GVKun编程网logo

适用于Python和Scala的自定义Spark软件包(spark 自定义rdd)

25

本文将为您提供关于适用于Python和Scala的自定义Spark软件包的详细介绍,我们还将为您解释spark自定义rdd的相关知识,同时,我们还将为您提供关于Cloudera的CCA175对pyth

本文将为您提供关于适用于Python和Scala的自定义Spark软件包的详细介绍,我们还将为您解释spark 自定义rdd的相关知识,同时,我们还将为您提供关于Cloudera的CCA175 对python和scala的要求、GQL仅适用于Python项目而不适用于Java?、java,scala,python,spark,hadoop,local模式测试代码,idea,jdk1.8,scla2.11,python2.7,spark-2.4.3-bin-hadoop2.7、Jupyter Notebook Python, Scala, R, Spark, Mesos的实用信息。

本文目录一览:

适用于Python和Scala的自定义Spark软件包(spark 自定义rdd)

适用于Python和Scala的自定义Spark软件包(spark 自定义rdd)

如何解决适用于Python和Scala的自定义Spark软件包?

我可以为Python和Scala(wheel和Jar)成功地独立创建软件包,然后将其上传到Databricks。但是,我想创建一个可同时用于Python和Scala的程序包。我注意到Databricks中的某些Maven软件包可以立即执行此操作,这怎么可能?

我假设软件包具有标准的结构,并且将Python和Scala的代码风格分开了?

我尝试过在线查找示例,但并不高兴

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

Cloudera的CCA175 对python和scala的要求

Cloudera的CCA175 对python和scala的要求

准备考一个cca175,目前只是做java的,不知道考试对python和scala的掌握程度是什么样?

GQL仅适用于Python项目而不适用于Java?

GQL仅适用于Python项目而不适用于Java?

在Google App Engine中,GQL(类似sql的数据存储查询机制)仅适用于Python项目,而不适用于Java项目.为什么会这样?

还有它在那里克服这个并在Java项目中使用GQL也?

最佳答案
GQL不在Java实现中,作为替代,您将使用Query对象来构建数据存储区查询.

如果您正在寻找Java的GQL实现,gql4j运行良好.我在一个小数据集上测试它以确保它可以工作,但我还没有在项目中使用它.

http://code.google.com/p/gql4j/

java,scala,python,spark,hadoop,local模式测试代码,idea,jdk1.8,scla2.11,python2.7,spark-2.4.3-bin-hadoop2.7

java,scala,python,spark,hadoop,local模式测试代码,idea,jdk1.8,scla2.11,python2.7,spark-2.4.3-bin-hadoop2.7

/*scala test*/
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordCount {
  def main(args: Array[String]): Unit = {
    println("start...")
    /**
      * 第一步:创建Spark的配置对象SparkConf,设置Spark程序运行时的配置信息,
      * 例如说通过设置setMaster来设置程序要链接的Spark集群的Master的URL,
      * 如果设置为local,则代表Spark程序在本地运行。
      */
    val conf = new SparkConf //创建SparkConf对象
    conf.setAppName("wordCount") //设置应用程序的名称,在程序运行的监控界面可以看到名称
    conf.setMaster("local") //此时,程序在本地运行,不需要安装Spark集群

    /**
      * 第二步:创建SparkContext对象
      * SparkContext是Spark程序所有功能的唯一入口,无论是采用scala、java、Python,R等都
      * 必须有一个SparkContext。SparkContext核心作用:初始化Spark应用程序运行所需要的核心组件,包括
      * DAGScheduler,TaskScheduler、SchedulerBackend同时还会负责Spark程序往Master注册程序等。
      * SparkContext是这个Spark程序中最为至关重要的一个对象。
      */
    val sc = new SparkContext(conf)

    /**
      * 第三步:根据具体的数据源(HDFS、HBase、Local FS、DB、S3等)通过SparkContext创建RDD。
      * RDD的创建方式有三种:根据外部的数据源(HDFS)、根据Scala集合、其他的RDD操作。数据会被RDD划分成一系列的
      * Partitions,分配到每个Partition的数据属于一个Task的处理范畴
      */
    val lines = sc.textFile("D://data//2.txt", 1)//

    /**
      * 第四步:对初始化的RDD进行Transformation级别处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算。
      */
    /**
      * 4.1、对每一行的字符串拆分成单个的单词
      */
    val words = lines.flatMap { line => line.split(" ") } //对每一行的字符串进行单词拆分并把所有行的拆分结果通过flat合并成为一

    /**
      * 4.2、在单词拆分的基础上对每个单词实例计数为1,也就是word => (word,1)
      */
    val pairs = words.map { word => (word, 1) }

    /**
      * 4.3、在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数
      */
    val wordCounts = pairs.reduceByKey(_+_) //对相同的key,进行value的累计

    wordCounts.foreach(map => println(map._1 +":"+ map._2))

    sc.stop()

    println("end...")
  }
}
/*python test*/
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)

lines = sc.parallelize(["pandas", "cat", "i like pandas"])
word = lines.filter(lambda s: "pandas" in s)

print(word.count())

 

/*java test*/
import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**
 * 用java语言开发spark程序
 * 第一个学习程序 wordcount
 * @author 18521
 *
 */
public class wordCountLocal {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        // 1 创建一个sparkconf 对象并配置
        // 使用setMaster 可以设置spark集群可以链接集群的URL,如果设置local 代表在本地运行而不是在集群运行
        SparkConf conf = new SparkConf()
                .setAppName("wordCountLocal")
                .setMaster("local");

        // 2 创建javasparkContext对象
        // sparkcontext 是一个入口,主要作用就是初始化spark应用程序所需的一些核心组件,例如调度器,task,
        // 还会注册spark,sparkMaster结点上注册。反正就是spake应用中最重要的对象
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 3 对输入源创建一个出事RDD
        // 元素就是输入源文件中的一行
        JavaRDD<String> lines = sc.textFile("D://data/2.txt");
        // 4 把输入源拆分成一个一个的单词
        // 引用一个RDD 都会创建一个function 类(比较简单的话就是一个匿名内部类)
        // FlatMapFunction 有连个参数输入和输出
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;


            public Iterable<String> call(String arg0) throws Exception {
                // TODO Auto-generated method stub
                return Arrays.asList(arg0.split(" "));
            }
        });
        // 5 需要将每一个单词映射为(单词,1) 后面才可以更具单词key 对后面value 1 进行累加从而达到计数的功能
        JavaPairRDD<String, Integer> parirs = words.mapToPair(new PairFunction<String, String, Integer>() {

            /**
             * 每一个单词都映射成(单词,1)
             */
            private static final long serialVersionUID = 1L;


            public Tuple2<String, Integer> call(String arg0) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<String, Integer>(arg0, 1);
            }
        });
        // 6 以单词做为key 统计单词出现的次数,用reducebykey 算子,对每一个key对于的value进行操作
        JavaPairRDD<String, Integer> wordcount = parirs.reduceByKey(new Function2<Integer, Integer, Integer>() {


            public Integer call(Integer arg0, Integer arg1) throws Exception {
                // TODO Auto-generated method stub
                return arg0 + arg1;
            }
        });

        // 7 已经通过spark 的几个算子 flatMap,mapToPair,reduceByKey 已经统计出每一个结点中的单词出现的次数
        // 这中操作叫做transformation,但是在一开始的RDD是把文件拆分打散到不同的结点中的,所以后面还需要操作action 进行集合
        // 9 action 操作通过foreach 来遍历所有最后一个RDD生成的元素
        wordcount.foreach(new VoidFunction<Tuple2<String, Integer>>() {

            @Override
            public void call(Tuple2<String, Integer> arg0) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(arg0._1 + " 出现了:" + arg0._2 + "次");
            }
        });
        sc.close();
    }
}

Jupyter Notebook Python, Scala, R, Spark, Mesos

Jupyter Notebook Python, Scala, R, Spark, Mesos

在Docker中运行Jupyter/Spark/Mesos服务。

来源[英]:https://github.com/jupyter/docker-stacks/tree/master/all-spark-notebook

Spark on Docker,基于Jupyter Notebook Python, Scala, R, Spark, Mesos技术栈,提供一个远程操作的模型和任务编写Web界面,采用Python界著名的Ipython Notebook格式,非常简洁、友好。

集成的软件

  • Jupyter Notebook 4.2.x
  • Conda Python 3.x 和 Python 2.7.x 环境
  • Conda R 3.2.x 环境
  • Scala 2.10.x
  • pyspark, pandas, matplotlib, scipy, seaborn, scikit-learn 预先安装在Python环境
  • ggplot2, rcurl 原装在 R 环境
  • Spark 1.6.0,运行在local模式,或者连接到 Spark workers的集群
  • Mesos client 0.22 binary that can communicate with a Mesos master
  • 非私有用户名 jovyan (uid=1000, 可配置, 参见相应的选项) 在组 users (gid=100) ,拥有控制权在目录 /home/jovyan/opt/conda
  • tini 作为容器入口点, start-notebook.sh 作为启动命令
  • 脚本 start-singleuser.sh 作为可选命令,运行Notebook server的single-user实例  , 是 JupyterHub 要求的
  • Options for HTTPS, password auth, and passwordless sudo

使用方法

使用下面的命令启动一个容器,Web服务在端口 8888,为配置授权(仅限私网内使用,不要配置在互联网和其他公共网上)。

docker run -d -p 8888:8888 jupyter/all-spark-notebook

一般情况下,需要访问宿主机中的数据资源,使用-v host-path:docker-path方式映射。

启动后在浏览器输入: http://127.0.0.1:8888即可访问。

启动Spark:Local Mode

使用Spark在小型的本地数据环境下的配置。

在Python Notebook:

  1. 运行一个容器,像上面那样。
  2. 打开一个Python 2 或 3 notebook。
  3. 创建 SparkContext在 local 模式。

例如,在notebook的第一个cell中,如下:

import pyspark
sc = pyspark.SparkContext(''local[*]'')

# do something to prove it works
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

在 R Notebook:

  1. 运行一个容器,像上面那样。
  2. 打开一个 R notebook。
  3. 初始化 sparkR,在local模式。
  4. 初始化 sparkRSQL

例如,在 R notebook的第一个cell中,如下:

library(SparkR)

sc <- sparkR.init("local[*]")
sqlContext <- sparkRSQL.init(sc)

# do something to prove it works
data(iris)
df <- createDataFrame(sqlContext, iris)
head(filter(df, df$Petal_Width > 0.2))

在Apache Toree (Scala) Notebook:

  1. 运行一个容器,像上面那样。
  2. 打开一个 Apache Toree (Scala) notebook。
  3. 使用预先配置的SparkContext,引用变量 sc

例如:

val rdd = sc.parallelize(0 to 999)
rdd.takeSample(false, 5)

连接到Mesos上的 Spark 集群

这里的配置允许你的计算集群和数据一起伸缩。

  1. 部署 Spark 到 Mesos。
  2. 配置每一个工作节点  the --no-switch_user flag 或者创建jovyan用户在每一个 slave节点上。
  3. 运行Docker容器,带参数 --net=host 在所有的Spark Workers都能访问的网络位置(查看 Spark networking requirement.)
    • 注意: When using --net=host, you must also use the flags --pid=host -e TINI_SUBREAPER=true. See https://github.com/jupyter/docker-stacks/issues/64 for details.
  4. Follow the language specific instructions below.

In a Python Notebook

  1. 打开 Python 2 或 3 notebook.
  2. 创建 SparkConf 实例,指向 Mesos master node (or Zookeeper instance) 和 Spark 二进制包的位置。
  3. 创建 SparkContext 采用上面的配置变量。

示例, Python 3 notebook的第一个Cell像下面这样:

import os
# make sure pyspark tells workers to use python3 not 2 if both are installed
os.environ[''PYSPARK_PYTHON''] = ''/usr/bin/python3''

import pyspark
conf = pyspark.SparkConf()

# point to mesos master or zookeeper entry (e.g., zk://10.10.10.10:2181/mesos)
conf.setMaster("mesos://10.10.10.10:5050")
# point to spark binary package in HDFS or on local filesystem on all slave
# nodes (e.g., file:///opt/spark/spark-1.6.0-bin-hadoop2.6.tgz)
conf.set("spark.executor.uri", "hdfs://10.10.10.10/spark/spark-1.6.0-bin-hadoop2.6.tgz")
# set other options as desired
conf.set("spark.executor.memory", "8g")
conf.set("spark.core.connection.ack.wait.timeout", "1200")

# create the context
sc = pyspark.SparkContext(conf=conf)

# do something to prove it works
rdd = sc.parallelize(range(100000000))
rdd.sumApprox(3)

如果使用在notebook和workers中使用Python 2, 修改环境变量PYSPARK_PYTHON 指向Python 2.x 解释器二进制包的位置。如果不设置, 缺省值为 python

当然, 所有的可以被隐藏在 IPython kernel startup script, 但是 "explicit is better than implicit." :)

在 R Notebook

  1. 如上的方法运行一个容器实例。
  2. 打开一个 R notebook。
  3. 初始化 sparkR ,指向Mesos master node (or Zookeeper instance) , Spark 二进制包位置。
  4. 初始化 sparkRSQL.

示例, 在 R notebook的第一个Cell:

library(SparkR)

# point to mesos master or zookeeper entry (e.g., zk://10.10.10.10:2181/mesos)\
# as the first argument
# point to spark binary package in HDFS or on local filesystem on all slave
# nodes (e.g., file:///opt/spark/spark-1.6.0-bin-hadoop2.6.tgz) in sparkEnvir
# set other options in sparkEnvir
sc <- sparkR.init("mesos://10.10.10.10:5050", sparkEnvir=list(
    spark.executor.uri="hdfs://10.10.10.10/spark/spark-1.6.0-bin-hadoop2.6.tgz",
    spark.executor.memory="8g"
    )
)
sqlContext <- sparkRSQL.init(sc)

# do something to prove it works
data(iris)
df <- createDataFrame(sqlContext, iris)
head(filter(df, df$Petal_Width > 0.2))

在Apache Toree (Scala) Notebook

  1. 打开一个终端,通过 New -> Terminal 在notebook 界面上。
  2. 添加关于集群的信息到 SPARK_OPTS 环境变量,当运行容器时.
  3. 打开一个Apache Toree (Scala) notebook。
  4. 使用预先配置的SparkContext,在变量名 sc中。

Apache Toree 内核自动创建了SparkContext,在启动时按照命令行参数和环境变量创建。 您可以传递关于你的 Mesos cluster的信息,当启动容器时通过 SPARK_OPTS 环境变量来实现。

例如, 传递的信息:Mesos master, Spark binary location in HDFS, and an executor options, 像下面这样启动容器:

docker run -d -p 8888:8888 -e SPARK_OPTS ''--master=mesos://10.10.10.10:5050 \ --spark.executor.uri=hdfs://10.10.10.10/spark/spark-1.6.0-bin-hadoop2.6.tgz \ --spark.executor.memory=8g'' jupyter/all-spark-notebook

注意,这跟上面在Python notebook的信息时一样的。 一旦内核得到集群的信息, 你可以在Apache Toree notebook测试集群,像下面这样:

// should print the value of --master in the kernel spec
println(sc.master)

// do something to prove it works
val rdd = sc.parallelize(0 to 99999999)
rdd.sum()

Standalone Mode连接到Spark Cluster

通过Standalone Mode连接到 Spark Cluster要求的设置如下:

  1. 确认docker image (检查 Dockerfile) 和Spark Cluster被部署、运行的是Spark的同一个版本。
  2. Deploy Spark on Standalone Mode.
  3. 运行Docker container 带参数 --net=host 在Spark workers都能访问到的网络位置. (查看 Spark networking requirement.)
    • 注意: 当使用 --net=host, 必须同时使用 --pid=host -e TINI_SUBREAPER=true. 查看详情: https://github.com/jupyter/docker-stacks/issues/64 。
  4. 特殊语言的指令与上面Mesos里提到的完全一样, 只是这里的master url 变成类似于这样: spark://10.10.10.10:7077

Notebook 选项

你可以传入 Jupyter command line options ,通过 start-notebook.sh command,在容器启动时设置参数。例如,设置notebook server 基础URL,想下面这样:

docker run -d -p 8888:8888 jupyter/all-spark-notebook start-notebook.sh --NotebookApp.base_url=/some/path

你可以绕开 start-notebook.sh脚本,直接在命令中指定。如果这样, 下面提到的NB_UIDGRANT_SUDO 特征将不能工作。具体细节查看Docker Options一节。

Docker Options

你可以定制Docker容器和Notebook Server的执行,通过制定下面的参数:

  • -e PASSWORD="YOURPASS" - 配置 Jupyter Notebook 要求 password,在非信任的网络上可以组合 USE_HTTPS 使用加密的连接。
  • -e USE_HTTPS=yes - 配置 Jupyter Notebook接受加密连接。如果 pem 文件(包含 SSL certificate 和 key)未被提供(参见下面), 容器将创建一个self-signed certificate。
  • -e NB_UID=1000 - 指定jovyan user的uid。 对于装载宿主机卷标并制定文件属有权是有用。为了该选项发挥作用, 必须运行容器时带上 --user root. ( 脚本start-notebook.sh 将在调整user id后运行 su jovyan。)
  • -e GRANT_SUDO=yes - 给予jovyan 用户帐号无密码执行 sudo 的权限。在安装操作系统软件包是有用。为了该选项发挥作用, 运行容器时必须使用--user root。(脚本start-notebook.sh 将在添加jovyan 到 sudoers 后运行su jovyan 。) 你应该只在信任该用户或者容器运行在隔离的宿主环境下时才打开这个sudo选项。
  • -v /some/host/folder/for/work:/home/jovyan/work - 宿主机加载缺省工作目录到宿主机,从而当容器终止或重建时能够保存工作的结果在宿主机中。
  • -v /some/host/folder/for/server.pem:/home/jovyan/.local/share/jupyter/notebook.pem - 加载SSL certificate plus key为 USE_HTTPS所用。当有一个域的证书并且Notebook Server运行在下面时有用。
  • -p 4040:4040 - 打开端口用于Spark的运行状态监视,参见 Spark Monitoring and Instrumentation UI. 注意,每一个新的spark context创建时赋予一个增量的端口号 (ie. 4040, 4041, 4042, etc.), 并且可能需要打开多个端口。docker run -d -p 8888:8888 -p 4040:4040 -p 4041:4041 jupyter/all-spark-notebook

SSL 证书

在这个Docker镜像中notebook server的配置需要一个 notebook.pem 文件,该文件包含base64编码的SSL key和SSL 证书。 该文件还包含其他的证书 (e.g., intermediate 和 root certificates)。

如果你的 key 和 certificate(s) 作为独立的文件, 你需要将它们合并成一个 PEM 文件。 作为可选的方式, 你可以创建自己的配置和 Docker镜像,可以使用分开的 key 和 certificate 文件。

更多的使用SSL的信息, 参见下面:

  • 例子 docker-stacks/examples 可以得到信息,关于在公共域使用本文的技术栈时如何使用加密证书 Let''s Encrypt 。
  • 文件 jupyter_notebook_config.py 包含了本Docker镜像如何创建self-signed certificate的信息。
  • 文档 Jupyter Notebook documentation 包含了一些最佳实践,包括如何运行一个公共 notebook server,大部分内容已经包含在了本镜像中。

Conda 环境设置

缺省的Python 3.x Conda 运行环境 安装在 /opt/conda目录下。第二个Python 2.x Conda 环境安装在 /opt/conda/envs/python2目录下。你可以切换到 python2 环境 ,在shell里面键入命令(这是通用的conda环境切换方法,使用conda create可以创建更多的环境):

source activate python2

你可以回到缺省的环境,在shell里键入下面的命令:

source deactivate

命令 jupyter, ipython, python, pip, easy_install, 和 conda (以及其它) 在两个环境下都是可用的。通常,你可以安装软件到两个环境中,无论哪一个环境是激活的,像下面这样(注意:conda install使用了-n参数指定环境的名称):

# install a package into the python2 environment
pip2 install some-package
conda install -n python2 some-package

# install a package into the default (python 3.x) environment
pip3 install some-package
conda install -n python3 some-package

JupyterHub

JupyterHub 要求每一个用户有一个Jupyter Notebook server的single-user实例。为了使用 JupyterHub 和 DockerSpawner,在本技术栈中,你需要指定容器镜像名称和覆盖缺省的容器run命令,在 jupyterhub_config.py 文件中指定:

# Spawn user containers from this image
c.DockerSpawner.container_image = ''jupyter/all-spark-notebook''

# Have the Spawner override the Docker run command
c.DockerSpawner.extra_create_kwargs.update({
    ''command'': ''/usr/local/bin/start-singleuser.sh''
})

关于适用于Python和Scala的自定义Spark软件包spark 自定义rdd的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于Cloudera的CCA175 对python和scala的要求、GQL仅适用于Python项目而不适用于Java?、java,scala,python,spark,hadoop,local模式测试代码,idea,jdk1.8,scla2.11,python2.7,spark-2.4.3-bin-hadoop2.7、Jupyter Notebook Python, Scala, R, Spark, Mesos等相关知识的信息别忘了在本站进行查找喔。

本文标签: