GVKun编程网logo

kafka+spark streaming集成(kafka整合sparkstreaming)

15

在这里,我们将给大家分享关于kafka+sparkstreaming集成的知识,让您更了解kafka整合sparkstreaming的本质,同时也会涉及到如何更有效地Flume1.7+Kafka+St

在这里,我们将给大家分享关于kafka+spark streaming集成的知识,让您更了解kafka整合sparkstreaming的本质,同时也会涉及到如何更有效地Flume1.7+Kafka+Streaming集成开发步骤、java8下spark-streaming结合kafka编程(spark 2.0 & kafka 0.10、java实现spark streaming与kafka集成进行流式计算、kafka-sparkstreaming---学习1的内容。

本文目录一览:

kafka+spark streaming集成(kafka整合sparkstreaming)

kafka+spark streaming集成(kafka整合sparkstreaming)

主要的:pom

        <kafka.version>0.10.2.0</kafka.version>
        <slf4j.version>1.7.10</slf4j.version>
        <spark.version>2.1.0</spark.version>        
...
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <!-- spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- spark streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- spark streaming kafka-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

 

主要功能:打印出现ERROR和WARN的log

import com.boyoi.kafka.KafkaParams;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;

/**
 * 基础测试
 */
public class Kafka2Error {

    public static void main(String[] avgs) throws URISyntaxException, IOException, InterruptedException {

        Collection<String> topics = Arrays.asList("test");
        JavaSparkContext sc = new JavaSparkContext("local","error");

        JavaStreamingContext jssc = new JavaStreamingContext(sc, Duration.apply(1000));
        final JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, KafkaParams.getSer9Params())
        );

        JavaDStream<String> map = directStream.map(a -> a.value());

        JavaDStream<String> filter = map.filter(s -> {
            if (
                    (
                            s.charAt(24) == ''['' &&
                                    s.charAt(25) == '' '' &&
                                    s.charAt(26) == ''W'' &&
                                    s.charAt(27) == ''A'' &&
                                    s.charAt(28) == ''R'' &&
                                    s.charAt(29) == ''N'' &&
                                    s.charAt(30) == '']''
                    )
                            ||
                            (
                                    s.charAt(24) == ''['' &&
                                            s.charAt(25) == ''E'' &&
                                            s.charAt(26) == ''R'' &&
                                            s.charAt(27) == ''R'' &&
                                            s.charAt(28) == ''O'' &&
                                            s.charAt(29) == ''R'' &&
                                            s.charAt(30) == '']''
                            )
                    ) {
                return true;
            }
            return false;
        });

        filter.foreachRDD(each -> each.foreach(each2-> System.out.println(each2)));

        jssc.start();
        jssc.awaitTermination();
    }

}

 

kafka默认参数如下。value.deserializer使用了自定义的反序列化类。因源使用的GBK字符集,而kafka默认使用的UTF-8来反序列化。

import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.HashMap;
import java.util.Map;

/**
 * kafka 默认参数
 */
public class KafkaParams {
    private static Map<String, Object> kafkaParams = new HashMap<String, Object>();
    static {
        kafkaParams.put("bootstrap.servers", "192.168.1.9:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializerGBK.class);
        kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
    }

    public static Map<String, Object> getSer9Params(){
        return kafkaParams;
    }


}

自定义的反序列化类代码。

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.UnsupportedEncodingException;

/**
 * 自定义字符反序列。默认GBK
 */
public class StringDeserializerGBK extends StringDeserializer {

    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            return data == null?null:new String(data, "GBK");
        } catch (UnsupportedEncodingException var4) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding gbk");
        }
    }
}

 

Flume1.7+Kafka+Streaming集成开发步骤

Flume1.7+Kafka+Streaming集成开发步骤

1. 安装

  • 下载地址:apache-flume-1.6.0
  • 下载完成后,在/opt/ebohailife/目录下上传、解压
[ebohailife@e-bohailife-dat002 ~]$ tar -zxvf apache-flume-1.7.0-bin.tar.gz
  • 检测安装是否成功:/opt/ebohailife/flume/apache-flume-1.6.0-bin/bin/flume-ng version

打印以下信息,则表示安装成功了

[ebohailife@e-bohailife-dat002 conf]$ ../bin/flume-ng version
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523

2. 开发

- 更改Flume配置文件

[ebohailife@e-bohailife-dat002 conf]$ echo $JAVA_HOME
/opt/ebohailife/jdk1.7.0_80

[ebohailife@e-bohailife-dat002 conf]$ cp flume-env.sh.template flume-env.sh
vi flume-env.sh # 修改flume-env.sh中JAVA_HOME变量的值

- 创建Flume任务的配置文件 taildir_behavior.conf

[ebohailife@e-bohailife-uat002 conf]$ vi taildir_behavior.conf 
#agent命名为a1
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/ebohailife/logs/rest/.*behavior.*
a1.sources.r1.positionFile = /tmp/flume/taildir_behavior_position.json
a1.sources.r1.fileHeader = false

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
a1.sinks.k1.kafka.topic = behaviorlog_r1p3
a1.sinks.k1.kafka.producer.acks= 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.flumeBatchSize = 100

# a1.sinks.k1.topic = behaviorlog_r1p3
# Kafka集群Broker列表,以下属性在1.7以上版本已弃用
# a1.sinks.k1.brokerList = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
# a1.sinks.k1.requiredAcks = 1
# a1.sinks.k1.batchSize = 100

# Use a channel which buffers events in file
a1.channels.c1.type = file
#检查点文件存储路径
a1.channels.c1.checkpointDir = /opt/ebohailife/apache-flume-1.7.0-bin/checkpoint
#消息数据存储路径
a1.channels.c1.dataDirs = /opt/ebohailife/apache-flume-1.7.0-bin/data

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

- 创建Flume任务的配置文件 taildir_phoneinfo.conf

[ebohailife@e-bohailife-uat002 conf]$ vi taildir_phoneinfo.conf
#agent命名为a2
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = TAILDIR
a2.sources.r2.filegroups = f1
a2.sources.r2.filegroups.f1 = /opt/ebohailife/logs/rest/.*phoneinfo.*
a2.sources.r2.positionFile = /tmp/flume/taildir_phoneinfo_position.json
a2.sources.r2.fileHeader = false

# Describe the sink
a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.k2.kafka.bootstrap.servers = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
a2.sinks.k2.kafka.topic = phoneinfolog_r1p3
a2.sinks.k2.kafka.producer.acks= 1
a2.sinks.k2.kafka.producer.linger.ms = 1
a2.sinks.k2.flumeBatchSize = 100

# a2.sinks.k2.topic = behaviorlog_r1p3
# Kafka集群Broker列表,以下属性在1.7以上版本已弃用
# a2.sinks.k2.brokerList = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
# a2.sinks.k2.requiredAcks = 1
# a2.sinks.k2.batchSize = 100

# Use a channel which buffers events in file
a2.channels.c2.type = file
#检查点文件存储路径
a2.channels.c2.checkpointDir = /opt/ebohailife/apache-flume-1.7.0-bin/checkpoint
#消息数据存储路径
a2.channels.c2.dataDirs = /opt/ebohailife/apache-flume-1.7.0-bin/data

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

- 创建Kafka Topic

# 创建topic behaviorlog_r1p3 
./kafka-topics.sh --zookeeper 10.104.0.227:2181 --create --topic behaviorlog_r1p3 --partition 3 --replication-factor 1
# 创建topic phoneinfolog_r1p3
./kafka-topics.sh --zookeeper 10.104.0.227:2181 --create --topic phoneinfolog_r1p3 --partition 3 --replication-factor 1

- 查看topic是否创建成功

./kafka-topics.sh  --list --zookeeper 10.104.0.227:2181

- 启动Flume NG,后台运行

./flume-ng agent -c /opt/ebohailife/apache-flume-1.7.0-bin/conf -f /opt/ebohailife/apache-flume-1.7.0-bin/conf/taildir_behavior.conf  -n a1  >/dev/null 2>&1 &

./flume-ng agent -c /opt/ebohailife/apache-flume-1.7.0-bin/conf -f /opt/ebohailife/apache-flume-1.7.0-bin/conf/taildir_phoneinfo.conf  -n a2  >/dev/null 2>&1 &

#  -Dflume.root.logger=INFO,console

- 启动Kafka Consumer,后台运行

# 启动behaviorlog_r1p3
./kafka-console-consumer.sh --topic behaviorlog_r1p3 --bootstrap-server 10.104.0.226:9092 >/dev/null 2>&1 &
# 启动phoneinfolog_r1p3
./kafka-console-consumer.sh --topic phoneinfolog_r1p3 --bootstrap-server 10.104.0.226:9092 >/dev/null 2>&1 &

- 创建日志收集流

# 创建phoneinfo_log流
CREATE STREAM phoneinfo_log_stream(phoneinfo STRING, tmp1 STRING, ip STRING, tmp2 STRING, phone_model STRING, tmp3 STRING,  phone_version STRING, tmp4 STRING,  area STRING, tmp5 STRING, start_time TIMESTAMP, tmp6 STRING, KDDI STRING, tmp7 STRING, app_source STRING, tmp8 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ''|'' TBLPROPERTIES("topic"="phoneinfolog_r1p3" ,"kafka.zookeeper"="10.104.0.227:2181","kafka.broker.list"="10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092"); 
# 创建behavior_log流
CREATE STREAM behavior_log_stream(eventid STRING, tmp1 STRING, ip STRING, tmp2 STRING, user_id STRING, tmp3 STRING,  user_name STRING, tmp4 STRING,  in_time TIMESTAMP, tmp5 STRING, operate_time TIMESTAMP, tmp6 STRING, phone_unicode STRING, tmp7 STRING, trigger_count STRING, tmp8 STRING, diff_in_oper INT, tmp9 STRING, tel_no STRING, tmp10 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ''|'' TBLPROPERTIES("topic"="behaviorlog_r1p3" ,"kafka.zookeeper"="10.104.0.227:2181","kafka.broker.list"="10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092"); 

- 创建日志表

# 创建phoneinfo_log表
CREATE TABLE phoneinfo_log_tab(phone STRING, ip STRING, phone_model STRING, phone_version STRING, area STRING, start_time TIMESTAMP, KDDI STRING, app_source STRING);
# 创建behavior_log表
CREATE TABLE behavior_log_tab(eventid STRING, ip STRING, user_id STRING, user_name STRING,  in_time TIMESTAMP,  operate_time TIMESTAMP,  
phone_unicode STRING, trigger_count STRING, diff_in_oper INT, tel_no STRING);
为防止小文件过多,进行以下设置:
set streamsql.enable.hdfs.batchflush = true # 打开批量flush开关
set streamsql.hdfs.batchflush.size = <num> #设置一次flush的消息个数,消息量达到该参数时flush一次
set streamsql.hdfs.batchflush.interval.ms = <num> #设置每过多长时间(单位为毫秒)flush一次 
# 需满足 batchflush.size 和 batchflush.interval.ms 其中的一个条件即会触发一次flush

- 启动日志流

# 触发phoneinfo_log_stream流计算
INSERT INTO phoneinfo_log_tab SELECT phoneinfo, ip, phone_model, phone_version, area, start_time, KDDI, app_source FROM phoneinfo_log_stream;
# 触发behavior_log_stream流计算
INSERT INTO behavior_log_tab SELECT eventid, ip, user_id, user_name,  in_time,  operate_time, phone_unicode, trigger_count, diff_in_oper, tel_no FROM behavior_log_stream;

java8下spark-streaming结合kafka编程(spark 2.0 & kafka 0.10

java8下spark-streaming结合kafka编程(spark 2.0 & kafka 0.10

前面有说道spark-streaming的简单demo,也有说到kafka成功跑通的例子,这里就结合二者,也是常用的使用之一。

1.相关组件版本 
首先确认版本,因为跟之前的版本有些不一样,所以才有必要记录下,另外仍然没有使用scala,使用java8,spark 2.0.0,kafka 0.10。

2.引入maven包 
网上找了一些结合的例子,但是跟我当前版本不一样,所以根本就成功不了,所以探究了下,列出引入包。

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.0.0</version>
</dependency>
登录后复制

网上能找到的不带kafka版本号的包最新是1.6.3,我试过,已经无法在spark2下成功运行了,所以找到的是对应kafka0.10的版本,注意spark2.0的scala版本已经是2.11,所以包括之前必须后面跟2.11,表示scala版本。

3.SparkSteamingKafka类
需要注意的是引入的包路径是org.apache.spark.streaming.kafka010.xxx,所以这里把import也放进来了。其他直接看注释。

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

public class SparkSteamingKafka {
    public static void main(String[] args) throws InterruptedException {
        String brokers = "master2:6667";
        String topics = "topic1";
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

        Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        //kafka相关参数,必要!缺了会报错
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers) ;
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //Topic分区
        Map<TopicPartition, Long> offsets = new HashMap<>();
        offsets.put(new TopicPartition("topic1", 0), 2L); 
        //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
        JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets)
            );
        //这里就跟之前的demo一样了,只是需要注意这边的lines里的参数本身是个ConsumerRecord对象
        JavaPairDStream<String, Integer> counts = 
                lines.flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator())
                .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
                .reduceByKey((x, y) -> x + y);  
        counts.print();
//  可以打印所有信息,看下ConsumerRecord的结构
//      lines.foreachRDD(rdd -> {
//          rdd.foreach(x -> {
//            System.out.println(x);
//          });
//        });
        ssc.start();
        ssc.awaitTermination();
        ssc.close();
    }
}
登录后复制

4.运行测试
这里使用上一篇kafka初探里写的producer类,put数据到kafka服务端,我这是master2节点上部署的kafka,本地测试跑spark2。

立即学习“Java免费学习笔记(深入)”;

UserKafkaProducer producerThread = new UserKafkaProducer(KafkaProperties.topic);
producerThread.start();
登录后复制

再运行3里的SparkSteamingKafka类,可以看到已经成功。 

1.png

2.png

以上就是java8下spark-streaming结合kafka编程(spark 2.0 & kafka 0.10的详细内容,更多请关注php中文网其它相关文章!

java实现spark streaming与kafka集成进行流式计算

java实现spark streaming与kafka集成进行流式计算

java实现spark streaming与kafka集成进行流式计算

  • 2017/6/26补充:接手了搜索系统,这半年有了很多新的心得,懒改这篇粗鄙之文,大家看综合看这篇新博文来理解下面的粗鄙代码吧,http://blog.csdn.net/yujishi2/article/details/73849237。
  • 背景:网上关于spark streaming的文章还是比较多的,可是大多数用scala实现,因我们的电商实时推荐项目以java为主,就踩了些坑,写了java版的实现,代码比较意识流,轻喷,欢迎讨论。
  • 流程:spark streaming从kafka读用户实时点击数据,过滤数据后从redis读商品相似度矩阵,从db读user历史行为,实时计算兴趣度,并将结果写入redis一份,供api层读取展示,写入hdfs一份供离线计算准确率召回率。
  • 补充:据了解,大型实时推荐系统里面,协同过滤一般用作生成候选集,计算兴趣读会被ctr等策略的 rerank代替,在calculateinterest中调用在线rerank服务排序。
  • 12/13补充:召回不变,目前采用ctr预估加上规则排序,后续上ltr。

  • 废话少说,上代码:

public class Main {
    static final String ZK_QUORUM = "*.*.*.*:2181,*.*.*.*:2181,*.*.*.*:2181/kafka";
    static final String GROUP = "test-consumer-group";
    static final String TOPICSS = "user_trace";
    static final String NUM_THREAD = "64";

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("main.java.computingCenter");
        // Create the context with 2 seconds batch size
        //每两秒读取一次kafka
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

        int numThreads = Integer.parseInt(NUM_THREAD);
        Map<String, Integer> topicMap = new HashMap<String, Integer>();
        String[] topics = TOPICSS.split(",");
        for (String topic: topics) {
            topicMap.put(topic, numThreads);
        }

        JavaPairReceiverInputDStream<String, String> messages =
                KafkaUtils.createStream(jssc, ZK_QUORUM, GROUP, topicMap);



        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        });

        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterable<String> call(String lines) {
            //kafka数据格式:"{\"Topic\":\"user_trace\",\"PartitionKey\":\"0\",\"TimeStamp\":1471524044018,\"Data\":\"0=163670589171371918%3A196846178238302087\",\"LogId\":\"0\",\"ContentType\":\"application/x-www-form-urlencoded\"}";
                List<String> arr = new ArrayList<String>();
                for (String s : lines.split(" ")) {
                    Map j = JSON.parseObject(s);
                    String s1 = "";
                    String s2 = "";
                    try {
                        s1 = URLDecoder.decode(j.get("Data").toString(), "UTF-8");
                        s2 = s1.split("=")[1];
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                    arr.add(s2);
                }
                return arr;
            }
        });

        JavaPairDStream<String, String> goodsSimilarityLists = words.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String s) throws Exception {
                //过滤非法的数据
                if (s.split(":").length == 2) {
                    return true;
                }
                return false;
            }
        }).mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
            //此处分partition对每个pair进行处理
            @Override
            public Iterable<Tuple2<String, String>> call(Iterator<String> s) throws Exception {
                ArrayList<Tuple2<String, String>> result = new ArrayList<Tuple2<String, String>>();
                while (s.hasNext()) {
                    String x = s.next();
                    String userId = x.split(":")[0];
                    String goodsId = x.split(":")[1];
                    System.out.println(x);
                    LinkedHashMap<Long, Double> recommendMap = null;
                    try {
                        //此service从redis读数据,进行实时兴趣度计算,推荐结果写入redis,供api层使用
                        CalculateInterestService calculateInterestService = new CalculateInterestService();
                        try {
                            recommendMap = calculateInterestService.calculateInterest(userId, goodsId);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

                        String text = "";
                        int count = 0;
                        for (Map.Entry<Long, Double> entry : recommendMap.entrySet()) {
                            text = text + entry.getKey();
                            if (count == recommendMap.size() - 1) {
                                break;
                            }
                            count = count + 1;
                            text = text + "{/c}";
                        }

                        text = System.currentTimeMillis() + ":" + text;
                        result.add(new Tuple2<String, String>(userId, text));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

                return result;
            }
        });

        goodsSimilarityLists.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {
            @Override
            public Void call(JavaPairRDD<String, String> rdd) throws Exception {
                //打印rdd,调试方便
                System.out.println(rdd.collect());
                return null;
            }
        });

        JavaPairDStream<Text, Text> goodsSimilarityListsText = goodsSimilarityLists.mapToPair(new PairFunction<Tuple2<String, String>, Text, Text>(){
            @Override
            public Tuple2<Text, Text> call(Tuple2<String, String> ori) throws Exception {
                //此处要将tuple2转化为org.apache.hadoop.io.Text格式,使用saveAsHadoopFiles方法写入hdfs
                return new Tuple2(new Text(ori._1), new Text(ori._2));
            }
        });

        //写入hdfs
        goodsSimilarityListsText.saveAsHadoopFiles("/user/hadoop/recommend_list/rl", "123", Text.class, Text.class, SequenceFileOutputFormat.class);

        jssc.start();
        jssc.awaitTermination();

    }
}
public class CalculateInterestService {

    private String dictKey = "greate_item_sim_2.0";
    private String recommendTable = "great_recommend_table_2.0";
    static final String HIGO_BASE_URL = "jdbc:mysql://*.*.*.*:3212/*";
    static final String HIGO_BASE_USER = "*";
    static final String HIGO_BASE_PASS = "*";

    public LinkedHashMap<Long, Double> calculateInterest(String userId, String traceGoodsId) {
        LinkedHashMap<Long, Double> sortedMap = new LinkedHashMap<Long, Double>();
        String[] simGoods = RedisHelper.getInstance().hget(dictKey, traceGoodsId).split(",");
        //用户的历史记录,应该存action:goodsId:timestamp格式,要重构,bi写入单独的数据表中
        HashMap<Long, String> userTrace = null;
        try {
            userTrace = getUserTrace(userId);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            return sortedMap;
        }
        HashMap<Long, Double> recommendMap = new HashMap<Long, Double>();
        String[] simGoodsIds = new String[simGoods.length];
        for (int i = 0; i < simGoods.length; i++) {
            simGoodsIds[i] = simGoods[i].split(":")[0];
        }
        List<String> pSimGoodsIds = RedisHelper.getInstance().hmget(dictKey, simGoodsIds);
        HashMap<Long, String> predictSimGoodsIds = new HashMap<Long, String>();
        for (int i = 0; i < simGoodsIds.length; i++) {
            predictSimGoodsIds.put(Long.parseLong(simGoodsIds[i]), pSimGoodsIds.get(i));
        }
        for (String item : simGoods) {
            //need optimised

            Double totalSum = 0.0;
            Double sum = 0.0;
            Long originGoodsId = Long.parseLong(item.split(":")[0]);
            for (String predictGoods : predictSimGoodsIds.get(originGoodsId).split(",")) {
                Long goodsId = Long.parseLong(predictGoods.split(":")[0].toString());
                Double sim = Double.valueOf(predictGoods.split(":")[1].toString());
                totalSum = totalSum + sim;
                Double score = 0.0;
                if (!userTrace.containsKey(goodsId)) {
                    //TODO 用户评分矩阵过于稀疏,需要svd补充评分,暂时无评分score为默认0.1
                    userTrace.put(goodsId, "default");
                }
                String action = userTrace.get(goodsId);


                if (action.equals("click")) {
                    score = 0.2;
                } else if (action.equals("favorate")) {

                } else if (action.equals("add_cart")) {
                    score = 0.6;
                } else if (action.equals("order")) {
                    score = 0.8;

                } else if (action.equals("default")) {

                    score = 0.1;
                }
                //相似度词典应存 goodsid:sim格式,要重构
                sum = sum + score * sim;
            }

            Double predictResult = sum / totalSum;
            recommendMap.put(originGoodsId, predictResult);
        }

        //sort recommend list
        List<Map.Entry<Long, Double>> list = new ArrayList<Map.Entry<Long, Double>>(recommendMap.entrySet());
        Collections.sort(list, new Comparator<Map.Entry<Long, Double>>() {
            @Override
            public int compare(Map.Entry<Long, Double> o1, Map.Entry<Long, Double> o2) {
                return o2.getValue().compareTo(o1.getValue());
            }
        });

        Map.Entry<Long, Double> tmpEntry = null;
        Iterator<Map.Entry<Long, Double>> iter = list.iterator();
        while (iter.hasNext()) {
            tmpEntry = iter.next();
            sortedMap.put(tmpEntry.getKey(), tmpEntry.getValue());
        }

        writeRecommendListToRedis(userId, sortedMap);

        return sortedMap;

    }

    private HashMap<Long, String> getUserTrace(String userId) throws ClassNotFoundException {
        //SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
        Class.forName("com.mysql.jdbc.Driver");
        PreparedStatement stmt = null;
        Connection conn = null;
        UserTrace userTrace = new UserTrace();
        try {
            conn = DriverManager.getConnection(HIGO_BASE_URL, HIGO_BASE_USER, HIGO_BASE_PASS);
            String sql = "select * from t_pandora_goods_record where account_id=" + userId;
            stmt = (PreparedStatement)conn.prepareStatement(sql);
            ResultSet rs = stmt.executeQuery();
            while(rs.next()) {
                userTrace.setId(Long.parseLong(rs.getString(1)));
                userTrace.setAccountId(Long.parseLong(rs.getString(2)));
                userTrace.setGoodsIds(rs.getString(3));
                userTrace.setMtime(rs.getString(4));
            }
            stmt.close();
            conn.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

        String[] goodsActionTimestamp = userTrace.getGoodsIds().split(",");
        HashMap<Long, String> hm = new HashMap<Long, String>();
        for (String ac : goodsActionTimestamp) {
            Long goodsId = Long.parseLong(ac.split(":")[0]);
            //String action = ac.split(":")[1];
            //String timestamp = ac.split(":")[2];
            //hack 下一步要bi把用户历史行为写入表中, action:goodsId:timestamp格式, timestamp后期将参与权重计算
            String action = "click";
            hm.put(goodsId, action);
        }
        return hm;
    }

    private void writeRecommendListToRedis(String userId, LinkedHashMap<Long, Double> sortedMap) {
        String recommendList = "";
        int count = 0;
        for (Map.Entry<Long, Double> entry : sortedMap.entrySet()) {
            recommendList = recommendList + entry.getKey();
            if (count == sortedMap.size() - 1) {
                break;
            }
            count = count + 1;
            recommendList = recommendList + ",";
        }
        RedisHelper.getInstance().hset(recommendTable, userId, recommendList);
    }

}

kafka-sparkstreaming---学习1

kafka-sparkstreaming---学习1

---恢复内容开始---

import java.util.*;

import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;

/**
 */
public class KafkaSparkStreamingDemo {
    public static void main(String[] args) throws InterruptedException {

        SparkConf conf = new SparkConf();
        conf.setAppName("kafkaSpark");
        conf.setMaster("local[4]");
        //创建Spark流应用上下文
        JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Seconds.apply(5));

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "s202:9092,s203:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "g6");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        Collection<String> topics = Arrays.asList("mytopic1");

        final JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        streamingContext,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );

        //压扁
        JavaDStream<String> wordsDS = stream.flatMap(new FlatMapFunction<ConsumerRecord<String,String>, String>() {
            public Iterator<String> call(ConsumerRecord<String, String> r) throws Exception {
                String value = r.value();
                List<String> list = new ArrayList<String>();
                String[] arr = value.split(" ");
                for (String s : arr) {
                    list.add(s);
                }
                return list.iterator();
            }
        });

        //映射成元组
        JavaPairDStream<String, Integer> pairDS = wordsDS.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        //聚合
        JavaPairDStream<String, Integer> countDS = pairDS.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //打印
        countDS.print();

        streamingContext.start();

        streamingContext.awaitTermination();
    }
}

上面是java版。

---恢复内容结束---

关于kafka+spark streaming集成kafka整合sparkstreaming的介绍已经告一段落,感谢您的耐心阅读,如果想了解更多关于Flume1.7+Kafka+Streaming集成开发步骤、java8下spark-streaming结合kafka编程(spark 2.0 & kafka 0.10、java实现spark streaming与kafka集成进行流式计算、kafka-sparkstreaming---学习1的相关信息,请在本站寻找。

本文标签: