GVKun编程网logo

Python-使用Spark将列转置为行(python将一列数据转换成向量)

16

在本文中,您将会了解到关于Python-使用Spark将列转置为行的新资讯,同时我们还将为您解释python将一列数据转换成向量的相关在本文中,我们将带你探索Python-使用Spark将列转置为行的

在本文中,您将会了解到关于Python-使用Spark将列转置为行的新资讯,同时我们还将为您解释python将一列数据转换成向量的相关在本文中,我们将带你探索Python-使用Spark将列转置为行的奥秘,分析python将一列数据转换成向量的特点,并给出一些关于Javascript将Json数据从列转置为行、PySpark - 使用 Python 在 Spark 上编程、PySpark Dataframe 将列转换为行、Pyspark将列的类型从日期更改为字符串的实用技巧。

本文目录一览:

Python-使用Spark将列转置为行(python将一列数据转换成向量)

Python-使用Spark将列转置为行(python将一列数据转换成向量)

我正在尝试将表的某些列转置为行。我正在使用Python和Spark 1.5.0。这是我的初始表:

+-----+-----+-----+-------+|  A  |col_1|col_2|col_...|+-----+-------------------+|  1  |  0.0|  0.6|  ...  ||  2  |  0.6|  0.7|  ...  ||  3  |  0.5|  0.9|  ...  ||  ...|  ...|  ...|  ...  |

我想要这样的东西:

+-----+--------+-----------+|  A  | col_id | col_value |+-----+--------+-----------+|  1  |   col_1|        0.0||  1  |   col_2|        0.6|   |  ...|     ...|        ...|    |  2  |   col_1|        0.6||  2  |   col_2|        0.7| |  ...|     ...|        ...|  |  3  |   col_1|        0.5||  3  |   col_2|        0.9||  ...|     ...|        ...|

有人知道我能做到吗?谢谢你的帮助。

答案1

小编典典

使用基本的Spark SQL函数相对简单。

python

from pyspark.sql.functions import array, col, explode, struct, litdf = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])def to_long(df, by):    # Filter dtypes and split into column names and type description    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))    # Spark SQL supports only homogeneous columns    assert len(set(dtypes)) == 1, "All columns have to be of the same type"    # Create and explode an array of (column_name, column_value) structs    kvs = explode(array([      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols    ])).alias("kvs")    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])to_long(df, ["A"])

Scala:

import org.apache.spark.sql.DataFrameimport org.apache.spark.sql.functions.{array, col, explode, lit, struct}val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")def toLong(df: DataFrame, by: Seq[String]): DataFrame = {  val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip  require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")        val kvs = explode(array(    cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*  ))  val byExprs = by.map(col(_))  df    .select(byExprs :+ kvs.alias("_kvs"): _*)    .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)}toLong(df, Seq("A"))

Javascript将Json数据从列转置为行

Javascript将Json数据从列转置为行

如何解决Javascript将Json数据从列转置为行?

我正尝试与此问题相反:Transposing JSON data

我想将当前按列拆分的Json对象转换为按行拆分的Json对象。

我从每列的Json拆分开始:

    d : 
       [
          0: {
             Mon01: "03/2015",Mon02: "1,0",Mon03: "2,Mon04: "3,Mon05: "",Mon06: "",},1: {
             Mon01: "04/2015",Mon02: "3,Mon03: "4,Mon04: "11,2: {
             Mon01: "05/2015",Mon02: "5,Mon03: "6,Mon04: "12,}
       ]

我试图将其转置为一行:

d : 
   [
      0: {
         Mon01: "03/2015",Mon02: "04/2015",Mon03: "05/2015",1: {
         Mon01: "1,Mon03: "5,2: {
         Mon01: "2,Mon02: "4,Mon04: "",3: {
         Mon01: "10,Mon02: "11,Mon03: "12,}

预先感谢您的帮助。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

PySpark - 使用 Python 在 Spark 上编程

PySpark - 使用 Python 在 Spark 上编程

Python Programming Guide

The Spark Python API (PySpark) exposes the Spark programming model to Python. To learn the basics of Spark, we recommend reading through theScala programming guide first; it should be easy to follow even if you don’t know Scala. This guide will show how to use the Spark features described there in Python.

Key Differences in the Python API

There are a few key differences between the Python and Scala APIs:

  • Python is dynamically typed, so RDDs can hold objects of multiple types.

  • PySpark does not yet support a few API calls, such as lookup and non-text input files, though these will be added in future releases.

In PySpark, RDDs support the same methods as their Scala counterparts but take Python functions and return Python collection types. Short functions can be passed to RDD methods using Python’s lambda syntax:

logData = sc.textFile(logFile).cache()
errors = logData.filter(lambda line: "ERROR" in line)

You can also pass functions that are defined with the def keyword; this is useful for longer functions that can’t be expressed using lambda:

def is_error(line):
    return "ERROR" in lineerrors = logData.filter(is_error)

Functions can access objects in enclosing scopes, although modifications to those objects within RDD methods will not be propagated back:

error_keywords = ["Exception""Error"]

def is_error(line):
    return any(keyword in line for keyword in error_keywords)

errors = logData.filter(is_error)

PySpark will automatically ship these functions to workers, along with any objects that they reference. Instances of classes will be serialized and shipped to workers by PySpark, but classes themselves cannot be automatically distributed to workers. The Standalone Use section describes how to ship code dependencies to workers.

In addition, PySpark fully supports interactive use—simply run ./bin/pyspark to launch an interactive shell.

Installing and Configuring PySpark

PySpark requires Python 2.6 or higher. PySpark applications are executed using a standard CPython interpreter in order to support Python modules that use C extensions. We have not tested PySpark with Python 3 or with alternative Python interpreters, such as PyPy or Jython.

By default, PySpark requires python to be available on the system PATH and use it to run programs; an alternate Python executable may be specified by setting the PYSPARK_PYTHON environment variable in conf/spark-env.sh (or .cmd on Windows).

All of PySpark’s library dependencies, including Py4J, are bundled with PySpark and automatically imported.

Standalone PySpark applications should be run using the bin/pyspark script, which automatically configures the Java and Python environment using the settings in conf/spark-env.sh or .cmd. The script automatically adds the bin/pyspark package to the PYTHONPATH.

Interactive Use

The bin/pyspark script launches a Python interpreter that is configured to run PySpark applications. To use pyspark interactively, first build Spark, then launch it directly from the command line without any options:

$ sbt/sbt assembly
$ ./bin/pyspark

The Python shell can be used explore data interactively and is a simple way to learn the API:

>>> words = sc.textFile("/usr/share/dict/words")

>>> words.filter(lambda w: w.startswith("spar")).take(5)
[u''spar''u''sparable''u''sparada''u''sparadrap''u''sparagrass'']

>>> help(pyspark) # Show all pyspark functions

By default, the bin/pyspark shell creates SparkContext that runs applications locally on a single core. To connect to a non-local cluster, or use multiple cores, set the MASTER environment variable. For example, to use the bin/pyspark shell with a standalone Spark cluster:

$ MASTER=spark://IP:PORT ./bin/pyspark

Or, to use four cores on the local machine:

$ MASTER=local[4] ./bin/pyspark

IPython

注意:此部分已过时,请参考 http://www.jupyter.org/

It is also possible to launch PySpark in IPython, the enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To use IPython, set the IPYTHON variable to 1 when running bin/pyspark:

$ IPYTHON=1 ./bin/pyspark

Alternatively, you can customize the ipython command by setting IPYTHON_OPTS. For example, to launch the IPython Notebook with PyLab graphing support:

$ IPYTHON_OPTS="notebook --pylab inline" ./bin/pyspark

IPython also works on a cluster or on multiple cores if you set the MASTER environment variable.

Standalone Programs

PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using bin/pyspark. The Quick Start guide includes a complete example of a standalone Python application.

Code dependencies can be deployed by listing them in the pyFiles option in the SparkContext constructor:

from pyspark import SparkContext
sc = SparkContext("local""App Name", pyFiles=[''MyFile.py''''lib.zip''''app.egg''])

Files listed here will be added to the PYTHONPATH and shipped to remote worker machines. Code dependencies can be added to an existing SparkContext using its addPyFile() method.

You can set configuration properties by passing aSparkConf object to SparkContext:

from pyspark import SparkConf, SparkContext

conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory""1g"))
sc = SparkContext(conf = conf)

API Docs

API documentation for PySpark is available as Epydoc. Many of the methods also contain doctests that provide additional usage examples.

Libraries

MLlib is also available in PySpark. To use it, you’ll needNumPy version 1.7 or newer, and Python 2.7. The MLlib guide contains some example applications.

Where to Go from Here

PySpark also includes several sample programs in the python/examples folder. You can run them by passing the files to pyspark; e.g.:

./bin/pyspark python/examples/wordcount.py

Each program prints usage help when run without arguments.


PySpark Dataframe 将列转换为行

PySpark Dataframe 将列转换为行

如何解决PySpark Dataframe 将列转换为行?

我有以下数据框

REC_DATA = spark.createDataFrame(
    [
      (''exercise'',''fiber'',''rice'',''male''),(''exercise'',''female''),''water'',''exercise'',],StructType(
        [
            StructField("1_rec",StringType(),False),StructField("2_rec",StructField("3_rec",StructField("sex",True),]
    )
)
1_rec 2_rec 3_rec 性别
练习 光纤 米饭 男性
练习 米饭 光纤 女性
练习 光纤 男性
米饭 练习 女性

我试图将这些行分组到一个新列中,将列 1_rec、2_rec、3_rec 转换为行,并添加一个带有数量的新列,输出应该是这样的:

位置 名称 计数
1_rec 练习 3
1_rec 1
2_rec 1
2_rec 米饭 2
2_rec 光纤 1
3_rec 米饭 1
3_rec 光纤 2
3_rec 练习 1

我曾尝试做一个交叉表,但它没有正常工作。

解决方法

使用 stack 对列进行分类,然后按 positionname

将它们分组
import pyspark.sql.functions as F

REC_DATA = (REC_DATA
            .selectExpr("stack(3,''1_rec'',1_rec,''2_rec'',2_rec,''3_rec'',3_rec) (position,name)")
            .groupBy(''position'',''name'')
            .agg(F.count("*").alias(''count'')))
REC_DATA.show()

+--------+--------+-----+
|position|    name|count|
+--------+--------+-----+
|   1_rec|   water|    1|
|   2_rec|    rice|    2|
|   3_rec|exercise|    1|
|   3_rec|   fiber|    2|
|   2_rec|   water|    1|
|   3_rec|    rice|    1|
|   1_rec|exercise|    3|
|   2_rec|   fiber|    1|
+--------+--------+-----+

Pyspark将列的类型从日期更改为字符串

Pyspark将列的类型从日期更改为字符串

我有以下数据框:

corr_temp_df[(''vacationdate'', ''date''), (''valueE'', ''string''), (''valueD'', ''string''), (''valueC'', ''string''), (''valueB'', ''string''), (''valueA'', ''string'')]

现在,我想将Vacationdate列的数据类型更改为String,以便数据框也采用这种新类型并覆盖所有条目的数据类型数据。例如写后:

corr_temp_df.dtypes

Vacationdate的数据类型应被覆盖。

我已经使用过诸如cast,StringType或astype之类的函数,但是我没有成功。你知道怎么做吗?

答案1

小编典典

让我们创建一些虚拟数据:

import datetimefrom pyspark.sql import Rowfrom pyspark.sql.functions import colrow = Row("vacationdate")df = sc.parallelize([    row(datetime.date(2015, 10, 07)),    row(datetime.date(1971, 01, 01))]).toDF()

如果Spark> = 1.5.0,则可以使用以下date_format功能:

from pyspark.sql.functions import date_format(df   .select(date_format(col("vacationdate"), "dd-MM-YYYY")   .alias("date_string"))   .show())

在Spark <1.5.0中,可以使用Hive UDF完成:

df.registerTempTable("df")sqlContext.sql(    "SELECT date_format(vacationdate, ''dd-MM-YYYY'') AS date_string FROM df")

当然,在Spark> = 1.5.0中仍然可用。

如果不使用HiveContext,则可以date_format使用UDF进行模拟:

from pyspark.sql.functions import udf, litmy_date_format = udf(lambda d, fmt: d.strftime(fmt))df.select(    my_date_format(col("vacationdate"), lit("%d-%m-%Y")).alias("date_string")).show()

请注意,它使用的是C标准格式,而不是Java简单的日期格式

关于Python-使用Spark将列转置为行python将一列数据转换成向量的问题我们已经讲解完毕,感谢您的阅读,如果还想了解更多关于Javascript将Json数据从列转置为行、PySpark - 使用 Python 在 Spark 上编程、PySpark Dataframe 将列转换为行、Pyspark将列的类型从日期更改为字符串等相关内容,可以在本站寻找。

本文标签: