GVKun编程网logo

如何使用Python在Spark中执行两个RDD表的基本联接?(spark联合两个rdd)

23

以上就是给各位分享如何使用Python在Spark中执行两个RDD表的基本联接?,其中也会对spark联合两个rdd进行解释,同时本文还将给你拓展pyspark对应的scala代码PythonRDD类

以上就是给各位分享如何使用Python在Spark中执行两个RDD表的基本联接?,其中也会对spark联合两个rdd进行解释,同时本文还将给你拓展pyspark 对应的 scala 代码 PythonRDD 类、python – 使用数组对象计算Spark RDD中的不同文本、scala – 如何使用带有两个RDD的键或filter()来使用spark intersection()?、scala – 如何在Spark-notebook中使用Accumulo 1.6创建Spark RDD?等相关知识,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

本文目录一览:

如何使用Python在Spark中执行两个RDD表的基本联接?(spark联合两个rdd)

如何使用Python在Spark中执行两个RDD表的基本联接?(spark联合两个rdd)

您将如何使用python在Spark中执行基本联接?在R中,您可以使用merg()进行此操作。在spark上使用python的语法是什么:

  1. 内部联接
  2. 左外连接
  3. 交叉连接

具有两个表(RDD),每个表中的单个列具有一个公共键。

RDD(1):(key,U)RDD(2):(key,V)

我认为内部联接是这样的:

rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));

是对的吗?我已经在互联网上搜索了,找不到很好的加入示例。提前致谢。

答案1

小编典典

可以使用PairRDDFunctions或Spark数据帧来完成。由于数据帧操作受益于Catalyst
Optimizer,因此第二个选项值得考虑。

假设您的数据如下所示:

rdd1 =  sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])rdd2 =  sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])

使用PairRDD:

内部联接:

rdd1.join(rdd2)

左外连接:

rdd1.leftOuterJoin(rdd2)

笛卡尔积(不需要RDD[(T, U)]):

rdd1.cartesian(rdd2)

广播加入(不需要RDD[(T, U)]):

  • 请参见Spark:将2元组键RDD与单键RDD结合在一起的最佳策略是什么?

最后,cogroup它没有直接的SQL等效项,但在某些情况下可能有用:

cogrouped = rdd1.cogroup(rdd2)cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()## [(''foo'', ([1], [4])), (''bar'', ([2], [5, 6])), (''baz'', ([3], []))]

使用Spark数据框

您可以使用SQL DSL或使用来执行原始SQL sqlContext.sql

df1 = spark.createDataFrame(rdd1, (''k'', ''v1''))df2 = spark.createDataFrame(rdd2, (''k'', ''v2''))# Register temporary tables to be able to use `sparkSession.sql`df1.createOrReplaceTempView(''df1'')df2.createOrReplaceTempView(''df2'')

内部联接:

# inner is a default value so it could be omitteddf1.join(df2, df1.k == df2.k, how=''inner'') spark.sql(''SELECT * FROM df1 JOIN df2 ON df1.k = df2.k'')

左外连接:

df1.join(df2, df1.k == df2.k, how=''left_outer'')spark.sql(''SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k'')

交叉连接(在Spark 2.0中需要显式交叉连接或配置更改-Spark
2.x的spark.sql.crossJoin.enabled):

df1.crossJoin(df2)spark.sql(''SELECT * FROM df1 CROSS JOIN df2'')

~~~~

df1.join(df2)sqlContext.sql(''SELECT * FROM df JOIN df2'')

从1.6(Scala中为1.5)开始,这些broadcast功能都可以与功能结合使用:

from pyspark.sql.functions import broadcastdf1.join(broadcast(df2), df1.k == df2.k)

执行广播加入。另请参阅为什么我的BroadcastHashJoin比Spark中的ShuffledHashJoin慢

pyspark 对应的 scala 代码 PythonRDD 类

pyspark 对应的 scala 代码 PythonRDD 类

pyspark jvm 端的 scala 代码 PythonRDD

代码版本为 spark 2.2.0

1.PythonRDD.class

这个 rdd 类型是 python 能接入 spark 的关键

//这是一个标准的RDD实现,实现对应的compute,partitioner,getPartitions等方法
//这个PythonRDD就是pyspark里PipelinedRDD里_jrdd属性方法返回的东西
//parent就是PipelinedRDD里传递进来的_prev_jrdd,是最初构建的数据源RDD
private[spark] class PythonRDD(
    parent: RDD[_],  //这个parentRDD是关键,python使用spark的所有数据来源都从这里来的
    func: PythonFunction, //这个是用户实现的python计算逻辑
    preservePartitoning: Boolean)
  extends RDD[Array[Byte]](parent) {

  val bufferSize = conf.getInt("spark.buffer.size", 65536)
  val reuse_worker = conf.getBoolean("spark.python.worker.reuse", true)

  override def getPartitions: Array[Partition] = firstParent.partitions

  override val partitioner: Option[Partitioner] = {
    if (preservePartitoning) firstParent.partitioner else None
  }

  val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)

  override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
    //调用PythonRunner执行此处任务逻辑
    //这里这个PythonRunner跟spark-submit时执行的PythonRunner不是同一个东西
    val runner = PythonRunner(func, bufferSize, reuse_worker)
    //执行runner的计算逻辑,第一个参数是spark数据源rdd的计算结果
    //firstParent.iterator会触发parent 这个rdd的计算,返回计算结果
    //这里第一个参数的rdd跟pyspark中RDD里的_jrdd是同一个东西
    runner.compute(firstParent.iterator(split, context), split.index, context)
  }
}

2.PythonRunner.class

这个类是 rdd 内部执行计算时的实体计算类,并不是代码提交时那个启动 py4j 的 PythonRunner

/*
 * 这个类做了三件事
 * 1.启动pyspark.daemon 接收task启动work执行接收到的task
 * 2.启动writerThread 将数据源的计算结果写到pyspark.work中
 * 3.从pyspark.work中拉取执行结果
 * 
 * writerThread写的数据就是pyspark中_jrdd计算出来的结果,也就是数据源rdd的数据
 */
private[spark] class PythonRunner(
    funcs: Seq[ChainedPythonFunctions],
    bufferSize: Int,
    reuse_worker: Boolean,
    isUDF: Boolean,
    argOffsets: Array[Array[Int]])
  extends Logging {

  require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs")

  //python执行的环境和命令
  private val envVars = funcs.head.funcs.head.envVars
  private val pythonExec = funcs.head.funcs.head.pythonExec
  private val pythonVer = funcs.head.funcs.head.pythonVer

  private val accumulator = funcs.head.funcs.head.accumulator

  def compute(
      inputIterator: Iterator[_],
      partitionIndex: Int,
      context: TaskContext): Iterator[Array[Byte]] = {
    val startTime = System.currentTimeMillis
    val env = SparkEnv.get
    val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",")
    envVars.put("SPARK_LOCAL_DIRS", localdir) // it''s also used in monitor thread
    if (reuse_worker) {
      envVars.put("SPARK_REUSE_WORKER", "1")
    }
    
    //创建pyspark 的work进程,底层执行的是pyspark.daemon
    //这个方法保证一次任务只启动一个pyspark.daemon
    //返回结果是跟work通信用的socket
    //具体分析将在其它部分记录
    val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
    @volatile var released = false

    // 创建writerThread,把数据源数据写到socket,发送到pyspark.work
    val writerThread = new WriterThread(env, worker, inputIterator, partitionIndex, context)

    //注册task完成监听,完成后停止writerThread线程
    context.addTaskCompletionListener { context =>
      writerThread.shutdownOnTaskCompletion()
      if (!reuse_worker || !released) {
        try {
          worker.close()
        } catch {
          case e: Exception =>
            logWarning("Failed to close worker socket", e)
        }
      }
    }

    writerThread.start()
    new MonitorThread(env, worker, context).start()

    val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
    // 创建拉取pyspark.work执行结果的迭代器
    val stdoutIterator = new Iterator[Array[Byte]] {
      override def next(): Array[Byte] = {
        val obj = _nextObj
        if (hasNext) {
          _nextObj = read()
        }
        obj
      }

      private def read(): Array[Byte] = {
        if (writerThread.exception.isDefined) {
          throw writerThread.exception.get
        }
        try {
          stream.readInt() match {
            case length if length > 0 =>
              val obj = new Array[Byte](length)
              stream.readFully(obj)
              obj
            case 0 => Array.empty[Byte]
            case SpecialLengths.TIMING_DATA =>
              // Timing data from worker
              val bootTime = stream.readLong()
              val initTime = stream.readLong()
              val finishTime = stream.readLong()
              val boot = bootTime - startTime
              val init = initTime - bootTime
              val finish = finishTime - initTime
              val total = finishTime - startTime
              logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot,
                init, finish))
              val memoryBytesSpilled = stream.readLong()
              val diskBytesSpilled = stream.readLong()
              context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
              context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
              read()
            case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
              // Signals that an exception has been thrown in python
              val exLength = stream.readInt()
              val obj = new Array[Byte](exLength)
              stream.readFully(obj)
              throw new PythonException(new String(obj, StandardCharsets.UTF_8),
                writerThread.exception.getOrElse(null))
            case SpecialLengths.END_OF_DATA_SECTION =>
              // We''ve finished the data section of the output, but we can still
              // read some accumulator updates:
              val numAccumulatorUpdates = stream.readInt()
              (1 to numAccumulatorUpdates).foreach { _ =>
                val updateLen = stream.readInt()
                val update = new Array[Byte](updateLen)
                stream.readFully(update)
                accumulator.add(update)
              }
              // Check whether the worker is ready to be re-used.
              if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
                if (reuse_worker) {
                  env.releasePythonWorker(pythonExec, envVars.asScala.toMap, worker)
                  released = true
                }
              }
              null
          }
        } catch {

          case e: Exception if context.isInterrupted =>
            logDebug("Exception thrown after task interruption", e)
            throw new TaskKilledException(context.getKillReason().getOrElse("unknown reason"))

          case e: Exception if env.isStopped =>
            logDebug("Exception thrown after context is stopped", e)
            null  // exit silently

          case e: Exception if writerThread.exception.isDefined =>
            logError("Python worker exited unexpectedly (crashed)", e)
            logError("This may have been caused by a prior exception:", writerThread.exception.get)
            throw writerThread.exception.get

          case eof: EOFException =>
            throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
        }
      }

      var _nextObj = read()

      override def hasNext: Boolean = _nextObj != null
    }
    //返回这个拉取数据结果的迭代器
    new InterruptibleIterator(context, stdoutIterator)
  }

  /**
   * WriterThread 线程的实现代码
   */
  class WriterThread(
      env: SparkEnv,
      worker: Socket,
      inputIterator: Iterator[_],
      partitionIndex: Int,
      context: TaskContext)
    extends Thread(s"stdout writer for $pythonExec") {

    @volatile private var _exception: Exception = null

    private val pythonIncludes = funcs.flatMap(_.funcs.flatMap(_.pythonIncludes.asScala)).toSet
    private val broadcastVars = funcs.flatMap(_.funcs.flatMap(_.broadcastVars.asScala))

    setDaemon(true)

    /** Contains the exception thrown while writing the parent iterator to the Python process. */
    def exception: Option[Exception] = Option(_exception)

    /** Terminates the writer thread, ignoring any exceptions that may occur due to cleanup. */
    def shutdownOnTaskCompletion() {
      assert(context.isCompleted)
      this.interrupt()
    }

    // 主要逻辑在run里,把数据源rdd的执行结果写进去
    // 把广播变量和环境,以及python的执行逻辑代码写进去
    // 把需要计算的数据源数据写进去
    override def run(): Unit = Utils.logUncaughtExceptions {
      try {
        TaskContext.setTaskContext(context)
        val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
        val dataOut = new DataOutputStream(stream)
        // Partition index
        dataOut.writeInt(partitionIndex)
        // Python version of driver
        PythonRDD.writeUTF(pythonVer, dataOut)
        // Write out the TaskContextInfo
        dataOut.writeInt(context.stageId())
        dataOut.writeInt(context.partitionId())
        dataOut.writeInt(context.attemptNumber())
        dataOut.writeLong(context.taskAttemptId())
        // sparkFilesDir
        PythonRDD.writeUTF(SparkFiles.getRootDirectory(), dataOut)
        // Python includes (*.zip and *.egg files)
        dataOut.writeInt(pythonIncludes.size)
        for (include <- pythonIncludes) {
          PythonRDD.writeUTF(include, dataOut)
        }
        // Broadcast variables
        val oldBids = PythonRDD.getWorkerBroadcasts(worker)
        val newBids = broadcastVars.map(_.id).toSet
        // number of different broadcasts
        val toRemove = oldBids.diff(newBids)
        val cnt = toRemove.size + newBids.diff(oldBids).size
        dataOut.writeInt(cnt)
        for (bid <- toRemove) {
          // remove the broadcast from worker
          dataOut.writeLong(- bid - 1)  // bid >= 0
          oldBids.remove(bid)
        }
        for (broadcast <- broadcastVars) {
          if (!oldBids.contains(broadcast.id)) {
            // send new broadcast
            dataOut.writeLong(broadcast.id)
            PythonRDD.writeUTF(broadcast.value.path, dataOut)
            oldBids.add(broadcast.id)
          }
        }
        dataOut.flush()
        // Serialized command:
        if (isUDF) {
          dataOut.writeInt(1)
          dataOut.writeInt(funcs.length)
          funcs.zip(argOffsets).foreach { case (chained, offsets) =>
            dataOut.writeInt(offsets.length)
            offsets.foreach { offset =>
              dataOut.writeInt(offset)
            }
            dataOut.writeInt(chained.funcs.length)
            chained.funcs.foreach { f =>
              dataOut.writeInt(f.command.length)
              dataOut.write(f.command)
            }
          }
        } else {
          dataOut.writeInt(0)
          val command = funcs.head.funcs.head.command
          dataOut.writeInt(command.length)
          dataOut.write(command)
        }
        // Data values
        PythonRDD.writeIteratorToStream(inputIterator, dataOut)
        dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION)
        dataOut.writeInt(SpecialLengths.END_OF_STREAM)
        dataOut.flush()
      } catch {
        case e: Exception if context.isCompleted || context.isInterrupted =>
          logDebug("Exception thrown after task completion (likely due to cleanup)", e)
          if (!worker.isClosed) {
            Utils.tryLog(worker.shutdownOutput())
          }

        case e: Exception =>
          // We must avoid throwing exceptions here, because the thread uncaught exception handler
          // will kill the whole executor (see org.apache.spark.executor.Executor).
          _exception = e
          if (!worker.isClosed) {
            Utils.tryLog(worker.shutdownOutput())
          }
      }
    }
  }

  // 监控task是不是还在执行
  class MonitorThread(env: SparkEnv, worker: Socket, context: TaskContext)
    extends Thread(s"Worker Monitor for $pythonExec") {

    setDaemon(true)

    override def run() {
      // Kill the worker if it is interrupted, checking until task completion.
      // TODO: This has a race condition if interruption occurs, as completed may still become true.
      while (!context.isInterrupted && !context.isCompleted) {
        Thread.sleep(2000)
      }
      if (!context.isCompleted) {
        try {
          logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
          env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
        } catch {
          case e: Exception =>
            logError("Exception when trying to kill worker", e)
        }
      }
    }
  }
}

python – 使用数组对象计算Spark RDD中的不同文本

python – 使用数组对象计算Spark RDD中的不同文本

我有一个由文本数组组成的spark rdd(单词).举个例子,

words.take(3)

会返回类似的东西.

[ ["A","B"],["B","C"],["C","A","D"] ]

现在,我想找出文本的总数以及文本的唯一数量.如果RDD只有3个以上的记录,

total_words = 7
unique_words = 4 (only A,B,C,D)

现在为了获得总数,我做了类似的事情

text_count_rdd = words.map(lambda x: len(x))
text_count_rdd.sum()

但我仍然坚持如何检索唯一的计数.

解决方法

只是flatMap,采取截然不同的计数:

words.flatMap(set).distinct().count()

scala – 如何使用带有两个RDD的键或filter()来使用spark intersection()?

scala – 如何使用带有两个RDD的键或filter()来使用spark intersection()?

我想在spark中使用keys()by key或filter().

但我真的不知道如何按键使用intersection().

所以我尝试使用filter(),但它没有用.

示例 – 这是两个RDD:

data1 //RDD[(String,Int)] = Array(("a",1),("a",2),("b",3),("c",1))
data2 //RDD[(String,5))

val data3 = data2.map{_._1}

data1.filter{_._1 == data3}.collect //Array[(String,Int] = Array()

我想根据data2所拥有的密钥获得一个(key,value)对与data1相同的密钥.

数组((“a”,(“a”,(“b”,3))是我想要的结果.

有没有一种方法可以通过key或filter()使用intersection()来解决这个问题?

解决方法

This can be achieved in different ways

1. filter()中的广播变量 – 需要提高可扩展性

val data1 = sc.parallelize(Seq(("a",1)))
val data2 = sc.parallelize(Seq(("a",5)))

// broadcast data2 key list to use in filter method,which runs in executor nodes
val bcast = sc.broadcast(data2.map(_._1).collect())

val result = data1.filter(r => bcast.value.contains(r._1))


println(result.collect().toList)
//Output
List((a,(a,(b,3))

2. cogroup(类似于按键分组)

val data1 = sc.parallelize(Seq(("a",5)))

val cogroupRdd: RDD[(String,(Iterable[Int],Iterable[Int]))] = data1.cogroup(data2)
/* List(
  (a,(CompactBuffer(1,CompactBuffer(3))),(CompactBuffer(2,CompactBuffer(5))),(c,(CompactBuffer(1),CompactBuffer()))
) */

//Now filter keys which have two non empty CompactBuffer. You can do that with 
//filter(row => row._2._1.nonEmpty && row._2._2.nonEmpty) also. 
val filterRdd = cogroupRdd.filter { case (k,(v1,v2)) => v1.nonEmpty && v2.nonEmpty } 
/* List(
  (a,CompactBuffer(5)))
) */

//As we care about first data only,lets pick first compact buffer only 
// by doing v1.map(val1 => (k,val1))
val result = filterRdd.flatMap { case (k,v2)) => v1.map(val1 => (k,val1)) }
//List((a,3))

3.使用内连接

val resultRdd = data1.join(data2).map(r => (r._1,r._2._1)).distinct()
//List((b,1))

这里data1.join(data2)保存带有公共键的对(内连接)

//List((a,(1,3)),(2,5)),1)),(3,1)))

scala – 如何在Spark-notebook中使用Accumulo 1.6创建Spark RDD?

scala – 如何在Spark-notebook中使用Accumulo 1.6创建Spark RDD?

我有一个Vagrant图像,Spark Notebook,Spark,Accumulo 1.6和Hadoop都在运行.从笔记本,我可以手动创建一个扫描仪,并从我使用其中一个Accumulo示例创建的表中提取测试数据:

val instanceNameS = "accumulo"
val zooServeRSS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS,zooServeRSS)
val connector: Connector = instance.getConnector( "root",new PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1",auths)

scanner.setRange(new Range("row_0000000000","row_0000000010"))

for(entry: Entry[Key,Value] <- scanner) {
  println(entry.getKey + " is " + entry.getValue)
}

将给出前10行表数据.

当我尝试创建RDD时:

val rdd2 = 
  sparkContext.newAPIHadoopRDD (
    new Configuration(),classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],classOf[org.apache.accumulo.core.data.Key],classOf[org.apache.accumulo.core.data.Value]
  )

由于以下错误,我得到一个RDD返回给我,我做不了多少:

java.io.IOException: Input info has not been set. at
org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538)
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getorElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at
org.apache.spark.rdd.RDD.count(RDD.scala:927)

考虑到我没有指定任何关于连接哪个表,auths是什么等的参数,这完全有道理.

所以我的问题是:从这里我需要做什么才能将前十行表数据放入我的RDD?

更新一个
仍然不起作用,但我确实发现了一些东西.原来有两个几乎相同的包,

org.apache.accumulo.core.client.mapreduce

&安培;

org.apache.accumulo.core.client.mapred

两者都有几乎相同的成员,除了一些方法签名不同的事实.不知道为什么两者都存在,因为没有我可以看到的弃用通知.我试图毫不高兴地实施Sietse的答案.以下是我的所作所为,以及回复:

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
val jobConf = new JobConf(new Configuration)

import org.apache.hadoop.mapred.JobConf import
org.apache.hadoop.conf.Configuration jobConf:
org.apache.hadoop.mapred.JobConf = Configuration: core-default.xml,
core-site.xml,mapred-default.xml,mapred-site.xml,yarn-default.xml,
yarn-site.xml

Configuration: core-default.xml,core-site.xml,
mapred-site.xml,yarn-site.xml

AbstractInputFormat.setConnectorInfo(jobConf,"root",new PasswordToken("password")

AbstractInputFormat.setScanAuthorizations(jobConf,auths)

AbstractInputFormat.setZooKeeperInstance(jobConf,new ClientConfiguration)

val rdd2 = 
  sparkContext.hadoopRDD (
    jobConf,classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat],classOf[org.apache.accumulo.core.data.Value],1
  )

rdd2: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key,
org.apache.accumulo.core.data.Value)] = HadoopRDD[1] at hadoopRDD at
:62

rdd2.first

java.io.IOException: Input info has not been set. at
org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)
at
org.apache.accumulo.core.client.mapred.AbstractInputFormat.validateOptions(AbstractInputFormat.java:308)
at
org.apache.accumulo.core.client.mapred.AbstractInputFormat.getSplits(AbstractInputFormat.java:505)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getorElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at
org.apache.spark.rdd.RDD.take(RDD.scala:1077) at
org.apache.spark.rdd.RDD.first(RDD.scala:1110) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69)
at…

*编辑2 *

re:霍尔顿的回答 – 仍然没有快乐:

AbstractInputFormat.setConnectorInfo(jobConf,new PasswordToken("password")
    AbstractInputFormat.setScanAuthorizations(jobConf,auths)
    AbstractInputFormat.setZooKeeperInstance(jobConf,new ClientConfiguration)
    InputFormatBase.setInputTableName(jobConf,"batchtest1")
    val rddX = sparkContext.newAPIHadoopRDD(
      jobConf,classOf[org.apache.accumulo.core.data.Value]
      )

rddX: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key,
org.apache.accumulo.core.data.Value)] = NewHadoopRDD[0] at
newAPIHadoopRDD at :58

Out[15]: NewHadoopRDD[0] at newAPIHadoopRDD at :58

rddX.first

java.io.IOException: Input info has not been set. at
org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538)
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getorElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at
org.apache.spark.rdd.RDD.take(RDD.scala:1077) at
org.apache.spark.rdd.RDD.first(RDD.scala:1110) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61)
at

编辑3 – 进步!

我能够弄清楚为什么’输入INFO未设置’错误发生了.你们中间的老鹰眼无疑会看到以下代码缺少一个结束的''(‘

AbstractInputFormat.setConnectorInfo(jobConf,new PasswordToken("password")

因为我在spark-notebook中这样做,我一直点击执行按钮继续前进,因为我没有看到错误.我忘了的是,当你放弃关闭’)时,笔记本将会执行spark-shell所做的事情 – 它将永远等待你添加它.所以错误是“setConnectorInfo”方法永远不会被执行的结果.

不幸的是,我仍然无法将累积表数据推送到可用于我的RDD中.当我执行

rddX.count

我回来了

res15: Long = 10000

这是正确的响应 – 我指出的表中有10,000行数据.但是,当我试图抓住数据的第一个元素时:

rddX.first

我收到以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0.0 in stage 0.0 (TID 0) had a not serializable result:
org.apache.accumulo.core.data.Key

关于从哪里去的任何想法?

编辑4 – 成功!

接受的答案评论是90%的方式 – 除了accumulo键/值需要被强制转换为可序列化的事实.我通过在两者上调用.toString()方法来实现这一点.我会尝试尽快发布一些完整的工作代码,以防其他任何人遇到同样的问题.

解决方法

通常使用自定义Hadoop InputFormats,使用JobConf指定信息.正如@Sietse所指出的,AccumuloInputFormat上有一些静态方法可用于配置JobConf.在这种情况下,我认为你想要做的是:

val jobConf = new JobConf() // Create a job conf
// Configure the job conf with our accumulo properties
AccumuloInputFormat.setConnectorInfo(jobConf,principal,token)
AccumuloInputFormat.setScanAuthorizations(jobConf,authorizations)
val clientConfig =  new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)
AccumuloInputFormat.setZooKeeperInstance(jobConf,clientConfig)
AccumuloInputFormat.setInputTableName(jobConf,tableName)
// Create an RDD using the jobConf
val rdd2 = sc.newAPIHadoopRDD(jobConf,classOf[org.apache.accumulo.core.data.Value]
)

注意:在深入研究代码之后,看来配置属性的设置部分基于被调用的类(有意识地避免与其他包冲突),所以当我们去具体类中时后来它找不到配置好的标志.解决方法是不使用Abstract类.有关实施细节,请参阅https://github.com/apache/accumulo/blob/bf102d0711103e903afa0589500f5796ad51c366/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java#L127).如果你不能使用spark-notebook在具体的实现上调用这个方法,可能使用spark-shell或定期构建的应用程序是最简单的解决方案.

我们今天的关于如何使用Python在Spark中执行两个RDD表的基本联接?spark联合两个rdd的分享已经告一段落,感谢您的关注,如果您想了解更多关于pyspark 对应的 scala 代码 PythonRDD 类、python – 使用数组对象计算Spark RDD中的不同文本、scala – 如何使用带有两个RDD的键或filter()来使用spark intersection()?、scala – 如何在Spark-notebook中使用Accumulo 1.6创建Spark RDD?的相关信息,请在本站查询。

本文标签: