在本文中,我们将详细介绍PySparkDataFrames-枚举而不转换为熊猫的方法?的各个方面,并为您提供关于python3枚举的相关解答,同时,我们也将为您带来关于ApacheSparkDataF
在本文中,我们将详细介绍PySpark DataFrames-枚举而不转换为熊猫的方法?的各个方面,并为您提供关于python3枚举的相关解答,同时,我们也将为您带来关于Apache Spark DataFrames 入门指南:操作 DataFrame、Apache Spark DataFrames入门指南:操作DataFrame、pandas dataframe 与 spark dataframe 互相转换(数据类型应该怎么转换呢?)、pandas的DataFrameGroupBy转换为DataFrame的有用知识。
本文目录一览:- PySpark DataFrames-枚举而不转换为熊猫的方法?(python3枚举)
- Apache Spark DataFrames 入门指南:操作 DataFrame
- Apache Spark DataFrames入门指南:操作DataFrame
- pandas dataframe 与 spark dataframe 互相转换(数据类型应该怎么转换呢?)
- pandas的DataFrameGroupBy转换为DataFrame
PySpark DataFrames-枚举而不转换为熊猫的方法?(python3枚举)
我有一个很大的 pyspark.sql.dataframe.DataFrame 名为df。我需要某种枚举记录的方式-
因此,能够访问具有特定索引的记录。(或选择具有索引范围的记录组)
在大熊猫中,我可以
indexes=[2,3,6,7] df[indexes]
在这里我想要类似的东西 (并且不将数据框转换为熊猫)
我最接近的是:
- 通过以下方式枚举原始数据框中的所有对象:
indexes=np.arange(df.count()) df_indexed=df.withColumn(''index'', indexes)
* 使用where()函数搜索所需的值。
问题:
- 为什么它不起作用以及如何使其起作用?如何在数据框中添加一行?
- 以后可以做类似的事情吗:
indexes=[2,3,6,7] df1.where("index in indexes").collect()
- 有没有更快,更简单的处理方法?
答案1
小编典典它不起作用,因为:
- 的第二个参数
withColumn
应该Column
不是一个集合。np.array
在这里不会工作 - 当您将
"index in indexes"
SQL表达式传递给时where
indexes
超出范围,并且不能将其解析为有效标识符
PySpark > = 1.4.0
您可以使用相应的窗口函数添加行号,并使用Column.isin
方法或格式正确的查询字符串进行查询:
from pyspark.sql.functions import col, rowNumber from pyspark.sql.window import Window w = Window.orderBy() indexed = df.withColumn("index", rowNumber().over(w)) # Using DSL indexed.where(col("index").isin(set(indexes))) # Using SQL expression indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))
看起来调用无PARTITION BY
子句的窗口函数会将所有数据移动到单个分区,因此上述毕竟不是最佳解决方案。
有没有更快,更简单的处理方法?
并不是的。Spark DataFrames不支持随机行访问。
PairedRDD``lookup
如果使用进行分区,则可以使用相对较快的方法进行访问HashPartitioner
。还有一个index-
rdd项目,它支持有效的查找。
编辑 :
与PySpark版本无关,您可以尝试执行以下操作:
from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, LongType row = Row("char") row_with_index = Row("char", "index") df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF() df.show(5) ## +----+ ## |char| ## +----+ ## | a| ## | b| ## | c| ## | d| ## | e| ## +----+ ## only showing top 5 rows # This part is not tested but should work and save some work later schema = StructType( df.schema.fields[:] + [StructField("index", LongType(), False)]) indexed = (df.rdd # Extract rdd .zipWithIndex() # Add index .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows .toDF(schema)) # It will work without schema but will be more expensive # inSet in Spark < 1.3 indexed.where(col("index").isin(indexes))
Apache Spark DataFrames 入门指南:操作 DataFrame
文章目录
- 1 二、操作 DataFrame
- 1.1 打印 DataFrame 里面的模式
- 1.2 对 DataFrame 里面的数据进行采样
- 1.3 查询 DataFrame 里面的列
- 1.4 根据条件过滤数据
- 1.5 对 DataFrame 里面的数据进行排序
- 1.6 对列进行重命名
- 1.7 将 DataFrame 看作是关系型数据表
- 1.8 对两个 DataFrame 进行 Join 操作
- 1.9 将 DataFrame 保存成文件
二、操作 DataFrame
在前面的文章中,我们介绍了如何创建 DataFrame。本文将介绍如何操作 DataFrame 里面的数据和打印出 DataFrame 里面数据的模式
打印 DataFrame 里面的模式
在创建完 DataFrame 之后,我们一般都会查看里面数据的模式,我们可以通过 printSchema
函数来查看。它会打印出列的名称和类型:
|
如果采用的是 load 方式参见 DataFrame 的,students.printSchema
的输出则如下:
|
对 DataFrame 里面的数据进行采样
打印完模式之后,我们要做的第二件事就是看看加载进 DataFrame 里面的数据是否正确。从新创建的 DataFrame 里面采样数据的方法有很多种。我们来对其进行介绍。
最简单的就是使用 show 方法,show 方法有四个版本:
(1)、第一个需要我们指定采样的行数 def show(numRows: Int);
(2)、第二种不需要我们指定任何参数,这种情况下,show 函数默认会加载出 20 行的数据 def show();
(3)、第三种需要指定一个 boolean 值,这个值说明是否需要对超过 20 个字符的列进行截取 def show(truncate: Boolean);
(4)、最后一种需要指定采样的行和是否需要对列进行截断 def show(numRows: Int, truncate: Boolean)
。实际上,前三个函数都是调用这个函数实现的。
Show 函数和其他函数不同的地方在于其不仅会显示需要打印的行,而且还会打印出头信息,并且会直接在默认的输出流打出 (console)。来看看怎么使用吧:
|
我们还可以使用 head (n: Int) 方法来采样数据,这个函数也需要输入一个参数标明需要采样的行数,而且这个函数返回的是 Row 数组,我们需要遍历打印。当然,我们也可以使用 head () 函数直接打印,这个函数只是返回数据的一行,类型也是 Row。
|
除了 show、head 函数。我们还可以使用 first 和 take 函数,他们分别调用 head () 和 head (n)
|
查询 DataFrame 里面的列
正如你所看到的,所有的 DataFrame 里面的列都是有名称的。Select 函数可以帮助我们从 DataFrame 中选择需要的列,并且返回一个全新的 DataFrame,下面我将此进行介绍。
1、只选择一列。假如我们只想从 DataFrame 中选择 email 这列,因为 DataFrame 是不可变的 (immutable),所以这个操作会返回一个新的 DataFrame:
|
现在我们有一个名叫 emailDataFrame 全新的 DataFrame,而且其中只包含了 email 这列,让我们使用 show 来看看是否是这样的:
|
2、选择多列。其实 select 函数支持选择多列。
|
需要主要的是,我们 select 列的时候,需要保证 select 的列是有效的,换句话说,就是必须保证 select 的列是 printSchema
打印出来的。如果列的名称是无效的,将会出现 org.apache.spark.sql.AnalysisException
异常,如下:
|
根据条件过滤数据
现在我们已经知道如何在 DataFrame 中选择需要的列,让我们来看看如何根据条件来过滤 DataFrame 里面的数据。对应基于 Row 的数据,我们可以将 DataFrame 看作是普通的 Scala 集合,然后我们根据需要的条件进行相关的过滤,为了展示清楚,我在语句没后面都用 show 函数展示过滤的结果。
|
注意看第一个过滤语句,虽然 id 被解析成 String 了,但是程序依然正确地做出了比较。我们也可以对多个条件进行过滤:
|
我们还可以采用类 SQL 的语法对数据进行过滤:
|
对 DataFrame 里面的数据进行排序
使用 sort 函数我们可以对 DataFrame 中指定的列进行排序:
|
也可以对多列进行排序:
|
从上面的结果我们可以看出,默认是按照升序进行排序的。我们也可以将上面的语句写成下面的:
|
这两个语句运行的效果是一致的。
对列进行重命名
如果我们对 DataFrame 中默认的列名不感兴趣,我们可以在 select 的时候利用 as 对其进行重命名,下面的列子将 studentName
重命名为 name,而 email 这列名字不变:
|
将 DataFrame 看作是关系型数据表
DataFrame 的一个强大之处就是我们可以将它看作是一个关系型数据表,然后在其上运行 SQL 查询语句,只要我们进行下面两步即可实现:
(1)、将 DataFrame 注册成一张名为 students 的表:
|
(2)、然后我们在其上用标准的 SQL 进行查询:
|
对两个 DataFrame 进行 Join 操作
前面我们已经知道如何将 DataFrame 注册成一张表,现在我们来看看如何使用普通的 SQL 对两个 DataFrame 进行 Join 操作。
1、内联:内联是默认的 Join 操作,它仅仅返回两个 DataFrame 都匹配到的结果,来看看下面的例子:
|
2、右外联:在内连接的基础上,还包含右表中所有不符合条件的数据行,并在其中的左表列填写 NULL ,来看看下面的实例:
|
3、左外联:在内连接的基础上,还包含左表中所有不符合条件的数据行,并在其中的右表列填写 NULL ,同样我们来看看下面的实例:
|
将 DataFrame 保存成文件
下面我来介绍如何将 DataFrame 保存到一个文件里面。前面我们加载 csv 文件用到了 load 函数,与之对于的用于保存文件可以使用 save 函数。具体操作包括以下两步:
1、首先创建一个 map 对象,用于存储一些 save 函数需要用到的一些属性。这里我将制定保存文件的存放路径和 csv 的头信息。
|
为了基于学习的态度,我们从 DataFrame 里面选择出 studentName 和 email 两列,并且将 studentName 的列名重定义为 name。
|
2、下面我们调用 save 函数保存上面的 DataFrame 数据到 iteblog.csv 文件夹中
|
mode 函数可以接收的参数有 Overwrite、Append、Ignore 和 ErrorIfExists。从名字就可以很好的理解,Overwrite 代表覆盖目录下之前存在的数据;Append 代表给指定目录下追加数据;Ignore 代表如果目录下已经有文件,那就什么都不执行;ErrorIfExists 代表如果保存目录下存在文件,那么抛出相应的异常。
需要注意的是,上述 path 参数指定的是保存文件夹,并不是最后的保存文件名。
Apache Spark DataFrames入门指南:操作DataFrame
文章目录
- 1 二、操作DataFrame
- 1.1 打印DataFrame里面的模式
- 1.2 对DataFrame里面的数据进行采样
- 1.3 查询DataFrame里面的列
- 1.4 根据条件过滤数据
- 1.5 对DataFrame里面的数据进行排序
- 1.6 对列进行重命名
- 1.7 将DataFrame看作是关系型数据表
- 1.8 对两个DataFrame进行Join操作
- 1.9 将DataFrame保存成文件
二、操作DataFrame
在前面的文章中,我们介绍了如何创建DataFrame。本文将介绍如何操作DataFrame里面的数据和打印出DataFrame里面数据的模式
打印DataFrame里面的模式
在创建完DataFrame之后,我们一般都会查看里面数据的模式,我们可以通过printSchema
函数来查看。它会打印出列的名称和类型:
|
如果采用的是load方式参见DataFrame的,students.printSchema
的输出则如下:
|
对DataFrame里面的数据进行采样
打印完模式之后,我们要做的第二件事就是看看加载进DataFrame里面的数据是否正确。从新创建的DataFrame里面采样数据的方法有很多种。我们来对其进行介绍。
最简单的就是使用show方法,show方法有四个版本:
(1)、第一个需要我们指定采样的行数def show(numRows: Int);
(2)、第二种不需要我们指定任何参数,这种情况下,show函数默认会加载出20行的数据def show();
(3)、第三种需要指定一个boolean值,这个值说明是否需要对超过20个字符的列进行截取def show(truncate: Boolean);
(4)、最后一种需要指定采样的行和是否需要对列进行截断def show(numRows: Int, truncate: Boolean)
。实际上,前三个函数都是调用这个函数实现的。
Show函数和其他函数不同的地方在于其不仅会显示需要打印的行,而且还会打印出头信息,并且会直接在默认的输出流打出(console)。来看看怎么使用吧:
|
我们还可以使用head(n: Int)方法来采样数据,这个函数也需要输入一个参数标明需要采样的行数,而且这个函数返回的是Row数组,我们需要遍历打印。当然,我们也可以使用head()函数直接打印,这个函数只是返回数据的一行,类型也是Row。
|
除了show、head函数。我们还可以使用first和take函数,他们分别调用head()和head(n)
|
查询DataFrame里面的列
正如你所看到的,所有的DataFrame里面的列都是有名称的。Select函数可以帮助我们从DataFrame中选择需要的列,并且返回一个全新的DataFrame,下面我将此进行介绍。
1、只选择一列。假如我们只想从DataFrame中选择email这列,因为DataFrame是不可变的(immutable),所以这个操作会返回一个新的DataFrame:
|
现在我们有一个名叫emailDataFrame全新的DataFrame,而且其中只包含了email这列,让我们使用show来看看是否是这样的:
|
2、选择多列。其实select函数支持选择多列。
|
需要主要的是,我们select列的时候,需要保证select的列是有效的,换句话说,就是必须保证select的列是printSchema
打印出来的。如果列的名称是无效的,将会出现org.apache.spark.sql.AnalysisException
异常,如下:
|
根据条件过滤数据
现在我们已经知道如何在DataFrame中选择需要的列,让我们来看看如何根据条件来过滤DataFrame里面的数据。对应基于Row的数据,我们可以将DataFrame看作是普通的Scala集合,然后我们根据需要的条件进行相关的过滤,为了展示清楚,我在语句没后面都用show函数展示过滤的结果。
|
注意看第一个过滤语句,虽然id被解析成String了,但是程序依然正确地做出了比较。我们也可以对多个条件进行过滤:
|
我们还可以采用类SQL的语法对数据进行过滤:
|
对DataFrame里面的数据进行排序
使用sort函数我们可以对DataFrame中指定的列进行排序:
|
也可以对多列进行排序:
|
从上面的结果我们可以看出,默认是按照升序进行排序的。我们也可以将上面的语句写成下面的:
|
这两个语句运行的效果是一致的。
对列进行重命名
如果我们对DataFrame中默认的列名不感兴趣,我们可以在select的时候利用as对其进行重命名,下面的列子将studentName
重命名为name,而email这列名字不变:
|
将DataFrame看作是关系型数据表
DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后在其上运行SQL查询语句,只要我们进行下面两步即可实现:
(1)、将DataFrame注册成一张名为students的表:
|
(2)、然后我们在其上用标准的SQL进行查询:
|
对两个DataFrame进行Join操作
前面我们已经知道如何将DataFrame注册成一张表,现在我们来看看如何使用普通的SQL对两个DataFrame进行Join操作。
1、内联:内联是默认的Join操作,它仅仅返回两个DataFrame都匹配到的结果,来看看下面的例子:
|
2、右外联:在内连接的基础上,还包含右表中所有不符合条件的数据行,并在其中的左表列填写NULL ,来看看下面的实例:
|
3、左外联:在内连接的基础上,还包含左表中所有不符合条件的数据行,并在其中的右表列填写NULL ,同样我们来看看下面的实例:
|
将DataFrame保存成文件
下面我来介绍如何将DataFrame保存到一个文件里面。前面我们加载csv文件用到了load函数,与之对于的用于保存文件可以使用save函数。具体操作包括以下两步:
1、首先创建一个map对象,用于存储一些save函数需要用到的一些属性。这里我将制定保存文件的存放路径和csv的头信息。
|
为了基于学习的态度,我们从DataFrame里面选择出studentName和email两列,并且将studentName的列名重定义为name。
|
2、下面我们调用save函数保存上面的DataFrame数据到iteblog.csv文件夹中
|
mode函数可以接收的参数有Overwrite、Append、Ignore和ErrorIfExists。从名字就可以很好的理解,Overwrite代表覆盖目录下之前存在的数据;Append代表给指定目录下追加数据;Ignore代表如果目录下已经有文件,那就什么都不执行;ErrorIfExists代表如果保存目录下存在文件,那么抛出相应的异常。
需要注意的是,上述path参数指定的是保存文件夹,并不是最后的保存文件名。
pandas dataframe 与 spark dataframe 互相转换(数据类型应该怎么转换呢?)
文章大纲
- spark 2.x 版本
- spark 3.2 版本及以上
- 参考文献
spark 2.x 版本
spark 2.4.8 版本:
- https://spark.apache.org/docs/2.4.8/api/python/_modules/pyspark/sql/dataframe.html#DataFrame.toPandas
源代码:
@since(1.3)
def toPandas(self):
"""
Returns the contents of this :class:`DataFrame
本文同步分享在 博客“shiter”(CSDN)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。
pandas的DataFrameGroupBy转换为DataFrame
import pandas as pd df = pd.DataFrame({''A'': [''a'', ''b'', ''c'', ''c'', ''b'', ''a'', ''a'', ''b''], ''B'': [1, 2, 3, 4, 5, 6, 7, 8], ''C'': [11, 12, 13, 14, 15, 16, 17, 18]}) print(df) mean = df.groupby(''A'').mean() print(mean) g = df.groupby(''A'').mean() g[''B''].mean() # 仅选择B列 b__mean = df.groupby([''A'', ''B'']).mean() print(b__mean) print(b__mean[''C''].values.tolist()) b__mean = b__mean.reset_index() print(b__mean)
reset_index()重新建立索引,就能变成DataFrame类型
我们今天的关于PySpark DataFrames-枚举而不转换为熊猫的方法?和python3枚举的分享已经告一段落,感谢您的关注,如果您想了解更多关于Apache Spark DataFrames 入门指南:操作 DataFrame、Apache Spark DataFrames入门指南:操作DataFrame、pandas dataframe 与 spark dataframe 互相转换(数据类型应该怎么转换呢?)、pandas的DataFrameGroupBy转换为DataFrame的相关信息,请在本站查询。
本文标签: