GVKun编程网logo

Kibana 5.2 中文文档 | 片刻 ApacheCN(apache中文网)(片刻网怎么了)

13

本文的目的是介绍Kibana5.2中文文档|片刻ApacheCN(apache中文网)的详细情况,特别关注片刻网怎么了的相关信息。我们将通过专业的研究、有关数据的分析等多种方式,为您呈现一个全面的了解

本文的目的是介绍Kibana 5.2 中文文档 | 片刻 ApacheCN(apache中文网)的详细情况,特别关注片刻网怎么了的相关信息。我们将通过专业的研究、有关数据的分析等多种方式,为您呈现一个全面的了解Kibana 5.2 中文文档 | 片刻 ApacheCN(apache中文网)的机会,同时也不会遗漏关于004.hive 命令的 3 种调用方式 | ApacheCN (apache 中文网)、005.hive: str_to_map使用案例 | ApacheCN(apache中文网)、2016 杭州·云栖大会 PDF 下载 | ApacheCN(apache中文网)、Apache Spark 2.2.0 中文文档 - GraphX Programming Guide | ApacheCN的知识。

本文目录一览:

Kibana 5.2 中文文档 | 片刻 ApacheCN(apache中文网)(片刻网怎么了)

Kibana 5.2 中文文档 | 片刻 ApacheCN(apache中文网)(片刻网怎么了)

 

 

  • ApacheCN | apache中文网
  • 官方文档

    • Kibana 5.2 中文文档

      • 介绍

      • 设置

        • 安装 Kibana

        • 配置 Kibana

        • Docker 上运行 Kibana

        • 访问 Kibana

        • 关联 Kibana 与 Elasticsearch

        • 与 Tribe nodes 一起使用 Kibana

        • 在生产环境中使用 Kibana

        • 升级 Kibana

      • 重大改变

        • 5.0 中的重大改变

      • 入门指南

        • 加载样本数据

        • 定义 index pattern(索引模式)

        • Discover(发现)数据

        • Visualizing(可视化)数据

        • 将它们全部放入 Dashboard(仪表盘)

        • 总结

      • Discover(发现)

        • 设置时间过滤器

        • 搜索数据

        • 字段过滤器

        • 查看文档数据

        • 查看字段数据的统计信息

      • Visualize(可视化)

        • 创建可视化

        • Area Charts(面积图)

        • Data Table(数据表)

        • Line Charts(折线图)

        • Markdown Widget(小部件)

        • Metric(度量)

        • Pie Charts(饼图)

        • Tile Maps(平铺地图)

        • Vertical Bar Charts(垂直条形图)

        • Tag Clouds(标签云)

        • Heatmap Chart(热力图)

      • Dashboard(仪表盘)

        • 构建仪表盘

        • 加载仪表板

        • 分享仪表盘

      • Timelion

        • 创建时间序列可视化

      • Console(控制台)

        • 多请求支持

        • 自动格式化

        • 键盘快捷键

        • 历史

        • 控制台设置

        • 配置控制台

      • Management(管理)

        • Index Pattern(索引模式)

        • Managing Fields(管理字段)

        • 设置高级选项

        • 管理保存的搜索,可视化和仪表盘

      • Kibana 插件

        • 安装插件

        • 更新和删除插件

        • 禁用插件

        • 配置插件管理器

        • 已知插件

      • 版本说明

        • 5.2.2 版本说明

        • 5.2.1 版本说明

        • 5.2.0 版本说明

        • 5.1.2 版本说明

        • 5.1.1 版本说明

        • 5.1.0 版本说明

        • 5.0.2 版本说明

        • 5.0.1 版本说明

        • 5.0.0 版本说明

  • 资源下载

    • 文档下载

  • 最近更新

  • 贡献者

  • 关于我们

  • 加入我们

 

004.hive 命令的 3 种调用方式 | ApacheCN (apache 中文网)

004.hive 命令的 3 种调用方式 | ApacheCN (apache 中文网)

ApacheCN | apache 中文网


hive 命令的 3 种调用方式 

官网地址:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli(可参考)

方式 1:hive –f  /root/shell/hive-script.sql(适合多语句)

 

hive-script.sql 类似于 script 一样,直接写查询命令就行

 

例如:

[root@cloud4 shell]# vi hive_script3.sql

select * from t1;

select count(*) from t1;

不进入交互模式,执行一个 hive script 

这里可以和静音模式 -S 联合使用 , 通过第三方程序调用,第三方程序通过 hive 的标准输出获取结果集。

 $HIVE_HOME/bin/hive -S -f /home/my/hive-script.sql (不会显示 mapreduct 的操作过程)

那么问题来了:如何传递参数呢?

demo 如下:

start_hql.sh  内容:

#!/bin/bash
# -S 打印输出 mapreduce 日志
hive \
  -hivevar id=1  \
  -hivevar col2=2  \
  -S -f test.sql

test.sql 内容:
-- 数据库
use tmp; 
-- 表名
select * 
from tmp_jzl_20140725_test11
where 
id=''${hivevar:id}'' and col2=''${hivevar:col2}'';
 

方式 2:hive -e  ''sql 语句 ''(适合短语句)

直接执行 sql 语句 

例如:
[root@cloud4 shell]# hive -e ''select * from t1''
静音模式:

[root@cloud4 shell]# hive -S -e''select * from t1''  (用法与第一种方式的静音模式一样,不会显示 mapreduce 的操作过程)
此处还有一亮点,用于导出数据到 linux 本地目录下

例如:

[root@cloud4 shell]# hive -e ''select * from t1''  > test.txt
有点类似 pig 导出分析结果一样,都挺方便的

方式 3:hive (直接使用 hive 交互式模式)

都挺方便的
介绍一种有意思的用法:
1.sql 的语法

 

#hive     启动

 

hive>quit;     退出 hive

hive> show databases;   查看数据库

hive> create database test;  创建数据库

hive> use default;    使用哪个数据库

hive>create table t1 (key string); 创建表
对于创建表我们可以选择读取文件字段按照什么字符进行分割
例如:
hive>create table t1(id ) ''/wlan'' 
partitioned by (log_date string)  表示通过 log_date 进行分区
row format delimited fields terminated by ''\t''   表示代表用‘\t’进行分割来读取字段
stored as textfile/sequencefile/rcfile/;  表示文件的存储的格式

存储格式的参考地址:http://blog.csdn.net/yfkiss/article/details/7787742
textfile 默认格式,数据不做压缩,磁盘开销大,数据解析开销大。
可结合 Gzip、Bzip2 使用(系统自动检查,执行查询时自动解压),但使用这种方式,hive 不会对数据进行切分,从而无法对数据进行并行操作。
实例:

[plain] view plaincopy

  1. > create table test1(str STRING)  

  2. > STORED AS TEXTFILE;   

  3. OK  

  4. Time taken: 0.786 seconds  

  5. #写脚本生成一个随机字符串文件,导入文件:  

  6. > LOAD DATA LOCAL INPATH ''/home/work/data/test.txt'' INTO TABLE test1;  

  7. Copying data from file:/home/work/data/test.txt  

  8. Copying file: file:/home/work/data/test.txt  

  9. Loading data to table default.test1  

  10. OK  

  11. Time taken: 0.243 seconds  

SequenceFile 是 Hadoop API 提供的一种二进制文件支持,其具有使用方便、可分割、可压缩的特点。
SequenceFile 支持三种压缩选择:NONE, RECORD, BLOCK。 Record 压缩率低,一般建议使用 BLOCK 压缩。
示例:

[plain] view plaincopy

  1. > create table test2(str STRING)  

  2. > STORED AS SEQUENCEFILE;  

  3. OK  

  4. Time taken: 5.526 seconds  

  5. hive> SET hive.exec.compress.output=true;  

  6. hive> SET io.seqfile.compression.type=BLOCK;  

  7. hive> INSERT OVERWRITE TABLE test2 SELECT * FROM test1;   

rcfile 是一种行列存储相结合的存储方式。首先,其将数据按行分块,保证同一个 record 在一个块上,避免读一个记录需要读取多个 block。其次,块数据列式存储,有利于数据压缩和快速的列存取。RCFILE 文件示例:

 

 

实例:

[plain] view plaincopy

  1. > create table test3(str STRING)  

  2. > STORED AS RCFILE;  

  3. OK  

  4. Time taken: 0.184 seconds  

  5. >  INSERT OVERWRITE TABLE test3 SELECT * FROM test1;  

 

总结:
相比 textfile 和 SequenceFile,rcfile 由于列式存储方式,数据加载时性能消耗较大,但是具有较好的压缩比和查询响应。数据仓库的特点是一次写入、多次读取,因此,整体来看,rcfile 相比其余两种格式具有较明显的优势。
 

 

hive>show tables;  查看该数据库中的所有表

hive>show tables  ‘*t*’;    // 支持模糊查询

 

hive>show partitions t1;  // 查看表有哪些分区 

hive>drop table t1 ; 删除表

hive 不支持修改表中数据,但是可以修改表结构,而不影响数据

 

有 local 的速度明显比没有 local 慢:

 

hive>load data inpath ''/root/inner_table.dat'' into table t1;   移动 hdfs 中数据到 t1 表中

hive>load  data local inpath ''/root/inner_table.dat'' into table t1;  上传本地数据到 hdfs 中

hive> !ls;  查询当前 linux 文件夹下的文件
hive> dfs -ls /; 查询当前 hdfs 文件系统下  ''/'' 目录下的文件

希望通过共享自己的笔记,来找到一群和我一样愿意分享笔记和心得的朋友,让大家一起进步

我的 QQ:529815144,外号:小头

 

005.hive: str_to_map使用案例 | ApacheCN(apache中文网)

005.hive: str_to_map使用案例 | ApacheCN(apache中文网)

ApacheCN | apache中文网

使用说明:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

map<string,string>

str_to_map(text[, delimiter1, delimiter2])

Splits text into key-value pairs using two delimiters. Delimiter1 separates text into K-V pairs, and Delimiter2 splits each K-V pair. Default delimiters are '','' for delimiter1 and ''='' for delimiter2.

案例1:

hive> 

    > select str_to_map(''aaa:11&bbb:22'', ''&'', '':'')
    > from tmp.tmp_jzl_20140725_test11;
OK

{"bbb":"22","aaa":"11"}

案例2:

hive> select str_to_map(''aaa:11&bbb:22'', ''&'', '':'')[''aaa'']
    > from tmp.tmp_jzl_20140725_test11;

OK
11

2016 杭州·云栖大会 PDF 下载 | ApacheCN(apache中文网)

2016 杭州·云栖大会 PDF 下载 | ApacheCN(apache中文网)

ApacheCN | apache中文网

2016 杭州·云栖大会 PDF 下载:

http://www.apache.wiki/pages/viewpage.action?pageId=2888842

预览图片如下,请点击链接跳转过去,全面支持下载,不受限制

(草。有几个装逼仔,一直在下载,把带宽跑满了,现在大家都访问不了,被迫下线处理 )

本来想做个好事,没想到。。

有兴趣的朋友在群共享里面下载吧,sorry .. QQ 群 : 214293307(ApacheCN 技术交流装逼专用)

进入文件夹,点击下载就行

(还是腾讯的网速给你,如果大家觉得满意,点个赞,让更多的人能下载到这些资源)

 

资料下载规则说明 : 

  • Apache CN(apache中文网)(http://www.apache.wiki)仅提供资源整理分享,资源所有权归原作者所有,与本网站立场无关,如有侵犯,请联系我们立即删除。
  • 每位用户支持最多下载5份演讲资料;
  • 所有下载资料未经阿里云授权,禁止私自传播转发或用作商业用途。阿里云一经发现有权要求下线并保留进一步追责权利。

 

大会日期

10月13日

10月14日

10月15日

10月16日

源网址 : https://yunqi.aliyun.com/2016/hangzhou/download?spm=5176.7944664.539453.1.StkkrI

打包下载地址 : http://download.apache.wiki/share/

Note : 支持在线浏览

主论坛

“电子商务”将消失,新五通一平将引领未来.pdf

马云 阿里巴巴集团董事局主席

飞天进化.pdf

王坚 阿里巴巴集团技术委员会主席

计算 探索未知的价值.pdf

胡晓明 阿里云总裁

飞天操作系统,中国科技的创新力量.pdf

唐洪 阿里云首席架构师

人工智能与产业变革.pdf

周靖人 阿里云首席科学家

计算,让想象力无限.pdf

张建锋 阿里巴巴集团首席技术官

阿里云重磅产品发布.pdf

李津 阿里云资深总监

分会场

阿里云生态峰会

云合2.0发布——阿里云生态云图.pdf

阿里云研究院人工智能报告.pdf

数据市场-大数据生态的滋养地.pdf

蚂蚁金蝶动技峰会

新技术在数字普惠金融的创新应用.pdf

人工智能驱动金融生活.pdf

云服务助力蚂蚁开放生态.pdf

基于DT、生物识别技术的金融安全实践.pdf

支付宝支付专场

变革:支付宝的支付进化论.pdf

核心 线下支付技术集成关键点.pdf

实话实说:支付商业变现解决方案.pdf

域名专场

“域”见更美好的未来:域名筑梦互联网+.pdf

“域”罢不能:创新改变行业.pdf

“域”感:域名投资从入门到精通.pdf

“域”数临风:域名大数据的应用.pdf

网站安全从云解析开始.pdf

诚.xin赢天下.pdf

创业公司的域名想象.pdf

数字营销专场

U-DMP:【友盟+】数据能力开放新篇章.pdf

数据营销大变革.pdf

阿里云人工智能实战之机器学习优化下的营销策略.pdf

阿里云数字营销服务的加速器.pdf

当大数据遇到营销.pdf

品友-撮合经济-人工智能-大数据与开放生态.pdf

视频云专场

构建智能化的视频系统.pdf

玩转海量音视频云存储.pdf

云盾为视频系统筑建安全堡垒.pdf

视频服务特色解决方案助力开发者0编码整合视频能力.pdf

画质重生,来一场视觉盛宴.pdf

智能监控:乐橙,智能监控玩转直播场景化.pdf

让每个APP都拥有直播.pdf

电商专场

 阿里电商云介绍与实践.pdf

 同程带您云端翱翔.pdf

 电商端到端数据精细化运营.pdf

数字营销助力传统企业通往电商之路.pdf

 数据驱动的互联网客服解决方案.pdf

 进口品电商的上云之路.pdf

 电商安全的攻防实战拆解.PDF

智能工业专场

工业互联网发展与工业互联网联盟展望.pdf

基于SAP HANA内存计算的工业物联网云计算方案.pdf

徐工集团的工业大数据之路.pdf

从德国工业4.0看传统机械装备智能化升级.pdf

2016杭州·云栖大会 - 工业大数据与云制造.pdf

从制造走向服务:制造业服务化转型之路.pdf

教育专场

DT时代互联网+教育生态.pdf

国家开放大学“六网融通”教学模式的实践与探索.pdf

回归职业教育的本质.pdf

云计算_大数据人才培养实践.pdf

校企融合的高校云计算人才培养方案.pdf

阿里云在线教育解决方案.pdf

位置服务专场

高德Inside:位置·共享.pdf

数据共享解码真实世界.pdf

位置服务在蚂蚁金服的个性化应用.pdf

咕咚:位置服务,数据如何让运动更好玩有趣.pdf

大数据专场

阿里云数加大事记.pdf

如何打造智能零售O2O.pdf

工业云促进智慧水务发展.pdf

大数据和人工智能技术在旅游行业的应用.pdf

基于数加MaxCompute的极速基因组数据分析.pdf

基于数加平台的城市停车公有云解决方案.pdf

基于阿里云与数加平台的全渠道电商中台搭建.pdf

基于数加的大数据仓库解决方案.pdf

基于数加的B2B客户营销解决方案.pdf

基于数加平台的云征信应用.pdf

云通信专场

通信的过去现在未来.pdf

智能语音交互开启服务智能化.pdf

南京昆石_构建简单安全_应用丰富的云通信平台.pdf

天天快递:直达客户,贴身服务.pdf

亿级场景化的实时音视频通讯.pdf

国际专场:创新&安全&大数据

 Cloud Industry Ecosystem of Korea.pdf

 数据驱动创新——印尼最大电商平台.pdf

 阿里云安全产品重磅发布&演讲.pdf

云栖社区开发者技术峰会——开源技术专场

阿里开源发展与生态.pdf

Open Source Docker.pdf

Weex项目&阿里移动技术开源方向.pdf

How to create a successful open source project.pdf

中国人的数据库分支:ApsaraDB AliSQL开源思路.pdf

企业互联网转型峰会

企业互联网转型中大数据建构.pdf

中石化互联网转型思考与实践.pdf

英特尔专场

英特尔支持的创客运动&VR第四维度触觉设计.pdf

英特尔实感技术让您的创新如虎添翼.pdf

昊翔台风H Realsense.pdf

FPGA 在数据中心的应用.pdf

Cloud Storage Innovations.pdf

在中国实现24小时精准医疗.pdf

基于 Apache Spark的大规模分布式机器学习实践.pdf

下一代服务机器人的应用和创新.pdf

机器人的小脑SLAMWARE.pdf

云安全专场

云上的世界为什么更安全.pdf

基于云等保的安全责任分担模型及解读.pdf

你的数据你做主.pdf

女性移动app的安全攻防战.pdf

Check Point云安全视角.pdf

让云和安全更简单.pdf

千寻峰会

触摸时空的力量.pdf

驾驭时空:中国首辆自动驾驶低速电动车发布.pdf

基因计算专场

进化:构建基因产业的基础设施.pdf

服务:数据驱动的基因组分析与解读.pdf

抗击:大数据分析助力遗传病肿瘤精准医疗.pdf

计算:精准医疗的变革力量和创新阵地.pdf

阿里云生态合作伙伴专场

时代巨变,初心不改 小微企服正当时.pdf

一个传统集成商的转型尝试.pdf

能源云.企业的下一站.pdf

共建懂政企的云服务生态.pdf

云生态折叠 空间转换的企业级混合云数据中心.pdf

投资即服务——立于云端 把握风口.pdf

DeepStream:GPU加速海量视频数据智能处理.pdf

口碑O2O生态服务专场

本地生活服务电商市场的精准营销.pdf

线下商业+互联网逻辑下的快消行业变革.pdf

DT时代大型企业智能转型专场

DT开启企业智能转型.pdf

DT 新能源——新能源时代的大数据应用.pdf

基于大数据的企业智能采购平台.pdf

智能语音客服——大型企业的客服智能化探.pdf

跑在“云端”的智能工业服务平台.pdf

工业品大数据智能电商——中国化工电商平台.pdf

游戏云引擎专场

云时代游戏的无限精彩.pdf

云引擎+——一键部署 动态调配.pdf

跨界三端 助力H5腾飞.pdf

如何高效的制作主机次时代游戏.pdf

大数据让游戏厂商精准投放有理有据.pdf

商用虚拟现实内容的突破口.pdf

人工智能专场

人工智能启示录.pdf

配送智能化之路.pdf

海洋大数据及其在海洋渔业中的应用.pdf

基于物流透明的大数据应用.pdf

智能语音交互:大众身边的AI.pdf

智变:人工智能革新客服行业的实践.pdf

视觉大数据智能计算实践——从实验室到真实世界.pdf

飞天服务专场专

专 新 智 惠.pdf

共赢——阿里云一站式企业管理服务.pdf

企业上云——面向P2P金融、物流行业的上云实战.pdf

数据库上云经典案例分析.pdf

EasyDB for Oracle——基于阿里云的Oracle最佳实践.pdf

云时代的安全服务.pdf

电商平台优化纪实.pdf

B+时代专场

B+时代:DT时代B2B技术演进.pdf

云中行走,垂直电商助推器——云电商平台介绍.pdf

B2B开放平台——构建企业供应链网络.pdf

阿里巴巴企业诚信体系——从大数据到场景应用.pdf

大数据+算法”助力B2B未来商业.pdf

全球化下语言服务平台生态体系下的智能服务.pdf

B+时代,移动办公的创新与应用.pdf

智慧物流专场

菜鸟网络系统设计及优化.pdf

描绘未来——物流云2.0.pdf

物流平台的实时优化算法.pdf

大规模云上调度实践.pdf

跨境多地域云仓储整合方案.pdf

容器技术专场(上午)

阿里云容器服务重要发布.pdf

Docker技术趋势解读.pdf

蚂蚁金服在Docker网络技术领域的探索和实践.pdf

淘宝移动技术实践&开放专场 (上午)

后APP时代-淘宝移动中台技术开源开放探索.pdf

淘宝直播的电商互动之路.pdf

移动网络体验升级—手淘海量移动网络服务的探索.pdf

手淘Native容器化框架发展和思考.pdf 

手机淘宝 H5 和 Weex 容器的构建实践.pdf

蚂蚁开放平台技术专场

蚂蚁开放技术路线及行业实践.pdf

开放技术保障的潘多拉魔盒.pdf

蚂蚁聚宝Android秒级编译——Freeline.pdf

蚂蚁聚宝虫洞项目技术方案介绍.pdf

余额宝超大规模基金文件处理.pdf

深信服专场

云计算等保标准解读.pdf

如何保障云上业务的安全.pdf

云平台及租户业务安全性研究思考.pdf

政务云安全建设实践.pdf

安踏安全建设探索之路.pdf

云数据中心建设实践.pdf

千里之外,洞悉风险.pdf

构建甲方视角的威胁情报体系.pdf

气象专场

中国天气风险管理的现在与未来.pdf

机器能预测多久之后的天气?.pdf

智慧出行的商业气象服务.pdf

区块链专场

区块链应用的法律视角和发展趋势.PDF

法大大存证邮——产品服务介绍.PDF

区块链下的金融新生态.PDF

区块链电子存证——打造 中国LawTech创新格局.PDF

法链背后的秘密—— onchain 分布式账本框架.PDF

游戏专场

UC国际如何构建国际共赢生态-帮助开发者出海.pdf

资本下的移动出海.pdf

手游出海的机遇与挑战.pdf

开源数据库之Greenplum专场

Greenplum开源的这一年.pdf

结合Greenplum实现混合BI分析.pdf

Greenplum深度优化技巧.pdf

让数据从PostgreSQL流动到Greenplum.pdf

MaxCompute 2.0专场

MaxCompute 2.0 overview.pdf

MaxCompute SQL 2.0:运算引擎及其实现原理.pdf

非结构化数据在MaxCompute上的处理.pdf

城市大脑在MaxCompute上的海量视频分析.pdf

PAI分布式机器学习平台.pdf

MaxCompute大数据生态集成和开发工具.pdf

MaxCompute Going forward.pdf

开源数据库之Redis专场

阿里云Redis技术架构简介及后续规划.pdf

高德经典数据库实践案例分享.pdf

新浪微博的Redis定制化之路.pdf

Redis在唯品会的应用实践.pdf

Redis集群演化的心路历程.pdf

聚安全专场

APP加固的新方向——全量混淆和瘦身.pdf

互联网企业通用业务安全防控体系建设.pdf

浅谈新形势下的网络安全创业.pdf

基于深度学习的内容安全检测与管控.pdf

YunOS IoT峰会

 主题演讲:连接万物·无处不在.pdf 

芯发现·新开始:云上芯片服务平台发布.pdf

你的专注·我的坚持:YunOS for Phone 新品发布01.pdf

你的专注·我的坚持:YunOS for Phone 新品发布02.pdf

创造·改变:新品类合作发布.pdf

这是现在·亦是未来:互联生态硬件重磅发布01.pdf

这是现在·亦是未来:互联生态硬件重磅发布02.pdf

探索·未知:人工智能产品重磅发布.pdf

YunOS Developer 专场

APICloud:服务CAF开发者.pdf

案例:从Web到Cloud App.pdf

案例:打造超越用户期待的服务体验.pdf

案例:掌阅+YunOS=悦读+.pdf

案例:如何打造千万流水的游戏.pdf

案例:厂商如何拥抱移动互联网运营.pdf

容器技术专场(下午)

Docker@Alibaba.pdf

构建基于docker的基因数据分析应用生态系统.pdf

NetViz_ 重新定义对微服务的网络分析.pdf

学霸君的微服务探索——基于阿里云容器的架构设计.pdf

阿里金融云Fintech峰会

金融云时代:计算 信任 连接 .pdf

对话:大数据机器人在金融智能客服的应用.pdf

金融机构互联网转型的平台支持.pdf

YunOS Intelligence专场

协同智能使不可能成为可能.pdf

The Sixth Sense of Self-Driving Cars.pdf

YunOS让机器感知和理解世界.pdf

时空感知 下一代YunOS位置服务.pdf

YunOS让机器跟人进行口语交互.pdf

From Passive to Proactive Deep Push on YunOS.pdf

法律之光专场

企业大数据如何助力司法提效.pdf

大数据风控在互联网和金融领域的探索和实践.pdf

人工智能开启法律服务新篇章.pdf

融合性司法行政新体系.pdf

YunOS@Home专场

阿里智能赋能智能家电.pdf

阿里智能助力惠而浦 打造智能新生活.pdf

开放互联,阿里智能万物互联解决方案.pdf

你好,Pal——阿里智能打造全新语音交互解决方案.pdf

统一标准,共建生态——智能物联标准白皮书发布.pdf

YunOS on Chip专场

“芯”之所向:展讯布局YunOS芯片基线.pdf

云市场专场

云市场,汇聚云计算生态核能.pdf

淘宝移动技术实践&开放专场(下午)

云时代下的性能优化&运维实践之路.pdf

从趋势、工具或相关的技术点谈APP的优化.pdf

阿里百川移动电商业务开放体系.pdf

APP自媒体商业化演进之路.pdf

助力移动App从0到N— 解读阿里百川移动开放平台.pdf

开源数据库之MongoDB专场

阿里云MongoDB.pdf

MongoDB多数据中心的方案选型之路.pdf

MongoDB疑难杂症的分析和优化.pdf

Terark—重新定义数据技术.pdf

MongoDB高级设计模式:数据即服务.pdf

开源数据库之Mysql专场

MariaDB ColumnStore.pdf

The State of the Art of MySQL Replication.pdf

如何让MySQL保持高效的N个好习惯.pdf

AliSQL行业解决方案和上云案例.pdf

AliSQL内核定制方案.pdf

大型企业管理创新专场

利用新技术助力企业商业重构与IT转型.pdf

国际专场:阿里云助力中国客户出海

如何走向MENA(中东北非).pdf

小影海外探索之路.pdf

基于云的实时音视频通信系统.pdf

Video++ 直播+专场

直播对视频直播产业的破局解构.pdf

用视频创新模式触摸科技测评的边界.pdf

直播场景的新运用.pdf

开发者技术峰会-架构

我看分布式系统发展和阿里实践.pdf

微博混合云,极端流量下的峰值应对与架构挑战.pdf

千万级用户直播App服务端架构设计和思考.pdf

阿里智能助理在电商领域的架构构建与实践.pdf

智能硬件专场

聊聊涂鸦、云、硬件和平台那些事.pdf

智能硬件的底层服务.pdf

智能硬件“出海”之路.pdf

2016智能家居生态市场分析.pdf

企业服务专场

互联网时代,中大型企业云端文件管理最佳实践.pdf

服务中大型企业:基于PaaS的一体化HR SaaS软件.pdf

通过SaaS CRM实现中大型企业销售全流程管理.pdf

GrowingIO 用数据驱动增长的秘密.pdf

YunOS for Work

YunOS for Work.pdf

YunOS 产品方案.pdf

YunOS开发生态介绍.pdf

有效的互联网+教育.pdf

让学习有效发生.pdf

医疗云专场

阿里云在医疗的这一年.pdf

互联网+时代大型公立医院分级诊疗探索与实践.pdf

人工智能破局点:医疗数据的结构化之路.pdf

医疗云体系架构.pdf

基于影像的计算机辅助诊断在医疗的初探.pdf

云端医疗,用数据智能升级产业价值.pdf

梅奥百年医疗数据库在华实践.pdf

云效专场

 云效Docker技术实践-我们的CI与CD.pdf

五矿电商云效实践之路.pdf

挖掘效能之矿-众安保险云效实践分享.pdf

研发效能·进化-云效新品发布.pdf

How To Make Your Organization Agile.pdf

SaaS增长专场

增长和留存的秘密.pdf

如何由传统增长走向SaaS增长.pdf

SaaS产品的客户成功之道.pdf

SaaS企业的三种成长个性.pdf

开源无线和前端专场

企业级 Node 基础框架——EGG.pdf

Weex开源经验谈.pdf

南方航空Macaca技术实践.pdf

Hilo——电商互动游戏引擎.pdf

基础设施专场

软硬结合的云硬件优化实践.pdf

阿里绿色数据中心之路.pdf

复杂网络故障智能处理.pdf

阿里基础设施智能化实践.pdf

企业SaaS专场

中国SaaS市场现状和最值得关注的创投趋势.pdf

数据智能驱动的企业增长之道.pdf

如何从市场中获取高值客户.pdf

中国需要什么样的协作文档.pdf

够快云库——文件协作从这里开始.pdf

电子签约如何驱动企业高效运营.pdf

数空科技专场

五叶草大数据私董会 五叶草大数据指数.pdf

云开雾散,大数据行业精准发力案例分享.pdf

大数据是水,逐水而居.pdf

阿里云大数据技术开发.pdf

大数据可视化与自然之美.pdf

数据市场 大数据生态的滋养地.pdf

阿里云大数据职业认证培训.pdf

Serverless专场

阿里云Serverless的解决方案 .pdf

函数计算-事件驱动的serverless计算平台.pdf

使用API网关快速开放Serverless服务.pdf

基于阿里云容器服务实现Serverless服务.pdf

Serverless日志处理挑战与方案.pdf

消息服务在Serverless中的应用.pdf

基于数加平台的大数据Serverless 实践.pdf

阿里汽车专场

阿里车主数据的探索和实践.pdf

开源大数据技术专场(下午)

Deep Dive into Catalyst.pdf

Apache HDFS之最新进展.pdf

梨视频基于E-MapReduce大数据推荐系统.pdf

分布式流处理框架功能对比及性能评估.pdf

Hadoop存储与计算分离实践.pdf

第一财经专场

消费升级-母婴商家达人的新机遇.pdf

借势大促数据驱动运营.pdf

数据电商.pdf

数据化精准营销.pdf

开源大数据技术专场(上午)

Hadoop的过去现在和未来.pdf

阿里巴巴Spark实践与探索.pdf

阿里巴巴HBase的一些实践与探索.pdf

阿里巴巴实时计算平台_JStorm_Turbo.pdf

蚂蚁对云数据库的实践分享专场

OceanBase 1.0:云服务的新一代通用关系型数据库.pdf

OceanBase 1.0:分布式技术架构.pdf

OceanBase 1.0:蚂蚁业务架构及运维实践.pdf

开放医疗与健康专场

高血压云服务生态的健康梦.pdf

创客专场

科技之巅,云栖之路.pdf

strikingly的创业启示.pdf

开源数据库之 PostgreSQL专场

PostgreSQL 9.6最新特性及未来展望.pdf

PostgreSQL助力 小微企业管理系统变革.pdf

PostgreSQL 物联网六脉神剑.pdf

金融级PostgreSQL数据库全方位监控及优化.pdf

从Uber切换Potgres说起.pdf

 

Apache Spark 2.2.0 中文文档 - GraphX Programming Guide | ApacheCN

Apache Spark 2.2.0 中文文档 - GraphX Programming Guide | ApacheCN

GraphX Programming Guide

  • 概述
  • 入门
  • 属性 Graph
    • 示例属性 Graph
  • Graph 运算符
    • 运算符的汇总表
    • Property 运算符
    • Structural 运算符
    • Join 运算符
    • 邻域聚合
      • 聚合消息 (aggregateMessages)
      • Map Reduce Triplets Transition Guide (Legacy)
      • 计算级别信息
      • 收集相邻点
    • Caching and Uncaching
  • Pregel API
  • Graph 建造者
  • Vertex and Edge RDDs
    • VertexRDDs
    • EdgeRDDs
  • 优化表示
  • Graph 算法
    • PageRank
    • 连接组件
    • Triangle 计数
  • 示例

GraphX

概述

GraphX 是 Spark 中用于图形和图形并行计算的新组件。在高层次上, GraphX 通过引入一个新的图形抽象来扩展 Spark RDD :一种具有附加到每个顶点和边缘的属性的定向多重图形。为了支持图形计算,GraphX 公开了一组基本运算符(例如: subgraph ,joinVertices 和 aggregateMessages)以及 Pregel API 的优化变体。此外,GraphX 还包括越来越多的图形算法 和 构建器,以简化图形分析任务。

入门

首先需要将 Spark 和 GraphX 导入到项目中,如下所示:

import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD

如果您不使用 Spark 外壳,您还需要一个 SparkContext。要了解有关 Spark 入门的更多信息,请参考 Spark 快速入门指南。

 

属性 Graph

属性 Graph 是一个定向多重图形,用户定义的对象附加到每个顶点和边缘。定向多图是具有共享相同源和目标顶点的潜在多个平行边缘的有向图。支持平行边缘的能力简化了在相同顶点之间可以有多个关系(例如: 同事和朋友)的建模场景。每个顶点都由唯一的 64 位长标识符( VertexId )键入。 GraphX 不对顶点标识符施加任何排序约束。类似地,边缘具有对应的源和目标顶点标识符。

属性图是通过 vertex (VD) 和 edge (ED) 类型进行参数化的。这些是分别与每个顶点和边缘相关联的对象的类型。

当它们是原始数据类型(例如: int ,double 等等)时,GraphX 优化顶点和边缘类型的表示,通过将其存储在专门的数组中来减少内存占用。

在某些情况下,可能希望在同一个图形中具有不同属性类型的顶点。这可以通过继承来实现。例如,将用户和产品建模为二分图,我们可能会执行以下操作:

class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null

像 RDD 一样,属性图是不可变的,分布式的和容错的。通过生成具有所需更改的新图形来完成对图表的值或结构的更改。请注意,原始图形的大部分(即,未受影响的结构,属性和索引)在新图表中重复使用,可降低此内在功能数据结构的成本。使用一系列顶点分割启发式方法,在执行器之间划分图形。与 RDD 一样,在发生故障的情况下,可以在不同的机器上重新创建图形的每个分区。

逻辑上,属性图对应于一对编码每个顶点和边缘的属性的类型集合 (RDD)。因此,图类包含访问图形顶点和边的成员:

class Graph[VD, ED] {
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
}

VertexRDD[VD] 和 EdgeRDD[ED] 分别扩展了 RDD[(VertexId, VD)] 和 RDD[Edge[ED]] 的优化版本。 VertexRDD[VD] 和 EdgeRDD[ED] 都提供了围绕图形计算和利用内部优化的附加功能。 我们在顶点和边缘 RDD 部分更详细地讨论了 VertexRDD 和 EdgeRDD API,但现在它们可以被认为是 RDD[(VertexId, VD)] 和 RDD[Edge[ED]] 的简单 RDD。

示例属性 Graph

假设我们要构建一个由 GraphX 项目中的各种协作者组成的属性图。顶点属性可能包含用户名和职业。我们可以用描述协作者之间关系的字符串来注释边:

The Property Graph

生成的图形将具有类型签名:

val userGraph: Graph[(String, String), String]

从原始文件, RDD 甚至合成生成器构建属性图有许多方法,这些在图形构建器的一节中有更详细的讨论 。最普遍的方法是使用 Graph 对象。例如,以下代码从 RDD 集合中构建一个图:

// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

在上面的例子中,我们使用了 Edge case 类。边缘具有 srcId 和 dstId 对应于源和目标顶点标识符。此外, Edge 该类有一个 attr 存储边缘属性的成员。

我们可以分别使用 graph.vertices 和 graph.edges 成员将图形解构成相应的顶点和边缘视图。

val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count

注意, graph.vertices 返回一个 VertexRDD[(String, String)] 扩展 RDD[(VertexId, (String, String))] ,所以我们使用 scala case 表达式来解构元组。另一方面, graph.edges 返回一个 EdgeRDD 包含 Edge[String] 对象。我们也可以使用 case 类型构造函数,如下所示:

graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count

除了属性图的顶点和边缘视图之外, GraphX 还暴露了三元组视图。三元组视图逻辑上连接顶点和边缘属性,生成 RDD[EdgeTriplet[VD, ED]] 包含 EdgeTriplet 该类的实例。此 连接可以用以下 SQL 表达式表示:

SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id

或图形为:

Edge Triplet

EdgeTriplet 类通过分别添加包含源和目标属性的 srcAttr 和 dstAttr 成员来扩展 Edge 类。 我们可以使用图形的三元组视图来渲染描述用户之间关系的字符串集合。

val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
  graph.triplets.map(triplet =>
    triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))

Graph 运算符

正如 RDDs 有这样的基本操作 map, filter, 以及 reduceByKey,性能图表也有采取用户定义的函数基本运算符的集合,产生具有转化特性和结构的新图。定义了优化实现的核心运算符,并定义了 Graph 表示为核心运算符组合的方便运算符 GraphOps 。不过,由于 Scala 的含义,操作员 GraphOps 可自动作为成员使用 Graph 。例如,我们可以通过以下方法计算每个顶点的入度(定义 GraphOps ):

val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees

区分核心图形操作的原因 GraphOps 是能够在将来支持不同的图形表示。每个图形表示必须提供核心操作的实现,并重用许多有用的操作 GraphOps

运算符的汇总表

以下是两个定义的功能的简要摘要,但为简单起见 Graph, GraphOps 它作为 Graph 的成员呈现。请注意,已经简化了一些功能签名(例如,删除了默认参数和类型约束),并且已经删除了一些更高级的功能,因此请参阅 API 文档以获取正式的操作列表。

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}

Property 运算符

与 RDD map 运算符一样,属性图包含以下内容:

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}

这些运算符中的每一个产生一个新的图形,其中顶点或边缘属性被用户定义的 map 函数修改。

请注意,在每种情况下,图形结构都不受影响。这是这些运算符的一个关键特征,它允许生成的图形重用原始图形的结构索引。以下代码段在逻辑上是等效的,但是第一个代码片段不保留结构索引,并且不会从 GraphX 系统优化中受益:

val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)

而是 mapVertices 用来保存索引:

val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))

这些运算符通常用于初始化特定计算或项目的图形以避免不必要的属性。例如,给出一个以度为顶点属性的图(我们稍后将描述如何构建这样一个图),我们为 PageRank 初始化它:

// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
  graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
  inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)

 

Structural 运算符

目前 GraphX 只支持一套简单的常用结构运算符,我们预计将来会增加更多。以下是基本结构运算符的列表。

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

该 reverse 运算符将返回逆转的所有边缘方向上的新图。这在例如尝试计算逆 PageRank 时是有用的。由于反向操作不会修改顶点或边缘属性或更改边缘数量,因此可以在没有数据移动或重复的情况下高效地实现。

在 subgraph 操作者需要的顶点和边缘的谓词,并返回包含只有满足谓词顶点的顶点的曲线图(评估为真),并且满足谓词边缘边缘并连接满足顶点谓词顶点。所述 subgraph 操作员可在情况编号被用来限制图形以顶点和感兴趣的边缘或消除断开的链接。例如,在以下代码中,我们删除了断开的链接:

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
                       (4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))

注意在上面的例子中只提供了顶点谓词。 如果未提供顶点或边缘谓词,则 subgraph 运算符默认为 true

在 mask 操作者通过返回包含该顶点和边,它们也在输入图形中发现的曲线构造一个子图。这可以与 subgraph 运算符一起使用, 以便根据另一个相关图中的属性限制图形。例如,我们可以使用缺少顶点的图运行连接的组件,然后将答案限制为有效的子图。

// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

groupEdges 操作符将多边形中的平行边(即,顶点对之间的重复边)合并。 在许多数值应用中,可以将平行边缘(它们的权重组合)合并成单个边缘,从而减小图形的大小。

 

Join 运算符

在许多情况下,有必要使用图形连接来自外部收集( RDD )的数据。例如,我们可能有额外的用户属性,我们要与现有的图形合并,或者我们可能希望将顶点属性从一个图形拉到另一个。这些任务可以使用 join 运算符完成。下面我们列出关键 join 运算符:

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

joinVertices 操作符将顶点与输入 RDD 相连,并返回一个新的图形,其中通过将用户定义的 map 函数应用于已连接顶点的结果而获得的顶点属性。 RDD 中没有匹配值的顶点保留其原始值。

请注意,如果 RDD 包含给定顶点的多个值,则只能使用一个值。因此,建议使用以下命令使输入 RDD 变得独一无二,这也将对结果值进行 pre-index ,以显着加速后续连接。

val nonUniqueCosts: RDD[(VertexId, Double)]
val uniqueCosts: VertexRDD[Double] =
  graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
  (id, oldCost, extraCost) => oldCost + extraCost)

除了将用户定义的 map 函数应用于所有顶点并且可以更改顶点属性类型之外,更一般的 outerJoinVertices 的行为类似于 joinVertices 。 因为不是所有的顶点都可能在输入 RDD 中具有匹配的值,所以 map 函数采用 Option 类型。 例如,我们可以通过使用 outDegree 初始化顶点属性来为 PageRank 设置一个图。

val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
  outDegOpt match {
    case Some(outDeg) => outDeg
    case None => 0 // No outDegree means zero outDegree
  }
}

您可能已经注意到上述示例中使用的多个参数列表(例如: f(a)(b) curried 函数模式。 虽然我们可以将 f(a)(b) 同样地写成 f(a,b) ,这意味着 b 上的类型推断不依赖于 a 。 因此,用户需要为用户定义的函数提供类型注释:

val joinedGraph = graph.joinVertices(uniqueCosts,
  (id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)

 

邻域聚合

许多图形分析任务的关键步骤是聚合关于每个顶点邻域的信息。 例如,我们可能想知道每个用户拥有的关注者数量或每个用户的追随者的平均年龄。 许多迭代图表算法(例如:网页级别,最短路径,以及连接成分)相邻顶点(例如:电流值的 PageRank ,最短到源路径,和最小可达顶点 ID )的重复聚合性质。

为了提高性能,主聚合操作员 graph.mapReduceTriplets 从新的更改 graph.AggregateMessages 。虽然 API 的变化相对较小,但我们在下面提供了一个转换指南。

 

聚合消息 (aggregateMessages)

GraphX 中的核心聚合操作是 aggregateMessages 。该运算符将用户定义的 sendMsg 函数应用于图中的每个边缘三元组,然后使用该 mergeMsg 函数在其目标顶点聚合这些消息。

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}

用户定义的 sendMsg 函数接受一个 EdgeContext ,它将源和目标属性以及 edge 属性和函数 (sendToSrc, 和 sendToDst) 一起发送到源和目标属性。 在 map-reduce 中,将 sendMsg 作为 map 函数。 用户定义的 mergeMsg 函数需要两个发往同一顶点的消息,并产生一条消息。 想想 mergeMsg 是 map-reduce 中的 reduce 函数。 aggregateMessages 运算符返回一个 VertexRDD[Msg] ,其中包含去往每个顶点的聚合消息(Msg 类型)。 没有收到消息的顶点不包括在返回的 VertexRDDVertexRDD 中。

另外,aggregateMessages 采用一个可选的 tripletsFields ,它们指示在 EdgeContext 中访问哪些数据(即源顶点属性,而不是目标顶点属性)。tripletsFields 定义的可能选项, TripletFields 默认值是 TripletFields.All 指示用户定义的 sendMsg 函数可以访问的任何字段 EdgeContext 。该 tripletFields 参数可用于通知 GraphX ,只有部分 EdgeContext 需要允许 GraphX 选择优化的连接策略。例如,如果我们计算每个用户的追随者的平均年龄,我们只需要源字段,因此我们将用于 TripletFields.Src 表示我们只需要源字段。

在早期版本的 GraphX 中,我们使用字节码检测来推断, TripletFields 但是我们发现字节码检查稍微不可靠,而是选择了更明确的用户控制。

在下面的例子中,我们使用 aggregateMessages 运算符来计算每个用户的资深追踪者的平均年龄。

import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators

// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      triplet.sendToDst(1, triplet.srcAttr)
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
  olderFollowers.mapValues( (id, value) =>
    value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))

Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala" in the Spark repo.

aggregateMessages 当消息(和消息的总和)是恒定大小(例如:浮动和加法而不是列表和级联)时,该操作最佳地执行。

 

Map Reduce Triplets Transition Guide (Legacy)

在早期版本的 GraphX 中,邻域聚合是使用 mapReduceTriplets 运算符完成的 :

class Graph[VD, ED] {
  def mapReduceTriplets[Msg](
      map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
      reduce: (Msg, Msg) => Msg)
    : VertexRDD[Msg]
}

mapReduceTriplets 操作符接受用户定义的映射函数,该函数应用于每个三元组,并且可以使用用户定义的缩减函数来生成聚合的消息。 然而,我们发现返回的迭代器的用户是昂贵的,并且它阻止了我们应用其他优化(例如:局部顶点重新编号)的能力。 在 aggregateMessages 中,我们引入了 EdgeContext ,它暴露了三元组字段,并且还显示了向源和目标顶点发送消息的功能。 此外,我们删除了字节码检查,而是要求用户指出三元组中实际需要哪些字段。

以下代码块使用 mapReduceTriplets :

val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
  Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)

可以使用 aggregateMessages :å

val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
  triplet.sendToDst("Hi")
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)

计算级别信息

常见的聚合任务是计算每个顶点的程度:与每个顶点相邻的边数。在有向图的上下文中,通常需要知道每个顶点的度数,外部程度和总程度。本 GraphOps 类包含运营商计算度数每个顶点的集合。例如,在下面我们将计算最大值,最大和最大级别:

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

收集相邻点

在某些情况下,通过在每个顶点处收集相邻顶点及其属性可以更容易地表达计算。这可以使用 collectNeighborIds 和 collectNeighbors 运算符轻松实现 。

class GraphOps[VD, ED] {
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}

这些操作可能相当昂贵,因为它们重复信息并需要大量通信。 如果可能,请直接使用 aggregateMessages 操作来表达相同的计算。

Caching and Uncaching

在 Spark 中,默认情况下,RDD 不会保留在内存中。为了避免重新计算,在多次使用它们时,必须明确缓存它们(参见 Spark Programming Guide)。GraphX 中的图形表现方式相同。当多次使用图表时,请务必先调用 Graph.cache()

在迭代计算中,uncaching 也可能是最佳性能所必需的。默认情况下,缓存的 RDD 和图形将保留在内存中,直到内存压力迫使它们以 LRU 顺序逐出。对于迭代计算,来自先前迭代的中间结果将填满缓存。虽然它们最终被驱逐出来,但存储在内存中的不必要的数据会减慢垃圾收集速度。一旦不再需要中间结果,就会更有效率。这涉及每次迭代实现(缓存和强制)图形或 RDD ,取消所有其他数据集,并且仅在将来的迭代中使用实例化数据集。然而,由于图形由多个 RDD 组成,所以很难将它们正确地分开。对于迭代计算,我们建议使用 Pregel API,它可以正确地解析中间结果。

 

Pregel API

图形是固有的递归数据结构,因为顶点的属性取决于其邻居的属性,而邻居的属性又依赖于其邻居的属性。因此,许多重要的图算法迭代地重新计算每个顶点的属性,直到达到一个固定点条件。已经提出了一系列图并行抽象来表达这些迭代算法。GraphX 公开了 Pregel API 的变体。

在高层次上,GraphX 中的 Pregel 运算符是限制到图形拓扑的批量同步并行消息抽象。 Pregel 操作符在一系列超级步骤中执行,其中顶点接收来自先前超级步骤的入站消息的总和,计算顶点属性的新值,然后在下一个超级步骤中将消息发送到相邻顶点。与 Pregel 不同,消息作为边缘三元组的函数并行计算,消息计算可以访问源和目标顶点属性。在超级步骤中跳过不接收消息的顶点。 Pregel 运算符终止迭代,并在没有剩余的消息时返回最终的图。

注意,与更多的标准 Pregel 实现不同,GraphX 中的顶点只能将消息发送到相邻顶点,并且使用用户定义的消息传递功能并行完成消息构造。这些约束允许在 GraphX 中进行额外优化。

以下是 Pregel 运算符 的类型签名以及 其实现的草图(注意:为了避免由于长谱系链引起的 stackOverflowError , pregel 支持周期性检查点图和消息,将 “spark.graphx.pregel.checkpointInterval” 设置为正数,说 10。并使用 SparkContext.setCheckpointDir (directory: String)) 设置 checkpoint 目录):

class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    // Receive the initial message at each vertex
    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()

    // compute the messages
    var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
    var activeMessages = messages.count()
    // Loop until no messages remain or maxIterations is achieved
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages and update the vertices.
      g = g.joinVertices(messages)(vprog).cache()
      val oldMessages = messages
      // Send new messages, skipping edges where neither side received a message. We must cache
      // messages so it can be materialized on the next line, allowing us to uncache the previous
      // iteration.
      messages = GraphXUtils.mapReduceTriplets(
        g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
      activeMessages = messages.count()
      i += 1
    }
    g
  }
}

请注意,Pregel 需要两个参数列表(即:graph.pregel(list1)(list2)。第一个参数列表包含配置参数,包括初始消息,最大迭代次数以及发送消息的边缘方向(默认情况下为边缘)。第二个参数列表包含用于接收消息(顶点程序 vprog),计算消息( sendMsg )和组合消息的用户定义函数 mergeMsg

在以下示例中,我们可以使用 Pregel 运算符来表达单源最短路径的计算。

import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators

// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
    if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  triplet => {  // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  (a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))

Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala" in the Spark repo.

 

Graph 建造者

GraphX 提供了从 RDD 或磁盘上的顶点和边的集合构建图形的几种方法。默认情况下,图形构建器都不会重新分配图形边;相反,边缘保留在其默认分区(例如 HDFS 中的原始块)中。Graph.groupEdges 需要重新分区图,因为它假定相同的边将被共同定位在同一分区上,因此您必须在调用 Graph.partitionBy 之前调用 groupEdges

object GraphLoader {
  def edgeListFile(
      sc: SparkContext,
      path: String,
      canonicalOrientation: Boolean = false,
      minEdgePartitions: Int = 1)
    : Graph[Int, Int]
}

GraphLoader.edgeListFile 提供了从磁盘边缘列表中加载图形的方法。它解析以下形式的(源顶点 ID ,目标顶点 ID )对的邻接列表,跳过以下开始的注释行 #

# This is a comment
2 1
4 1
1 2

它 Graph 从指定的边缘创建一个,自动创建边缘提到的任何顶点。所有顶点和边缘属性默认为 1. canonicalOrientation 参数允许在正方向 (srcId < dstId) 重新定向边,这是连接的组件算法所要求的。该 minEdgePartitions 参数指定要生成的边缘分区的最小数量;如果例如 HDFS 文件具有更多块,则可能存在比指定更多的边缘分区。

object Graph {
  def apply[VD, ED](
      vertices: RDD[(VertexId, VD)],
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD = null)
    : Graph[VD, ED]

  def fromEdges[VD, ED](
      edges: RDD[Edge[ED]],
      defaultValue: VD): Graph[VD, ED]

  def fromEdgeTuples[VD](
      rawEdges: RDD[(VertexId, VertexId)],
      defaultValue: VD,
      uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]

}

Graph.apply 允许从顶点和边缘的 RDD 创建图形。重复的顶点被任意挑选,并且边缘 RDD 中找到的顶点,而不是顶点 RDD 被分配了默认属性。

Graph.fromEdges 允许仅从 RDD 的边缘创建图形,自动创建边缘提到的任何顶点并将其分配给默认值。

Graph.fromEdgeTuples 允许仅从边缘元组的 RDD 创建图形,将边缘分配为值 1,并自动创建边缘提到的任何顶点并将其分配给默认值。 它还支持重复数据删除边缘;重复数据删除,将某些 PartitionStrategy 作为 uniqueEdges 参数传递(例如:uniqueEdges = Some(PartitionStrategy.RandomVertexCut))。 分区策略是必须的,以便在相同的分区上共同使用相同的边,以便可以进行重复数据删除。

 

Vertex and Edge RDDs

GraphX 公开 RDD 了图中存储的顶点和边的视图。然而,由于 GraphX 在优化的数据结构中维护顶点和边,并且这些数据结构提供了附加功能,所以顶点和边分别作为 VertexRDD 和 EdgeRDD 返回 。在本节中,我们将回顾一些这些类型中的其他有用功能。请注意,这只是一个不完整的列表,请参阅 API 文档中的正式操作列表。

VertexRDDs

该 VertexRDD[A] 扩展 RDD[(VertexId, A)] 并增加了额外的限制,每个 VertexId 只发生一次。此外, VertexRDD[A] 表示一组顶点,每个顶点的属性类型 A。在内部,这是通过将顶点属性存储在可重用的散列图数据结构中来实现的。因此,如果两个 VertexRDD 派生自相同的基础 VertexRDD(例如:filter 或 mapValues),则可以在不使用散列评估的情况下连续连接。为了利用这个索引的数据结构,VertexRDD 公开了以下附加功能:

class VertexRDD[VD] extends RDD[(VertexId, VD)] {
  // Filter the vertex set but preserves the internal index
  def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
  // Transform the values without changing the ids (preserves the internal index)
  def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
  def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
  // Show only vertices unique to this set based on their VertexId''s
  def minus(other: RDD[(VertexId, VD)])
  // Remove vertices from this set that appear in the other set
  def diff(other: VertexRDD[VD]): VertexRDD[VD]
  // Join operators that take advantage of the internal indexing to accelerate joins (substantially)
  def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
  def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
  // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
  def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}

请注意,例如,filter 运算符 如何返回 VertexRDD。过滤器实际上是通过 BitSet 使用索引重新实现的,并保留与其他 VertexRDD 进行快速连接的能力。同样,mapValues 运算符不允许 map 功能改变, VertexId 从而使相同的 HashMap 数据结构能够被重用。无论是 leftJoin 和 innerJoin 能够连接两个时识别 VertexRDD 来自同一来源的小号 HashMap 和落实线性扫描,而不是昂贵的点查找的加入。

aggregateUsingIndex 运算符对于从 RDD[(VertexId, A)] 有效构建新的 VertexRDD 非常有用。 在概念上,如果我在一组顶点上构造了一个 VertexRDD[B],这是一些 RDD[(VertexId, A)] 中的顶点的超集,那么我可以重用索引来聚合然后再索引 RDD[(VertexId, A)]。 例如:

val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

EdgeRDDs

该 EdgeRDD[ED] 扩展 RDD[Edge[ED]] 使用其中定义的各种分区策略之一来组织块中的边 PartitionStrategy。在每个分区中,边缘属性和邻接结构分别存储,可以在更改属性值时进行最大限度的重用。

EdgeRDDEdgeRDD 公开的三个附加功能是:

// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Reverse the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

在大多数应用中,我们发现在 EdgeRDDEdgeRDD 上的操作是通过图形运算符完成的,或者依赖基 RDD 类中定义的操作。

优化表示

虽然在分布式图形的 GraphX 表示中使用的优化的详细描述超出了本指南的范围,但一些高级理解可能有助于可扩展算法的设计以及 API 的最佳使用。GraphX 采用顶点切分方式进行分布式图分割:

Edge Cut vs. Vertex Cut

GraphX 不是沿着边沿分割图形,而是沿着顶点分割图形,这可以减少通信和存储开销。在逻辑上,这对应于将边缘分配给机器并允许顶点跨越多台机器。分配边缘的确切方法取决于 PartitionStrategy 各种启发式的几种折衷。用户可以通过与 Graph.partitionBy 运算符重新分区图来选择不同的策略。默认分区策略是使用图形构建中提供的边的初始分区。然而,用户可以轻松切换到 GraphX 中包含的 2D 划分或其他启发式算法。

RDD Graph Representation

一旦边缘被划分,高效的图形并行计算的关键挑战就是有效地将顶点属性与边缘连接起来。因为真实世界的图形通常具有比顶点更多的边缘,所以我们将顶点属性移动到边缘。因为不是所有的分区都将包含邻近的所有顶点的边缘,我们内部维护标识在哪里执行所需的连接像操作时,广播顶点的路由表 triplets 和 aggregateMessages

 

Graph 算法

GraphX 包括一组简化分析任务的图算法。该算法被包含在 org.apache.spark.graphx.lib 包可直接作为方法来访问 Graph 通过 GraphOps 。本节介绍算法及其使用方法。

 

PageRank

PageRank 测量在图中每个顶点的重要性,假设从边缘 u 到 v 表示的认可 v 通过的重要性 u 。例如,如果 Twitter 用户遵循许多其他用户,则用户将被高度排名。

GraphX 附带了 PageRank 的静态和动态实现方法作 PageRank 对象上的方法。静态 PageRank 运行固定次数的迭代,而动态 PageRank 运行直到排列收敛(即,停止改变超过指定的公差)。GraphOps 允许直接调用这些算法作为方法 Graph 。

GraphX 还包括一个可以运行 PageRank 的社交网络数据集示例。给出了一组用户 data/graphx/users.txt ,并给出了一组用户之间的关系 data/graphx/followers.txt 。我们计算每个用户的 PageRank 如下:

import org.apache.spark.graphx.GraphLoader

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))

Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala" in the Spark repo.

连接组件

连接的组件算法将图中每个连接的组件与其最低编号顶点的 ID 进行标记。例如,在社交网络中,连接的组件可以近似群集。GraphX 包含 ConnectedComponents object 中算法的实现,我们从 PageRank 部分 计算示例社交网络数据集的连接组件如下:

import org.apache.spark.graphx.GraphLoader

// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
  case (id, (username, cc)) => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))

Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala" in the Spark repo.

Triangle 计数

顶点是三角形的一部分,当它有两个相邻的顶点之间有一个边。GraphX 在 TriangleCount 对象 中实现一个三角计数算法,用于确定通过每个顶点的三角形数量,提供聚类度量。我们从 PageRank 部分 计算社交网络数据集的三角形数。需要注意的是 TriangleCount 边缘要处于规范方向 (srcId < dstId),而图形要使用 Graph.partitionBy

import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}

// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
  .partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
  (username, tc)
}
// Print the result
println(triCountByUsername.collect().mkString("\n"))

Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala" in the Spark repo.

示例

假设我想从一些文本文件中构建图形,将图形限制为重要的关系和用户,在 sub-graph 上运行 page-rank ,然后返回与顶级用户关联的属性。我可以用 GraphX 在几行内完成所有这些:

import org.apache.spark.graphx.GraphLoader

// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("data/graphx/users.txt")
  .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")

// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
  case (uid, deg, Some(attrList)) => attrList
  // Some users may not have attributes so we set them as empty
  case (uid, deg, None) => Array.empty[String]
}

// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)

// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)

// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
  case (uid, attrList, Some(pr)) => (pr, attrList.toList)
  case (uid, attrList, None) => (0.0, attrList.toList)
}

println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))

Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala" in the Spark repo.

 

我们一直在努力

 

apachecn/spark-doc-zh

   云计算之嫣然伊笑

 

原文地址: http://spark.apachecn.org/docs/cn/2.2.0/graphx-programming-guide.html
网页地址: http://spark.apachecn.org/
github: https://github.com/apachecn/spark-doc-zh(觉得不错麻烦给个 Star,谢谢!~)

今天关于Kibana 5.2 中文文档 | 片刻 ApacheCN(apache中文网)片刻网怎么了的讲解已经结束,谢谢您的阅读,如果想了解更多关于004.hive 命令的 3 种调用方式 | ApacheCN (apache 中文网)、005.hive: str_to_map使用案例 | ApacheCN(apache中文网)、2016 杭州·云栖大会 PDF 下载 | ApacheCN(apache中文网)、Apache Spark 2.2.0 中文文档 - GraphX Programming Guide | ApacheCN的相关知识,请在本站搜索。

本文标签: