GVKun编程网logo

Pyspark - 在两个 String 列上合并导致错误 - 无法转换为 struct(pyspark 字符串拼接)

8

想了解Pyspark-在两个String列上合并导致错误-无法转换为struct的新动态吗?本文将为您提供详细的信息,我们还将为您解答关于pyspark字符串拼接的相关问题,此外,我们还将为您介绍关于

想了解Pyspark - 在两个 String 列上合并导致错误 - 无法转换为 struct的新动态吗?本文将为您提供详细的信息,我们还将为您解答关于pyspark 字符串拼接的相关问题,此外,我们还将为您介绍关于-Spark Scala Mongodb- MongoTypeConversionException 无法将 STRING 转换为 StructType(...)、Apache Spark™ 2.0: MQTT as a Source for Structured Streaming、beego.Controller.ParseForm () 方法没法转换成 struct 的对象、java – Spark Strutured Streaming自动将时间戳转换为本地时间的新知识。

本文目录一览:

Pyspark - 在两个 String 列上合并导致错误 - 无法转换为 struct(pyspark 字符串拼接)

Pyspark - 在两个 String 列上合并导致错误 - 无法转换为 struct(pyspark 字符串拼接)

如何解决Pyspark - 在两个 String 列上合并导致错误 - 无法转换为 struct?

我有两个具有以下架构的数据框

api_log_user_df

   root
 |-- action: string (nullable = true)
 |-- battleID: string (nullable = true)
 |-- chosenGrade: string (nullable = true)
 |-- classCode: string (nullable = true)
 |-- curriculumTreeID: string (nullable = true)
 |-- details: struct (nullable = true)
 |    |-- extensions: struct (nullable = true)
 |    |    |-- code: string (nullable = true)
 |    |    |-- exception: struct (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |-- locations: struct (nullable = true)
 |    |    |-- column: long (nullable = true)
 |    |    |-- line: long (nullable = true)
 |    |-- message: string (nullable = true)
 |    |-- path: string (nullable = true)
 |-- duration: long (nullable = true)
 |-- env: string (nullable = true)
 |-- err: struct (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- message: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- stack: string (nullable = true)
 |-- error: struct (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- message: string (nullable = true)
 |    |-- requestId: string (nullable = true)
 |    |-- retryDelay: float (nullable = true)
 |    |-- retryable: boolean (nullable = true)
 |    |-- statusCode: long (nullable = true)
 |    |-- time: timestamp (nullable = true)
 |-- giftBoxID: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- hostname: string (nullable = true)
 |-- key: long (nullable = true)
 |-- level: long (nullable = true)
 |-- message: string (nullable = true)
 |-- msg: string (nullable = true)
 |-- name: string (nullable = true)
 |-- parameters: struct (nullable = true)
 |    |-- DAPROPS: string (nullable = true)
 |    |-- userAgent: string (nullable = true)
 |-- pid: long (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- ExpressionAttributeValues: struct (nullable = true)
 |    |    |-- :giftBoxValue: struct (nullable = true)
 |    |    |    |-- M: struct (nullable = true)
 |    |    |    |    |-- ID: struct (nullable = true)
 |    |    |    |    |    |-- N: string (nullable = true)
 |    |    |    |    |-- isOpen: struct (nullable = true)
 |    |    |    |    |    |-- BOOL: boolean (nullable = true)
 |    |-- Key: struct (nullable = true)
 |    |    |-- id: struct (nullable = true)
 |    |    |    |-- N: string (nullable = true)
 |    |-- ReturnValues: string (nullable = true)
 |    |-- TableName: string (nullable = true)
 |    |-- UpdateExpression: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- userID: string (nullable = true)
 |-- v: long (nullable = true)
 |-- _Metadata: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- calendar_date: date (nullable = true)
 |-- _id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- FailedRules: struct (nullable = true)
 |    |-- characterSize: struct (nullable = true)
 |    |    |-- message: string (nullable = true)
 |    |    |-- property: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |    |-- gold: struct (nullable = true)
 |    |    |-- message: string (nullable = true)
 |    |    |-- property: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |    |-- level: struct (nullable = true)
 |    |    |-- message: string (nullable = true)
 |    |    |-- property: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |    |-- orbTiers: struct (nullable = true)
 |    |    |-- message: string (nullable = true)
 |    |    |-- property: string (nullable = true)
 |    |    |-- value: struct (nullable = true)
 |    |    |    |-- orb: long (nullable = true)
 |    |    |    |-- orbTier: long (nullable = true)
 |    |    |    |-- tier: long (nullable = true)
 |    |-- stars: struct (nullable = true)
 |    |    |-- message: string (nullable = true)
 |    |    |-- property: string (nullable = true)
 |    |    |-- value: string (nullable = true)

api_log_alt_user_df

_id:string
alt_userID:string

我正在尝试像下面那样加入这两个 DF

api_log_df = (
    api_log_user_df.join(api_log_alt_user_df,on="_id",how="left")
    .drop("_Metadata")
    .withColumn(
        "__user_id",F.coalesce(
            F.coalesce(F.col("userID"),F.col("user_id").cast("string")),F.col("alt_userID"),),)
)

我收到如下错误消息,但我不明白为什么?这里的所有列都是 (coalesce) 数据类型字符串。我不确定如何处理此错误以进行调试。

org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 11162.0 Failed 4 times,most recent failure: Lost task 13.3 in stage 11162.0 (TID 10779,10.88.249.71,executor 1): java.lang.IllegalArgumentException: The value ([]) of the type (java.util.ArrayList) cannot be converted to struct<code:string,exception:struct<type:string>,type:string>
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)

仅在某些日子每天运行时不会出现此错误,但我无法找出可能导致该错误的数据类型。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

-Spark Scala Mongodb- MongoTypeConversionException 无法将 STRING 转换为 StructType(...)

-Spark Scala Mongodb- MongoTypeConversionException 无法将 STRING 转换为 StructType(...)

如何解决-Spark Scala Mongodb- MongoTypeConversionException 无法将 STRING 转换为 StructType(...)?

非常感谢任何帮助。

我正在尝试使用来自 mongodb 的数据构建一个数据框。

val spark = SparkSession.builder()
      .master("local")
      .appName("app")
      .config("spark.mongodb.input.uri",uri)
      .config("spark.mongodb.input.collection","collectionName")
      .config("spark.mongodb.input.readPreference.name","secondary")
      .getorCreate()

val df = MongoSpark.load(spark).limit(1)

然后我尝试逐行读取元素,数据框的架构如下所示:

root
 |-- A: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- B: boolean (nullable = true)
 |-- C: string (nullable = true)
 |-- D: string (nullable = true)
 |-- E: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = true)
 |    |    |-- d: string (nullable = true)

如果数据框不包含 E,dataframe.show() 会打印出来就好了。

但是,如果数据框确实包含 E,那么 dataframe.show() 会给我

Cannot cast STRING into a StructType(StructField(a,StringType,true),StructField(b,StructField(c,StructField(d,true)) (value: BsonString{value=''http://...some url...''})

我尝试了几乎所有与 stackoverflow 上列出的这个问题相关的解决方案,但我仍然没有通过这个错误。

我应该如何解决这个问题?谢谢!

解决方法

E 实际上是一个包含多个字符串的对象数组。

example of mongodb document

Apache Spark™ 2.0: MQTT as a Source for Structured Streaming

Apache Spark™ 2.0: MQTT as a Source for Structured Streaming

Apache Spark™ 2.0: MQTT as a Source for Structured Streaming

 

Apache Spark™ 2.0 introduced Structured Streaming as an alpha release and Spark contributors have already created a library for reading data from MQTT Servers that allows Spark users to process Internet-of-Things data. Specifically, the library allows developers to create a SQL Stream with data received through an MQTT server using sqlContext.readstream.

The work is being done in Apache Bahir™ which "provides extensions to distributed analytic platforms such as Apache Spark". For more detail about Bahir and to get started with MQTT Structured Streaming, check out this step-by-step post from Luciano Resende.

What is MQTT?

For those who might be unfamiliar with MQTT (Message Queuing Telemetry Transport), the mqtt.org site defines it as "a machine-to-machine (M2M)/Internet of Things connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is a standard at OASIS-mqtt-v3.1.1"

MQTT has the potential to be a good fit for Spark because of its efficiency, low networking overhead, and a message protocol governed by Quality of Service (QoS) levels that establish different delivery guarantees. Originally authored in 1999, MQTT is already in wide use and its various implementations feature easy distribution, high performance, and high availability.

High throughput and fault tolerance for structured streaming sources

Currently, Spark provides fault tolerance in the sense that a streaming source can reliably replay the entire sequence of incoming messages. This is true of streaming sources like Kafka, S3, HDFS, and even local filesystems. However, it may not be the case with MQTT. (And it is definitely not the case with native socket-based sources.)

For now, all of the available sources in structured streaming are backed by a filesystem (S3, HDFS etc.). In these cases, all the executors can request offsets from the filesystem sources and process data in parallel.

However in cases where streaming sources have pub-sub as the only option for receiving the stream, supporting high throughput becomes a challenge. MQTT does not currently support reading in parallel and thus does not support high throughput.

Most message queues including MQTT lack built-in support for replaying the entire sequence of message. That deficiency might not be an issue since not all work loads require the entire sequence be retained on the server. If so, there will ideally be a limit on when a source can purge stored data without compromising any "exactly once" delivery guarantees. (Note that this is an outstanding issue. See SPARK-16963.)

For our current release, we implement a minimal fault tolerance guarantee by storing all incoming messages locally on the disk. This represents an intermediate solution, as it poses a limit on how many messages it can process.

For source code and documentation, visit the related github repository, or visit the Bahir website to learn about other extensions available within the project.

beego.Controller.ParseForm () 方法没法转换成 struct 的对象

beego.Controller.ParseForm () 方法没法转换成 struct 的对象

beego.Controller.ParseForm () 方法没法转换成 struct 的对象,网上找了各种资料 用法如下 func (this *OrgnizationController) DoAddOrg () { org := models.Orgnization {} if err := this.ParseForm (&org); err != nil {         fmt.Println (err)     }     fmt.Println ("转换后的值:" + org.Mastername + org.Orgname) } } 打印的结果为空,不知道哪位大侠能解答一下...

java – Spark Strutured Streaming自动将时间戳转换为本地时间

java – Spark Strutured Streaming自动将时间戳转换为本地时间

我有UTC和ISO8601的时间戳,但使用结构化流,它会自动转换为本地时间.有没有办法阻止这种转换?我想在UTC中使用它.

我正在从Kafka读取json数据,然后使用from_json Spark函数解析它们.

输入:

{"Timestamp":"2015-01-01T00:00:06.222Z"}

流:

SparkSession
  .builder()
  .master("local[*]")
  .appName("my-app")
  .getorCreate()
  .readStream()
  .format("kafka")
  ... //some magic
  .writeStream()
  .format("console")
  .start()
  .awaitTermination();

架构:

StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField("Timestamp",DataTypes.TimestampType,true),});

输出:

+--------------------+
|           Timestamp|
+--------------------+
|2015-01-01 01:00:...|
|2015-01-01 01:00:...|
+--------------------+

如您所见,小时数自动增加.

PS:我试着尝试使用from_utc_timestamp Spark函数,但没有运气.

解决方法

对我来说它起作用了:
spark.conf.set("spark.sql.session.timeZone","UTC")

它告诉spark sql使用UTC作为时间戳的默认时区.我在spark sql中使用它,例如:

select *,cast('2017-01-01 10:10:10' as timestamp) from soMetable

我知道它在2.0.1中不起作用.但适用于Spark 2.2.我在sqlTransformer中也使用过它.

我不确定流媒体.

关于Pyspark - 在两个 String 列上合并导致错误 - 无法转换为 structpyspark 字符串拼接的问题我们已经讲解完毕,感谢您的阅读,如果还想了解更多关于-Spark Scala Mongodb- MongoTypeConversionException 无法将 STRING 转换为 StructType(...)、Apache Spark™ 2.0: MQTT as a Source for Structured Streaming、beego.Controller.ParseForm () 方法没法转换成 struct 的对象、java – Spark Strutured Streaming自动将时间戳转换为本地时间等相关内容,可以在本站寻找。

本文标签: