GVKun编程网logo

如何使用Spark(pyspark)编写镶木地板文件?

15

最近很多小伙伴都在问如何使用Spark和pyspark编写镶木地板文件?这两个问题,那么本篇文章就来给大家详细解答一下,同时本文还将给你拓展org.apache.spark.SparkExceptio

最近很多小伙伴都在问如何使用Sparkpyspark编写镶木地板文件?这两个问题,那么本篇文章就来给大家详细解答一下,同时本文还将给你拓展org.apache.spark.SparkException:无效的Spark URL:spark:// HeartbeatReceiver @ xxxx_LPT-324:51380 PySpark、Pandas无法读取在PySpark中创建的实木复合地板文件、pyspark:过滤具有不同列结构的镶木地板文件、python – pyspark:使用spark-submit运送jar依赖项等相关知识,下面开始了哦!

本文目录一览:

如何使用Spark(pyspark)编写镶木地板文件?

如何使用Spark(pyspark)编写镶木地板文件?

我在Spark中还很陌生,我一直在尝试将Dataframe转换为Spark中的镶木地板文件,但还没有成功。该文件说,我可以使用 write.parquet
函数来创建该文件。但是,当我运行脚本时,它向我显示: AttributeError:’RDD’对象没有属性’write’

from pyspark import SparkContext
sc = SparkContext("local","Protob Conversion to Parquet ")

# spark is an existing SparkSession
df = sc.textFile("/temp/proto_temp.csv")

# Displays the content of the DataFrame to stdout
df.write.parquet("/output/proto.parquet")

你知道怎么做吗?

我使用的Spark版本是为Hadoop 2.7.3构建的Spark 2.0.1。

org.apache.spark.SparkException:无效的Spark URL:spark:// HeartbeatReceiver @ xxxx_LPT-324:51380 PySpark

org.apache.spark.SparkException:无效的Spark URL:spark:// HeartbeatReceiver @ xxxx_LPT-324:51380 PySpark

如何解决org.apache.spark.SparkException:无效的Spark URL:spark:// HeartbeatReceiver @ xxxx_LPT-324:51380 PySpark?

尝试使用PySpark创建SparkConf,但出现错误

代码

from pyspark.python.pyspark.shell import spark
from pyspark import SparkConf,SparkContext
from pyspark.shell import sqlContext
from pyspark.sql import SparkSession
      
conf = SparkConf().setAppName("Test-1 ETL").setMaster("local[*]").set("spark.driver.host","localhost").set("spark.sql.execution.arrow.pyspark.enabled","true")
sc = SparkContext(conf=conf)

错误

org.apache.spark.SparkException: Invalid Spark URL: spark://HeartbeatReceiver@xxxx_LPT-324:51380

我还设置了set SPARK_LOCAL_HOSTNAME=localhost

有人可以帮助我吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

Pandas无法读取在PySpark中创建的实木复合地板文件

Pandas无法读取在PySpark中创建的实木复合地板文件

我正在通过以下方式从Spark DataFrame编写镶木地板文件:

df.write.parquet("path/myfile.parquet", mode = "overwrite", compression="gzip")

这将创建一个包含多个文件的文件夹。

当我尝试将其读入pandas时,会出现以下错误,具体取决于我使用的解析器:

import pandas as pddf = pd.read_parquet("path/myfile.parquet", engine="pyarrow")

PyArrow:

pyarrow.lib.check_status中的文件“ pyarrow \ error.pxi”,第83行

ArrowIOError:无效的实木复合地板文件。页脚已损坏。

快速镶木地板:

文件“ C:\ Program Files \ Anaconda3 \ lib \ site-packages \ fastparquet \
util.py”,行38,在default_open中返回open(f,mode)

PermissionError:[Errno 13]权限被拒绝:’path / myfile.parquet’

我正在使用以下版本:* Spark 2.4.0* Panda 0.23.4* Poppy 0.10.0* Fast parquet 0.2.1

我尝试了gzip以及灵活的压缩。两者都不起作用。我当然要确保文件位于Python有权读取/写入的位置。

如果有人能够重现此错误,则已经有所帮助。

答案1

小编典典

由于即使使用较新的pandas版本,这似乎仍然是一个问题,因此我编写了一些函数来规避此问题,这是更大的pyspark helpers库的一部分:

import pandas as pdimport datetimedef read_parquet_folder_as_pandas(path, verbosity=1):  files = [f for f in os.listdir(path) if f.endswith("parquet")]  if verbosity > 0:    print("{} parquet files found. Beginning reading...".format(len(files)), end="")    start = datetime.datetime.now()  df_list = [pd.read_parquet(os.path.join(path, f)) for f in files]  df = pd.concat(df_list, ignore_index=True)  if verbosity > 0:    end = datetime.datetime.now()    print(" Finished. Took {}".format(end-start))  return dfdef read_parquet_as_pandas(path, verbosity=1):  """Workaround for pandas not being able to read folder-style parquet files.  """  if os.path.isdir(path):    if verbosity>1: print("Parquet file is actually folder.")    return read_parquet_folder_as_pandas(path, verbosity)  else:    return pd.read_parquet(path)

这假定拼花地板“文件”中的相关文件(实际上是一个文件夹)以“
.parquet”结尾。这适用于数据砖导出的拼花文件,也可以与其他文件一起使用(未经测试,对评论中的反馈感到高兴)。

read_parquet_as_pandas()如果事先不知道是否为文件夹,则可以使用该功能。

pyspark:过滤具有不同列结构的镶木地板文件

pyspark:过滤具有不同列结构的镶木地板文件

如何解决pyspark:过滤具有不同列结构的镶木地板文件?

我的镶木地板数据保存在 aws s3 存储桶中。 Parquet 文件按日期分区,文件夹结构如下

MyFolder
  |-- date=20210701
    |--part-xysdf-snappy.parquet
  |-- date=20210702
    |--part-fasdf-snappy.parquet
  |-- date=20210703
    |--part-ghdfg-snappy.parquet
 ....
 ....

请注意Parquet in date=20210701(最早的条目)有问题,漏掉了两列

  +-------+-----+
  |   name|grade|
  +-------+-----+
  |Alberto|  100|
  | Dakota|   96|
  +-------+-----+

其余的镶木地板文件都很好,就像

  +-------+-----+------+-------+
  |   name|grade|height|  date |
  +-------+-----+--------------+
  |Karolin|  110|  173 |20210701
  | Lucas |   91|  178 |20210701
  +-------+-----+------+-------+

如果我只想关注''姓名''和''等级'',我可以使用下面的代码来显示结果

def check_data(start_date,end_date):
    cols = [''name'',''grade'']
    df = spark.read.parquet(''path/MyFolder'').select(cols)
    df = df.filter(f''date > "{start_date}" and date < "{end_date}"'')
    return df

上面的代码很方便,而且工作正常。但是,现在我想添加 ''height'' 和 ''date'' 列,并忽略 date=20210701(因为它错过了两列)。事情变得更诡异了。如果我使用

def check_data(start_date,''grade'',''height'',''date'']
    nan = ''Nan''
    df = spark.read.parquet(''path/MyFolder'').filter(f''height != "{nan}"'')
    df = df.filter(f''date > "{start_date}" and date < "{end_date}"'')
    df = df.select(cols)
    return df

我遇到了这个错误

   Cannot resolve ''height'' given input columns  [name,grade]..... 

我在这里得到的唯一解决方案是遍历所有 parquet 文件夹,然后附加 pyspark 数据框,但这需要额外的时间。

另外,如果我删除date=20210701,问题也解决了,但我就是做不到。

可以分享一下你的想法吗?谢谢。 ?

解决方法

如果缺少单行或少量行的数据,您可以将空值替换为该列的均值/中值。

在这种情况下,您可以添加计算镶木地板中所有高度的中位数,然后为 date=20210701 添加该值。

这样您的数据就不会出现偏差。

此外,中值优于均值,因为一些异常值可能会扭曲平均值。

,

其实解决方法很简单。

  df = spark.read.format(''parquet'').option(''mergeSchema'',''true'').load(path).select(''name'',''grade'',''height'',''date'')

python – pyspark:使用spark-submit运送jar依赖项

python – pyspark:使用spark-submit运送jar依赖项

我写了一个pyspark脚本,它读取两个json文件,coGroup它们并将结果发送到elasticsearch集群;当我在本地运行它时,一切都按预期工作(大部分),我为org.elasticsearch.hadoop.mr.EsOutputFormat和org.elasticsearch.hadoop.mr.LinkedMapWritable类下载了elasticsearch-hadoop jar文件,然后运行我的作业pyspark使用–jars参数,我可以看到弹性搜索集群中出现的文档.

但是,当我尝试在spark集群上运行它时,我收到此错误:

Traceback (most recent call last):
  File "/root/spark/spark_test.py",line 141,in <module>
    conf=es_write_conf
  File "/root/spark/python/pyspark/rdd.py",line 1302,in saveAsNewAPIHadoopFile
    keyConverter,valueConverter,jconf)
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",line 538,in __call__
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",line 300,in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: java.lang.classNotFoundException: org.elasticsearch.hadoop.mr.LinkedMapWritable
    at java.net.urlclassloader$1.run(urlclassloader.java:366)
    at java.net.urlclassloader$1.run(urlclassloader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.urlclassloader.findClass(urlclassloader.java:354)
    at java.lang.classLoader.loadClass(ClassLoader.java:425)
    at java.lang.classLoader.loadClass(ClassLoader.java:358)
    at java.lang.class.forName0(Native Method)
    at java.lang.class.forName(Class.java:274)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:157)
    at org.apache.spark.api.python.PythonRDD$$anonfun$getkeyvalueTypes$1$$anonfun$apply$9.apply(PythonRDD.scala:611)
    at org.apache.spark.api.python.PythonRDD$$anonfun$getkeyvalueTypes$1$$anonfun$apply$9.apply(PythonRDD.scala:610)
    at scala.Option.map(Option.scala:145)
    at org.apache.spark.api.python.PythonRDD$$anonfun$getkeyvalueTypes$1.apply(PythonRDD.scala:610)
    at org.apache.spark.api.python.PythonRDD$$anonfun$getkeyvalueTypes$1.apply(PythonRDD.scala:609)
    at scala.Option.flatMap(Option.scala:170)
    at org.apache.spark.api.python.PythonRDD$.getkeyvalueTypes(PythonRDD.scala:609)
    at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:701)
    at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)

对我来说这似乎很清楚:在工人身上没有弹性研究 – hadoop jar;所以问题是:我如何将它与我的应用程序一起发送?我可以使用sc.addPyFile作为python依赖项,但它不适用于jar,并且使用spark-submit的–jars参数也无济于事.

解决方法

–jars正常工作;问题是我如何首先运行火花提交工作;正确的执行方式是:

./bin/spark-submit <options> scriptname

因此,必须在脚本之前放置–jars选项:

./bin/spark-submit --jars /path/to/my.jar myscript.py

如果您认为这是将参数传递给脚本本身的唯一方法,那么这很明显,因为脚本名称后面的所有内容都将用作脚本的输入参数:

./bin/spark-submit --jars /path/to/my.jar myscript.py --do-magic=true

我们今天的关于如何使用Sparkpyspark编写镶木地板文件?的分享就到这里,谢谢您的阅读,如果想了解更多关于org.apache.spark.SparkException:无效的Spark URL:spark:// HeartbeatReceiver @ xxxx_LPT-324:51380 PySpark、Pandas无法读取在PySpark中创建的实木复合地板文件、pyspark:过滤具有不同列结构的镶木地板文件、python – pyspark:使用spark-submit运送jar依赖项的相关信息,可以在本站进行搜索。

本文标签: