在本文中,我们将带你了解HttpRunnerManager平台异步生成及展示代码覆盖率报告(ant+jacoco+jenkins+HttpRunnerManager)在这篇文章中,同时我们还将给您一些
在本文中,我们将带你了解HttpRunnerManager 平台异步生成及展示代码覆盖率报告 (ant+jacoco+jenkins+HttpRunnerManager)在这篇文章中,同时我们还将给您一些技巧,以帮助您实现更有效的CentOS安装部署HttpRunnerManager V2.0、com.amazonaws.mobileconnectors.s3.transfermanager.TransferManager的实例源码、FlowRunnerManager java 调度 Linux 命令源码分析、httpclient4.5.5 PoolingHttpClientConnectionManager。
本文目录一览:- HttpRunnerManager 平台异步生成及展示代码覆盖率报告 (ant+jacoco+jenkins+HttpRunnerManager)
- CentOS安装部署HttpRunnerManager V2.0
- com.amazonaws.mobileconnectors.s3.transfermanager.TransferManager的实例源码
- FlowRunnerManager java 调度 Linux 命令源码分析
- httpclient4.5.5 PoolingHttpClientConnectionManager
HttpRunnerManager 平台异步生成及展示代码覆盖率报告 (ant+jacoco+jenkins+HttpRunnerManager)
ant+jacoco+jenkins+HttpRunnerManager 代码覆盖率统计平台搭建
实现思路通过 jenkins 构建,并使用 HttpRunnerManager 异步实现报告更新与展示。
现在整理一下我的实现流程:
一、创建 jenkins 节点并启动此节点
1、jenkins 系统创建 nodes 节点
2、生成镜像,创建并运行节点容器
二、jenkins 创建 job,使用 jacoco 运行程序
1、配置 jdk 和 maven
2、创建 job
3、maven 构建并使用 ant+jacoco 启动脚本
4、创建 build.xml 文件,生成报告
三、覆盖率统计报告服务器
四、httprunnermanager 异步生成报告
1、在宿主机上添加异步任务代码
2、宿主机上启动任务
3、在 httprunnerManager 中添加任务代码
4、httprunnerManager 添加访问报告的链接
五、应用展示
一、创建 jenkins 节点并启动此节点
创建 jenkins 节点,并使用 docker 容器启动节点。
1、jenkins 系统创建 nodes 节点
a、系统管理 - 管理节点 - 新建节点
b、创建成功后会出现如下界面
点击 agent.jar,下载 agent.jar 重命名为 slave.jar
2、生成镜像,创建并运行节点容器
a、下载所需要的环境包
ant、maven、jacoco、jdk:从官网上下载
slave.jar:就是刚才下载的 agent.jar。
b、编写 dockerfile 文件


FROM python:3.6.6-stretch
MAINTAINER test@123.com
#环境变量
ENV MASTER_DOMAIN ''jenkins_Ip:8080'' #第一步中的jenkins服务器的的IP及端口
ENV AGENT_NAME ''docker-slave-jacoco-medical'' #jenkins的节点名称
ENV SECRET ''1222223333344444444445556666777777666'' # 第一步中的jenkins节点的SECRET
#创建目录
RUN mkdir /var/tmp/jdk /var/tmp/maven /var/tmp/slave /var/tmp/ant /var/tmp/jacoco
COPY jdk1.8.0_181.tar.gz /var/tmp/jdk/
COPY apache-maven-3.5.4-bin.tar.gz /var/tmp/maven/
COPY slave.jar /var/tmp/slave/
COPY apache-ant-1.10.5-bin.tar.gz /var/tmp/ant/
COPY jacoco-0.8.1.tar.gz /var/tmp/jacoco/
#解压
RUN tar -xzvf /var/tmp/jdk/jdk1.8.0_181.tar.gz -C /var/tmp/jdk \
&& cd /var/tmp/jdk && rm -rf *.tar.gz
RUN tar -xzvf /var/tmp/maven/apache-maven-3.5.4-bin.tar.gz -C /var/tmp/maven \
&& cd /var/tmp/maven && rm -rf *.tar.gz
RUN tar -xzvf /var/tmp/ant/apache-ant-1.10.5-bin.tar.gz -C /var/tmp/ant \
&& cd /var/tmp/ant && rm -rf *.tar.gz
RUN tar -xzvf /var/tmp/jacoco/jacoco-0.8.1.tar.gz -C /var/tmp/jacoco \
&& cd /var/tmp/jacoco && rm -rf *.tar.gz
#EXPOSE 映射端口
EXPOSE 80
#环境变量
ENV ANT_HOME /var/tmp/ant/apache-ant-1.10.5
ENV JAVA_HOME /var/tmp/jdk/jdk1.8.0_181
ENV CLASSPATH $JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
ENV CATALINA_HOME /var/tmp/maven/apache-maven-3.5.4
ENV CATALINA_BASE /var/tmp/maven/apache-maven-3.5.4
ENV PATH $PATH:$JAVA_HOME/bin:$CATALINA_HOME/lib:$CATALINA_HOME/bin:$ANT_HOME/bin
#环境变量生效
#RUN source /etc/profile
#运行代码
RUN cd /var/tmp/jdk/jdk1.8.0_181/bin && ln -s java java_1.8
CMD java_1.8 -jar /var/tmp/slave/slave.jar -jnlpUrl "http://${MASTER_DOMAIN}/computer/${AGENT_NAME}/slave-agent.jnlp" -secret "${SECRET}"
c、生成镜像文件
在 dockerfile 所在的目录下运行
docker build -t docker.io/slave_jacoco_medical:latest .
d、创建并运行容器
AGENT_NAME:jenkins 节点名称 SECRET:创建节点成功界面的 secret 值
-e:设置环境变量 --name:容器名称
-v:挂载,需要把代码所在的工作目录、报告生成的目录挂载到宿主机上
注意:此处容器的名字与 jenkins 节点远程工作目录名字一致,这样会给后续异步生成报告带来便捷,此处先不过多解释。
(可以通过传入的 AGENT_NAME、SECRET 变量,生成不同项目的容器)
docker run -d --restart always -p 9001:80 -e AGENT_NAME=''docker-slave-jacoco-medical'' \
-e SECRET=''1222223333344444444445556666777777666'' \
-v /var/jenkins_jacoco_jobs/jenkins_slave_medical_jobs:/jenkins_slave_jacoco_medical \
-v /jenkins_root/jenkins_medical_root:/root \
-v /var/ant_reports/medical_reports:/var/antreports \
--name jenkins_slave_jacoco_medical docker.io/slave_jacoco_medical:latest
二、jenkins 创建 job,使用 jacoco 运行程序
1、配置 jdk 和 maven
jenkins 使用 maven 生成 jar 包,需要先安装 jdk、maven
jenkins - 系统管理 - 全局工具配置
2、创建 job
注意此处需要设置为刚才创建的节点名称
源码管理:
3、maven 构建并使用 ant+jacoco 启动脚本
通过 maven -f 路径 /pom.xml install 构建 或如图设置
通过 jacoco 启动程序:“构建 - Execute shell” 设置启动脚本
启动程序后,可通过 http:// 宿主机 IP:9001 / 访问接口 (容器对外挂载的是 9001 端口)


# 如果程序已经启动,杀掉进程
count=`ps -ef | grep jacocoagent | grep -v "grep" | wc -l`
if [ $count -gt 0 ]; then
ps aux|grep jacocoagent|grep -v grep|awk ''{print $2}''|xargs kill -9
fi
# 删除merged.exec 确保构建后覆盖率清为0
rm -rf /docker_slave_jacoco_medical/merged.exec \
#启动程序
&& java -javaagent:/var/tmp/jacoco/jacoco-0.8.1/lib/jacocoagent.jar=includes=*,output=tcpserver,port=8048,address=127.0.0.1 -jar /jenkins_slave_medical/workspace/jacoco_medical/target/mdc-service-0.0.1-SNAPSHOT.jar &
4、创建 build.xml 文件,生成报告
创建 build.xml


<?xml version="1.0" ?>
<project name="testExec" xmlns:jacoco="antlib:org.jacoco.ant" default="jacoco">
<property name="jacocoantPath" value="/var/tmp/jacoco/jacoco-0.8.1/lib/jacocoant.jar"/>
<property name="jacocoexecPath" value="./merged.exec"/>
<property name="workspacePath" value="."/>
<property name="reportfolderPath" value="/var/antreports/reports"/>
<property name="server_ip" value="127.0.0.1"/>
<property name="server_port" value="8048"/>
<!-- <property name="srcApiPath" value="/docker_slave_jacoco_medical/workspace/jacoco_medical/mdc-api/src/main/java">
<property name="classApiPPath" value="/docker_slave_jacoco_medical/workspace/jacoco_medical/mdc-api/target/classes"/> -->
<property name="srcServicePath" value="/docker_slave_jacoco_medical/workspace/jacoco_medical/mdc-service/src/main/java"/>
<property name="classServicePath" value="/docker_slave_jacoco_medical/workspace/jacoco_medical/mdc-service/target/classes/com"/>
<taskdef uri="antlib:org.jacoco.ant" resource="org/jacoco/ant/antlib.xml">
<classpath path="${jacocoantPath}" />
</taskdef>
<target name="merge">
<jacoco:merge destfile="merged.exec">
<fileset dir="${workspacePath}" includes="**/*.exec"/>
</jacoco:merge>
</target>
<target name="dump">
<jacoco:dump address="${server_ip}" reset="false" destfile="${jacocoexecPath}" port="${server_port}" append="true"/>
</target>
<target name="jacoco">
<delete dir="${reportfolderPath}" />
<mkdir dir="${reportfolderPath}" />
<jacoco:report>
<executiondata>
<file file="${jacocoexecPath}" />
</executiondata>
<structure name="JaCoCo Report">
<group name="order">
<classfiles>
<fileset dir="${classServicePath}" />
</classfiles>
<sourcefiles encoding="UTF-8">
<fileset dir="${srcServicePath}" />
</sourcefiles>
</group>
</structure>
<html destdir="${reportfolderPath}" encoding="utf-8" />
</jacoco:report>
</target>
</project>
进入容器内部执行以下命令,即可生成报告
ant dump -buildfile 路径/build.xml
ant jacoco -buildfile 路径/build.xml
三、覆盖率统计报告服务器
# 搜索并拉取镜像
docker search tomcat
docker pull tomcat
# 创建并运行容器,把webapp目录挂载出来
docker run -d --restart always -p 9090:8080 -v /var/ant_reports:/usr/local/tomcat/webapps --name tomcat_ant_reports docker.io/tomcat:latest
通过 http:// 宿主机 IP:9090/medical_reports/reports/ 即可查看代码覆盖率报告
四、httprunnermanager 异步生成报告
1、在宿主机上添加异步任务代码
celerycon.py :
from kombu import Exchange,Queue
from celery import platforms
import os
BROKER_URL = ''amqp://user:password@rabbitmq的ip:port//''#与httprunnerManager使用同一个rabbitmq
CELERY_RESULT_BACKEND = "db+mysql://root:123456@10.8.154.123:3309/test"
#CELERY_RESULT_BACKEND ="amqp"
CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
)
CELERY_ROUTES = {
''ApiManager.tasks.taskA'':{"queue":"for_task_A","routing_key":"for_task_A"},
''tasks.taskB'':{"queue":"for_task_B","routing_key":"for_task_B"}
}
CELERY_ACCEPT_CONTENT = [''pickle'', ''json'', ''msgpack'', ''yaml'']
platforms.C_FORCE_ROOT = True
CELERYD_MAX_TASKS_PER_CHILD = 40
tasks.py :
import os
from celery import Celery
app = Celery()
app.config_from_object("celerycon") #获取配置文件
@app.task(bind=True)
def taskA(self, tag, queue=''for_task_A''):
#在本机执行shell脚本,返回执行结果的状态,成功返回0
#tag是传入的参数(容器名称)
lac = os.system(''bash -v /opt/jacoco_work/workSc/ApiManager/ant.sh'' + tag)
return "Success!" if lac == 0 else "Failure!"
@app.task
def taskB(self,x,y,z):
return x + y + z
ant.sh :
#!bin/bash
docker_name=$1
sudo docker exec $docker_name bin/bash -c "ant dump -buildfile /$docker_name/build.xml && ant jacoco -buildfile /$docker_name/build.xml"
#docker_name获取的是执行代码时传入的第一个参数。
#build.xml直接放在jenkins节点(容器)的远程工作目录下,因之前远程工作目录与容器名字相同,此处可以把远程工作目录参数化。如此以来,以后不管任何项目都可以通过参数来生成相应
项目的报告。(这就是之前容器的名字与jenkins节点远程工作目录名字设置一致的原因)
2、宿主机上启动任务
#进入workSc目录下执行:
# 启动任务
celery multi start jacoco_work -A ApiManager.tasks -l info -n worker1.%h
# 停止任务
celery multi stop jacoco_work -A ApiManager.tasks -l info -n worker1.%h
3、在 httprunnerManager 中添加任务代码
views.py 添加
from ApiManager.tasks import *
def refresh1(request):
# model:容器名称(如:jenkins_slave_jacoco_medical)
name = request.POST.get(''model'')
taskA.delay(name)
data = {
''msg'': name + '' started'',
''id'': name
}
return render_to_response(''refresh.html'', data)
tasks.py 添加
@shared_task
def taskA(tag):
# 以下代码不会执行
lac = os.system(''bash -v /opt/jacoco_work/workSc/ApiManager/ant.sh docker_slave_jacoco_medical'')
return lac
4、httprunnerManager 添加访问报告的链接
最后在 httprunnerManager 平台使用 iframe 展现 http:// 宿主机 ip:9090/medical_reports/reports/ 页面的报告
五、应用展示
注意:构建时是从 gitlab 直接拉取代码,所以构建时注意看下开发配置的环境是否为测试环境。
1、在 httprunnerManager 平台,选择 jacoco 搭建的环境,运行用例
2、接口测试完毕后,点击” 提交 “实现异步生成报告
3、通过 httprunnerManager 平台查看报告:
如果大家有发现什么错误的地方或者好的建议,欢迎评论留言指出,谢谢。
CentOS安装部署HttpRunnerManager V2.0
HttpRunnerManager V2.0此项目是一个基于HttpRunner的接口自动化测试平台,同时HttpRunner 是一款面向 HTTP(S) 协议的通用测试框架,
只需编写维护一份 YAML/JSON
脚本,即可实现自动化测试、性能测试、线上监控、持续集成等多种测试任务。
新版本主要增加了定时任务,异步执行,报告持久化、日志保存以及数据类型支持,QQ群欢迎提问:628448476。
本文将自行进行安装的步骤进行整理发布。
本文在网络参考一些大神发布文章,同时总结了一些新的问题。
一、准备环境和安装包
主要包括:centos7.6,HttpRunnerManager V2.0,MysqL5.7.26,python3.6.4,Django2.1.2,rabbitmq-server-3.6.8(为了避免兼容问题,强烈建议版本尽可能相同,不同时尽量选更高版本。)
CentOS建议不要用6.X,会出现各种难以解决的问题,一些安装包的很难解决依赖包和相应版本问题,建议重新用centos7X版本。
下载可自行百度,个人在此提供以下包下载链接。
CentOS:https://mirrors.edge.kernel.org/centos/7.6.1810/isos/x86_64/CentOS-7-x86_64-DVD-1810.iso(安装教程:https://www.cnblogs.com/clsn/p/8338099.html#auto_id_22)
HttpRunnerManager V2.0:https://github.com/samasword/HttpRunnerManager或https://github.com/HttpRunner/HttpRunnerManager
其他一些具体配置时提供了下载方法。
二、修改项目设置:
1、打开下载后解压的源码包,进入HttpRunnerManager/HttpRunnerManager文件夹下,找到 settings.py打开修改。
修改:HttpRunnerManager/HttpRunnerManager/settings.py里DATABASES字典和邮件发送账号相关配置
DATABASES = {
‘default‘: {
‘ENGINE‘: ‘django.db.backends.MysqL‘,
‘NAME‘: ‘HttpRunner‘,# 新建数据库名,自己在数据库新建的一个‘HttpRunner‘
‘USER‘: ‘root‘,# 安装的数据库登录名
‘PASSWORD‘: ‘lcc123456‘,# 安装的数据库登录密码
‘HOST‘: ‘127.0.0.1‘,# 数据库所在服务器ip地址(虚拟机就是虚拟机ip)
‘PORT‘: ‘3306‘,# 监听端口 默认3306即可
}
}
再修改邮箱配置代码:
EMAIL_SEND_USERNAME = ‘[email protected]‘ # 邮箱地址,定时任务报告发送邮箱,支持163,qq,sina,企业QQ邮箱等,注意需要在邮箱设置中开通smtp服务
EMAIL_SEND_PASSWORD = ‘password‘ # 邮箱的登录密码
最后修改worker相关配置:
djcelery.setup_loader()
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = ‘Asia/Shanghai‘
broKER_URL = ‘amqp://guest:[email protected]:5672//‘ # 127.0.0.1即为rabbitmq-server所在服务器ip地址(比如我的虚拟机192.168.31.16,那就填下这个ip,端口无需修改)
CELERYBEAT_SCHEDULER = ‘djcelery.schedulers.DatabaseScheduler‘
CELERY_RESULT_BACKEND = ‘djcelery.backends.database:DatabaseBackend‘
CELERY_ACCEPT_CONTENT = [‘application/json‘]
CELERY_TASK_SERIALIZER = ‘json‘
CELERY_RESULT_SERIALIZER = ‘json‘
CELERY_TASK_RESULT_EXPIRES = 7200 # celery任务执行结果的超时时间,
CELERYD_CONCURRENCY = 10 # celery worker的并发数 也是命令行-c指定的数目 根据服务器配置实际更改 默认10
CELERYD_MAX_TASKS_PER_CHILD = 100 # 每个worker执行了多少任务就会死掉,我建议数量可以大一些,默认100
还要修改HttpRunnerManager目录下requirements.txt:
将Django == 2.0.3改为Django == 2.1.2,以及删除MysqLclient == 1.3.12
因为后续会安装这个版本的Django2.1.2,同时MysqLclient需要手动安装
2、安装数据库(建议cd ../usr/loal后安装软件到根目录的/usr/loal目录下)
个人觉得可以在linux中/usr/loal/目录下直接用:wget http://dev.MysqL.com/get/MysqL57-community-release-el7-8.noarch.rpm进行下载安装(没有此命令,可以yum isntall wget或百度linux下wget安装方法)
具体操作参考这篇博客,个人感觉最详细和合理:https://www.cnblogs.com/dengshihuang/p/8029372.html
3、安装python(建议cd ../usr/loal后安装软件到根目录的/usr/loal目录下)
下载可以在linux中/usr/loal/直接用:wget https://www.python.org/ftp/python/3.6.4/Python-3.6.4.tgz ,然后解压,安装
接下来一步步按以下命令执行
tar -xvf Python-3.6.4.tgz
cd Python-3.6.4
./configure --with-ssl
yum install libffi-devel -y
make all && make install
make clean
make distclean
/usr/local/bin/python3 -V
(如果安make装时出现zipimport.ZipImportError: can‘t decompress data; zlib not available
make: *** [install] 错误 1)
请使用以下命令解决:
yum install zlib*
然后在输入y Is this ok [y/d/N]: y
输入上一条命令后就会显示版本3.6.4
接下来建立软连接指向到当前系统默认python命令的bin目录,让系统使用新版本python 输入以下命令
mv /usr/bin/python /usr/bin/python2.7
ln -s /usr/local/bin/python3.6 /usr/bin/python
再输入:python -V,即可查看当前默认python版本
4、安装Django,直接使用pip3进行安装:
pip3 install Django==2.1.2,安装后可进行验证,输入python,再import后打印版本。
如果安装失败,可以进行以下尝试;
wget https://www.djangoproject.com/download/2.1.2/tarball/ --no-check-certificate
tar xzvf django-2.1.2.tar.gz # 解压下载包
cd django-2.1.2 # 进入 Django 目录
python setup.py install # 执行安装命令
5、安装RabbitMQ
具体参考:https://www.cnblogs.com/fxcity/p/11041994.html或者https://testerhome.com/notes/2164
6、安装MysqLclient,执行命令
ln -s /usr/local/MysqL/bin/MysqL_config /usr/local/bin/MysqL_config
pip install MysqLclient 或者 yum install MysqLclient
说明:上述教程不能保证百分百无问题,遇到问题可以自行百度或评论或发信息至我邮箱[email protected]
有问题把报错的提示复制一两行进行百度或谷歌搜索解决。
三、编译运行
1、在安装RabbitMQ后,确定该服务已启动,确定数据库MysqL服务和RabbitMQ服务处于启动状态。
2、在CentsOS中进入的源码包文件夹HttpRunnerManager下,并确定当前文件夹下是有requirements.txt文件的。
然后执行:pip install -r requirements.txt
3、执行:python manage.py makemigrations apimanager
4、创建超级用户,执行:python manage.py createsuperuser 用于用户后台管理数据库,并按提示输入相应用户名,密码,邮箱。 如不需用,可跳过此步骤
5、开启服务,python manage.py runserver 0.0.0.0:8000(ip必须为0.0.0.0,端口8000可自行修改,访问时必须带设置的端口)开启成功如下图。
说明:Ip为0.0.0.0网上说时为了局域网内其他主机有权限访问,访问时地址是centos的ip加开启服务时设置的端口。
例如我的centos ip是192.168.31.16
相关地址:http://192.168.31.16:8000/api/login/(登录)
http://192.168.31.16:8000/api/register/(注册)
http://192.168.31.16:8000/admin/(后台)
特此说明下:服务正常启动后,centos虚拟机是服务器,这是主机浏览器还是无法访问的,必须再创建一个window虚拟机,在windows虚拟机中访问,
具体原因应该是局域网网络问题,暂时没找到解决方案。后续有消息会更新或加在评论区。也望各位指正和提醒一些问题的解决方法。
四、问题补充
1、 后续这个项目具体使用方法参考社区:https://testerhome.com/topics/13295(实际操作)
2、参考链接:
1、https://testerhome.com/topics/18498
2、https://www.cnblogs.com/dengshihuang/p/8029372.html
3、https://testerhome.com/notes/2164
com.amazonaws.mobileconnectors.s3.transfermanager.TransferManager的实例源码
public static void uploadToS3(TransferManager transferManager,final File file,final FileProgressListener fileProgressListener) throws InterruptedException { PutObjectRequest putObjectRequest = new PutObjectRequest("car-classifieds",file.getName(),file) .withCannedAcl(CannedAccessControlList.PublicRead); final Upload upload = transferManager.upload(putObjectRequest); upload.addProgressListener(new ProgressListener() { @Override public void progressChanged(ProgressEvent progressEvent) { fileProgressListener.onProgressChanged(progressEvent.getBytesTransferred()); if (progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) { fileProgressListener.onCompleteUpload(); } if (progressEvent.getEventCode() == com.amazonaws.event.ProgressEvent.STARTED_EVENT_CODE) { fileProgressListener.onStartUpload(); } if (progressEvent.getEventCode() == com.amazonaws.event.ProgressEvent.Failed_EVENT_CODE) { fileProgressListener.onFailedUpload(); } } }); }
public static void init(Context context) { provider = new CognitoCachingCredentialsProvider(context,AWS_ACCOUNT_ID,COGNITO_POOL_ID,COGNTIO_ROLE_UNAUTH,COGNITO_ROLE_AUTH,Regions.US_EAST_1); //initialize the clients cognitosync = new CognitoSyncManager(context,Regions.US_EAST_1,provider); manager = new TransferManager(provider); ddb = new AmazonDynamoDBClient(provider); //ddbmapper = new DynamoDBMapper(ddb); analytics = MobileAnalyticsManager.getorCreateInstance(context,MOBILE_ANALYTICS_APP_ID,provider); kinesis = new KinesisRecorder(context.getDir(KInesIS_DIRECTORY_NAME,0),provider); lambda = new LambdaInvokerFactory(context,Regions.US_WEST_2,provider); }
public static void uploadToS3(TransferManager transferManager,final File file) throws InterruptedException { PutObjectRequest putObjectRequest = new PutObjectRequest("car-classifieds",file) .withCannedAcl(CannedAccessControlList.PublicRead); final Upload upload = transferManager.upload(putObjectRequest); upload.addProgressListener(new ProgressListener() { @Override public void progressChanged(ProgressEvent progressEvent) { if (progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) { } } }); }
public static TransferManager getTransferManager(Context context) { CognitoCachingCredentialsProvider credentialsProvider = new CognitoCachingCredentialsProvider( context,"us-east-1:c2432cb5-55e1-42ec-a12f-4be2bc8abd9a",Regions.US_EAST_1 ); TransferManager transferManager = new TransferManager(credentialsProvider); return transferManager; }
public S3broadcastManager(broadcaster broadcaster,AWSCredentials creds) { // XXX - Need to determine what's going wrong with MD5 computation System.setProperty("com.amazonaws.services.s3.disableGetobjectMD5Validation","true"); mTransferManager = new TransferManager(creds); mbroadcaster = broadcaster; mQueue = new LinkedBlockingQueue<>(); mInterceptors = new HashSet<>(); new Thread(this).start(); }
public static void init(Context context) { provider = new CognitoCachingCredentialsProvider(context,Regions.US_EAST_1); //initialize the Cognito Sync Client //initialize the Other Clients manager = new TransferManager(provider); analytics = MobileAnalyticsManager.getorCreateInstance(context,"App_ID_Here",provider); }
public static TransferManager getManager() { if (manager == null) { throw new IllegalStateException("client not initialized yet"); } return manager; }
public static TransferManager getManager() { if (manager == null) { throw new IllegalStateException("client not initialized yet"); } return manager; }
FlowRunnerManager java 调度 Linux 命令源码分析
/* * Copyright 2012 LinkedIn Corp. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package azkaban.execapp; import static java.util.Objects.requireNonNull; import azkaban.Constants; import azkaban.Constants.ConfigurationKeys; import azkaban.event.Event; import azkaban.event.EventListener; import azkaban.execapp.event.FlowWatcher; import azkaban.execapp.event.LocalFlowWatcher; import azkaban.execapp.event.RemoteFlowWatcher; import azkaban.execapp.metric.NumFailedFlowMetric; import azkaban.executor.AlerterHolder; import azkaban.executor.ExecutableFlow; import azkaban.executor.ExecutionOptions; import azkaban.executor.Executor; import azkaban.executor.ExecutorLoader; import azkaban.executor.ExecutorManagerException; import azkaban.executor.Status; import azkaban.jobtype.JobTypeManager; import azkaban.jobtype.JobTypeManagerException; import azkaban.metric.MetricReportManager; import azkaban.metrics.CommonMetrics; import azkaban.project.ProjectLoader; import azkaban.project.ProjectWhitelist; import azkaban.project.ProjectWhitelist.WhitelistType; import azkaban.sla.SlaOption; import azkaban.spi.AzkabanEventReporter; import azkaban.spi.EventType; import azkaban.storage.StorageManager; import azkaban.utils.FileIOUtils; import azkaban.utils.FileIOUtils.JobMetaData; import azkaban.utils.FileIOUtils.LogData; import azkaban.utils.JSONUtils; import azkaban.utils.Pair; import azkaban.utils.Props; import azkaban.utils.ThreadPoolExecutingListener; import azkaban.utils.TrackingThreadPool; import azkaban.utils.UndefinedPropertyException; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; import java.io.File; import java.io.IOException; import java.lang.Thread.State; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import javax.inject.Inject; import javax.inject.Singleton; import org.apache.commons.io.FileUtils; import org.apache.log4j.Logger; /** * Execution manager for the server side execution. * * When a flow is submitted to FlowRunnerManager, it is the {@link Status#PREPARING} status. When a * flow is about to be executed by FlowRunner, its status is updated to {@link Status#RUNNING} * * Two main data structures are used in this class to maintain flows. * * runningFlows: this is used as a bookkeeping for submitted flows in FlowRunnerManager. It has * nothing to do with the executor service that is used to execute the flows. This bookkeeping is * used at the time of canceling or killing a flow. The flows in this data structure is removed in * the handleEvent method. * * submittedFlows: this is used to keep track the execution of the flows, so it has the mapping * between a Future<?> and an execution id. This would allow us to find out the execution ids of the * flows that are in the Status.PREPARING status. The entries in this map is removed once the flow * execution is completed. */ @Singleton public class FlowRunnerManager implements EventListener, ThreadPoolExecutingListener { private static final Logger logger = Logger.getLogger(FlowRunnerManager.class); private static final String EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE = "executor.use.bounded.threadpool.queue"; private static final String EXECUTOR_THREADPOOL_WORKQUEUE_SIZE = "executor.threadpool.workqueue.size"; private static final String EXECUTOR_FLOW_THREADS = "executor.flow.threads"; private static final String FLOW_NUM_JOB_THREADS = "flow.num.job.threads"; // recently finished secs to clean up. 1 minute private static final int RECENTLY_FINISHED_TIME_TO_LIVE = 60 * 1000; private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30; private static final int DEFAULT_FLOW_NUM_JOB_TREADS = 10; private static final int DEFAULT_POLLING_INTERVAL_MS = 1000; // this map is used to store the flows that have been submitted to // the executor service. Once a flow has been submitted, it is either // in the queue waiting to be executed or in executing state. private final Map<Future<?>, Integer> submittedFlows = new ConcurrentHashMap<>(); private final Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap<>(); // keep track of the number of flow being setup({@link createFlowRunner()}) private final AtomicInteger preparingFlowCount = new AtomicInteger(0); private final Map<Integer, ExecutableFlow> recentlyFinishedFlows = new ConcurrentHashMap<>(); private final TrackingThreadPool executorService; private final CleanerThread cleanerThread; private final ExecutorLoader executorLoader; private final ProjectLoader projectLoader; private final JobTypeManager jobtypeManager; private final FlowPreparer flowPreparer; private final TriggerManager triggerManager; private final AlerterHolder alerterHolder; private final AzkabanEventReporter azkabanEventReporter; private final Props azkabanProps; private final File executionDirectory; private final File projectDirectory; private final Object executionDirDeletionSync = new Object(); private final CommonMetrics commonMetrics; private final ExecMetrics execMetrics; private final int numThreads; private final int numJobThreadPerFlow; // We want to limit the log sizes to about 20 megs private final String jobLogChunkSize; private final int jobLogNumFiles; // If true, jobs will validate proxy user against a list of valid proxy users. private final boolean validateProxyUser; private PollingService pollingService; private int threadPoolQueueSize = -1; private Props globalProps; private long lastCleanerThreadCheckTime = -1; private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day // date time of the the last flow submitted. private long lastFlowSubmittedDate = 0; // Indicate if the executor is set to active. private volatile boolean active; @Inject public FlowRunnerManager(final Props props, final ExecutorLoader executorLoader, final ProjectLoader projectLoader, final StorageManager storageManager, final TriggerManager triggerManager, final AlerterHolder alerterHolder, final CommonMetrics commonMetrics, final ExecMetrics execMetrics, @Nullable final AzkabanEventReporter azkabanEventReporter) throws IOException { this.azkabanProps = props; this.executionDirRetention = props.getLong("execution.dir.retention", this.executionDirRetention); this.azkabanEventReporter = azkabanEventReporter; logger.info("Execution dir retention set to " + this.executionDirRetention + " ms"); this.executionDirectory = new File(props.getString("azkaban.execution.dir", "executions")); if (!this.executionDirectory.exists()) { this.executionDirectory.mkdirs(); setgidPermissionOnExecutionDirectory(); } this.projectDirectory = new File(props.getString("azkaban.project.dir", "projects")); if (!this.projectDirectory.exists()) { this.projectDirectory.mkdirs(); } // azkaban.temp.dir this.numThreads = props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS); this.numJobThreadPerFlow = props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS); this.executorService = createExecutorService(this.numThreads); this.executorLoader = executorLoader; this.projectLoader = projectLoader; this.triggerManager = triggerManager; this.alerterHolder = alerterHolder; this.commonMetrics = commonMetrics; this.execMetrics = execMetrics; this.jobLogChunkSize = this.azkabanProps.getString("job.log.chunk.size", "5MB"); this.jobLogNumFiles = this.azkabanProps.getInt("job.log.backup.index", 4); this.validateProxyUser = this.azkabanProps.getBoolean("proxy.user.lock.down", false); final String globalPropsPath = props.getString("executor.global.properties", null); if (globalPropsPath != null) { this.globalProps = new Props(null, globalPropsPath); } this.jobtypeManager = new JobTypeManager(props.getString( AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), this.globalProps, getClass().getClassLoader()); ProjectCacheCleaner cleaner = null; try { final double projectCacheSizePercentage = props.getDouble(ConfigurationKeys.PROJECT_CACHE_SIZE_PERCENTAGE); cleaner = new ProjectCacheCleaner(this.projectDirectory, projectCacheSizePercentage); } catch (final UndefinedPropertyException ex) { } // Create a flow preparer this.flowPreparer = new FlowPreparer(storageManager, this.executionDirectory, this.projectDirectory, cleaner, this.execMetrics.getProjectCacheHitRatio()); this.execMetrics.addFlowRunnerManagerMetrics(this); this.cleanerThread = new CleanerThread(); this.cleanerThread.start(); if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) { this.logger.info("Starting polling service."); this.pollingService = new PollingService(this.azkabanProps.getLong (ConfigurationKeys.AZKABAN_POLLING_INTERVAL_MS, DEFAULT_POLLING_INTERVAL_MS)); this.pollingService.start(); } } /** * Setting the gid bit on the execution directory forces all files/directories created within the * directory to be a part of the group associated with the azkaban process. Then, when users * create their own files, the azkaban cleanup thread can properly remove them. * * Java does not provide a standard library api for setting the gid bit because the gid bit is * system dependent, so the only way to set this bit is to start a new process and run the shell * command "chmod g+s " + execution directory name. * * Note that this should work on most Linux distributions and MacOS, but will not work on * Windows. */ private void setgidPermissionOnExecutionDirectory() throws IOException { logger.info("Creating subprocess to run shell command: chmod g+s " + this.executionDirectory.toString()); Runtime.getRuntime().exec("chmod g+s " + this.executionDirectory.toString()); } private TrackingThreadPool createExecutorService(final int nThreads) { final boolean useNewThreadPool = this.azkabanProps.getBoolean(EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE, false); logger.info("useNewThreadPool: " + useNewThreadPool); if (useNewThreadPool) { this.threadPoolQueueSize = this.azkabanProps.getInt(EXECUTOR_THREADPOOL_WORKQUEUE_SIZE, nThreads); logger.info("workQueueSize: " + this.threadPoolQueueSize); // using a bounded queue for the work queue. The default rejection policy // {@ThreadPoolExecutor.AbortPolicy} is used final TrackingThreadPool executor = new TrackingThreadPool(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(this.threadPoolQueueSize), this); return executor; } else { // the old way of using unbounded task queue. // if the running tasks are taking a long time or stuck, this queue // will be very very long. return new TrackingThreadPool(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), this); } } public void setExecutorActive(final boolean isActive, final String host, final int port) throws ExecutorManagerException, InterruptedException { final Executor executor = this.executorLoader.fetchExecutor(host, port); Preconditions.checkState(executor != null, "Unable to obtain self entry in DB"); if (executor.isActive() != isActive) { executor.setActive(isActive); this.executorLoader.updateExecutor(executor); } else { logger.info( "Set active action ignored. Executor is already " + (isActive ? "active" : "inactive")); } this.active = isActive; if (!this.active) { // When deactivating this executor, this call will wait to return until every thread in {@link // #createFlowRunner} has finished. When deploying new executor, old running executor will be // deactivated before new one is activated and only one executor is allowed to // delete/hard-linking project dirs to avoid race condition described in {@link // FlowPreparer#setup}. So to make deactivation process block until flow preparation work // finishes guarantees the old executor won''t access {@link FlowPreparer#setup} after // deactivation. waitUntilFlowPreparationFinish(); } } /** * Wait until ongoing flow preparation work finishes. */ private void waitUntilFlowPreparationFinish() throws InterruptedException { final Duration SLEEP_INTERVAL = Duration.ofSeconds(5); while (this.preparingFlowCount.intValue() != 0) { logger.info(this.preparingFlowCount + " flow(s) is/are still being setup before complete " + "deactivation."); Thread.sleep(SLEEP_INTERVAL.toMillis()); } } public long getLastFlowSubmittedTime() { // Note: this is not thread safe and may result in providing dirty data. // we will provide this data as is for now and will revisit if there // is a string justification for change. return this.lastFlowSubmittedDate; } public Props getGlobalProps() { return this.globalProps; } public void setGlobalProps(final Props globalProps) { this.globalProps = globalProps; } public void submitFlow(final int execId) throws ExecutorManagerException { if (isAlreadyRunning(execId)) { return; } final FlowRunner runner = createFlowRunner(execId); // Check again. if (isAlreadyRunning(execId)) { return; } submitFlowRunner(runner); } private boolean isAlreadyRunning(final int execId) throws ExecutorManagerException { if (this.runningFlows.containsKey(execId)) { logger.info("Execution " + execId + " is already in running."); if (!this.submittedFlows.containsValue(execId)) { // Execution had been added to running flows but not submitted - something''s wrong. // Return a response with error: this is a cue for the dispatcher to retry or finalize the // execution as failed. throw new ExecutorManagerException("Execution " + execId + " is in runningFlows but not in submittedFlows. Most likely submission had failed."); } // Already running, everything seems fine. Report as a successful submission. return true; } return false; } /** * return whether this execution has useExecutor defined. useExecutor is for running test * executions on inactive executor. */ private boolean isExecutorSpecified(final ExecutableFlow flow) { return flow.getExecutionOptions().getFlowParameters() .containsKey(ExecutionOptions.USE_EXECUTOR); } private FlowRunner createFlowRunner(final int execId) throws ExecutorManagerException { final ExecutableFlow flow; flow = this.executorLoader.fetchExecutableFlow(execId); if (flow == null) { throw new ExecutorManagerException("Error loading flow with exec " + execId); } // Sets up the project files and execution directory. this.preparingFlowCount.incrementAndGet(); // Record the time between submission, and when the flow preparation/execution starts. // Note that since submit time is recorded on the web server, while flow preparation is on // the executor, there could be some inaccuracies due to clock skew. this.commonMetrics.addQueueWait(System.currentTimeMillis() - flow.getExecutableFlow().getSubmitTime()); final Timer.Context flowPrepTimerContext = this.execMetrics.getFlowSetupTimerContext(); try { if (this.active || isExecutorSpecified(flow)) { this.flowPreparer.setup(flow); } else { // Unset the executor. this.executorLoader.unsetExecutorIdForExecution(execId); throw new ExecutorManagerException("executor became inactive before setting up the " + "flow " + execId); } } finally { this.preparingFlowCount.decrementAndGet(); flowPrepTimerContext.stop(); } // Setup flow runner FlowWatcher watcher = null; final ExecutionOptions options = flow.getExecutionOptions(); if (options.getPipelineExecutionId() != null) { final Integer pipelineExecId = options.getPipelineExecutionId(); final FlowRunner runner = this.runningFlows.get(pipelineExecId); if (runner != null) { watcher = new LocalFlowWatcher(runner); } else { // also ends up here if execute is called with pipelineExecId that''s not running any more // (it could have just finished, for example) watcher = new RemoteFlowWatcher(pipelineExecId, this.executorLoader); } } int numJobThreads = this.numJobThreadPerFlow; if (options.getFlowParameters().containsKey(FLOW_NUM_JOB_THREADS)) { try { final int numJobs = Integer.valueOf(options.getFlowParameters().get( FLOW_NUM_JOB_THREADS)); if (numJobs > 0 && (numJobs <= numJobThreads || ProjectWhitelist .isProjectWhitelisted(flow.getProjectId(), WhitelistType.NumJobPerFlow))) { numJobThreads = numJobs; } } catch (final Exception e) { throw new ExecutorManagerException( "Failed to set the number of job threads " + options.getFlowParameters().get(FLOW_NUM_JOB_THREADS) + " for flow " + execId, e); } } final FlowRunner runner = new FlowRunner(flow, this.executorLoader, this.projectLoader, this.jobtypeManager, this.azkabanProps, this.azkabanEventReporter, this.alerterHolder); runner.setFlowWatcher(watcher) .setJobLogSettings(this.jobLogChunkSize, this.jobLogNumFiles) .setValidateProxyUser(this.validateProxyUser) .setNumJobThreads(numJobThreads).addListener(this); configureFlowLevelMetrics(runner); return runner; } private void submitFlowRunner(final FlowRunner runner) throws ExecutorManagerException { this.runningFlows.put(runner.getExecutionId(), runner); try { // The executorService already has a queue. // The submit method below actually returns an instance of FutureTask, // which implements interface RunnableFuture, which extends both // Runnable and Future interfaces final Future<?> future = this.executorService.submit(runner); // keep track of this future this.submittedFlows.put(future, runner.getExecutionId()); // update the last submitted time. this.lastFlowSubmittedDate = System.currentTimeMillis(); } catch (final RejectedExecutionException re) { this.runningFlows.remove(runner.getExecutionId()); final StringBuffer errorMsg = new StringBuffer( "Azkaban executor can''t execute any more flows. "); if (this.executorService.isShutdown()) { errorMsg.append("The executor is being shut down."); } throw new ExecutorManagerException(errorMsg.toString(), re); } } /** * Configure Azkaban metrics tracking for a new flowRunner instance */ private void configureFlowLevelMetrics(final FlowRunner flowRunner) { logger.info("Configuring Azkaban metrics tracking for flow runner object"); if (MetricReportManager.isAvailable()) { final MetricReportManager metricManager = MetricReportManager.getInstance(); // Adding NumFailedFlow Metric listener flowRunner.addListener((NumFailedFlowMetric) metricManager .getMetricFromName(NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME)); } } public void cancelJobBySLA(final int execId, final String jobId) throws ExecutorManagerException { final FlowRunner flowRunner = this.runningFlows.get(execId); if (flowRunner == null) { throw new ExecutorManagerException("Execution " + execId + " is not running."); } for (final JobRunner jobRunner : flowRunner.getActiveJobRunners()) { if (jobRunner.getJobId().equals(jobId)) { logger.info("Killing job " + jobId + " in execution " + execId + " by SLA"); jobRunner.killBySLA(); break; } } } public void cancelFlow(final int execId, final String user) throws ExecutorManagerException { final FlowRunner runner = this.runningFlows.get(execId); if (runner == null) { throw new ExecutorManagerException("Execution " + execId + " is not running."); } runner.kill(user); } public void pauseFlow(final int execId, final String user) throws ExecutorManagerException { final FlowRunner runner = this.runningFlows.get(execId); if (runner == null) { throw new ExecutorManagerException("Execution " + execId + " is not running."); } runner.pause(user); } public void resumeFlow(final int execId, final String user) throws ExecutorManagerException { final FlowRunner runner = this.runningFlows.get(execId); if (runner == null) { throw new ExecutorManagerException("Execution " + execId + " is not running."); } runner.resume(user); } public void retryFailures(final int execId, final String user) throws ExecutorManagerException { final FlowRunner runner = this.runningFlows.get(execId); if (runner == null) { throw new ExecutorManagerException("Execution " + execId + " is not running."); } runner.retryFailures(user); } public ExecutableFlow getExecutableFlow(final int execId) { final FlowRunner runner = this.runningFlows.get(execId); if (runner == null) { return this.recentlyFinishedFlows.get(execId); } return runner.getExecutableFlow(); } @Override public void handleEvent(final Event event) { if (event.getType() == EventType.FLOW_FINISHED || event.getType() == EventType.FLOW_STARTED) { final FlowRunner flowRunner = (FlowRunner) event.getRunner(); final ExecutableFlow flow = flowRunner.getExecutableFlow(); if (event.getType() == EventType.FLOW_FINISHED) { this.recentlyFinishedFlows.put(flow.getExecutionId(), flow); logger.info("Flow " + flow.getExecutionId() + " is finished. Adding it to recently finished flows list."); this.runningFlows.remove(flow.getExecutionId()); } else if (event.getType() == EventType.FLOW_STARTED) { // add flow level SLA checker this.triggerManager .addTrigger(flow.getExecutionId(), SlaOption.getFlowLevelSLAOptions(flow)); } } } public LogData readFlowLogs(final int execId, final int startByte, final int length) throws ExecutorManagerException { final FlowRunner runner = this.runningFlows.get(execId); if (runner == null) { throw new ExecutorManagerException("Running flow " + execId + " not found."); } final File dir = runner.getExecutionDir(); if (dir != null && dir.exists()) { try { synchronized (this.executionDirDeletionSync) { if (!dir.exists()) { throw new ExecutorManagerException( "Execution dir file doesn''t exist. Probably has beend deleted"); } final File logFile = runner.getFlowLogFile(); if (logFile != null && logFile.exists()) { return FileIOUtils.readUtf8File(logFile, startByte, length); } else { throw new ExecutorManagerException("Flow log file doesn''t exist."); } } } catch (final IOException e) { throw new ExecutorManagerException(e); } } throw new ExecutorManagerException( "Error reading file. Log directory doesn''t exist."); } public LogData readJobLogs(final int execId, final String jobId, final int attempt, final int startByte, final int length) throws ExecutorManagerException { final FlowRunner runner = this.runningFlows.get(execId); if (runner == null) { throw new ExecutorManagerException("Running flow " + execId + " not found."); } final File dir = runner.getExecutionDir(); if (dir != null && dir.exists()) { try { synchronized (this.executionDirDeletionSync) { if (!dir.exists()) { throw new ExecutorManagerException( "Execution dir file doesn''t exist. Probably has beend deleted"); } final File logFile = runner.getJobLogFile(jobId, attempt); if (logFile != null && logFile.exists()) { return FileIOUtils.readUtf8File(logFile, startByte, length); } else { throw new ExecutorManagerException("Job log file doesn''t exist."); } } } catch (final IOException e) { throw new ExecutorManagerException(e); } } throw new ExecutorManagerException( "Error reading file. Log directory doesn''t exist."); } public List<Object> readJobAttachments(final int execId, final String jobId, final int attempt) throws ExecutorManagerException { final FlowRunner runner = this.runningFlows.get(execId); if (runner == null) { throw new ExecutorManagerException("Running flow " + execId + " not found."); } final File dir = runner.getExecutionDir(); if (dir == null || !dir.exists()) { throw new ExecutorManagerException( "Error reading file. Log directory doesn''t exist."); } try { synchronized (this.executionDirDeletionSync) { if (!dir.exists()) { throw new ExecutorManagerException( "Execution dir file doesn''t exist. Probably has beend deleted"); } final File attachmentFile = runner.getJobAttachmentFile(jobId, attempt); if (attachmentFile == null || !attachmentFile.exists()) { return null; } final List<Object> jobAttachments = (ArrayList<Object>) JSONUtils.parseJSONFromFile(attachmentFile); return jobAttachments; } } catch (final IOException e) { throw new ExecutorManagerException(e); } } public JobMetaData readJobMetaData(final int execId, final String jobId, final int attempt, final int startByte, final int length) throws ExecutorManagerException { final FlowRunner runner = this.runningFlows.get(execId); if (runner == null) { throw new ExecutorManagerException("Running flow " + execId + " not found."); } final File dir = runner.getExecutionDir(); if (dir != null && dir.exists()) { try { synchronized (this.executionDirDeletionSync) { if (!dir.exists()) { throw new ExecutorManagerException( "Execution dir file doesn''t exist. Probably has beend deleted"); } final File metaDataFile = runner.getJobMetaDataFile(jobId, attempt); if (metaDataFile != null && metaDataFile.exists()) { return FileIOUtils.readUtf8MetaDataFile(metaDataFile, startByte, length); } else { throw new ExecutorManagerException("Job log file doesn''t exist."); } } } catch (final IOException e) { throw new ExecutorManagerException(e); } } throw new ExecutorManagerException( "Error reading file. Log directory doesn''t exist."); } public long getLastCleanerThreadCheckTime() { return this.lastCleanerThreadCheckTime; } public boolean isCleanerThreadActive() { return this.cleanerThread.isAlive(); } public State getCleanerThreadState() { return this.cleanerThread.getState(); } public boolean isExecutorThreadPoolShutdown() { return this.executorService.isShutdown(); } public int getNumQueuedFlows() { return this.executorService.getQueue().size(); } public int getNumRunningFlows() { return this.executorService.getActiveCount(); } public String getRunningFlowIds() { // The in progress tasks are actually of type FutureTask final Set<Runnable> inProgressTasks = this.executorService.getInProgressTasks(); final List<Integer> runningFlowIds = new ArrayList<>(inProgressTasks.size()); for (final Runnable task : inProgressTasks) { // add casting here to ensure it matches the expected type in // submittedFlows final Integer execId = this.submittedFlows.get((Future<?>) task); if (execId != null) { runningFlowIds.add(execId); } else { logger.warn("getRunningFlowIds: got null execId for task: " + task); } } Collections.sort(runningFlowIds); return runningFlowIds.toString(); } public String getQueuedFlowIds() { final List<Integer> flowIdList = new ArrayList<>(this.executorService.getQueue().size()); for (final Runnable task : this.executorService.getQueue()) { final Integer execId = this.submittedFlows.get(task); if (execId != null) { flowIdList.add(execId); } else { logger .warn("getQueuedFlowIds: got null execId for queuedTask: " + task); } } Collections.sort(flowIdList); return flowIdList.toString(); } public int getMaxNumRunningFlows() { return this.numThreads; } public int getTheadPoolQueueSize() { return this.threadPoolQueueSize; } public void reloadJobTypePlugins() throws JobTypeManagerException { this.jobtypeManager.loadPlugins(); } public int getTotalNumExecutedFlows() { return this.executorService.getTotalTasks(); } @Override public void beforeExecute(final Runnable r) { } @Override public void afterExecute(final Runnable r) { this.submittedFlows.remove(r); } /** * This shuts down the flow runner. The call is blocking and awaits execution of all jobs. */ public void shutdown() { logger.warn("Shutting down FlowRunnerManager..."); if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) { this.pollingService.shutdown(); } this.executorService.shutdown(); boolean result = false; while (!result) { logger.info("Awaiting Shutdown. # of executing flows: " + getNumRunningFlows()); try { result = this.executorService.awaitTermination(1, TimeUnit.MINUTES); } catch (final InterruptedException e) { logger.error(e); } } logger.warn("Shutdown FlowRunnerManager complete."); } /** * This attempts shuts down the flow runner immediately (unsafe). This doesn''t wait for jobs to * finish but interrupts all threads. */ public void shutdownNow() { logger.warn("Shutting down FlowRunnerManager now..."); if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) { this.pollingService.shutdown(); } this.executorService.shutdownNow(); this.triggerManager.shutdown(); } /** * Deleting old execution directory to free disk space. */ public void deleteExecutionDirectory() { logger.warn("Deleting execution dir: " + this.executionDirectory.getAbsolutePath()); try { FileUtils.deleteDirectory(this.executionDirectory); } catch (final IOException e) { logger.error(e); } } private Set<Pair<Integer, Integer>> getActiveProjectVersions() { final Set<Pair<Integer, Integer>> activeProjectVersions = new HashSet<>(); for (final FlowRunner runner : FlowRunnerManager.this.runningFlows.values()) { final ExecutableFlow flow = runner.getExecutableFlow(); activeProjectVersions.add(new Pair<>(flow .getProjectId(), flow.getVersion())); } return activeProjectVersions; } private class CleanerThread extends Thread { // Every hour, clean execution dir. private static final long EXECUTION_DIR_CLEAN_INTERVAL_MS = 60 * 60 * 1000; // Every 2 mins clean the recently finished list private static final long RECENTLY_FINISHED_INTERVAL_MS = 2 * 60 * 1000; // Every 5 mins kill flows running longer than allowed max running time private static final long LONG_RUNNING_FLOW_KILLING_INTERVAL_MS = 5 * 60 * 1000; private final long flowMaxRunningTimeInMins = FlowRunnerManager.this.azkabanProps.getInt( Constants.ConfigurationKeys.AZKABAN_MAX_FLOW_RUNNING_MINS, -1); private boolean shutdown = false; private long lastExecutionDirCleanTime = -1; private long lastRecentlyFinishedCleanTime = -1; private long lastLongRunningFlowCleanTime = -1; public CleanerThread() { this.setName("FlowRunnerManager-Cleaner-Thread"); setDaemon(true); } public void shutdown() { this.shutdown = true; this.interrupt(); } private boolean isFlowRunningLongerThan(final ExecutableFlow flow, final long flowMaxRunningTimeInMins) { final Set<Status> nonFinishingStatusAfterFlowStarts = new HashSet<>( Arrays.asList(Status.RUNNING, Status.QUEUED, Status.PAUSED, Status.FAILED_FINISHING)); return nonFinishingStatusAfterFlowStarts.contains(flow.getStatus()) && flow.getStartTime() > 0 && TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis() - flow.getStartTime()) >= flowMaxRunningTimeInMins; } @Override public void run() { while (!this.shutdown) { synchronized (this) { try { FlowRunnerManager.this.lastCleanerThreadCheckTime = System.currentTimeMillis(); FlowRunnerManager.logger.info("# of executing flows: " + getNumRunningFlows()); // Cleanup old stuff. final long currentTime = System.currentTimeMillis(); if (currentTime - RECENTLY_FINISHED_INTERVAL_MS > this.lastRecentlyFinishedCleanTime) { FlowRunnerManager.logger.info("Cleaning recently finished"); cleanRecentlyFinished(); this.lastRecentlyFinishedCleanTime = currentTime; } if (currentTime - EXECUTION_DIR_CLEAN_INTERVAL_MS > this.lastExecutionDirCleanTime) { FlowRunnerManager.logger.info("Cleaning old execution dirs"); cleanOlderExecutionDirs(); this.lastExecutionDirCleanTime = currentTime; } if (this.flowMaxRunningTimeInMins > 0 && currentTime - LONG_RUNNING_FLOW_KILLING_INTERVAL_MS > this.lastLongRunningFlowCleanTime) { FlowRunnerManager.logger .info(String.format("Killing long jobs running longer than %s mins", this.flowMaxRunningTimeInMins)); for (final FlowRunner flowRunner : FlowRunnerManager.this.runningFlows.values()) { if (isFlowRunningLongerThan(flowRunner.getExecutableFlow(), this.flowMaxRunningTimeInMins)) { FlowRunnerManager.logger.info(String .format("Killing job [id: %s, status: %s]. It has been running for %s mins", flowRunner.getExecutableFlow().getId(), flowRunner.getExecutableFlow().getStatus(), TimeUnit.MILLISECONDS .toMinutes(System.currentTimeMillis() - flowRunner.getExecutableFlow() .getStartTime()))); flowRunner.kill(); } } this.lastLongRunningFlowCleanTime = currentTime; } wait(FlowRunnerManager.RECENTLY_FINISHED_TIME_TO_LIVE); } catch (final InterruptedException e) { FlowRunnerManager.logger.info("Interrupted. Probably to shut down."); } catch (final Throwable t) { FlowRunnerManager.logger.warn( "Uncaught throwable, please look into why it is not caught", t); } } } } private void cleanOlderExecutionDirs() { final File dir = FlowRunnerManager.this.executionDirectory; final long pastTimeThreshold = System.currentTimeMillis() - FlowRunnerManager.this.executionDirRetention; final File[] executionDirs = dir .listFiles(path -> path.isDirectory() && path.lastModified() < pastTimeThreshold); for (final File exDir : executionDirs) { try { final int execId = Integer.valueOf(exDir.getName()); if (FlowRunnerManager.this.runningFlows.containsKey(execId) || FlowRunnerManager.this.recentlyFinishedFlows.containsKey(execId)) { continue; } } catch (final NumberFormatException e) { FlowRunnerManager.logger.error("Can''t delete exec dir " + exDir.getName() + " it is not a number"); continue; } synchronized (FlowRunnerManager.this.executionDirDeletionSync) { try { FileUtils.deleteDirectory(exDir); } catch (final IOException e) { FlowRunnerManager.logger.error("Error cleaning execution dir " + exDir.getPath(), e); } } } } private void cleanRecentlyFinished() { final long cleanupThreshold = System.currentTimeMillis() - FlowRunnerManager.RECENTLY_FINISHED_TIME_TO_LIVE; final ArrayList<Integer> executionToKill = new ArrayList<>(); for (final ExecutableFlow flow : FlowRunnerManager.this.recentlyFinishedFlows.values()) { if (flow.getEndTime() < cleanupThreshold) { executionToKill.add(flow.getExecutionId()); } } for (final Integer id : executionToKill) { FlowRunnerManager.logger.info("Cleaning execution " + id + " from recently finished flows list."); FlowRunnerManager.this.recentlyFinishedFlows.remove(id); } } } /** * Polls new executions from DB periodically and submits the executions to run on the executor. */ @SuppressWarnings("FutureReturnValueIgnored") private class PollingService { private final long pollingIntervalMs; private final ScheduledExecutorService scheduler; private int executorId = -1; public PollingService(final long pollingIntervalMs) { this.pollingIntervalMs = pollingIntervalMs; this.scheduler = Executors.newSingleThreadScheduledExecutor(); } public void start() { this.scheduler.scheduleAtFixedRate(() -> pollExecution(), 0L, this.pollingIntervalMs, TimeUnit.MILLISECONDS); } private void pollExecution() { if (this.executorId == -1) { if (AzkabanExecutorServer.getApp() != null) { try { final Executor executor = requireNonNull(FlowRunnerManager.this.executorLoader .fetchExecutor(AzkabanExecutorServer.getApp().getHost(), AzkabanExecutorServer.getApp().getPort()), "The executor can not be null"); this.executorId = executor.getId(); } catch (final Exception e) { FlowRunnerManager.logger.error("Failed to fetch executor ", e); } } } else { try { // Todo jamiesjc: check executor capacity before polling from DB final int execId = FlowRunnerManager.this.executorLoader .selectAndUpdateExecution(this.executorId, FlowRunnerManager.this.active); if (execId != -1) { FlowRunnerManager.logger.info("Submitting flow " + execId); submitFlow(execId); FlowRunnerManager.this.commonMetrics.markDispatchSuccess(); } } catch (final Exception e) { FlowRunnerManager.logger.error("Failed to submit flow ", e); FlowRunnerManager.this.commonMetrics.markDispatchFail(); } } } public void shutdown() { this.scheduler.shutdown(); this.scheduler.shutdownNow(); } } }
httpclient4.5.5 PoolingHttpClientConnectionManager
今天在做腾讯网址安全检测 API 接入时用到了 org.apache.httpcomponents.httpclient-4.5.5 这个包,当初在 main 方法里测试的时候发现从请求到响应大概要用 0.6s 的时间,但是在 springboot 应用里面使用的时候,第一次请求用了 0.4s 左右,第二次就只要 0.1s 左右,很惊讶,然后想是不是连接池的作用,查找过程如下:
1. 首先在本地,在将程序运行起来后先不进行请求,使用 jmap -histo:live 19144 > d://a.txt 命令(参考 https://www.cnblogs.com/anjijiji/p/6239395.html)将 JVM 中存活的所有对象打印到文件中,在文件中查看是否存在 org.apache.http 开头包名的类,发现找不到。
2. 使用 postman 进行一次 API 请求,然后在使用 jmap -histo:live 19144 > d://a.txt 将文件覆盖后进行查看,发现在文件中找到几个关于 org.apache.http.impl.conn 开头的几个类,将有关连接池的类列出如下:
org.apache.http.impl.conn.CPool
org.apache.http.impl.conn.CPoolEntry
org.apache.http.impl.conn.PoolingHttpClientConnectionManager
PoolingHttpClientConnectionManager 是一个 HttpClientConnection 的连接池封装,可以为多线程提供并发请求服务。主要作用就是分配连接,回收连接等,同一个 route 的请求,会优先使用连接池提供的空闲长连接;CPoolEntry 是 ManagedHttpClientConnection 的封装,包含具体的一个连接的信息;CPool 是继承 AbstractConnPool 的一个连接池实现,PoolingHttpClientConnectionManager 在进行具体的分配连接,回收连接获时实际上调用的是 CPool。
只介绍 cpool 几个属性(供参考):
routeToPool: 具体的路由粒度对应的连接池
leased: 已经被租用的连接(正在被使用的)
available: 空闲连接
pending: 正在等待获取连接队列
maxPerRoute: 每个路由上最大的连接数(不能超过连接池总连接数)
defaultMaxPerRoute: 默认的每个路由上最大的连接数(不能超过连接池总连接数)
maxTotal: 连接池最大的连接数
validateAfterInactivity:空闲永久连接检查间隔,这个牵扯的还比较多官方推荐使用这个来检查永久链接的可用性,而不推荐每次请求的时候才去检查(ms)
3. 在知道以上的原理之后,下面列出具体配置代码(欢迎指出错误和共同交流!)
public class UrlUtil { public static final Logger logger = LoggerFactory.getLogger(UrlUtil.class); private static CloseableHttpClient httpClient = null;// static { PoolingHttpClientConnectionManager clientConnectionManager = new PoolingHttpClientConnectionManager(); clientConnectionManager.setValidateAfterInactivity(2000);//检测有效连接的间隔 clientConnectionManager.setMaxTotal(50);//设定连接池最大数量 clientConnectionManager.setDefaultMaxPerRoute(50);//设定默认单个路由的最大连接数(由于本处只使用一个路由地址所以设定为连接池大小) httpClient = HttpClients.createMinimal(clientConnectionManager); } /** * 检测URL安全,腾讯网址安全API(部分代码) * @param urlString * @return * @throws Exception */ public static void urlSafe(String urlString) throws Exception { CloseableHttpResponse response = null; try { HttpPost httpPost = new HttpPost("http://www.cloud.urlsec.qq.com"); RequestConfig requestConfig = RequestConfig.custom() .setConnectionRequestTimeout(1000)//设定连接服务器超时时间 .setConnectTimeout(1000)//设定从连接池获取可用连接的时间 .setSocketTimeout(1000)//设定获取数据的超时时间 .build(); httpPost.setConfig(requestConfig); HttpEntity entity = new StringEntity(params,"utf-8"); httpPost.setEntity(entity); httpPost.setHeader("Content-type", "application/json"); response = httpClient.execute(httpPost); } catch (Exception e) { throw e; }finally { if (null != response) { response.close(); } } } }
参考文章:https://www.cnblogs.com/shoren/p/httpclient-leaseConnection.html
今天关于HttpRunnerManager 平台异步生成及展示代码覆盖率报告 (ant+jacoco+jenkins+HttpRunnerManager)的讲解已经结束,谢谢您的阅读,如果想了解更多关于CentOS安装部署HttpRunnerManager V2.0、com.amazonaws.mobileconnectors.s3.transfermanager.TransferManager的实例源码、FlowRunnerManager java 调度 Linux 命令源码分析、httpclient4.5.5 PoolingHttpClientConnectionManager的相关知识,请在本站搜索。
本文标签: