GVKun编程网logo

线程“主” org.apache.spark.SparkException中的异常:此JVM中只能运行一个SparkContext(请参阅SPARK-2243)

21

最近很多小伙伴都在问线程“主”org.apache.spark.SparkException中的异常:此JVM中只能运行一个SparkContext和请参阅SPARK-2243这两个问题,那么本篇文章

最近很多小伙伴都在问线程“主” org.apache.spark.SparkException中的异常:此JVM中只能运行一个SparkContext请参阅SPARK-2243这两个问题,那么本篇文章就来给大家详细解答一下,同时本文还将给你拓展3 pyspark学习---sparkContext概述、Exception in thread "main" org.apache.spark.SparkException: Could not parse Master URL: ''yarn''、org.apache.spark.api.java.JavaSparkContext的实例源码、org.apache.spark.SparkException: A master URL must be set in your configuration等相关知识,下面开始了哦!

本文目录一览:

线程“主” org.apache.spark.SparkException中的异常:此JVM中只能运行一个SparkContext(请参阅SPARK-2243)

线程“主” org.apache.spark.SparkException中的异常:此JVM中只能运行一个SparkContext(请参阅SPARK-2243)

尝试使用cassandra运行spark应用程序时出现错误。

Exception in thread "main" org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243).

我正在使用Spark版本1.2.0,并且很明显,我在我的应用程序中仅使用了一个Spark上下文。但是每当我尝试添加以下代码进行流式传输时,都会出现此错误。

JavaStreamingContext activitySummaryScheduler = new JavaStreamingContext(            sparkConf, new Duration(1000));

答案1

小编典典

您一次只能拥有一个SparkContext,并且由于StreamingContext中包含一个SparkContext,因此在同一代码中不能有单独的Streaming和Spark
Context。您可以做的是在SparkContext的基础上构建一个StreamingContext,这样,如果您确实需要,就可以访问两者。

使用此构造函数 JavaStreamingContext(sparkContext: JavaSparkContext, batchDuration:Duration)

3 pyspark学习---sparkContext概述

3 pyspark学习---sparkContext概述

1 Tutorial

Spark本身是由scala语言编写,为了支持py对spark的支持呢就出现了pyspark。它依然可以通过导入Py4j进行RDDS等操作。

2 sparkContext

(1)sparkContext是spark运用的入口点,当我们运行spark的时候,驱动启动同时上下文也开始初始化。

(2)sparkContext使用py4j调用JVM然后创建javaSparkContext,默认为‘sc’,所以如果在shell下就直接用sc.方法就可以。如果你再创建上下文,将会报错cannot run multiple sparkContexts at once哦。结构如下所示

(3)那么一个sparkContext需要哪些内容呢,也就是初始化上下文的时候类有哪些参数呢。

 1 class pyspark.SparkContext (
 2    master = None,#我们需要连接的集群url
 3    appName = None, #工作项目名称
 4    sparkHome = None, #spark安装路径
 5    pyFiles = None,#一般为处理文件的路径
 6    environment = None, #worker节点的环境变量
 7    batchSize = 0, 
 8    serializer = PickleSerializer(), #rdd序列化器
 9    conf = None, 
10    gateway = None, #要么使用已经存在的JVM要么初始化一个新的JVM
11    jsc = None, #JavaSparkContext实例
12    profiler_cls = <class ''pyspark.profiler.BasicProfiler''>
13 )

尝试个例子:在pycharm中使用的哟

1 # coding:utf-8
2 from pyspark import SparkContext, SparkConf
3 
4 logFile = "./files/test.txt"
5 sc = SparkContext()
6 logData = sc.textFile(logFile).cache()
7 numA = logData.filter(lambda s: ''a'' in s).count()
8 numB = logData.filter(lambda s: ''a'' in s).count()
9 print "Lines with a: %i, lines with b: %i" % (numA, numB)

加油!

Exception in thread

Exception in thread "main" org.apache.spark.SparkException: Could not parse Master URL: ''yarn''

高春辉、王春生、朱峰:关于开源创业的 15 件小事

Exception in thread "main" org.apache.spark.SparkException: Could not parse Master URL: ''yarn''

 

 

https://stackoverflow.com/questions/41054700/could-not-parse-master-url

 

缺少 jar 包:

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-yarn_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

org.apache.spark.api.java.JavaSparkContext的实例源码

org.apache.spark.api.java.JavaSparkContext的实例源码

项目:big-data-benchmark    文件:SparkWordCount.java   
public static void main(String[] args) {
    if (args.length != 2) {
        System.err.println("Usage:");
        System.err.println("  SparkWordCount <sourceFile> <targetFile>");
        System.exit(1);
    }

    SparkConf conf = new SparkConf()
            .setAppName("Word Count");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> textFile = sc.textFile(args[0]);
    JavaRDD<String> words = textFile.flatMap(LineIterator::new);
    JavaPairRDD<String,Long> pairs =
            words.mapToPair(s -> new Tuple2<>(s,1L));
    JavaPairRDD<String,Long> counts =
            pairs.reduceByKey((Function2<Long,Long,Long>) (a,b) -> a + b);

    System.out.println("Starting task..");
    long t = System.currentTimeMillis();
    counts.saveAsTextFile(args[1] + "_" + t);
    System.out.println("Time=" + (System.currentTimeMillis() - t));
}
项目:Sempala    文件:Spark.java   
/**
 * Initializes a Spark connection. Use it afterwards for execution of Spark
 * sql queries.
 * 
 * @param appName
 *            the name of the app that will be used with this Spark
 *            connection
 * @param database
 *            name of the database that will be used with this Spark
 *            connection
 */
public Spark(String appName,String database) {

    // Todo check what will happen if there is already in use the same app
    // name
    this.sparkConfiguration = new SparkConf().setAppName(appName);
    this.javaContext = new JavaSparkContext(sparkConfiguration);
    this.hiveContext = new HiveContext(javaContext);
    // Todo check what kind of exception can be thrown here if there is a
    // problem with spark connection

    this.hiveContext.sql(String.format("CREATE DATABASE %s",database));
    // Todo check what kind of exception is thrown if database already

    // use the created database
    this.hiveContext.sql((String.format("USE %s",database)));
}
项目:oryx2    文件:KMeansUpdate.java   
/**
 * @param sparkContext    active Spark Context
 * @param trainData       training data on which to build a model
 * @param hyperParameters ordered list of hyper parameter values to use in building model
 * @param candidatePath   directory where additional model files can be written
 * @return a {@link PMML} representation of a model trained on the given data
 */
@Override
public PMML buildModel(JavaSparkContext sparkContext,JavaRDD<String> trainData,List<?> hyperParameters,Path candidatePath) {
  int numClusters = (Integer) hyperParameters.get(0);
  Preconditions.checkArgument(numClusters > 1);
  log.info("Building KMeans Model with {} clusters",numClusters);

  JavaRDD<Vector> trainingData = parsedToVectorRDD(trainData.map(MLFunctions.PARSE_FN));
  KMeansModel kMeansModel = KMeans.train(trainingData.rdd(),numClusters,maxIterations,numberOfRuns,initializationStrategy);

  return kMeansModelToPMML(kMeansModel,fetchClusterCountsFromModel(trainingData,kMeansModel));
}
项目:BLASpark    文件:OtherOperations.java   
public static distributedMatrix GetLU(distributedMatrix A,JavaSparkContext jsc) {


        distributedMatrix returnedMatrix;

        if( A.getClass() == IndexedRowMatrix.class) {
            returnedMatrix = OtherOperations.GetLU_IRW((IndexedRowMatrix) A);
        }
        else if (A.getClass() == CoordinateMatrix.class) {
            returnedMatrix = OtherOperations.GetLU_COORD((CoordinateMatrix) A);
        }
        else if (A.getClass() == BlockMatrix.class){
            // Todo: Implement this operation
            //returnedMatrices = OtherOperations.GetLU_BCK((BlockMatrix) A,diagonalInL,diagonalInU,jsc);
            returnedMatrix = null;
        }
        else {
            returnedMatrix = null;
        }


        return returnedMatrix;

    }
项目:BLASpark    文件:OtherOperations.java   
public static distributedMatrix GetD(distributedMatrix A,boolean inverseValues,JavaSparkContext jsc) {

        distributedMatrix returnedMatrix;

        if( A.getClass() == IndexedRowMatrix.class) {
            returnedMatrix = OtherOperations.GetD_IRW((IndexedRowMatrix) A,inverseValues,jsc);
        }
        else if (A.getClass() == CoordinateMatrix.class) {
            returnedMatrix = OtherOperations.GetD_COORD((CoordinateMatrix) A,jsc);
        }
        else if (A.getClass() == BlockMatrix.class){
            // Todo: Implement this operation
            //returnedMatrices = OtherOperations.GetLU_BCK((BlockMatrix) A,jsc);
            returnedMatrix = null;
        }
        else {
            returnedMatrix = null;
        }


        return returnedMatrix;

    }
项目:ViraPipe    文件:InterleaveMulti.java   
private static void splitFastq(FileStatus fst,String fqPath,String splitDir,int splitlen,JavaSparkContext sc) throws IOException {
  Path fqpath = new Path(fqPath);
  String fqname = fqpath.getName();
  String[] ns = fqname.split("\\.");
  //Todo: Handle also compressed files
  List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst,sc.hadoopConfiguration(),splitlen);

  JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);

  splitRDD.foreach( split ->  {

    FastqRecordReader fqreader = new FastqRecordReader(new Configuration(),split);
    writeFastqFile(fqreader,new Configuration(),splitDir + "/split_" + split.getStart() + "." + ns[1]);

   });
}
项目:ViraPipe    文件:Decompress.java   
public static void interleaveSplitFastq(FileStatus fst,FileStatus fst2,JavaSparkContext sc) throws IOException {

    List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst,splitlen);
    List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2,splitlen);

    JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
    JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
    JavaPairRDD<FileSplit,FileSplit> zips = splitRDD.zip(splitRDD2);

    zips.foreach( splits ->  {
      Path path = splits._1.getPath();
      FastqRecordReader fqreader = new FastqRecordReader(new Configuration(),splits._1);
      FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(),splits._2);

      writeInterleavedSplits(fqreader,fqreader2,splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq");
    });
  }
项目:Apache-Spark-2x-for-Java-Developers    文件:WordCount.java   
public static void wordCountJava8( String filename )
{
    // Define a configuration to use to interact with Spark
    SparkConf conf = new SparkConf().setMaster("local").setAppName("Work Count App");

    // Create a Java version of the Spark Context from the configuration
    JavaSparkContext sc = new JavaSparkContext(conf);

    // Load the input data,which is a text file read from the command line
    JavaRDD<String> input = sc.textFile( filename );

    // Java 8 with lambdas: split the input string into words
   // Todo here a change has happened 
    JavaRDD<String> words = input.flatMap( s -> Arrays.asList( s.split( " " ) ).iterator() );

    // Java 8 with lambdas: transform the collection of words into pairs (word and 1) and then count them
    JavaPairRDD<Object,Object> counts = words.mapToPair( t -> new Tuple2( t,1 ) ).reduceByKey( (x,y) -> (int)x + (int)y );

    // Save the word count back out to a text file,causing evaluation.
    counts.saveAsTextFile( "output" );
}
项目:Apache-Spark-2x-for-Java-Developers    文件:SparkWordCount.java   
public static void main(String[] args) throws Exception {
    System.out.println(System.getProperty("hadoop.home.dir"));
    String inputPath = args[0];
    String outputPath = args[1];
    FileUtils.deleteQuietly(new File(outputPath));

    JavaSparkContext sc = new JavaSparkContext("local","sparkwordcount");

    JavaRDD<String> rdd = sc.textFile(inputPath);

    JavaPairRDD<String,Integer> counts = rdd
            .flatMap(x -> Arrays.asList(x.split(" ")).iterator())
            .mapToPair(x -> new Tuple2<String,Integer>((String) x,1))
            .reduceByKey((x,y) -> x + y);

    counts.saveAsTextFile(outputPath);
    sc.close();
}
项目:incubator-sdap-mudrod    文件:SessionExtractor.java   
/**
 * loadClickStremFromTxt:Load click stream form txt file
 *
 * @param clickthroughFile
 *          txt file
 * @param sc
 *          the spark context
 * @return clickstream list in JavaRDD format {@link ClickStream}
 */
public JavaRDD<ClickStream> loadClickStremFromTxt(String clickthroughFile,JavaSparkContext sc) {
  return sc.textFile(clickthroughFile).flatMap(new FlatMapFunction<String,ClickStream>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @SuppressWarnings("unchecked")
    @Override
    public Iterator<ClickStream> call(String line) throws Exception {
      List<ClickStream> clickthroughs = (List<ClickStream>) ClickStream.parseFromTextLine(line);
      return (Iterator<ClickStream>) clickthroughs;
    }
  });
}
项目:incubator-sdap-mudrod    文件:SparkDriver.java   
public SparkDriver(Properties props) {
  SparkConf conf = new SparkConf().setAppName(props.getProperty(MudrodConstants.SPARK_APP_NAME,"MudrodSparkApp")).setIfMissing("spark.master",props.getProperty(MudrodConstants.SPARK_MASTER))
      .set("spark.hadoop.validateOutputSpecs","false").set("spark.files.overwrite","true");

  String esHost = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS);
  String esPort = props.getProperty(MudrodConstants.ES_HTTP_PORT);

  if (!"".equals(esHost)) {
    conf.set("es.nodes",esHost);
  }

  if (!"".equals(esPort)) {
    conf.set("es.port",esPort);
  }

  conf.set("spark.serializer",KryoSerializer.class.getName());
  conf.set("es.batch.size.entries","1500");

  sc = new JavaSparkContext(conf);
  sqlContext = new sqlContext(sc);
}
项目:betleopard    文件:LiveBetMain.java   
private void init() throws IOException {
    final ClientConfig config = new ClientConfig();
    client = HazelcastClient.newHazelcastClient(config);

    final SparkConf conf = new SparkConf()
            .set("hazelcast.server.addresses","127.0.0.1:5701")
            .set("hazelcast.server.groupName","dev")
            .set("hazelcast.server.groupPass","dev-pass")
            .set("hazelcast.spark.valueBatchingEnabled","true")
            .set("hazelcast.spark.readBatchSize","5000")
            .set("hazelcast.spark.writeBatchSize","5000");

    sc = new JavaSparkContext("local","appname",conf);

    loadHistoricalRaces();
    createrandomUsers();
    createFutureEvent();
}
项目:MinoanER    文件:CNPNeighborsUnnormalized.java   
/**
 * 
 * @param topKvalueCandidates the topK results per entity,acquired from value similarity
 * @param rawTriples1 the rdf triples of the first entity collection
 * @param rawTriples2 the rdf triples of the second entity collection
 * @param SEParaTOR the delimiter that separates subjects,predicates and objects in the rawTriples1 and rawTriples2 files
 * @param entityIds1 the mapping of entity urls to entity ids,as it was used in blocking
 * @param entityIds2
 * @param MIN_SUPPORT_THRESHOLD the minimum support threshold,below which,relations are discarded from top relations
 * @param K the K for topK candidate matches
 * @param N the N for topN rdf relations (and neighbors)
 * @param jsc the java spark context used to load files and broadcast variables
 * @return topK neighbor candidates per entity
 */
public JavaPairRDD<Integer,IntArrayList> run(JavaPairRDD<Integer,Int2FloatLinkedOpenHashMap> topKvalueCandidates,JavaRDD<String> rawTriples1,JavaRDD<String> rawTriples2,String SEParaTOR,JavaRDD<String> entityIds1,JavaRDD<String> entityIds2,float MIN_SUPPORT_THRESHOLD,int K,int N,JavaSparkContext jsc) {

    Map<Integer,IntArrayList> inNeighbors = new HashMap<>(new RelationsRank().run(rawTriples1,SEParaTOR,entityIds1,MIN_SUPPORT_THRESHOLD,N,true,jsc));
    inNeighbors.putAll(new RelationsRank().run(rawTriples2,entityIds2,false,jsc));

    broadcast<Map<Integer,IntArrayList>> inNeighbors_BV = jsc.broadcast(inNeighbors);

    //JavaPairRDD<Integer,IntArrayList> topKneighborCandidates =  getTopKNeighborSims(topKvalueCandidates,inNeighbors_BV,K);        
    JavaPairRDD<Integer,IntArrayList> topKneighborCandidates =  getTopKNeighborSimsSUM(topKvalueCandidates,K);        
    return topKneighborCandidates;
}
项目:MinoanER    文件:CNPARCS.java   
/**
 * 
 * @param topKvalueCandidates the topK results per entity,Int2FloatLinkedOpenHashMap> run2(JavaPairRDD<Integer,IntArrayList>> inNeighbors_BV = jsc.broadcast(inNeighbors);             
    JavaPairRDD<Integer,Int2FloatLinkedOpenHashMap> topKneighborCandidates =  getTopKNeighborSimsSUMWithscores(topKvalueCandidates,K);        
    return topKneighborCandidates;
}
项目:MinoanER    文件:CNPNeighbors.java   
/**
 * 
 * @param topKvalueCandidates the topK results per entity,IntArrayList>> inNeighbors_BV = jsc.broadcast(inNeighbors);

    //JavaPairRDD<Tuple2<Integer,Integer>,Float> neighborSims = getNeighborSims(topKvalueCandidates,inNeighbors_BV);        
    //JavaPairRDD<Integer,IntArrayList> topKneighborCandidates =  getTopKNeighborSimsOld(neighborSims,K);        
    return topKneighborCandidates;
}
项目:oryx2    文件:ExampleBatchLayerUpdate.java   
@Override
public void runUpdate(JavaSparkContext sparkContext,long timestamp,JavaPairRDD<String,String> newData,String> pastData,String modelDirstring,TopicProducer<String,String> modelUpdatetopic) throws IOException {
  JavaPairRDD<String,String> allData = pastData == null ? newData : newData.union(pastData);
  String modelString;
  try {
    modelString = new ObjectMapper().writeValueAsstring(countdistinctOtherWords(allData));
  } catch (JsonProcessingException jpe) {
    throw new IOException(jpe);
  }
  modelUpdatetopic.send("MODEL",modelString);
}
项目:MinoanER    文件:BlocksFromEntityIndexTest.java   
@Before
public void setUp() {
    System.setProperty("hadoop.home.dir","C:\\Users\\VASILIS\\Documents\\hadoop_home"); //only for local mode

    spark = SparkSession.builder()
        .appName("test") 
        .config("spark.sql.warehouse.dir","/file:/tmp")                
        .config("spark.executor.instances",1)
        .config("spark.executor.cores",1)
        .config("spark.executor.memory","1G")            
        .config("spark.driver.maxResultSize","1g")
        .config("spark.master","local")
        .getorCreate();        



    jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); 
}
项目:MinoanER    文件:BlockFilteringAdvancedTest.java   
@Before
public void setUp() {        
    System.setProperty("hadoop.home.dir","local")
        .getorCreate();        



    jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); 
}
项目:ParquetUtils    文件:ParquetRepartTest.java   
@BeforeClass
public static void createContext() throws IOException {

    Configuration hdfsConfig = HDfsutils.getConfiguration();
    SparkConf config = new SparkConf();
    config.setMaster("local[*]");
    config.setAppName("my JUnit running Spark");
    sc = new JavaSparkContext(config);
    fileSystem = FileSystem.get(hdfsConfig);
    sqlContext = new sqlContext(sc);
    engine = new ParquetRepartEngine(fileSystem,sqlContext);
}
项目:oryx2    文件:ALSUpdate.java   
private static MatrixFactorizationModel pmmlToMFModel(JavaSparkContext sparkContext,PMML pmml,Path modelParentPath,broadcast<Map<String,Integer>> bUserIDToIndex,Integer>> bItemIDToIndex) {
  String xPathString = AppPMMLUtils.getExtensionValue(pmml,"X");
  String yPathString = AppPMMLUtils.getExtensionValue(pmml,"Y");
  JavaPairRDD<String,float[]> userRDD = readFeaturesRDD(sparkContext,new Path(modelParentPath,xPathString));
  JavaPairRDD<String,float[]> productRDD = readFeaturesRDD(sparkContext,yPathString));
  int rank = userRDD.first()._2().length;
  return new MatrixFactorizationModel(
      rank,readAndConvertFeatureRDD(userRDD,bUserIDToIndex),readAndConvertFeatureRDD(productRDD,bItemIDToIndex));
}
项目:mutantpdb    文件:App.java   
public static void main( String[] args )
{
    Dataset<Row> mutations = DataProvider.getMutationsToStructures();
    List<String> pdbIds = mutations.select(col("pdbId"))
            .distinct().toJavaRDD().map(t -> t.getString(0)).collect();

    List<Row> broadcasted = mutations.select("pdbId","chainId","pdbAtomPos").collectAsList();
    SaprkUtils.stopSparkSession();

    JavaSparkContext sc = SaprkUtils.getSparkContext();
    broadcast<List<Row>> bcmut = sc.broadcast(broadcasted);

    MmtfReader//.readSequenceFile("/pdb/2017/full",pdbIds,sc)
            .downloadMmtfFiles(Arrays.asList("5IRC"),sc)
            .flatMapToPair(new StructuretopolymerChains())
            .flatMapToPair(new AddResiduetoKey(bcmut))
            .mapValues(new StructuretoBioJava())
            .mapToPair(new FilterResidue())
            .filter(t -> t._2!=null).keys()
            .map(t -> t.replace(".",","))
            .saveAsTextFile("/Users/yana/git/mutantpdb/src/main/resources/pdb_residues");
    sc.close();
}
项目:BLASpark    文件:OtherOperations.java   
public static distributedMatrix[] GetLU(distributedMatrix A,boolean diagonalInL,boolean diagonalInU,JavaSparkContext jsc) {

        if((diagonalInL && diagonalInU) || (!diagonalInL && !diagonalInU)) {
            LOG.error("Diagonal values must be in either upper or lower matrices");
            System.exit(-1);
        }

        distributedMatrix[] returnedMatrices;

        if( A.getClass() == IndexedRowMatrix.class) {
            returnedMatrices = OtherOperations.GetLU_IRW((IndexedRowMatrix) A,jsc);
        }
        else if (A.getClass() == CoordinateMatrix.class) {
            returnedMatrices = OtherOperations.GetLU_COORD((CoordinateMatrix) A,jsc);
            returnedMatrices = null;
        }
        else {
            returnedMatrices = null;
        }


        return returnedMatrices;

    }
项目:BLASpark    文件:L2.java   
public static DenseVector DGEMV(double alpha,distributedMatrix A,DenseVector x,double beta,DenseVector y,JavaSparkContext jsc){

        // First form  y := beta*y.
        if (beta != 1.0) {
            if (beta == 0.0) {
                y = Vectors.zeros(y.size()).toDense();
            }
            else {
                BLAS.scal(beta,y);
            }
        }

        if (alpha == 0.0) {
            return y;
        }

        DenseVector tmpVector = Vectors.zeros(y.size()).toDense();

        // Form  y := alpha*A*x + y.
        // Case of IndexedRowMatrix
        if( A.getClass() == IndexedRowMatrix.class) {
            tmpVector = L2.DGEMV_IRW((IndexedRowMatrix) A,alpha,x,jsc);
        }
        else if (A.getClass() == CoordinateMatrix.class) {
            tmpVector = L2.DGEMV_COORD((CoordinateMatrix) A,jsc);
        }
        else if (A.getClass() == BlockMatrix.class){
            tmpVector = L2.DGEMV_BCK((BlockMatrix) A,jsc);
        }
        else {
            tmpVector = null;
        }

        BLAS.axpy(1.0,tmpVector,y);


        return y;

    }
项目:BLASpark    文件:L2.java   
private static DenseVector DGEMV_COORD(CoordinateMatrix matrix,double alpha,DenseVector vector,JavaSparkContext jsc) {

        JavaRDD<MatrixEntry> items = matrix.entries().toJavaRDD();
        DenseVector result = items.mapPartitions(new MatrixEntriesMultiplication(vector,alpha))
                .reduce(new MatrixEntriesMultiplicationReducer());

        return result;
    }
项目:gcp    文件:Spark1Batch.java   
public static void main(String[] args) {
  SparkConf sc = new SparkConf().setAppName("POC-Batch");
  try(JavaSparkContext jsc = new JavaSparkContext(sc)) {

    JavaRDD<ExampleXML> records = jsc.wholeTextFiles("input/")
        .map(t -> t._2())
        .map(new ParseXML());

    System.out.printf("Amount of XMLs: %d\n",records.count());
  }
}
项目:athena    文件:MachineLearningManagerImpl.java   
public DecisionTreeValidationSummary validateDecisionTreeAthenaFeatures(JavaSparkContext sc,FeatureConstraint featureConstraint,AthenaMLFeatureConfiguration athenaMLFeatureConfiguration,DecisionTreeDetectionModel decisionTreeDetectionModel,Indexing indexing,Marking marking) {
    long start = System.nanoTime(); // <-- start

    JavaPairRDD<Object,BSONObject> mongoRDD;
    mongoRDD = sc.newAPIHadoopRDD(
            mongodbConfig,// Configuration
            MongoInputFormat.class,// InputFormat: read from a live cluster.
            Object.class,// Key class
            BSONObject.class          // Value class
    );

    DecisionTreeDetectionAlgorithm decisionTreeDetectionAlgorithm = (DecisionTreeDetectionAlgorithm) decisionTreeDetectionModel.getDetectionAlgorithm();

    DecisionTreeValidationSummary decisionTreeValidationSummary =
            new DecisionTreeValidationSummary(sc.sc(),decisionTreeDetectionAlgorithm.getNumClasses(),indexing,marking);

    DecisionTreedistJob decisionTreedistJob = new DecisionTreedistJob();

    decisionTreedistJob.validate(mongoRDD,athenaMLFeatureConfiguration,decisionTreeDetectionModel,decisionTreeValidationSummary);


    long end = System.nanoTime(); // <-- start
    long time = end - start;

    decisionTreeValidationSummary.setTotalValidationTime(time);
    return decisionTreeValidationSummary;
}
项目:athena    文件:MachineLearningManagerImpl.java   
public GaussianMixtureValidationSummary validateGaussianMixtureAthenaFeatures(JavaSparkContext sc,GaussianMixtureDetectionModel gaussianMixtureDetectionModel,// Key class
            BSONObject.class          // Value class
    );

    GaussianMixtureDetectionAlgorithm gaussianMixtureDetectionAlgorithm = (GaussianMixtureDetectionAlgorithm) gaussianMixtureDetectionModel.getDetectionAlgorithm();
    GaussianMixtureValidationSummary gaussianMixtureValidationSummary =
            new GaussianMixtureValidationSummary(sc.sc(),gaussianMixtureDetectionAlgorithm.getK(),marking);


    GaussianMixturedistJob gaussianMixturedistJob = new GaussianMixturedistJob();

    gaussianMixturedistJob.validate(mongoRDD,gaussianMixtureDetectionModel,gaussianMixtureValidationSummary);


    long end = System.nanoTime(); // <-- start
    long time = end - start;

    gaussianMixtureValidationSummary.setTotalValidationTime(time);
    return gaussianMixtureValidationSummary;
}
项目:Java-data-science-Cookbook    文件:ScalaTest.java   
public static void main( String[] args ){
    String inputFile = "data/dummy.txt";
    SparkConf configuration = new SparkConf().setMaster("local[4]").setAppName("My App");
    JavaSparkContext sparkContext = new JavaSparkContext(configuration);
    JavaRDD<String> logData = sparkContext.textFile(inputFile).cache();

    long numberA = logData.filter(new Function<String,Boolean>(){
        private static final long serialVersionUID = 1L;
        public Boolean call(String s){
            return s.length() == 0;
        }
    }).count();
    sparkContext.close();
    System.out.println("Empty Lines: " + numberA);
}
项目:athena    文件:MachineLearningManagerImpl.java   
public LassovalidationSummary validateLassoAthenaFeatures(JavaSparkContext sc,LassoDetectionModel lassoDetectionModel,// Key class
            BSONObject.class          // Value class
    );

    LassoDetectionAlgorithm lassoDetectionAlgorithm =
            (LassoDetectionAlgorithm) lassoDetectionModel.getDetectionAlgorithm();

    LassovalidationSummary lassovalidationSummary = new LassovalidationSummary();
    lassovalidationSummary.setLassoDetectionAlgorithm(lassoDetectionAlgorithm);
    LassodistJob lassodistJob = new LassodistJob();

    lassodistJob.validate(mongoRDD,lassoDetectionModel,lassovalidationSummary);


    long end = System.nanoTime(); // <-- start
    long time = end - start;
    lassovalidationSummary.setValidationTime(time);

    return lassovalidationSummary;
}
项目:athena    文件:MachineLearningManagerImpl.java   
public RidgeRegressionValidationSummary validateRidgeRegressionAthenaFeatures(JavaSparkContext sc,RidgeRegressionDetectionModel ridgeRegressionDetectionModel,// Key class
            BSONObject.class          // Value class
    );

    RidgeRegressionDetectionAlgorithm ridgeRegressionDetectionAlgorithm =
            (RidgeRegressionDetectionAlgorithm) ridgeRegressionDetectionModel.getDetectionAlgorithm();

    RidgeRegressionValidationSummary ridgeRegressionValidationSummary = new RidgeRegressionValidationSummary();
    ridgeRegressionValidationSummary.setRidgeRegressionDetectionAlgorithm(ridgeRegressionDetectionAlgorithm);
    RidgeRegressiondistJob ridgeRegressiondistJob = new RidgeRegressiondistJob();

    ridgeRegressiondistJob.validate(mongoRDD,ridgeRegressionDetectionModel,ridgeRegressionValidationSummary);


    long end = System.nanoTime(); // <-- start
    long time = end - start;
    ridgeRegressionValidationSummary.setValidationTime(time);
    return ridgeRegressionValidationSummary;
}
项目:ViraPipe    文件:Interleave.java   
private static void splitFastq(FileStatus fst,JavaSparkContext sc) throws IOException {
  Path fqpath = new Path(fqPath);
  String fqname = fqpath.getName();
  String[] ns = fqname.split("\\.");
  List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst,splitDir + "/split_" + split.getStart() + "." + ns[1]);

   });
}
项目:ViraPipe    文件:Interleave.java   
public static void interleaveSplitFastq(FileStatus fst,splits._2);
      writeInterleavedSplits(fqreader,splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq");
    });
  }
项目:Explainer    文件:SparkUtils.java   
public static SparkUtils getInstance() {
  SparkUtils result = instance;
  if (result == null) {
    synchronized (SparkUtils.class) {
      result = instance;
      if (result == null) {
        instance = result = new SparkUtils();
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Explainer");
        jsc = new JavaSparkContext(sparkConf);
        sqlContext = new sqlContext(jsc);
      }
    }
  }
  return result;
}
项目:oryx2    文件:ALSUpdate.java   
@Override
public void publishAdditionalModelData(JavaSparkContext sparkContext,JavaRDD<String> newData,JavaRDD<String> pastData,String> modelUpdatetopic) {
  // Send item updates first,before users. That way,user-based endpoints like /recommend
  // may take longer to not return 404,but when they do,the result will be more complete.
  log.info("Sending item / Y data as model updates");
  String yPathString = AppPMMLUtils.getExtensionValue(pmml,yPathString));

  String updatebroker = modelUpdatetopic.getUpdatebroker();
  String topic = modelUpdatetopic.getTopic();

  // For Now,there is no use in sending kNown users for each item
  productRDD.foreachPartition(new EnqueueFeatureVecsFn("Y",updatebroker,topic));

  log.info("Sending user / X data as model updates");
  String xPathString = AppPMMLUtils.getExtensionValue(pmml,"X");
  JavaPairRDD<String,xPathString));

  if (noKNownItems) {
    userRDD.foreachPartition(new EnqueueFeatureVecsFn("X",topic));
  } else {
    log.info("Sending kNown item data with model updates");
    JavaRDD<String[]> allData =
        (pastData == null ? newData : newData.union(pastData)).map(MLFunctions.PARSE_FN);
    JavaPairRDD<String,Collection<String>> kNownItems = kNownsRDD(allData,true);
    userRDD.join(kNownItems).foreachPartition(
        new EnqueueFeatureVecsAndKNownItemsFn("X",topic));
  }
}
项目:athena    文件:MachineLearningManagerImpl.java   
public NaiveBayesValidationSummary validateNaiveBayesAthenaFeatures(JavaSparkContext sc,NaiveBayesDetectionModel naiveBayesDetectionModel,// Key class
            BSONObject.class          // Value class
    );

    NaiveBayesDetectionAlgorithm naiveBayesDetectionAlgorithm = (NaiveBayesDetectionAlgorithm) naiveBayesDetectionModel.getDetectionAlgorithm();

    NaiveBayesValidationSummary naiveBayesValidationSummary =
            new NaiveBayesValidationSummary(sc.sc(),naiveBayesDetectionAlgorithm.getNumClasses(),marking);

    NaiveBayesdistJob naiveBayesdistJob = new NaiveBayesdistJob();


    naiveBayesdistJob.validate(mongoRDD,naiveBayesDetectionModel,naiveBayesValidationSummary);


    long end = System.nanoTime(); // <-- start
    long time = end - start;

    naiveBayesValidationSummary.setTotalValidationTime(time);
    return naiveBayesValidationSummary;
}
项目:athena    文件:MachineLearningManagerImpl.java   
public GradientBoostedTreesValidationSummary validateGradientBoostedTreesAthenaFeatures(JavaSparkContext sc,GradientBoostedTreesDetectionModel gradientBoostedTreesDetectionModel,// Key class
            BSONObject.class          // Value class
    );

    GradientBoostedTreesDetectionAlgorithm gradientBoostedTreesDetectionAlgorithm =
            (GradientBoostedTreesDetectionAlgorithm) gradientBoostedTreesDetectionModel.getDetectionAlgorithm();

    GradientBoostedTreesValidationSummary gradientBoostedTreesValidationSummary =
            new GradientBoostedTreesValidationSummary(sc.sc(),gradientBoostedTreesDetectionAlgorithm.getNumClasses(),marking);

    GradientBoostedTreesdistJob gradientBoostedTreesdistJob = new GradientBoostedTreesdistJob();

    gradientBoostedTreesdistJob.validate(mongoRDD,gradientBoostedTreesDetectionModel,gradientBoostedTreesValidationSummary);


    long end = System.nanoTime(); // <-- start
    long time = end - start;

    gradientBoostedTreesValidationSummary.setTotalValidationTime(time);
    return gradientBoostedTreesValidationSummary;
}
项目:movie-recommender    文件:SparkModule.java   
@Provides
CassandraIo<Rawrating> providesCassandraratingIO(JavaSparkContext sparkContext) {
    if (ratingCassandraIo != null) {
        return ratingCassandraIo;
    }
    ratingCassandraIo = new CassandraIo<>(Rawrating.class,"dev","ratings");
    ratingCassandraIo.setSparkContext(sparkContext);


    return ratingCassandraIo;
}
项目:Metadata-qa-marc    文件:ParallelValidator.java   
public static void main(String[] args) throws ParseException {

        final Validator validator = new Validator(args);
        ValidatorParameters params = validator.getParameters();
        validator.setDoPrintInProcessRecord(false);

        logger.info("Input file is " + params.getArgs());
        SparkConf conf = new SparkConf().setAppName("marcCompletenessCount");
        JavaSparkContext context = new JavaSparkContext(conf);

        System.err.println(validator.getParameters().formatParameters());

        JavaRDD<String> inputFile = context.textFile(validator.getParameters().getArgs()[0]);

        JavaRDD<String> baseCountsRDD = inputFile
            .flatMap(content -> {
                marcReader reader = Readmarc.getmarcStringReader(content);
                Record marc4jRecord = reader.next();
                marcRecord marcRecord = marcFactory.createFrommarc4j(
                    marc4jRecord,params.getDefaultrecordtype(),params.getmarcVersion(),params.fixalephseq());
                validator.processRecord(marcRecord,1);
                return ValidationErrorFormatter
                    .formatForSummary(marcRecord.getValidationErrors(),params.getFormat())
                    .iterator();
            }
        );
        baseCountsRDD.saveAsTextFile(validator.getParameters().getFileName());
    }
项目:athena    文件:MachineLearningManagerImpl.java   
public LinearRegressionValidationSummary validateLinearRegressionAthenaFeatures(JavaSparkContext sc,LinearRegressionDetectionModel linearRegressionDetectionModel,// Key class
            BSONObject.class          // Value class
    );

    LinearRegressionDetectionAlgorithm linearRegressionDetectionAlgorithm =
            (LinearRegressionDetectionAlgorithm) linearRegressionDetectionModel.getDetectionAlgorithm();

    LinearRegressionValidationSummary linearRegressionValidationSummary =
            new LinearRegressionValidationSummary();
    linearRegressionValidationSummary.setLinearRegressionDetectionAlgorithm(linearRegressionDetectionAlgorithm);

    LinearRegressiondistJob linearRegressiondistJob = new LinearRegressiondistJob();

    linearRegressiondistJob.validate(mongoRDD,linearRegressionDetectionModel,linearRegressionValidationSummary);


    long end = System.nanoTime(); // <-- start
    long time = end - start;
    linearRegressionValidationSummary.setValidationTime(time);


    return linearRegressionValidationSummary;
}
项目:rdf2x    文件:TestUtils.java   
/**
 * Parse RDF file from resources folder
 * @param sc spark context to use for parsing
 * @param fileName name of the file to parse
 * @return RDD of quads from the requested file
 */
public static JavaRDD<Quad> getQuadsRDD(JavaSparkContext sc,String fileName) {
    QuadParser parser = new elephasQuadParser(
            new QuadParserConfig()
                    .setBatchSize(2),sc
    );
    String path = TestUtils.getDatasetPath(fileName);
    return parser.parseQuads(path);
}

org.apache.spark.SparkException: A master URL must be set in your configuration

org.apache.spark.SparkException: A master URL must be set in your configuration

执行 spark examples 报错:

Using Spark''s default log4j profile: org/apache/spark/log4j-defaults.properties
18/05/25 11:09:45 INFO SparkContext: Running Spark version 2.1.1
18/05/25 11:09:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/05/25 11:09:45 ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: A master URL must be set in your configuration
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:379)
	at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2320)
	at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)
	at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
	at org.apache.spark.examples.graphx.AggregateMessagesExample$.main(AggregateMessagesExample.scala:42)
	at org.apache.spark.examples.graphx.AggregateMessagesExample.main(AggregateMessagesExample.scala)
18/05/25 11:09:45 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:379)
	at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2320)
	at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)
	at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
	at org.apache.spark.examples.graphx.AggregateMessagesExample$.main(AggregateMessagesExample.scala:42)
	at org.apache.spark.examples.graphx.AggregateMessagesExample.main(AggregateMessagesExample.scala)

Process finished with exit code 1

解决方法:在启动 vm 里添加参数

-Dspark.master=local

关于线程“主” org.apache.spark.SparkException中的异常:此JVM中只能运行一个SparkContext请参阅SPARK-2243的介绍现已完结,谢谢您的耐心阅读,如果想了解更多关于3 pyspark学习---sparkContext概述、Exception in thread "main" org.apache.spark.SparkException: Could not parse Master URL: ''yarn''、org.apache.spark.api.java.JavaSparkContext的实例源码、org.apache.spark.SparkException: A master URL must be set in your configuration的相关知识,请在本站寻找。

本文标签: