以上就是给各位分享如何使用Python在Spark中执行两个RDD表的基本联接?,其中也会对spark联合两个rdd进行解释,同时本文还将给你拓展pyspark对应的scala代码PythonRDD类
以上就是给各位分享如何使用Python在Spark中执行两个RDD表的基本联接?,其中也会对spark联合两个rdd进行解释,同时本文还将给你拓展pyspark 对应的 scala 代码 PythonRDD 类、python – 使用数组对象计算Spark RDD中的不同文本、scala – 如何使用带有两个RDD的键或filter()来使用spark intersection()?、scala – 如何在Spark-notebook中使用Accumulo 1.6创建Spark RDD?等相关知识,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!
本文目录一览:- 如何使用Python在Spark中执行两个RDD表的基本联接?(spark联合两个rdd)
- pyspark 对应的 scala 代码 PythonRDD 类
- python – 使用数组对象计算Spark RDD中的不同文本
- scala – 如何使用带有两个RDD的键或filter()来使用spark intersection()?
- scala – 如何在Spark-notebook中使用Accumulo 1.6创建Spark RDD?
如何使用Python在Spark中执行两个RDD表的基本联接?(spark联合两个rdd)
您将如何使用python在Spark中执行基本联接?在R中,您可以使用merg()进行此操作。在spark上使用python的语法是什么:
- 内部联接
- 左外连接
- 交叉连接
具有两个表(RDD),每个表中的单个列具有一个公共键。
RDD(1):(key,U)RDD(2):(key,V)
我认为内部联接是这样的:
rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));
是对的吗?我已经在互联网上搜索了,找不到很好的加入示例。提前致谢。
答案1
小编典典可以使用PairRDDFunctions
或Spark数据帧来完成。由于数据帧操作受益于Catalyst
Optimizer,因此第二个选项值得考虑。
假设您的数据如下所示:
rdd1 = sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])rdd2 = sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])
使用PairRDD:
内部联接:
rdd1.join(rdd2)
左外连接:
rdd1.leftOuterJoin(rdd2)
笛卡尔积(不需要RDD[(T, U)]
):
rdd1.cartesian(rdd2)
广播加入(不需要RDD[(T, U)]
):
- 请参见Spark:将2元组键RDD与单键RDD结合在一起的最佳策略是什么?
最后,cogroup
它没有直接的SQL等效项,但在某些情况下可能有用:
cogrouped = rdd1.cogroup(rdd2)cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()## [(''foo'', ([1], [4])), (''bar'', ([2], [5, 6])), (''baz'', ([3], []))]
使用Spark数据框
您可以使用SQL DSL或使用来执行原始SQL sqlContext.sql
。
df1 = spark.createDataFrame(rdd1, (''k'', ''v1''))df2 = spark.createDataFrame(rdd2, (''k'', ''v2''))# Register temporary tables to be able to use `sparkSession.sql`df1.createOrReplaceTempView(''df1'')df2.createOrReplaceTempView(''df2'')
内部联接:
# inner is a default value so it could be omitteddf1.join(df2, df1.k == df2.k, how=''inner'') spark.sql(''SELECT * FROM df1 JOIN df2 ON df1.k = df2.k'')
左外连接:
df1.join(df2, df1.k == df2.k, how=''left_outer'')spark.sql(''SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k'')
交叉连接(在Spark 2.0中需要显式交叉连接或配置更改-Spark
2.x的spark.sql.crossJoin.enabled):
df1.crossJoin(df2)spark.sql(''SELECT * FROM df1 CROSS JOIN df2'')
~~~~
df1.join(df2)sqlContext.sql(''SELECT * FROM df JOIN df2'')
从1.6(Scala中为1.5)开始,这些broadcast
功能都可以与功能结合使用:
from pyspark.sql.functions import broadcastdf1.join(broadcast(df2), df1.k == df2.k)
执行广播加入。另请参阅为什么我的BroadcastHashJoin比Spark中的ShuffledHashJoin慢
pyspark 对应的 scala 代码 PythonRDD 类
pyspark jvm 端的 scala 代码 PythonRDD
代码版本为 spark 2.2.0
1.PythonRDD.class
这个 rdd 类型是 python 能接入 spark 的关键
//这是一个标准的RDD实现,实现对应的compute,partitioner,getPartitions等方法
//这个PythonRDD就是pyspark里PipelinedRDD里_jrdd属性方法返回的东西
//parent就是PipelinedRDD里传递进来的_prev_jrdd,是最初构建的数据源RDD
private[spark] class PythonRDD(
parent: RDD[_], //这个parentRDD是关键,python使用spark的所有数据来源都从这里来的
func: PythonFunction, //这个是用户实现的python计算逻辑
preservePartitoning: Boolean)
extends RDD[Array[Byte]](parent) {
val bufferSize = conf.getInt("spark.buffer.size", 65536)
val reuse_worker = conf.getBoolean("spark.python.worker.reuse", true)
override def getPartitions: Array[Partition] = firstParent.partitions
override val partitioner: Option[Partitioner] = {
if (preservePartitoning) firstParent.partitioner else None
}
val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
//调用PythonRunner执行此处任务逻辑
//这里这个PythonRunner跟spark-submit时执行的PythonRunner不是同一个东西
val runner = PythonRunner(func, bufferSize, reuse_worker)
//执行runner的计算逻辑,第一个参数是spark数据源rdd的计算结果
//firstParent.iterator会触发parent 这个rdd的计算,返回计算结果
//这里第一个参数的rdd跟pyspark中RDD里的_jrdd是同一个东西
runner.compute(firstParent.iterator(split, context), split.index, context)
}
}
2.PythonRunner.class
这个类是 rdd 内部执行计算时的实体计算类,并不是代码提交时那个启动 py4j 的 PythonRunner
/*
* 这个类做了三件事
* 1.启动pyspark.daemon 接收task启动work执行接收到的task
* 2.启动writerThread 将数据源的计算结果写到pyspark.work中
* 3.从pyspark.work中拉取执行结果
*
* writerThread写的数据就是pyspark中_jrdd计算出来的结果,也就是数据源rdd的数据
*/
private[spark] class PythonRunner(
funcs: Seq[ChainedPythonFunctions],
bufferSize: Int,
reuse_worker: Boolean,
isUDF: Boolean,
argOffsets: Array[Array[Int]])
extends Logging {
require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs")
//python执行的环境和命令
private val envVars = funcs.head.funcs.head.envVars
private val pythonExec = funcs.head.funcs.head.pythonExec
private val pythonVer = funcs.head.funcs.head.pythonVer
private val accumulator = funcs.head.funcs.head.accumulator
def compute(
inputIterator: Iterator[_],
partitionIndex: Int,
context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
val env = SparkEnv.get
val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",")
envVars.put("SPARK_LOCAL_DIRS", localdir) // it''s also used in monitor thread
if (reuse_worker) {
envVars.put("SPARK_REUSE_WORKER", "1")
}
//创建pyspark 的work进程,底层执行的是pyspark.daemon
//这个方法保证一次任务只启动一个pyspark.daemon
//返回结果是跟work通信用的socket
//具体分析将在其它部分记录
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
@volatile var released = false
// 创建writerThread,把数据源数据写到socket,发送到pyspark.work
val writerThread = new WriterThread(env, worker, inputIterator, partitionIndex, context)
//注册task完成监听,完成后停止writerThread线程
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
if (!reuse_worker || !released) {
try {
worker.close()
} catch {
case e: Exception =>
logWarning("Failed to close worker socket", e)
}
}
}
writerThread.start()
new MonitorThread(env, worker, context).start()
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
// 创建拉取pyspark.work执行结果的迭代器
val stdoutIterator = new Iterator[Array[Byte]] {
override def next(): Array[Byte] = {
val obj = _nextObj
if (hasNext) {
_nextObj = read()
}
obj
}
private def read(): Array[Byte] = {
if (writerThread.exception.isDefined) {
throw writerThread.exception.get
}
try {
stream.readInt() match {
case length if length > 0 =>
val obj = new Array[Byte](length)
stream.readFully(obj)
obj
case 0 => Array.empty[Byte]
case SpecialLengths.TIMING_DATA =>
// Timing data from worker
val bootTime = stream.readLong()
val initTime = stream.readLong()
val finishTime = stream.readLong()
val boot = bootTime - startTime
val init = initTime - bootTime
val finish = finishTime - initTime
val total = finishTime - startTime
logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot,
init, finish))
val memoryBytesSpilled = stream.readLong()
val diskBytesSpilled = stream.readLong()
context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
read()
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
// Signals that an exception has been thrown in python
val exLength = stream.readInt()
val obj = new Array[Byte](exLength)
stream.readFully(obj)
throw new PythonException(new String(obj, StandardCharsets.UTF_8),
writerThread.exception.getOrElse(null))
case SpecialLengths.END_OF_DATA_SECTION =>
// We''ve finished the data section of the output, but we can still
// read some accumulator updates:
val numAccumulatorUpdates = stream.readInt()
(1 to numAccumulatorUpdates).foreach { _ =>
val updateLen = stream.readInt()
val update = new Array[Byte](updateLen)
stream.readFully(update)
accumulator.add(update)
}
// Check whether the worker is ready to be re-used.
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
if (reuse_worker) {
env.releasePythonWorker(pythonExec, envVars.asScala.toMap, worker)
released = true
}
}
null
}
} catch {
case e: Exception if context.isInterrupted =>
logDebug("Exception thrown after task interruption", e)
throw new TaskKilledException(context.getKillReason().getOrElse("unknown reason"))
case e: Exception if env.isStopped =>
logDebug("Exception thrown after context is stopped", e)
null // exit silently
case e: Exception if writerThread.exception.isDefined =>
logError("Python worker exited unexpectedly (crashed)", e)
logError("This may have been caused by a prior exception:", writerThread.exception.get)
throw writerThread.exception.get
case eof: EOFException =>
throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
}
}
var _nextObj = read()
override def hasNext: Boolean = _nextObj != null
}
//返回这个拉取数据结果的迭代器
new InterruptibleIterator(context, stdoutIterator)
}
/**
* WriterThread 线程的实现代码
*/
class WriterThread(
env: SparkEnv,
worker: Socket,
inputIterator: Iterator[_],
partitionIndex: Int,
context: TaskContext)
extends Thread(s"stdout writer for $pythonExec") {
@volatile private var _exception: Exception = null
private val pythonIncludes = funcs.flatMap(_.funcs.flatMap(_.pythonIncludes.asScala)).toSet
private val broadcastVars = funcs.flatMap(_.funcs.flatMap(_.broadcastVars.asScala))
setDaemon(true)
/** Contains the exception thrown while writing the parent iterator to the Python process. */
def exception: Option[Exception] = Option(_exception)
/** Terminates the writer thread, ignoring any exceptions that may occur due to cleanup. */
def shutdownOnTaskCompletion() {
assert(context.isCompleted)
this.interrupt()
}
// 主要逻辑在run里,把数据源rdd的执行结果写进去
// 把广播变量和环境,以及python的执行逻辑代码写进去
// 把需要计算的数据源数据写进去
override def run(): Unit = Utils.logUncaughtExceptions {
try {
TaskContext.setTaskContext(context)
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
val dataOut = new DataOutputStream(stream)
// Partition index
dataOut.writeInt(partitionIndex)
// Python version of driver
PythonRDD.writeUTF(pythonVer, dataOut)
// Write out the TaskContextInfo
dataOut.writeInt(context.stageId())
dataOut.writeInt(context.partitionId())
dataOut.writeInt(context.attemptNumber())
dataOut.writeLong(context.taskAttemptId())
// sparkFilesDir
PythonRDD.writeUTF(SparkFiles.getRootDirectory(), dataOut)
// Python includes (*.zip and *.egg files)
dataOut.writeInt(pythonIncludes.size)
for (include <- pythonIncludes) {
PythonRDD.writeUTF(include, dataOut)
}
// Broadcast variables
val oldBids = PythonRDD.getWorkerBroadcasts(worker)
val newBids = broadcastVars.map(_.id).toSet
// number of different broadcasts
val toRemove = oldBids.diff(newBids)
val cnt = toRemove.size + newBids.diff(oldBids).size
dataOut.writeInt(cnt)
for (bid <- toRemove) {
// remove the broadcast from worker
dataOut.writeLong(- bid - 1) // bid >= 0
oldBids.remove(bid)
}
for (broadcast <- broadcastVars) {
if (!oldBids.contains(broadcast.id)) {
// send new broadcast
dataOut.writeLong(broadcast.id)
PythonRDD.writeUTF(broadcast.value.path, dataOut)
oldBids.add(broadcast.id)
}
}
dataOut.flush()
// Serialized command:
if (isUDF) {
dataOut.writeInt(1)
dataOut.writeInt(funcs.length)
funcs.zip(argOffsets).foreach { case (chained, offsets) =>
dataOut.writeInt(offsets.length)
offsets.foreach { offset =>
dataOut.writeInt(offset)
}
dataOut.writeInt(chained.funcs.length)
chained.funcs.foreach { f =>
dataOut.writeInt(f.command.length)
dataOut.write(f.command)
}
}
} else {
dataOut.writeInt(0)
val command = funcs.head.funcs.head.command
dataOut.writeInt(command.length)
dataOut.write(command)
}
// Data values
PythonRDD.writeIteratorToStream(inputIterator, dataOut)
dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION)
dataOut.writeInt(SpecialLengths.END_OF_STREAM)
dataOut.flush()
} catch {
case e: Exception if context.isCompleted || context.isInterrupted =>
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
if (!worker.isClosed) {
Utils.tryLog(worker.shutdownOutput())
}
case e: Exception =>
// We must avoid throwing exceptions here, because the thread uncaught exception handler
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
if (!worker.isClosed) {
Utils.tryLog(worker.shutdownOutput())
}
}
}
}
// 监控task是不是还在执行
class MonitorThread(env: SparkEnv, worker: Socket, context: TaskContext)
extends Thread(s"Worker Monitor for $pythonExec") {
setDaemon(true)
override def run() {
// Kill the worker if it is interrupted, checking until task completion.
// TODO: This has a race condition if interruption occurs, as completed may still become true.
while (!context.isInterrupted && !context.isCompleted) {
Thread.sleep(2000)
}
if (!context.isCompleted) {
try {
logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
} catch {
case e: Exception =>
logError("Exception when trying to kill worker", e)
}
}
}
}
}
python – 使用数组对象计算Spark RDD中的不同文本
words.take(3)
会返回类似的东西.
[ ["A","B"],["B","C"],["C","A","D"] ]
现在,我想找出文本的总数以及文本的唯一数量.如果RDD只有3个以上的记录,
total_words = 7 unique_words = 4 (only A,B,C,D)
现在为了获得总数,我做了类似的事情
text_count_rdd = words.map(lambda x: len(x)) text_count_rdd.sum()
但我仍然坚持如何检索唯一的计数.
解决方法
words.flatMap(set).distinct().count()
scala – 如何使用带有两个RDD的键或filter()来使用spark intersection()?
但我真的不知道如何按键使用intersection().
所以我尝试使用filter(),但它没有用.
示例 – 这是两个RDD:
data1 //RDD[(String,Int)] = Array(("a",1),("a",2),("b",3),("c",1)) data2 //RDD[(String,5)) val data3 = data2.map{_._1} data1.filter{_._1 == data3}.collect //Array[(String,Int] = Array()
我想根据data2所拥有的密钥获得一个(key,value)对与data1相同的密钥.
数组((“a”,(“a”,(“b”,3))是我想要的结果.
有没有一种方法可以通过key或filter()使用intersection()来解决这个问题?
解决方法
This can be achieved in different ways
1. filter()中的广播变量 – 需要提高可扩展性
val data1 = sc.parallelize(Seq(("a",1))) val data2 = sc.parallelize(Seq(("a",5))) // broadcast data2 key list to use in filter method,which runs in executor nodes val bcast = sc.broadcast(data2.map(_._1).collect()) val result = data1.filter(r => bcast.value.contains(r._1)) println(result.collect().toList) //Output List((a,(a,(b,3))
2. cogroup(类似于按键分组)
val data1 = sc.parallelize(Seq(("a",5))) val cogroupRdd: RDD[(String,(Iterable[Int],Iterable[Int]))] = data1.cogroup(data2) /* List( (a,(CompactBuffer(1,CompactBuffer(3))),(CompactBuffer(2,CompactBuffer(5))),(c,(CompactBuffer(1),CompactBuffer())) ) */ //Now filter keys which have two non empty CompactBuffer. You can do that with //filter(row => row._2._1.nonEmpty && row._2._2.nonEmpty) also. val filterRdd = cogroupRdd.filter { case (k,(v1,v2)) => v1.nonEmpty && v2.nonEmpty } /* List( (a,CompactBuffer(5))) ) */ //As we care about first data only,lets pick first compact buffer only // by doing v1.map(val1 => (k,val1)) val result = filterRdd.flatMap { case (k,v2)) => v1.map(val1 => (k,val1)) } //List((a,3))
3.使用内连接
val resultRdd = data1.join(data2).map(r => (r._1,r._2._1)).distinct() //List((b,1))
这里data1.join(data2)保存带有公共键的对(内连接)
//List((a,(1,3)),(2,5)),1)),(3,1)))
scala – 如何在Spark-notebook中使用Accumulo 1.6创建Spark RDD?
val instanceNameS = "accumulo" val zooServeRSS = "localhost:2181" val instance: Instance = new ZooKeeperInstance(instanceNameS,zooServeRSS) val connector: Connector = instance.getConnector( "root",new PasswordToken("password")) val auths = new Authorizations("exampleVis") val scanner = connector.createScanner("batchtest1",auths) scanner.setRange(new Range("row_0000000000","row_0000000010")) for(entry: Entry[Key,Value] <- scanner) { println(entry.getKey + " is " + entry.getValue) }
将给出前10行表数据.
当我尝试创建RDD时:
val rdd2 = sparkContext.newAPIHadoopRDD ( new Configuration(),classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],classOf[org.apache.accumulo.core.data.Key],classOf[org.apache.accumulo.core.data.Value] )
由于以下错误,我得到一个RDD返回给我,我做不了多少:
java.io.IOException: Input info has not been set. at
org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538)
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getorElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at
org.apache.spark.rdd.RDD.count(RDD.scala:927)
考虑到我没有指定任何关于连接哪个表,auths是什么等的参数,这完全有道理.
所以我的问题是:从这里我需要做什么才能将前十行表数据放入我的RDD?
更新一个
仍然不起作用,但我确实发现了一些东西.原来有两个几乎相同的包,
org.apache.accumulo.core.client.mapreduce
&安培;
org.apache.accumulo.core.client.mapred
两者都有几乎相同的成员,除了一些方法签名不同的事实.不知道为什么两者都存在,因为没有我可以看到的弃用通知.我试图毫不高兴地实施Sietse的答案.以下是我的所作所为,以及回复:
import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.conf.Configuration val jobConf = new JobConf(new Configuration)
import org.apache.hadoop.mapred.JobConf import
org.apache.hadoop.conf.Configuration jobConf:
org.apache.hadoop.mapred.JobConf = Configuration: core-default.xml,
core-site.xml,mapred-default.xml,mapred-site.xml,yarn-default.xml,
yarn-site.xmlConfiguration: core-default.xml,core-site.xml,
mapred-site.xml,yarn-site.xml
AbstractInputFormat.setConnectorInfo(jobConf,"root",new PasswordToken("password") AbstractInputFormat.setScanAuthorizations(jobConf,auths) AbstractInputFormat.setZooKeeperInstance(jobConf,new ClientConfiguration) val rdd2 = sparkContext.hadoopRDD ( jobConf,classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat],classOf[org.apache.accumulo.core.data.Value],1 )
rdd2: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key,
org.apache.accumulo.core.data.Value)] = HadoopRDD[1] at hadoopRDD at
:62
rdd2.first
java.io.IOException: Input info has not been set. at
org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)
at
org.apache.accumulo.core.client.mapred.AbstractInputFormat.validateOptions(AbstractInputFormat.java:308)
at
org.apache.accumulo.core.client.mapred.AbstractInputFormat.getSplits(AbstractInputFormat.java:505)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getorElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at
org.apache.spark.rdd.RDD.take(RDD.scala:1077) at
org.apache.spark.rdd.RDD.first(RDD.scala:1110) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69)
at…
*编辑2 *
re:霍尔顿的回答 – 仍然没有快乐:
AbstractInputFormat.setConnectorInfo(jobConf,new PasswordToken("password") AbstractInputFormat.setScanAuthorizations(jobConf,auths) AbstractInputFormat.setZooKeeperInstance(jobConf,new ClientConfiguration) InputFormatBase.setInputTableName(jobConf,"batchtest1") val rddX = sparkContext.newAPIHadoopRDD( jobConf,classOf[org.apache.accumulo.core.data.Value] )
rddX: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key,
org.apache.accumulo.core.data.Value)] = NewHadoopRDD[0] at
newAPIHadoopRDD at :58Out[15]: NewHadoopRDD[0] at newAPIHadoopRDD at :58
rddX.first
java.io.IOException: Input info has not been set. at
org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538)
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getorElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at
org.apache.spark.rdd.RDD.take(RDD.scala:1077) at
org.apache.spark.rdd.RDD.first(RDD.scala:1110) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61)
at
编辑3 – 进步!
我能够弄清楚为什么’输入INFO未设置’错误发生了.你们中间的老鹰眼无疑会看到以下代码缺少一个结束的''(‘
AbstractInputFormat.setConnectorInfo(jobConf,new PasswordToken("password")
因为我在spark-notebook中这样做,我一直点击执行按钮继续前进,因为我没有看到错误.我忘了的是,当你放弃关闭’)时,笔记本将会执行spark-shell所做的事情 – 它将永远等待你添加它.所以错误是“setConnectorInfo”方法永远不会被执行的结果.
不幸的是,我仍然无法将累积表数据推送到可用于我的RDD中.当我执行
rddX.count
我回来了
res15: Long = 10000
这是正确的响应 – 我指出的表中有10,000行数据.但是,当我试图抓住数据的第一个元素时:
rddX.first
我收到以下错误:
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0.0 in stage 0.0 (TID 0) had a not serializable result:
org.apache.accumulo.core.data.Key
关于从哪里去的任何想法?
编辑4 – 成功!
接受的答案评论是90%的方式 – 除了accumulo键/值需要被强制转换为可序列化的事实.我通过在两者上调用.toString()方法来实现这一点.我会尝试尽快发布一些完整的工作代码,以防其他任何人遇到同样的问题.
解决方法
val jobConf = new JobConf() // Create a job conf // Configure the job conf with our accumulo properties AccumuloInputFormat.setConnectorInfo(jobConf,principal,token) AccumuloInputFormat.setScanAuthorizations(jobConf,authorizations) val clientConfig = new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers) AccumuloInputFormat.setZooKeeperInstance(jobConf,clientConfig) AccumuloInputFormat.setInputTableName(jobConf,tableName) // Create an RDD using the jobConf val rdd2 = sc.newAPIHadoopRDD(jobConf,classOf[org.apache.accumulo.core.data.Value] )
注意:在深入研究代码之后,看来配置属性的设置部分基于被调用的类(有意识地避免与其他包冲突),所以当我们去具体类中时后来它找不到配置好的标志.解决方法是不使用Abstract类.有关实施细节,请参阅https://github.com/apache/accumulo/blob/bf102d0711103e903afa0589500f5796ad51c366/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java#L127).如果你不能使用spark-notebook在具体的实现上调用这个方法,可能使用spark-shell或定期构建的应用程序是最简单的解决方案.
我们今天的关于如何使用Python在Spark中执行两个RDD表的基本联接?和spark联合两个rdd的分享已经告一段落,感谢您的关注,如果您想了解更多关于pyspark 对应的 scala 代码 PythonRDD 类、python – 使用数组对象计算Spark RDD中的不同文本、scala – 如何使用带有两个RDD的键或filter()来使用spark intersection()?、scala – 如何在Spark-notebook中使用Accumulo 1.6创建Spark RDD?的相关信息,请在本站查询。
本文标签: