这篇文章主要围绕如何使用来自另一个数据框的新值更新pyspark数据框?和pyspark改变数据类型展开,旨在为您提供一份详细的参考资料。我们将全面介绍如何使用来自另一个数据框的新值更新pyspark
这篇文章主要围绕如何使用来自另一个数据框的新值更新pyspark数据框?和pyspark改变数据类型展开,旨在为您提供一份详细的参考资料。我们将全面介绍如何使用来自另一个数据框的新值更新pyspark数据框?的优缺点,解答pyspark改变数据类型的相关问题,同时也会为您带来Pyspark数据框上的数据透视字符串列、PySpark数据框下推、Python,将另一个数据框的最大值附加到另一个数据框、Python:如何通过保留第一个数据框的信息来合并列上的两个数据框?的实用方法。
本文目录一览:- 如何使用来自另一个数据框的新值更新pyspark数据框?(pyspark改变数据类型)
- Pyspark数据框上的数据透视字符串列
- PySpark数据框下推
- Python,将另一个数据框的最大值附加到另一个数据框
- Python:如何通过保留第一个数据框的信息来合并列上的两个数据框?
如何使用来自另一个数据框的新值更新pyspark数据框?(pyspark改变数据类型)
我有两个Spark数据框:
数据框A:
|col_1 | col_2 | ... | col_n ||val_1 | val_2 | ... | val_n |
和数据框B:
|col_1 | col_2 | ... | col_m ||val_1 | val_2 | ... | val_m |
数据框B可以包含来自数据框A的重复行,更新行和新行。我想在spark中编写操作,在其中可以创建一个新数据框,其中包含数据框A的行以及数据框B的更新行和新行。
我首先创建一个仅包含不可更新列的哈希列。这是唯一的ID。所以我们可以说col1
,并col2
可以改变值(可更新),但是col3,..,coln
是唯一的。我创建了一个哈希函数为hash(col3,..,coln)
:
A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A]))B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))
现在,我想编写一些火花代码,基本上从B中选择哈希值不在A中的 行(因此,新行和更新后的行) ,并将它们与A中的行一起加入新的数据帧中。
pyspark?
编辑:数据框B可以有来自数据框A的额外列,因此无法进行联合。
样例
数据框A:
+-----+-----+|col_1|col_2|+-----+-----+| a| www|| b| eee|| c| rrr|+-----+-----+
数据框B:
+-----+-----+-----+|col_1|col_2|col_3|+-----+-----+-----+| a| wew| 1|| d| yyy| 2|| c| rer| 3|+-----+-----+-----+
结果:数据框C:
+-----+-----+-----+|col_1|col_2|col_3|+-----+-----+-----+| a| wew| 1|| b| eee| null|| c| rer| 3|| d| yyy| 2|+-----+-----+-----+
答案1
小编典典这与用新值更新数据框列密切相关,除了您还想添加数据框B中的行。一种方法是首先执行链接的问题中概述的操作,然后将结果与数据框B合并并删除重复。
例如:
dfA.alias(''a'').join(dfB.alias(''b''), on=[''col_1''], how=''left'')\ .select( ''col_1'', f.when( ~f.isnull(f.col(''b.col_2'')), f.col(''b.col_2'') ).otherwise(f.col(''a.col_2'')).alias(''col_2''), ''b.col_3'' )\ .union(dfB)\ .dropDuplicates()\ .sort(''col_1'')\ .show()#+-----+-----+-----+#|col_1|col_2|col_3|#+-----+-----+-----+#| a| wew| 1|#| b| eee| null|#| c| rer| 3|#| d| yyy| 2|#+-----+-----+-----+
如果您有很多要替换的列并且不想对它们全部进行硬编码,则可以更一般地使用列表推导:
cols_to_update = [''col_2'']dfA.alias(''a'').join(dfB.alias(''b''), on=[''col_1''], how=''left'')\ .select( *[ [''col_1''] + [ f.when( ~f.isnull(f.col(''b.{}''.format(c))), f.col(''b.{}''.format(c)) ).otherwise(f.col(''a.{}''.format(c))).alias(c) for c in cols_to_update ] + [''b.col_3''] ] )\ .union(dfB)\ .dropDuplicates()\ .sort(''col_1'')\ .show()
Pyspark数据框上的数据透视字符串列
我有一个像这样的简单数据框:
rdd = sc.parallelize(
[
(0,"A",223,"201603","PORT"),(0,22,"201602",422,"201601","DOCK"),(1,"B",3213,(2,"C",2321,"DOCK")
]
)
df_data = sqlContext.createDataFrame(rdd,["id","type","cost","date","ship"])
df_data.show()
+---+----+----+------+----+
| id|type|cost| date|ship|
+---+----+----+------+----+
| 0| A| 223|201603|PORT|
| 0| A| 22|201602|PORT|
| 0| A| 422|201601|DOCK|
| 1| B|3213|201602|DOCK|
| 1| B|3213|201601|PORT|
| 2| C|2321|201601|DOCK|
+---+----+----+------+----+
我需要按日期进行调整:
df_data.groupby(df_data.id,df_data.type).pivot("date").avg("cost").show()
+---+----+------+------+------+
| id|type|201601|201602|201603|
+---+----+------+------+------+
| 2| C|2321.0| null| null|
| 0| A| 422.0| 22.0| 223.0|
| 1| B|3213.0|3213.0| null|
+---+----+------+------+------+
一切正常。但是现在我需要对其进行透视,并获得一个非数字列:
df_data.groupby(df_data.id,df_data.type).pivot("date").avg("ship").show()
当然,我会得到一个例外:
AnalysisException: u'"ship" is not a numeric column. Aggregation function can only be applied on a numeric column.;'
我想产生一些东西
+---+----+------+------+------+
| id|type|201601|201602|201603|
+---+----+------+------+------+
| 2| C|DOCK | null| null|
| 0| A| DOCK | PORT| DOCK|
| 1| B|DOCK |PORT | null|
+---+----+------+------+------+
有可能pivot
吗?
PySpark数据框下推
如何解决PySpark数据框下推?
我在sqlServer中有下表
CREATE TABLE test (
id smallint,val,varchar(200)
);
我正在尝试使用Python中的Sparksql的DataFrame API来查询它,
df = sql.read.format("jdbc")
.option("url",url)
.option("query","SELECT * FROM test")
.option("driver",driver)
.load()
我希望能够使用id
将过滤器下推到数据库中的isin()
列,如下所示:
df = df.filter(df[''id''].isin([1,2,3]))
但是,由于id列在数据库中是smallint
,因此谓词不会被下推。如果我尝试使用相同的语法来过滤val
列,则谓词将成功下推。
有人知道是否有可能将smallint
列的谓词从Python下推到数据库吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)
Python,将另一个数据框的最大值附加到另一个数据框
如何解决Python,将另一个数据框的最大值附加到另一个数据框?
作为初学者,我一直无法破解。
我有一个df g1,它包含从0到5的索引以及数字值,例如250、4023、2045、2010、113。 我想将这些值移动到另一个数据帧中,该数据帧具有500k行和列,称为group1。 group1应该与df.g1索引匹配,因此当g1行1的值为250时,该值应该存在于df1且group1 == 1的所有行中。与其他数字类似,因此最终结果为df1且没有此新列中的NAN值,但只有值的多次[50,4023,2045,2010,113]。
很抱歉给我带来麻烦,我是stackoverflow的new = noob,只是尝试学习python。这个论坛对搜索问题有很大帮助!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)
Python:如何通过保留第一个数据框的信息来合并列上的两个数据框?
如何解决Python:如何通过保留第一个数据框的信息来合并列上的两个数据框??
Sample:
df1 = pd.DataFrame({''Name'': [''Tom'', ''Sara'', ''Eva'', ''Jack'', ''Laura''],
''Age'': [34, 18, 44, 27, 30]})
#print (df1)
df3 = df1.copy()
df2 = pd.DataFrame({''Name'': [''Tom'', ''Paul'', ''Eva'', ''Jack'', ''Michelle''],
''Sex'': [''M'', ''M'', ''F'', ''M'', ''F'']})
#print (df2)
使用map由Series
创建人set_index
:
df1[''Sex''] = df1[''Name''].map(df2.set_index(''Name'')[''Sex''])
print (df1)
Name Age Sex
0 Tom 34 M
1 Sara 18 NaN
2 Eva 44 F
3 Jack 27 M
4 Laura 30 NaN
merge左连接的替代解决方案:
df = df3.merge(df2[[''Name'',''Sex'']], on=''Name'', how=''left'')
print (df)
Name Age Sex
0 Tom 34 M
1 Sara 18 NaN
2 Eva 44 F
3 Jack 27 M
4 Laura 30 NaN
如果需要通过多列映射(例如Year和Code),则需要merge左连接:
df1 = pd.DataFrame({''Name'': [''Tom'', ''Sara'', ''Eva'', ''Jack'', ''Laura''],
''Year'':[2000,2003,2003,2004,2007],
''Code'':[1,2,3,4,4],
''Age'': [34, 18, 44, 27, 30]})
print (df1)
Name Year Code Age
0 Tom 2000 1 34
1 Sara 2003 2 18
2 Eva 2003 3 44
3 Jack 2004 4 27
4 Laura 2007 4 30
df2 = pd.DataFrame({''Name'': [''Tom'', ''Paul'', ''Eva'', ''Jack'', ''Michelle''],
''Sex'': [''M'', ''M'', ''F'', ''M'', ''F''],
''Year'':[2001,2003,2003,2004,2007],
''Code'':[1,2,3,5,3],
''Val'':[21,34,23,44,67]})
print (df2)
Name Sex Year Code Val
0 Tom M 2001 1 21
1 Paul M 2003 2 34
2 Eva F 2003 3 23
3 Jack M 2004 5 44
4 Michelle F 2007 3 67
#merge by all columns
df = df1.merge(df2, on=[''Year'',''Code''], how=''left'')
print (df)
Name_x Year Code Age Name_y Sex Val
0 Tom 2000 1 34 NaN NaN NaN
1 Sara 2003 2 18 Paul M 34.0
2 Eva 2003 3 44 Eva F 23.0
3 Jack 2004 4 27 NaN NaN NaN
4 Laura 2007 4 30 NaN NaN NaN
#specified columns - columns for join (Year, Code) need always + appended columns (Val)
df = df1.merge(df2[[''Year'',''Code'', ''Val'']], on=[''Year'',''Code''], how=''left'')
print (df)
Name Year Code Age Val
0 Tom 2000 1 34 NaN
1 Sara 2003 2 18 34.0
2 Eva 2003 3 44 23.0
3 Jack 2004 4 27 NaN
4 Laura 2007 4 30 NaN
如果获取错误map意味着按连接列重复,则在这里Name:
df1 = pd.DataFrame({''Name'': [''Tom'', ''Sara'', ''Eva'', ''Jack'', ''Laura''],
''Age'': [34, 18, 44, 27, 30]})
print (df1)
Name Age
0 Tom 34
1 Sara 18
2 Eva 44
3 Jack 27
4 Laura 30
df3, df4 = df1.copy(), df1.copy()
df2 = pd.DataFrame({''Name'': [''Tom'', ''Tom'', ''Eva'', ''Jack'', ''Michelle''],
''Val'': [1,2,3,4,5]})
print (df2)
Name Val
0 Tom 1 <-duplicated name Tom
1 Tom 2 <-duplicated name Tom
2 Eva 3
3 Jack 4
4 Michelle 5
s = df2.set_index(''Name'')[''Val'']
df1[''New''] = df1[''Name''].map(s)
print (df1)
InvalidindexError
:重新索引仅对唯一值的Index对象有效
解决方案通过删除重复项DataFrame.drop_duplicates,或dict在最后一次重复匹配中使用map by :
#default keep first value
s = df2.drop_duplicates(''Name'').set_index(''Name'')[''Val'']
print (s)
Name
Tom 1
Eva 3
Jack 4
Michelle 5
Name: Val, dtype: int64
df1[''New''] = df1[''Name''].map(s)
print (df1)
Name Age New
0 Tom 34 1.0
1 Sara 18 NaN
2 Eva 44 3.0
3 Jack 27 4.0
4 Laura 30 NaN
#add parameter for keep last value
s = df2.drop_duplicates(''Name'', keep=''last'').set_index(''Name'')[''Val'']
print (s)
Name
Tom 2
Eva 3
Jack 4
Michelle 5
Name: Val, dtype: int64
df3[''New''] = df3[''Name''].map(s)
print (df3)
Name Age New
0 Tom 34 2.0
1 Sara 18 NaN
2 Eva 44 3.0
3 Jack 27 4.0
4 Laura 30 NaN
#map by dictionary
d = dict(zip(df2[''Name''], df2[''Val'']))
print (d)
{''Tom'': 2, ''Eva'': 3, ''Jack'': 4, ''Michelle'': 5}
df4[''New''] = df4[''Name''].map(d)
print (df4)
Name Age New
0 Tom 34 2.0
1 Sara 18 NaN
2 Eva 44 3.0
3 Jack 27 4.0
4 Laura 30 NaN
解决方法
我有两个数据框df1和df2。df1包含人的年龄信息,而df2包含人的性别信息。并非所有人都在里面df1或里面df2
df1
Name Age
0 Tom 34
1 Sara 18
2 Eva 44
3 Jack 27
4 Laura 30
df2
Name Sex
0 Tom M
1 Paul M
2 Eva F
3 Jack M
4 Michelle F
我想有人民的性别的信息df1和设置NaN,如果我没有在这个信息df2。我尝试这样做,df1 = pd.merge(df1,df2,on = ''Name'',how = ''outer'')
但是我保留了一些df2我不想要的信息。
df1
Name Age Sex
0 Tom 34 M
1 Sara 18 NaN
2 Eva 44 F
3 Jack 27 M
4 Laura 30 NaN
关于如何使用来自另一个数据框的新值更新pyspark数据框?和pyspark改变数据类型的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于Pyspark数据框上的数据透视字符串列、PySpark数据框下推、Python,将另一个数据框的最大值附加到另一个数据框、Python:如何通过保留第一个数据框的信息来合并列上的两个数据框?等相关知识的信息别忘了在本站进行查找喔。
本文标签: