如果您对Bilibili毛剑:Go业务基础库之Error和b站毛剑个人简历感兴趣,那么这篇文章一定是您不可错过的。我们将详细讲解Bilibili毛剑:Go业务基础库之Error的各种细节,并对b站毛剑
如果您对Bilibili 毛剑:Go 业务基础库之 Error和b站毛剑个人简历感兴趣,那么这篇文章一定是您不可错过的。我们将详细讲解Bilibili 毛剑:Go 业务基础库之 Error的各种细节,并对b站毛剑个人简历进行深入的分析,此外还有关于Android开发仿bilibili刷新按钮的实现代码、Apache Flink 在 bilibili 的多元化探索与实践、Apache Flink在 bilibili 的多元化探索与实践、bilibili 实时平台的架构与实践的实用技巧。
本文目录一览:- Bilibili 毛剑:Go 业务基础库之 Error(b站毛剑个人简历)
- Android开发仿bilibili刷新按钮的实现代码
- Apache Flink 在 bilibili 的多元化探索与实践
- Apache Flink在 bilibili 的多元化探索与实践
- bilibili 实时平台的架构与实践
Bilibili 毛剑:Go 业务基础库之 Error(b站毛剑个人简历)
前言
在 4 月 27 日举办的 Gopher China 2019 中,国内 Go 语言专家,Bilibili 架构师毛剑进行了题为《 Go 业务基础库之 Error & Context 》的演讲,主要探讨两个问题:
1. 在业务的基础库中,经常需要针对异常进行处理;
这次分享针对业务逻辑的异常处理,异常日志记录,异常信息关联,
业务错误码,以及基于 Go,error 的特点如何来使用解决这类问题;
2. 在 Go 引入 context 以后,我们如何改造自己的基础库。
利用 context 上下文解决元数据传递,超时传递,
在启动新的 goroutine 时候,如何保证上下文传递到位。
本文是他演讲的第一部分 ——Error 篇,以下为演讲实录。
毛剑:我之前讲过的一些 Topic 偏整体架构,比如微服务。这次讲的会比较详细一点。因为在整个微服务体系下,很多框架理念大家熟悉了,我也讲很多次了,所以想尝试一下讲比较细节的点,比如说 Go 里面很细节的点,或者在我们做业务的一些基础库或者公共库有一些什么需要注意的。我在听国外讲师分享的时候,发现他们非常非常细节的,比如说可能就讲讲 Go Test。
这次我大概会分两块 Go 里面两个重点,一个是 Error,一个是 Context。
No.1
Error
Error 有几个点是让我们日常开发觉得比较麻烦的。首先是错误检查和打印,其次是业务的错误码设计。我们经常在代码里面到处都是 return error 这个非常麻烦,每个地方都要处理。这个有利有弊。我自己理解的是每一行代码或者每一个函数调用结束以后,应该对它负责,所以尽早处理掉函数的错误也好,异常也好。
对于一个程序来说,这种错误我们应该进行保护,比如说你的程序因为遇到一些问题让你业务退出了,这个对于业务是有影响的。对于一个函数我们要尽快处理,对于一个程序我们要做保护编程。
首先业务开发会分层,Dao 或者 Service,然后开发同学在各个层都打日志,一个业务上线以后,有可能打几十个错误日志,这些日志散落在系统里面,你要查的时候要把整个上下文关联起来看非常麻烦。因为时间顺序是错乱的,非常难找。
其次即便看到日志以后还要猜,到底是哪一个地方代码出的问题,如果偏底层抛出来的错误会非常麻烦定位。还有根因的丢失,有一些错误如果要包装,要附带一些消息,原始的 error 就不见了。我们可能需要基于原始的 error 做等值的判断,就会比较麻烦。
另外业务里面 API 肯定有错误码,100、200,返回 - 001 都有可能,客户端同学要给予这些错误做逻辑调整。API 里面的错误 HINT 分两种,一种是面向终端用户,他可能想看到的是更友好的一些错误提示,而不是偏程序的消息展示。另外一种包含一些附带逻辑处理的数据(比如失败后的 Retry 策略等)。
针对这几个问题,我聊聊自己怎么考虑和解决的。
Handle Erreo-错误检查和打印
1. 追加上下文
首先上下文 error 堆栈不方便找,或者不方便定位,我当时找到一个库,找到 pkg/errors 这个库,这是 Dave 开发的,就是把上下文记录下来存到一个地方,之后就能够还原原始的 error,以及整个 error 堆栈,非常方便。
有一些错误,很典型的例子是一个文件打开报错了,或者是读他报错,
我想把具体的文件名一块儿带过去,如果报一个 readfailed 我根本不知道什么是东西报错了,如果把文件名信息或者原始 error 带过去。pkg/errors 具体实现相对来说比较简单,我们看一下这张图:
比如说 WithStack,这里就是原始 error 传递,,把这个 error 记录起来,内部使用一个 withStack 结构体,把堆栈信息存到 error 字段,这样返回另外一个 error 抛出去,上层一层一层传递就有信息了。
2. 根因追踪
第二个处理根因。这里提一个术语叫 sentinel errors ,这个是什么东西呢?就是定义了一个包级别的变量。比如 io.EOF= errros.New ("eof"),基础库这样代码不少。我们编程过程中不是非常推荐使用这种方式来,因为定义非常多包级别的错误,会导致包 API 面积会变大,这些在 Dave 文章也提到了。类似 IO.EOF,或者 Syscall.ENOENT 都是属于这种方式。但是我们通过 withStack 包装以后,是跟之前的 error 就不一样了,我们通过 Cause 的方法可以拿到原始 error,实现非常简单:
首先定义了一个内部私有的 Interface 的方法。具体怎么返回呢?当找到第一个不是实现这个接口的 Error 就返回。如果有人故意实现了这个 Cause 就返回它,如果没有就一直找到第一个我就退出。这个非常好理解。
Best Pratice-错误检查和打印
通过两种方式,两年前我们内部的基础库为后续完整记录调用栈,进行全仓支持 pkg/errors 的使用。
有一些原则:
首先在业务基础库,以前使用标准库的 errors,现在使用 pkg/errors 的 New/Errorf 可以返回,这个时候把当前的堆栈上下文已经保留了。
第二如果我调用的是我自己业务基础库里面的来自其他库的一个返回,我就不再二次处理直接透传,直接往上抛。比如说调了 bpackage 的方法,他返回一个 error,这个时候直接往上抛,我不进行包装了 (WithStack)。因为第一个人进行了 WithStack 或者 Errorf/New 包调了以后,已经把堆栈保存了,没有必要保存第二次,所以来自同包内的方法返回我就退出,因为可能被处理过。
第三当我们和 Go 的标准库或者第三方库交互的时候,我们需要 WithStack 把错误记录下来,我就知道是第三方库某个地方报了错。这里有一个小问题,如果第三方库也包了 error 异常的时候,其实会比较麻烦。
第四指当把这个错误抛给调用者的时候,我们以前的做法是每一个地方每一个层打日志,大家自己看一下实际业务很多人喜欢每个地方报一个错,记一条日志,记参数,每一个地方都有。而我们 HTTP 框架,在这个框架日志统一打印,这个相对来说是比较好的。我们看一下在 HTTP 网关代码我们会默认把这个 Log 的 Middleware 注入进去。在顶端打日志,不要每个地方打。这里面提稍微小一点,对于标准库返回的,像 sql.ErrNoRows 这种,建议不包的,如果包了就是破坏了以前业务代码,会导致判断不成立,因为我们不可能要求所有业务都用 Cause 方法还原根因再判断,这个对以前的有破坏。所以我们内部有一些小约定,对于特别特殊的 Error 我们不包的。对于其他错误,比某个网络的一些错误,觉得不需要处理就包装,再往上面抛。
我们再看一下实际做业务过程当中,比如做 HTTP 网关,我们有很多业务逻辑要并行调很多的服务。因为面向用户 API,用户 API 面向的是用户场景,一个用户场景有很多很多各种各样的数据源组成。这些数据源一定涉及多个服务和多个 RPC 组成。我们发现 Go 的 sub 标准库里面有一个非常好的 errorgroup 的包,可以很方便用它做并行的调用,在代码结尾可以调 Wait 方法可以获得第一个异常。
比如说上述四个请求,可以获取第一个产生异常。如果你想在上述四个请求中,想忽略这些错误做一些降级,可以把 Error 覆盖 nil 返回。
这个库在用的过程中,我们还是要做一些加工。我们当时做了什么处理呢?我们先看一下原来的一个白色的图就是 WithContext、Go、Wait 的用法。
我发现几个问题:第一经常有同学用错返回的 Context,我们看一下这个 WithContext 会返回 Context,我发现有同学把 Context 不小心把后面业务代码里面继续传递,经常有同学说线上总收到 Context Cancel 这种问题会出现。
第二扇出请求没有控制。我发现有一些同学写的代码,本来应该是聚合的发的请求,被写成 for 循环,并行发出很多次请求。比如说只是一个 QPS,对内放大是一百一千,这种情况下导致瞬时的 Goroutine 长了很多,内存也长了很多,这是不好的,我们不希望这种行为过多产生,所以我们提供另外一个功能就是控制扇出大小的能力。
第三写代码有各种各样的原因 panic,尤其业务是容易 panic 的。panic 一定要做保护。如果你是直接使用 errgroup 库,很有可能你没有做保护导致最后退出。所以我们做了改动:
第一加了 GOMAXPROCS,可以限制这个 errorgroup 最多并行开多少个 Goroutine;
第二我们不再返回 Context,我觉得大部分情况没有人会用,所以我们不用返回他,我们直接返回使用 Group。我们对他进行了整改,避免在我们业务开发过程中避免犯的一些错误。
我再提一下,坚持第一时间和现场处理 error。在程序部署以后程序尽可能恢复异常,避免程序终止。
Handle Erreo-业务错误处理
我们上面主要讲了怎么处理异常,这个面向偏业务内部的一些基础库的做法。实际在业务逻辑里面他的一些业务错误码这种东西怎么处理呢?
我们知道 error 实现了 Error 的方法,可以改造它。这种改造方法叫 ErrorType,可以实现接口可以自己定一个类型,所以我们也是一样的思路,实现了 Error 方法的 ErrorType,我们内部命名为 Codes (错误码的意思)。
第一个是 Error 接口,第二是获取到底报什么错误 Code。第三是 Message 这个是指面向开发者和程序员的错误,就是请求参数错误之类的。还有就是 Details,你有一些业务,比如说我被限流了,限流以后可能要返回,多久以后再重试,重试几次,这样的 Data 我们叫 Detail。
同时在 Codes 包里提供几个包级别的方法,第一就是 Cause,用于还原底层 error 转换成 Codes,方便业务使用。
第二个以前代码里面很多同学会和特定的 error 进行 Equal 操作(结构体判断),判断是不是这个错误码,这个非常不安全,因为有可能实现了 Cause 的结构体可能是两种不同东西,最终判断两个东西是不是相等,是用 Code 的 int(具体错误值)。所以我们高级别提供了 Equal 的方法,判断两个 Error 到底错误码是不是相等的,这是业务过程中我们想这些解决问题。
通过定义这样的 errorinterface,我们抽象两种模型,一种是固定模型的错误码,例如用户没有找到,或者什么东西没有权限,这是比较固定的跟逻辑相关的错误码。这种我们一般约定变量名,然后返回统一错误的 Message,这个 Message 可以在云端下发,这个 11001 是什么样的注释,可能文本从云端下发,这是固定模型的错误类型。
还有一种是自定义错误码类型。我们叫 Status 就是状态。为什么叫 Status 也是有原因,最早想把 Status 跟 HTTP 状态码一一适配,包括跟 gRPC 的状态一一适配,所以这样取名叫 Status。我们看到有一个 repeated 字段,这个用于上面我们提到的错误详情,通过定义这个 PB 以后,生成 HTTP 状态码或者其他状态码都可以。
最终我们怎么做的呢?
有一些定义好的错误码,消息内容比如说是 % s,要方便使用者自定义。所以我们参考标准库,提供了 Error 和 Errorf,这个地方传的是 Code,因为一定带 Code 码,而你具体自定义的是 Message 是什么,或者用 format 的方式传入进去。
咱们在 gRPC 内部还要处理这种错误,我们目前的做法不在 gRPC 的 Message 定义 ErrorCode,ErrorMessage,发现非常麻烦,需要在每个 Message 强行带上这个东西,而且有嵌套的 Message,发现每个地方都会冗余很多。所以我们没有把 Code 和 Message 这个东西定义进去,而是模仿的是 gRPStatus 挂载进去,因为 gRPC 有一些 Status 已经被官方征用。比如说 01,但是有一些没有用,第八个 (Unknown) 就没有用。实际上我们把自定义的 Codes 挂载进去,在框架里面解出来,报的具体是什么业务 Code,我们通过这样一个方式传递。
看一下传递过程,通过定义这样一个 Code,传递给 gRPCStatus,gRPC 通过网络传给 Server,解开就知道是什么错误码。这个拿到以后实际在写业务代码中,不会直接给 ecode Interface,实际上因为实现了 Error 方法,所以可以直接返回一个普通的标准库 error 来做。
Best Pratice-业务错误处理
我们接下来讲一个经常容易讨论的一个话题,如何定义错误码。这个东西内部讨论非常久,也争议很多次。后来我们找了非常有意思的文章,这个是谷歌的一篇文章讲如何设计 API,定义了这样一个 HTTP。我为什么要提这个事,因为我发现以前我们很多同学在返回 JSON 的时候,会把错误放到 JSON 里面,但是实际上有一些错误应该用 HTTP 状态码,比如说 500,504、503,如果放到 JSON 里面,HTTP 接口永远是 200,但是对于运维来说计算 SLI 的时候指标从 JSON 里面取不方便,我们期望公共错误码尽量收敛在 HTTP 状态里面返回,这是第一个要提议。
公共状态码还要提一下,我们用户不存在或者是某个物品不存在,视频不存在,等等各种不存在,我们希望这类错误尽可能收敛到一些统一的 Code,比如说 404,不要定义很多。客户端拿到一个接口会访问多少个各种不同类型的 “404” 他很麻烦处理,会导致面向错误编程。我们一个接口给对方的时候,返回哪一些错误码一定要能够写出来。
第二我们经常会扯接口错误码要不要统一,其实这个跟行业的几个朋友讨论交流过,我们想全公司所有 API 错误码唯一很难,要去一个地方统一注册很难。所以我们期望不是那么强烈的方式定义这个错误码,用业务的命名空间。这个参考微软定义蓝屏的错误码。OX1、OX2 多少开头的。我们这样定义,正数是业务提示,负数是异常,用命名空间,比如说某一个高位的 BIT 或者低位的,我们通过这种方式区分不同的业务。
我们做微服务,会 A 调 B,B 调 C,每一层调错误的时候,到底怎么处理的?到底从 C 传给 B,B 再透传给 A,A 传给用户呢?这种方法也不太好,因为最终 A 暴露给客户端,告诉他返回哪一些错误码,你是讲不清楚,因为调了很多人。在微服务传递错误的时候,应该立即消化并转化,我明确帮助你抛给我哪一些错误我转化掉,并且抛给我上游想知道的错误,如果他对这个错误不敏感或者不想处理,我就包成公共错误码(比如内部错误),这样相对来说比较好,这样我们 API 定义的错误数量是有限的。
讲完了 Error,Error 就是两个地方,实际一个是偏业务,一个是偏基础库。
后文预告:《 Go 业务基础库之 Context 》将于明日发布
对本期嘉宾分享的 Go 业务基础库之 Error
有何感想,请评论留言
24 小时内点赞前五名的同学将获得
由比原链提供的
《Go 语言公链开发实战》一本
重磅活动预告
Gopher Meetup 广州站即将开启。来自小鹏汽车、腾讯、早安科技、PingCAP 的大咖讲师带来 Go 开发领域的一线实践经验分享,尽在 10 月 26 日,小鹏汽车总部销售展厅!
报名请戳:阅读原文
Go 中国
扫码关注
国内最具规模和生命力的 Go 开发者社区
本文分享自微信公众号 - GoCN(golangchina)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与 “OSC 源创计划”,欢迎正在阅读的你也加入,一起分享。
Android开发仿bilibili刷新按钮的实现代码
一、简述
最近跟小伙伴一起讨论了一下,决定一起仿一个BiliBili的app(包括android端和iOS端),我们并没有打算把这个项目完全做完,毕竟我们的重点是掌握一些新框架的使用,并在实战过程中发现并弥补自身的不足。
本系列将记录我(android端)在开发过程中的一些我觉得有必要记录的功能实现而已,并不是完整的从0到1的完整教程,若个别看官大爷觉得不好请出门左拐谢谢。
以下是该项目将会完成的功能。
- 视频播放功能
- 直播功能
- 弹幕功能
- 换肤功能
- …
本系列文章,将会有记录以上功能的实现但不仅仅只有这些,还会有一些其他,比如自定义控件、利用fiddler抓包等,接下来就进入本篇的主题――《仿bilibili刷新按钮的实现》。
二、实战
1、分析
先来看看原版效果:
该按钮由3部分组成,分别是圆角矩形、文字、旋转图标。在点击按钮后,开始加载数据,旋转图标发生旋转,数据加载完成后,旋转图标复位并停止旋转。话不多说,开始敲代码。
2、绘制
这里,我们要绘制的部分有3个,分别是上面提到的圆角矩形、文字、旋转图标。那么这里就为这3部分分别声明了一些属性。
要注意的一点是,这个类中有3个构造函数,因为有部分属性需要在构造函数中初始化(也为之后自定义属性做准备),所以,将第1个与第2个构造函数中的super修改为this。
public class LQRRefreshButton extends View { // 圆角矩形属性 private int borderColor = Color.parseColor("#fb7299"); private float borderWidth = 0; private float borderRadius = 120; // 文字属性 private String text = "点击换一批"; private int textColor = Color.parseColor("#fb7299"); private float textSize = 28; // 旋转图标属性 private int iconSrc = R.mipmap.tag_center_refresh_icon; private float iconSize = 28; private Bitmap iconBitmap; private float space4TextAndIcon = 20; // 画笔 private Paint mPaint = new Paint(Paint.ANTI_ALIAS_FLAG); public LQRRefreshButton(Context context) { this(context,null); } public LQRRefreshButton(Context context,@Nullable AttributeSet attrs) { this(context,attrs,0); } public LQRRefreshButton(Context context,@Nullable AttributeSet attrs,int defStyleAttr) { super(context,defStyleAttr); // 将图标资源实例化为Bitmap iconBitmap = BitmapFactory.decodeResource(getResources(),R.mipmap.tag_center_refresh_icon); } @Override protected void onDraw(Canvas canvas) { super.onDraw(canvas); // 1、画圆角矩形 // 2、画字 // 3、画刷新图标 } }
接下来着重完成onDraw()方法的实现:
@Override protected void onDraw(Canvas canvas) { super.onDraw(canvas); // 1、画圆角矩形 mPaint.setStyle(Paint.Style.stroke); mPaint.setColor(borderColor); mPaint.setstrokeWidth(borderWidth); canvas.drawRoundRect(new RectF(0,getWidth(),getHeight()),borderRadius,mPaint); // 2、画字 mPaint.setTextSize(textSize); mPaint.setColor(textColor); mPaint.setStyle(Paint.Style.FILL); float measureText = mPaint.measureText(text); float measureAndIcon = measureText + space4TextAndIcon + iconSize; float textStartX = getWidth() / 2 - measureAndIcon / 2; float textBaseY = getHeight() / 2 + (Math.abs(mPaint.ascent()) - mPaint.descent()) / 2; canvas.drawText(text,textStartX,textBaseY,mPaint); // 3、画刷新图标 float iconStartX = textStartX + measureText + space4TextAndIcon; canvas.drawBitmap(iconBitmap,iconStartX,getHeight() / 2 - iconSize / 2,mPaint); }
先来看看效果:
我给该控件设置了宽为200dp,高为100dp。
可以看到效果还不错,但还是有一点点问题的,下面就分别说说这3部分是怎么画的,及存在的小问题。
1)画圆角矩形
其实画圆角矩形很简单,设置好画笔的样式、颜色、线粗,再调用canvas的drawRoundRect()方法即可实现。
因为我们要画的圆角矩形只需要画线,所以画笔的样式便设置为Paint.Style.stroke。
canvas的drawRoundRect()方法中,第一个参数是绘制范围,这里就直接按该控件的大小来设置即可。第二、三个参数是x轴和y轴的圆角半径,第三个参数是画笔(要画东西当然需要画笔~)。
但你有没有发现,此时的 线粗为0(borderWidth=0),矩形线怎么还有?这是因为画笔的样式为Paint.Style.stroke,当线粗为0时,还要画出1px的线,因为对画笔来说,最小的线粗就是1px。所以,上面的代码需要做如下改动:
// 1、画圆角矩形 if (borderWidth > 0) { mPaint.setStyle(Paint.Style.stroke); mPaint.setColor(borderColor); mPaint.setstrokeWidth(borderWidth); canvas.drawRoundRect(new RectF(0,mPaint); }
2)画字
画字的一般步骤是设置文字大小、文字颜色、画笔样式,绘制起点。其中后2个最为重要。
画笔样式对画出的字是有影响的,当画笔样式为Paint.Style.stroke时,画出来的字是镂空的(不信你可以试试),我们需要的是实心的字,所以需要修改画笔的样式为Paint.Style.FILL。
在安卓中,文字的绘制跟其它绘制是不同的,例如,圆角矩形和旋转图标的绘制起点是左上角,而文字则是按文字左下字为起点,也就是按基线(Baseline)来绘制,故需要得到基线起点的坐标。
如上图中,现在要获得的就是文字左下角的点,这要怎么求呢?
先说x,一般需要让文字居中显示(跟文字的对齐方式也有关系,这里以默认的左对齐为例),所以计算公式一般为: x = 控件宽度/2 - 文字长度/2。但我们这个控件有点不同,它还需要考虑到旋转图标的位置问题,所以x应该这么求: x = 控件宽度/2 - (文字长度+空隙+旋转图标宽度)/2。
// 得到文字长度 float measureText = mPaint.measureText(text); // 得到 文字长度+空隙+旋转图标宽度 float measureAndIcon = measureText + space4TextAndIcon + iconSize; // 得到文字绘制起点 float textStartX = getWidth() / 2 - measureAndIcon / 2;
再说y,如图所示:
如果直接用控件的高度的一半作为文字绘制的基线,那么绘制出来的文字肯定偏上,这是因为Ascent的高度比Descent的高度要高的多,我们在计算Baseline时,需要在Ascent中减去Descent的高度得到两者高度差,再让控件中心y坐标加上(下降)这个高度差的一半。故:
float textBaseY = getHeight() / 2 + (Math.abs(mPaint.ascent()) - mPaint.descent()) / 2;
3)画刷新图标
最后就是画刷新图标了,它是以左上角为起点的,通过canvas的drawBitmap()方法进行绘制即可。
但是,有一点需要注意,iconSize是我自己定的一个大小,并不是图标的实际大小,所以在往后做旋转动画时获取到的旋转中心会有误差,将导致图标旋转时不是按中心进行旋转。所以,这里需要对图标大小进行调整:
public class LQRRefreshButton extends View { ... public LQRRefreshButton(Context context,defStyleAttr); // icon iconBitmap = BitmapFactory.decodeResource(getResources(),iconSrc); iconBitmap = zoomImg(iconBitmap,iconSize,iconSize); } public Bitmap zoomImg(Bitmap bm,float newWidth,float newHeight) { // 获得图片的宽高 int width = bm.getWidth(); int height = bm.getHeight(); // 计算缩放比例 float scaleWidth = ((float) newWidth) / width; float scaleHeight = ((float) newHeight) / height; // 取得想要缩放的matrix参数 Matrix matrix = new Matrix(); matrix.postScale(scaleWidth,scaleHeight); // 得到新的图片 Bitmap newbm = Bitmap.createBitmap(bm,width,height,matrix,true); return newbm; } ... }
3、动画
现在,要实现旋转图标的旋转功能了。原理就是在canvas绘制图标时,将canvas进行旋转,canvas旋转着绘制图标也很简单,只需要4步:
canvas.save(); canvas.rotate(degress,centerX,centerY); canvas.drawBitmap(iconBitmap,mPaint); canvas.restore();
接下来要做的,就是计算出旋转中心,旋转角度,并不停止的去调用onDraw()编制图标,可以使用ValueAnimator或ObjectAnimator实现这个功能,这里选用ObjectAnimator。实现如下:
public class LQRRefreshButton extends View { ... private float degress = 0; private ObjectAnimator mAnimator; public LQRRefreshButton(Context context,defStyleAttr); // 旋转动画 mAnimator = ObjectAnimator.ofObject(this,"degress",new FloatEvaluator(),360,0); mAnimator.setDuration(2000); mAnimator.setRepeatMode(ObjectAnimator.RESTART); mAnimator.setInterpolator(new LinearInterpolator()); mAnimator.setRepeatCount(ObjectAnimator.INFINITE); } @Override protected void onDraw(Canvas canvas) { super.onDraw(canvas); ... // 3、画刷新图标 float iconStartX = textStartX + measureText + space4TextAndIcon; canvas.save(); float centerX = iconStartX + iconSize / 2; int centerY = getHeight() / 2; canvas.rotate(degress,centerY); canvas.drawBitmap(iconBitmap,mPaint); canvas.restore(); } public void start() { mAnimator.start(); } public void stop() { mAnimator.cancel(); setDegress(0); } public float getDegress() { return degress; } public void setDegress(float degress) { this.degress = degress; invalidate(); } }
使用ObjectAnimator可以对任意属性值进行修改,所以需要在该控件中声明一个旋转角度变量(degress),并编写getter和setter方法,还需要在setter方法中调用invalidate(),这样才能在角度值发生变换时,让控件回调onDraw()进行图标的旋转绘制。ObjectAnimator的使用也不复杂,这里就不详细介绍了。来看下动画效果吧:
4、自定义属性
一个自定义控件,是不能把属性值写死在控件里的,所以我们需要自定义属性,从外界获取这些属性值。
1)属性文件编写
在attrs.xml中编写如下代码:
<?xml version="1.0" encoding="utf-8"?> <resources> <declare-styleable name="LQRRefreshButton"> <attr name="refresh_btn_borderColor" format="color"/> <attr name="refresh_btn_borderWidth" format="dimension"/> <attr name="refresh_btn_borderRadius" format="dimension"/> <attr name="refresh_btn_text" format="string"/> <attr name="refresh_btn_textColor" format="color"/> <attr name="refresh_btn_textSize" format="dimension"/> <attr name="refresh_btn_iconSrc" format="reference"/> <attr name="refresh_btn_iconSize" format="dimension"/> <attr name="refresh_btn_space4TextAndIcon" format="dimension"/> </declare-styleable> </resources>
2)属性值获取
在控件的第三个构造函数中获取这些属性值:
public class LQRRefreshButton extends View { public LQRRefreshButton(Context context,defStyleAttr); // 获取自定义属性值 TypedArray ta = context.obtainStyledAttributes(attrs,R.styleable.LQRRefreshButton); borderColor = ta.getColor(R.styleable.LQRRefreshButton_refresh_btn_borderColor,Color.parseColor("#fb7299")); borderWidth = ta.getDimension(R.styleable.LQRRefreshButton_refresh_btn_borderWidth,dipToPx(0)); borderRadius = ta.getDimension(R.styleable.LQRRefreshButton_refresh_btn_borderRadius,dipToPx(60)); text = ta.getString(R.styleable.LQRRefreshButton_refresh_btn_text); if (text == null) text = ""; textColor = ta.getColor(R.styleable.LQRRefreshButton_refresh_btn_textColor,Color.parseColor("#fb7299")); textSize = ta.getDimension(R.styleable.LQRRefreshButton_refresh_btn_textSize,spToPx(14)); iconSrc = ta.getResourceId(R.styleable.LQRRefreshButton_refresh_btn_iconSrc,R.mipmap.tag_center_refresh_icon); iconSize = ta.getDimension(R.styleable.LQRRefreshButton_refresh_btn_iconSize,dipToPx(14)); space4TextAndIcon = ta.getDimension(R.styleable.LQRRefreshButton_refresh_btn_space4TextAndIcon,dipToPx(10)); ta.recycle(); ... } }
这里有一点需要留意:
ta.getDimension(属性id,默认值)
1
2
通过TypedArray对象可以从外界到的的值会根据单位(如:dp、sp)的不同自动转换成px,但默认值的单位是一定的,为px,所以为了符合安卓规范,不要直接使用px,所以需要手动做个转换。最后还需要调用recycle()方法回收TypedArray。
3)在布局文件中应用
<com.lqr.biliblili.mvp.ui.widget.LQRRefreshButton android:id="@+id/btn_refresh" android:layout_width="118dp" android:layout_height="32dp" android:layout_gravity="center" android:layout_marginBottom="3dp" android:layout_marginTop="8dp" app:refresh_btn_borderRadius="25dp" app:refresh_btn_borderWidth="1dp" app:refresh_btn_iconSize="16dp" app:refresh_btn_text="点击换一批" app:refresh_btn_textColor="@color/bottom_text_live" app:refresh_btn_textSize="14sp"/>
总结
以上所述是小编给大家介绍的Android 仿bilibili刷新按钮的实现,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对编程小技巧网站的支持!
Apache Flink 在 bilibili 的多元化探索与实践
B 站实时的前世与今生
Flink On Yarn 的增量化管道的方案
Flink 和 AI 方向的一些工程实践
未来的发展与思考


一、B 站实时的前世与今生
1. 生态场景辐射
说起实时计算的未来,关键词就在于数据的实效性。首先从整个大数据发展的生态上,来看它的核心场景辐射:在大数据发展的初期,核心是以面向天为粒度的离线计算的场景。那时候的数据实效性多数都是以运算以天为单位,它更加注重时间和成本的平衡。
随着数据应用,数据分析以及数据仓库的普及与完善,越来越多的人对数据的实效性提出了更高的要求。比如,当需要做一些数据的实时推荐时,数据的实效将决定它的价值。在这种情况下,整个实时计算的场景就普遍诞生。
但在实际的运作过程当中,也遇到了很多场景 ,其实并没有对数据有非常高的实时性要求,在这种情况下必然会存在数据从毫秒,秒或者天的新的一些场景,实时场景数据更多是以分钟为粒度的一些增量计算的场景。对于离线计算,它更加注重成本;对实时计算,它更加注重价值实效;而对于增量计算,它更加注重去平衡成本,以及综合的价值和时间。
2. B 站的时效性
在三个维度上,B 站的划分是怎样的?对于 B 站而言 ,目前有 75% 的数据是通过离线计算来进行支撑的,另外还有 20% 的场景是通过实时计算, 5% 是通过增量计算。
对于实时计算的场景, 主要是应用在整个实时的机器学习、实时推荐、广告搜索、数据应用、实时渠道分析投放、报表、olap、监控等;
对于离线计算,数据辐射面广,主要以数仓为主;
对于增量计算,今年才启动一些新的场景,比如说 binlog 的增量 Upsert 场景。
3. ETL 时效性差
对于实效性问题 ,其实早期遇到了很多痛点 ,核心集中在三个方面:
第一,传输管道缺乏计算能力。早期的方案,数据基本都是要按天落到 ODS ,DW 层是凌晨过后的第二天去扫描前一天所有 ODS 层的数据,也就是说,整体数据没办法前置清洗;
第二,含有大量作业的资源集中爆发在凌晨之后,整个资源编排的压力就会非常大;
第三、实时和离线的 gap 是比较难满足的,因为对于大部分的数据来说,纯实时的成本过高,纯离线的实效又太差。同时,MySQL 数据的入仓时效也不太够。举个例子,好比 B 站的弹幕数据 ,它的体量非常夸张,这种业务表的同步往往需要十几个小时,而且非常的不稳定。
4. AI 实时工程复杂
除了实效性的问题 早期还遇到了 AI 实时工程比较复杂的问题:
第一,是整个特征工程计算效率的问题。同样的实时特征的计算场景, 也需要在离线的场景上进行数据的回溯,计算逻辑就会重复开发;
第二,整个实时链路比较长。一个完整的实时推荐链路, 涵盖了 N 个实时和 M 个离线的十几个作业组成,有时候遇到问题排查,整个链路的运维和管控成本都非常高;
第三、随着 AI 人员的增多,算法人员的投入,实验迭代很难横向扩展。
5. Flink 做了生态化的实践
在这些关键痛点的背景下,我们集中针对 Flink 做了生态化的实践,核心包括了整个实时数仓的应用以及整个增量化的 ETL 管道,还有面向 AI 的机器学习的一些场景。本次的分享会更加侧重增量管道以及 AI 加 Flink 的方向上。下图展示了整体的规模,目前,整个传输和计算的体量,在万亿级的消息规模有 30000+ 计算核数,1000+ job 数以及 100 多个用户。
二、Flink On Yarn 的增量化管道的方案
1. 早期的架构
先来看一下整个管道早期的架构,从下图可以看出,数据其实主要是通过 Flume 来消费 Kafka 落到 HDFS。Flume 用它的事务机制,来确保数据从 Source 到 Channel, 再到 Sink 时候的一致性,最后数据落到 HDFS 之后,下游的 Scheduler 会通过扫描目录下有没有 tmp 文件,来判断数据是否 Ready,以此来调度拉起下游的 ETL 离线作业。
2. 痛点
在早期遇到了不少痛点:
第一个比较关键的是数据质量。
最先用的是 MemoryChannel,它会存在数据的丢失,之后也试过用 FileChannel 的模式,但性能上无法达到要求。此外在 HDFS 不太稳定的情况下,Flume 的事务机制就会导致数据会 rollback 回滚到 Channel,一定程度上会导致数据不断的重复。在 HDFS 极度不稳定的情况下,最高的重复率会达到百分位的概率;
Lzo 行存储,早期的整个传输是通过分隔符的形式,这种分隔符的 Schema 是比较弱约束的,而且也不支持嵌套的格式。
第二点是整个数据的时效,无法提供分钟级的查询,因为 Flume 不像 Flink 有 Checkpoint 斩断的机制,更多是通过 idle 机制来控制文件的关闭;
第三点是下游的 ETL 联动。前文有提到,我们更多是通过扫描 tmp 目录是否 ready 的方案,这种情况下 scheduler 会大量的和 NameNode 调用 hadoop list 的 api,这样会导致 NameNode 的压力比较大。
3. 稳定性相关的痛点
在稳定性上也遇到很多问题:
第一,Flume 是不带状态的,节点异常或者是重启之后,tmp 没法正常关闭;
第二,早期没有依附大数据的环境,是物理部署的模式,资源伸缩很难去把控,成本也会相对偏高;
第三,Flume 和 HDFS 在通信上有问题。比如说当写 HDFS 出现堵塞的情况,某一个节点的堵塞会反压到 Channel,就会导致 Source 不会去 Kafka 消费数据,停止拉动 offset,一定程度上就会引发 Kafka 的 Rebalance,最后会导致全局 offset 不往前推进,从而导致数据的堆积。
4. 万亿级的增量管道 DAG 视图
在如上的痛点下,核心方案基于 Flink 构建了一套万亿级的增量管道,下图是整个运行时的 DAG 视图。
首先,在 Flink 架构下,KafkaSource 杜绝了 rebalance 的雪崩问题,即便整个 DAG 视图中有某个并发度出现数据写 HDFS 的堵塞,也不会导致全局所有 Kafka 分区的堵塞。此外的话,整个方案本质是通过 Transform 的模块来实现可扩展的节点。
第一层节点是 Parser,它主要是做数据的解压反序列化等的解析操作;
第二层是引入提供给用户的定制化 ETL 模块,它可以实现数据在管道中的定制清洗;
第三层是 Exporter 模块,它支持将数据导出到不同的存储介质。比如写到 HDFS 时,会导出成 parquet;写到 Kafka,会导出成 pb 格式。同时,在整个 DAG 的链路上引入了 ConfigBroadcast 的模块来解决管道元数据实时更新、热加载的问题。此外,在整个链路当中,每分钟会进行一次 checkpoint,针对增量的实际数据进行 Append,这样就可以提供分钟级的查询。
5. 万亿级的增量管道整体视图
Flink On Yarn 的整体架构,可以看出其实整个管道视图是划分以 BU 为单位的。每个 Kafka 的 topic,都代表了某一种数据终端的分发,Flink 作业就会专门负责各种终端类型的写入处理。视图里面还可以看到,针对 blinlog 的数据,还实现了整个管道的组装,可以由多个节点来实现管道的运作。
6. 技术亮点
接下来来看一下整个架构方案核心的一些技术亮点,前三个是实时功能层面的一些特色,后三个主要是在一些非功能性层面的一些优化。
对于数据模型来说,主要是通过 parquet,利用 Protobuf 到 parquet 的映射来实现格式收敛;
分区通知主要是因为一条管道其实是处理多条流,核心解决的是多条流数据的分区 ready 的通知机制;
CDC 管道更多是利用 binlog 和 HUDI 来实现 upsert 问题的解决;
小文件主要是在运行时通过 DAG 拓扑的方式来解决文件合并的问题;
HDFS 通信实际是在万亿级规模下的很多种关键问题的优化;
最后是分区容错的一些优化。
■ 6.1 数据模型
业务的开发主要是通过拼装字符串,来组装数据的一条条记录的上报。后期则是通过了模型的定义和管理,以及它的开发来组织的,主要是通过在平台的入口提供给用户去录制每一条流、每个表,它的 Schema ,Schema 会将它生成 Protobuf 的文件,用户可以在平台上去下载 Protobuf 对应的 HDFS 模型文件,这样,client 端的开发完全就可以通过强 Schema 方式从 pb 来进行约束。
来看一下运行时的过程,首先 Kafka 的 Source 会去消费实际上游传过来的每一条 RawEvent 的记录,RawEvent 里面会有 PBEvent 的对象,PBEvent 其实是一条条的 Protobuf 的记录。数据从 Source 流到的 Parser 模块,解析后会形成 PBEvent,PBEvent 会将用户在平台录入的整个 Schema 模型,存储在 OSS 对象系统上,Exporter 模块会动态去加载模型的变更。然后通过 pb 文件去反射生成的具体事件对象,事件对象最后就可以映射落成 parquet 的格式。这里主要做了很多缓存反射的优化,使整个 pb 的动态解析性能达到六倍的提升。最后,我们会将数据会落地到 HDFS,形成 parquet 的格式。
■ 6.2 分区通知优化
前面提到管道会处理上百条流,早期 Flume 的架构,其实每个 Flume 节点,很难去感应它自己处理的进度。同时,Flume 也没办法做到全局进度的处理。但是基于 Flink,就可以通过 Watermark 的机制来解决。
首先在 Source 会基于消息当中的 Eventime 来生成 Watermark,Watermark 会经过每一层的处理传递到 Sink,最后会通过 Commiter 模块,以单线程的方式来汇总所有 Watermark 消息的进度。当它发现全局 Watermark 已经推进到下个小时的分区的时候,它会下发一条消息到 Hive MetStore,或者是写入到 Kafka, 来通知上小时分区数据 ready,从而可以让下游的调度可以更快的通过消息驱动的方式来拉起作业的运行。
■ 6.3 CDC管道上的优化
下图右侧其实是整个 cdc 管道完整的链路。要实现 MySQL 数据到 Hive 数据的完整映射,就需要解决流和批处理的问题。
首先是通过 Datax 将 MySQL 的数据全量一次性同步到的 HDFS。紧接着通过 spark 的 job,将数据初始化成 HUDI 的初始快照,接着通过 Canal 来实现将 Mysql 的 binlog 的数据拖到的 Kafka 的 topic,然后是通过 Flink 的 Job 将初始化快照的数据结合增量的数据进行增量更新,最后形成 HUDI 表。
整个链路是要解决数据的不丢不重,重点是针对 Canal 写 Kafka 这块,开了事务的机制,保证数据落 Kafka topic 的时候,可以做到数据在传输过程当中的不丢不重。另外,数据在传输的上层其实也有可能出现数据的重复和丢失,这时候更多是通过全局唯一 id 加毫秒级的时间戳。在整个流式 Job 中,针对全局 id 来做数据的去重,针对毫秒级时间来做数据的排序,这样能保证数据能够有序的更新到的 HUDI。
紧接着通过 Trace 的系统基于 Clickhouse 来做存储,来统计各个节点数据的进出条数来做到数据的精确对比。
■ 6.4 稳定性 - 小文件的合并
前面提到,改造成 Flink 之后,我们是做了每分钟的 Checkpoint,文件数的放大非常严重。主要是在整个 DAG 当中去引入 merge 的 operater 来实现文件的合并,merge 的合并方式主要是基于并发度横向合并,一个 writer 会对应一个 merge。这样每五分钟的 Checkpoint,1 小时的 12 个文件,都会进行合并。通过种方式的话,可以将文件数极大的控制在合理的范围内。
■ 6.5 HDFS 通信
实际运作过程当中经常会遇到整个作业堆积比较严重的问题,实际分析其实主是和 HDFS 通信有很大的关系。
其实 HDFS 通讯,梳理了四个关键的步骤:初始化 state、Invoke、Snapshot 以及 Notify Checkpoint complete。
核心问题主要发生在 Invoke 阶段,Invoke 会达到文件的滚动条件,这时候会触发 flush 和 close。close 实际和 NameNode 通信的时候,会经常出现堵塞的情况。
Snapshot 阶段同样会遇到一个问题,一个管道上百条流一旦触发 Snapshot,串行执行 flush 和 close 也会非常的慢。
核心优化集中在三个方面:
第一,减少了文件的斩断,也就是 close 的频次。在 Snapshot 阶段,不会去 close 关闭文件,而更多的是通过文件续写的方式。这样,在初始化 state 的阶段,就需要做文件的 Truncate 来做 Recovery 恢复。
第二,是异步化 close 的改进,可以说是 close 的动作不会去堵塞整个总链路的处理,针对 Invoke 和 Snapshot 的 close,会将状态管理到 state 当中,通过初始化 state 来进行文件的恢复。
第三,针对多条流,Snapshot 还做了并行化的处理,每 5 分钟的 Checkpoint, 多条流其实就是多个 bucket,会通过循环来进行串行的处理,那么通过多线程的方式来改造,就可以减少 Checkpoint timeout 的发生。
■ 6.6 分区容错的一些优化
实际在管道多条流的情况下,有些流的数据并不是每个小时都是连续的。
这种情况会带来分区,它的 Watermark 没有办法正常推进,引发空分区的问题。所以我们在管道的运行过程当中,引入 PartitionRecover 模块,它会根据 Watermark 来推进分区的通知。针对有些流的 Watermark,如果在 ideltimeout 还没有更新的情况下,Recover 模块来进行分区的追加。它会在每个分区的末尾到达的时候,加上 delay time 来扫描所有流的 Watermark,由此来进行兜底。
在传输过程当中,当 Flink 作业重启的时候,会遇到一波僵尸的文件,我们是通过在 DAG 的 commit 的节点,去做整个分区通知前的僵尸文件的清理删除,来实现整个僵尸文件的清理,这些都属于非功能性层面的一些优化。
三、Flink 和 AI 方向的一些工程实践
1. 架构演进时间表
下图是 AI 方向在实时架构完整的时间线。
早在 2018 年,很多算法人员的实验开发都是作坊式的。每个算法人员会根据自己熟悉的语言,比如说 Python,php 或 c++ 来选择不同的语言来开发不同的实验工程。它的维护成本非常大,而且容易出现故障;
2019 年上半年,主要是基于 Flink 提供了 jar 包的模式来面向整个算法做一些工程的支持,可以说在整个上半年的初期,其实更多是围绕稳定性,通用性来做一些支持;
2019 年的下半年,是通过自研的 BSQL,大大降低了模型训练的门槛,解决 label 以及 instance 的实时化来提高整个实验迭代的效率;
2020 年上半年,更多是围绕整个特征的计算,流批计算打通以及特征工程效率的提升,来做一些改进;
到2020 年的下半年,更多是围绕整个实验的流程化以及引入 AIFlow,方便的去做流批 DAG。
2. AI 工程架构回顾
回顾一下整个 AI 工程,它的早期的架构图其实体现的是整个 AI 在 2019 年初的架构视图,其本质是通过一些 single task 的方式,各种混合语言来组成的一些计算节点,来支撑着整个模型训练的链路拉起。经过 2019 年的迭代,将整个近线的训练完全的替换成用 BSQL 的模式来进行开发和迭代。
3. 现状痛点
在 2019 年底,其实又遇到了一些新的问题,这些问题主要集中在功能和非功能两个维度上。
在功能层面:
首先从 label 转到产生 instance 流,以及到模型训练,到线上预测,乃至真正的实验效果,整个链路非常的长且复杂;
第二,整个实时的特征、离线特征、以及流批的一体,涉及到非常多的作业组成,整个链路很复杂。同时实验和 online 都要做特征的计算,结果不一致会导致最终的效果出现问题。此外,特征存在哪里也不好找,没办法去追溯。
在非功能性层面,算法的同学经常会遇到,不知道 Checkpoint 是什么,要不要开,有啥配置。此外,线上出问题的时候也不好排查,整个链路都非常的长。
所以第三点就是,完整的实验进度需要涉及的资源是非常多的,但是对算法来说它根本就不知道这些资源是什么以及需要多少,这些问题其实都都对算法产生很大的困惑。
4. 痛点归结
归根结底,集中在三个方面:
第一是一致性的问题。从数据的预处理,到模型训练,再到预测,各个环节其实是断层的。当中包括数据的不一致,也包括计算逻辑的不一致;
第二,整个实验迭代非常慢。一个完整的实验链路,其实对算法同学来说,他需要掌握东西非常多。同时实验背后的物料没办法进行共享。比如说有些特征,每个实验背后都要重复开发;
第三,是运维和管控的成本比较高。
完整的实验链路,背后其实是包含实时的一条工程加离线的一条工程链路组成,线上的问题很难去排查。
5. 实时 AI 工程的雏形
在这样的一些痛点下,在 20 年主要是集中在 AI 方向上去打造实时工程的雏形。核心是通过下面三个方面来进行突破。
第一是在 BSQL 的一些能力上,对于算法,希望通过面向 SQL 来开发以此降低工程投入;
第二是特征工程,会通过核心解决特征计算的一些问题来满足特征的一些支持;
第三是整个实验的协作,算法的目的其实在于实验,希望去打造一套端到端的实验协作,最终希望做到面向算法能够“一键实验”。
6. 特征工程-难点
我们在特征工程中遇到了一些难点。
第一是在实时特征计算上,因为它需要将结果利用到整个线上的预测服务,所以它对延迟以及稳定性的要求都非常的高;
第二是整个实时和离线的计算逻辑一致,我们经常遇到一个实时特征,它需要去回溯过去 30 天到到 60 天的离线数据,怎么做到实时特征的计算逻辑能同样在离线特征的计算上去复用;
第三是整个离线特征的流批一体比较难打通。实时特征的计算逻辑经常会带有窗口时序等等一些流式的概念,但是离线特征是没有这些语义的。
7. 实时特征
这里看一下我们怎么去做实时特征,图中的右侧是最典型的一些场景。比如说我要实时统计用户最近一分钟、6 小时、12 小时、24 小时,对各个 UP 主相关视频的播放次数。针对这样场景,其实里面有两个点:
第一、它需要用到滑动窗口来做整个用户过去历史的计算。此外,数据在滑动计算过程当中,它还需要去关联 UP 主的一些基础的信息维表,来获取 UP 主的一些视频来统计他的播放次数。归根结底,其实遇到了两个比较大的痛。
用 Flink 原生的滑动窗口,分钟级的滑动,会导致窗口比较多,性能会损耗比较大。
同时细粒度的窗口也会导致定时器过多,清理效率比较差。
第二是维表查询,会遇到是多个 key 要去查询 HBASE 的多个对应的 value,这种情况需要去支持数组的并发查询。
在两个痛点下,针对滑动窗口,主要是改造成为 Group By 的模式,加上 agg 的 UDF 的模式,将整个一小时、六小时、十二小时、二十四小时的一些窗口数据,存放到整个 Rocksdb 当中。这样通过 UDF 模式,整个数据触发机制就可以基于 Group By 实现记录级的触发,整个语义、时效性都会提升的比较大。同时在整个 AGG 的 UDF 函数当中,通过 Rocksdb 来做 state,在 UDF 当中来维护数据的生命周期。此外还扩展了整个 SQL 实现了数组级别的维表查询。最后的整个效果其实可以在实时特征的方向上,通过超大窗口的模式来支持各种计算场景。
8. 特征-离线
接下来看一下离线,左侧视图上半部分是完整的实时特征的计算链路,可以看出要解决同样的一条 SQL,在离线的计算上也能够复用,那就需要去解决相应的一些计算的 IO 都能够复用的问题。比如在流式上是通过 Kafka 来进行数据的输入,在离线上需要通过 HDFS 来做数据的输入。在流式上是通过 KFC 或者 AVBase 等等的一些 kv 引擎来支持,在离线上就需要通过 hive 引擎来解决,归根结底,其实需要去解决三个方面的问题:
第一,需要去模拟整个流式消费的能力,能够支持在离线的场景下去消费 HDFS 数据;
第二,需要解决 HDFS 数据在消费过程当中的分区有序的问题,类似 Kafka 的分区消费;
第三,需要去模拟 kv 引擎维表化的消费,实现基于 hive 的维表消费。还需要解决一个问题,当从 HDFS 拉取的每一条记录,每一条记录其实消费 hive 表的时候都有对应的 Snapshot,就相当于是每一条数据的时间戳,要消费对应数据时间戳的分区。
9. 优化
■ 9.1 离线-分区有序
分区有序的方案其实主要是基于数据在落 HDFS 时候,前置做了一些改造。首先数据在落 HDFS 之前,是传输的管道,通过 Kafka 消费数据。在 Flink 的作业从 Kafka 拉取数据之后,通过 Eventtime 去提取数据的 watermark,每一个 Kafka Source 的并发度会将 watermark 汇报到 JobManager 当中的 GlobalWatermark 模块,GlobalAgg 会汇总来自每一个并发度 Watermark 推进的进度,从而去统计 GlobalWatermark 的进展。根据 GlobalWatermark 的进展来计算出当中有哪些并发度的 Watermark 计算过快的问题,从而通过 GlobalAgg 下发给 Kafka Source 控制信息,Kafka Source 有些并发度过快的情况下,它的整个分区推进就降低速度。这样,在 HDFS Sink 模块,在同时间片上收到的数据记录的整个 Event time 基本上有序的,最终落到 HDFS 还会在文件名上去标识它相应的分区以及相应的时间片范围。最后在 HDFS 分区目录下,就可以实现数据分区的有序目录。
■ 9.2 离线-分区增量消费
数据在 HDFS 增量有序之后,实现了 HDFStreamingSource,它会针对文件做 Fecher 分区,针对每个文件都有 Fecher 的线程,且每个 Fecher 线程会统计每一个文件。它 offset 处理了游标的进度,会将状态根据 Checkpoint 的过程,将它更新到的 State 当中。
这样就可以实现整个文件消费的有序推进。在回溯历史数据的时候,离线作业就会涉及到整个作业的停止。实际是在整个 FileFetcher 的模块当中去引入一个分区结束的标识,且会在每一个线程去统计每一个分区的时候,去感应它分区的结束,分区结束后的状态最后汇总到的 cancellationManager,并进一步会汇总到 Job Manager 去更新全局分区的进度,当全局所有的分区都到了末尾的游标时候,会将整个 Flink 作业进行 cancel 关闭掉。
■ 9.3 离线 - Snapshot 维表
前面讲到整个离线数据,其实数据都在 hive 上,hive 的 HDFS 表数据的整个表字段信息会非常的多,但实际做离线特征的时候,需要的信息其实是很少的,因此需要在 hive 的过程先做离线字段裁剪,将一张 ODS 的表清洗成 DW 的表,DW 的表会最后通过 Flink 运行 Job,内部会有个 reload 的 scheduler,它会定期的去根据数据当前推进的 Watermark 的分区,去拉取在 hive 当中每一个分区对应的表信息。通过去下载某 HDFS 的 hive 目录当中的一些数据,最后会在整个内存当中 reload 成 Rocksdb 的文件,Rocksdb 其实就是最后用来提供维表 KV 查询的组件。
组件里面会包含多个 Rocksdb 的 build 构建过程,主要是取决于整个数据流动的过程当中的 Eventtime,如果发现 Eventtime 推进已经快到小时分区结束的末尾时候,会通过懒加载的模式去主动 reload,构建下一个小时 Rocksdb 的分区,通过这种方式,来切换整个 Rocksdb 的读取。
10. 实验流批一体
在上面三个优化,也就是分区有序增量,类 Kafka 分区 Fetch 消费,以及维表 Snapshot 的基础下,最终是实现了实时特征和离线特征,共用一套 SQL 的方案,打通了特征的流批计算。紧接着来看一下整个实验,完整的流批一体的链路,从图中可以看出最上面的粒度是整个离线的完整的计算过程。第二是整个近线的过程,离线过程其实所用计算的语义都是和近线过程用实时消费的语义是完全一致的,都是用 Flink 来提供 SQL 计算的。
来看一下近线,其实 Label join 用的是 Kafka 的一条点击流以及展现流,到了整个离线的计算链路,则用的一条 HDFS 点击的目录和 HDFS 展现目录。特征数据处理也是一样的,实时用的是 Kafka 的播放数据,以及 Hbase 的一些稿件数据。对于离线来说,用的是 hive 的稿件数据,以及 hive 的播放数据。除了整个离线和近线的流批打通,还将整个近线产生的实时的数据效果汇总到 OLAP 引擎上,通过 superset 来提供整个实时的指标可视化。其实从图可以看出完整的复杂流批一体的计算链路,当中包含的计算节点是非常的复杂和庞多的。
11. 实验协作 - 挑战
下阶段挑战更多是在实验协作上,下图是将前面整个链路进行简化后的抽象。从图中可以看出,三个虚线的区域框内,分别是离线的链路加两个实时的链路,三个完整的链路构成作业的流批,实际上就是一个工作流最基本的过程。里面需要去完成工作流完整的抽象,包括了流批事件的驱动机制,以及,对于算法在 AI 领域上更多希望用 Python 来定义完整的 flow,此外还将整个输入,输出以及它的整个计算趋于模板化,这样可以做到方便整个实验的克隆。
12. 引入 AIFlow
整个工作流上在下半年更多是和社区合作,引入了 AIFlow 的整套方案。
右侧其实是整个 AIFlow 完整链路的 DAG 视图,可以看出整个节点,其实它支持的类型是没有任何限制的,可以是流式节点,也可以是离线节点。此外的话,整个节点与节点之间通信的边是可以支持数据驱动以及事件驱动的。引入 AIFlow 的好处主要在于,AIFlow 提供基于 Python 语义来方便去定义完整的 AIFlow 的工作流,同时还包括整个工作流的进度的调度。
在节点的边上,相比原生的业界的一些 Flow 方案,他还支持基于事件驱动的整个机制。好处是可以帮助在两个 Flink 作业之间,通过 Flink 当中 watermark 处理数据分区的进度去下发一条事件驱动的消息来拉起下一个离线或者实时的作业。
此外还支持周边的一些配套服务,包括通知的一些消息模块服务,还有元数据的服务,以及在 AI 领域一些模型中心的服务。
13. Python 定义 Flow
来看一下基于 AIFlow 是如何最终定义成 Python 的工作流。右边的视图是一个线上项目的完整工作流的定义。第一、是整个是 Spark job 的定义,当中通过配置 dependence 来描述整个下游的依赖关系,它会下发一条事件驱动的消息来拉起下面的 Flink 流式作业。流式作业也同样可以通过消息驱动的方式来拉起下面的 Spark 作业。整个语义的定义非常的简单,只需要四个步骤,配置每节点的 confg 的信息,以及定义每节点的 operation 的行为,还有它的 dependency 的依赖,最后去运行整个 flow 的拓扑视图。
14. 基于事件驱动流批
接下来看一下完整的流批调度的驱动机制,下图右侧是完整的三个工作节点的驱动视图。第一个是从 Source 到 SQL 到 Sink。引入的黄色方框是扩展的 supervisor,他可以收集全局的 watermark 进度。当整个流式作业发现 watermark 可以推进到下一个小时的分区的时候,它会下发一条消息,去给到 NotifyService。NotifyService 拿到这条消息之后,它会去下发给到下一个作业,下一个作业主要会在整个 Flink 的 DAG 当中去引入 flow 的 operator,operator 在没有收到上个作业下发了消息之前,它会堵塞整个作业的运行。直到收到消息驱动之后,就代表上游其实上一个小时分区已经完成了,这时下个 flow 节点就可以驱动拉起来运作。同样,下个工作流节点也引入了 GlobalWatermark Collector 的模块来汇总收集它的处理的进度。当上一个小时分区完成之后,它也会下发一条消息到 NotifyService,NotifyService 会将这条消息去驱动调用 AIScheduler 的模块,从而去拉起 spark 离线作业来做 spark 离线的收尾。从里你们可以看出,整个链路其实是支持批到批,批到流以及流到流,以及流到批的四个场景。
15. 实时 AI 全链路的雏形
在流和批的整个 flow 定义和调度的基础上,在 2020 年初步构建出来了实时 AI 全链路的雏形,核心是面向实验。算法同学也可以基于 SQL 来开发的 Node 的节点,Python 是可以定义完整的 DAG 工作流。监控,告警以及运维是一体化的。
同时,支持从离线到实时的打通,从数据处理到模型训练,从模型训练到实验效果的打通,以及面向端到端的打通。右侧是整个近线实验的链路。下面是将整个实验链路产出的物料数据提供给在线的预测训练的服务。整体会有三个方面的配套:
一是基础的一些平台功能,包括实验管理,模型管理,特征管理等等;
其次也包括整个 AIFlow 底层的一些 service 的服务;
再有是一些平台级的 metadata 的元数据服务。
四、未来的一些展望
在未来的一年,我们还会更加集中在两个方面的一些工作。
第一是数据湖的方向上,会集中在 ODS 到 DW 层的一些增量计算场景,以及 DW 到 ADS 层的一些场景的突破,核心会结合 Flink 加 Iceberg 以及 HUDI 来作为该方向的落地。
在实时 AI 平台上,会进一步去面向实验来提供一套实时的 AI 协作平台,核心是希望打造高效,能够提炼简化算法人员的工程平台。
更多 Flink 相关技术交流,可扫码加入社区钉钉大群~