GVKun编程网logo

HttpRunnerManager 平台异步生成及展示代码覆盖率报告 (ant+jacoco+jenkins+HttpRunnerManager)

12

在本文中,我们将带你了解HttpRunnerManager平台异步生成及展示代码覆盖率报告(ant+jacoco+jenkins+HttpRunnerManager)在这篇文章中,同时我们还将给您一些

在本文中,我们将带你了解HttpRunnerManager 平台异步生成及展示代码覆盖率报告 (ant+jacoco+jenkins+HttpRunnerManager)在这篇文章中,同时我们还将给您一些技巧,以帮助您实现更有效的CentOS安装部署HttpRunnerManager V2.0、com.amazonaws.mobileconnectors.s3.transfermanager.TransferManager的实例源码、FlowRunnerManager java 调度 Linux 命令源码分析、httpclient4.5.5 PoolingHttpClientConnectionManager

本文目录一览:

HttpRunnerManager 平台异步生成及展示代码覆盖率报告 (ant+jacoco+jenkins+HttpRunnerManager)

HttpRunnerManager 平台异步生成及展示代码覆盖率报告 (ant+jacoco+jenkins+HttpRunnerManager)

ant+jacoco+jenkins+HttpRunnerManager 代码覆盖率统计平台搭建

实现思路通过 jenkins 构建,并使用 HttpRunnerManager 异步实现报告更新与展示。

现在整理一下我的实现流程:

 

一、创建 jenkins 节点并启动此节点

1、jenkins 系统创建 nodes 节点

2、生成镜像,创建并运行节点容器

二、jenkins 创建 job,使用 jacoco 运行程序

1、配置 jdk 和 maven

2、创建 job

3、maven 构建并使用 ant+jacoco 启动脚本

4、创建 build.xml 文件,生成报告

三、覆盖率统计报告服务器

四、httprunnermanager 异步生成报告

1、在宿主机上添加异步任务代码

2、宿主机上启动任务

3、在 httprunnerManager 中添加任务代码

4、httprunnerManager 添加访问报告的链接

五、应用展示

 

一、创建 jenkins 节点并启动此节点

       创建 jenkins 节点,并使用 docker 容器启动节点。

1、jenkins 系统创建 nodes 节点

     a、系统管理 - 管理节点 - 新建节点

             

     b、创建成功后会出现如下界面

          点击 agent.jar,下载 agent.jar 重命名为 slave.jar

        

 2、生成镜像,创建并运行节点容器

     a、下载所需要的环境包

           ant、maven、jacoco、jdk:从官网上下载

           slave.jar:就是刚才下载的 agent.jar。

        

     b、编写 dockerfile 文件

FROM         python:3.6.6-stretch
MAINTAINER    test@123.com
#环境变量
ENV MASTER_DOMAIN ''jenkins_Ip:8080''  #第一步中的jenkins服务器的的IP及端口
ENV AGENT_NAME ''docker-slave-jacoco-medical'' #jenkins的节点名称
ENV SECRET ''1222223333344444444445556666777777666'' # 第一步中的jenkins节点的SECRET
#创建目录
RUN mkdir /var/tmp/jdk /var/tmp/maven /var/tmp/slave /var/tmp/ant /var/tmp/jacoco 
COPY jdk1.8.0_181.tar.gz /var/tmp/jdk/
COPY apache-maven-3.5.4-bin.tar.gz /var/tmp/maven/
COPY slave.jar /var/tmp/slave/
COPY apache-ant-1.10.5-bin.tar.gz /var/tmp/ant/
COPY jacoco-0.8.1.tar.gz /var/tmp/jacoco/
#解压
RUN tar -xzvf /var/tmp/jdk/jdk1.8.0_181.tar.gz -C /var/tmp/jdk \
&& cd /var/tmp/jdk && rm -rf *.tar.gz
RUN tar -xzvf /var/tmp/maven/apache-maven-3.5.4-bin.tar.gz -C /var/tmp/maven \
&& cd /var/tmp/maven && rm -rf *.tar.gz
RUN tar -xzvf /var/tmp/ant/apache-ant-1.10.5-bin.tar.gz -C /var/tmp/ant \
&& cd /var/tmp/ant && rm -rf *.tar.gz
RUN tar -xzvf /var/tmp/jacoco/jacoco-0.8.1.tar.gz -C /var/tmp/jacoco \
&& cd /var/tmp/jacoco && rm -rf *.tar.gz
#EXPOSE 映射端口
EXPOSE 80
#环境变量
ENV ANT_HOME /var/tmp/ant/apache-ant-1.10.5
ENV JAVA_HOME /var/tmp/jdk/jdk1.8.0_181
ENV CLASSPATH $JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
ENV CATALINA_HOME /var/tmp/maven/apache-maven-3.5.4
ENV CATALINA_BASE /var/tmp/maven/apache-maven-3.5.4
ENV PATH $PATH:$JAVA_HOME/bin:$CATALINA_HOME/lib:$CATALINA_HOME/bin:$ANT_HOME/bin
#环境变量生效
#RUN source /etc/profile
#运行代码
RUN cd /var/tmp/jdk/jdk1.8.0_181/bin && ln -s java java_1.8
CMD java_1.8 -jar /var/tmp/slave/slave.jar -jnlpUrl "http://${MASTER_DOMAIN}/computer/${AGENT_NAME}/slave-agent.jnlp" -secret "${SECRET}"
dockerfile 文件内容

 

     c、生成镜像文件

       在 dockerfile 所在的目录下运行

docker build -t docker.io/slave_jacoco_medical:latest .

   

     d、创建并运行容器

        AGENT_NAME:jenkins 节点名称        SECRET:创建节点成功界面的 secret 值

        -e:设置环境变量     --name:容器名称

        -v:挂载,需要把代码所在的工作目录、报告生成的目录挂载到宿主机上

        注意:此处容器的名字与 jenkins 节点远程工作目录名字一致,这样会给后续异步生成报告带来便捷,此处先不过多解释。

        (可以通过传入的 AGENT_NAME、SECRET 变量,生成不同项目的容器)

docker run -d --restart always -p 9001:80 -e AGENT_NAME=''docker-slave-jacoco-medical''  \
-e SECRET=''1222223333344444444445556666777777666''  \
-v /var/jenkins_jacoco_jobs/jenkins_slave_medical_jobs:/jenkins_slave_jacoco_medical \
-v /jenkins_root/jenkins_medical_root:/root \
-v /var/ant_reports/medical_reports:/var/antreports  \
--name jenkins_slave_jacoco_medical docker.io/slave_jacoco_medical:latest

  二、jenkins 创建 job,使用 jacoco 运行程序

     1、配置 jdk 和 maven

           jenkins 使用 maven 生成 jar 包,需要先安装 jdk、maven

           jenkins - 系统管理 - 全局工具配置

          

             

     2、创建 job

          注意此处需要设置为刚才创建的节点名称

        

          源码管理:

         

     3、maven 构建并使用 ant+jacoco 启动脚本

      通过 maven  -f  路径 /pom.xml  install 构建  或如图设置

      

      通过 jacoco 启动程序:“构建 - Execute shell” 设置启动脚本

       

       启动程序后,可通过 http:// 宿主机 IP:9001 / 访问接口 (容器对外挂载的是 9001 端口)  

# 如果程序已经启动,杀掉进程
count=`ps -ef | grep jacocoagent | grep -v "grep" | wc -l`
if [ $count -gt 0 ]; then
ps aux|grep jacocoagent|grep -v grep|awk ''{print $2}''|xargs kill -9
fi
# 删除merged.exec 确保构建后覆盖率清为0
rm -rf /docker_slave_jacoco_medical/merged.exec \
#启动程序
&& java -javaagent:/var/tmp/jacoco/jacoco-0.8.1/lib/jacocoagent.jar=includes=*,output=tcpserver,port=8048,address=127.0.0.1 -jar /jenkins_slave_medical/workspace/jacoco_medical/target/mdc-service-0.0.1-SNAPSHOT.jar &
Execute shell 脚本

 

     4、创建 build.xml 文件,生成报告

        创建 build.xml

<?xml version="1.0" ?>
      <project name="testExec" xmlns:jacoco="antlib:org.jacoco.ant" default="jacoco">      
      <property name="jacocoantPath" value="/var/tmp/jacoco/jacoco-0.8.1/lib/jacocoant.jar"/>
      <property name="jacocoexecPath" value="./merged.exec"/>
      <property name="workspacePath" value="."/>
      <property name="reportfolderPath" value="/var/antreports/reports"/>
      <property name="server_ip" value="127.0.0.1"/>
      <property name="server_port" value="8048"/>
    
      <!-- <property name="srcApiPath" value="/docker_slave_jacoco_medical/workspace/jacoco_medical/mdc-api/src/main/java">
      <property name="classApiPPath" value="/docker_slave_jacoco_medical/workspace/jacoco_medical/mdc-api/target/classes"/> -->

      <property name="srcServicePath" value="/docker_slave_jacoco_medical/workspace/jacoco_medical/mdc-service/src/main/java"/>
      <property name="classServicePath" value="/docker_slave_jacoco_medical/workspace/jacoco_medical/mdc-service/target/classes/com"/>

      <taskdef uri="antlib:org.jacoco.ant" resource="org/jacoco/ant/antlib.xml">
        <classpath path="${jacocoantPath}" />
      </taskdef>    
    
      <target name="merge">
        <jacoco:merge destfile="merged.exec">
          <fileset dir="${workspacePath}" includes="**/*.exec"/>
        </jacoco:merge>
      </target>    
    
      <target name="dump">
        <jacoco:dump address="${server_ip}" reset="false" destfile="${jacocoexecPath}" port="${server_port}" append="true"/>
      </target>
    
      <target name="jacoco">
        <delete dir="${reportfolderPath}" />
        <mkdir dir="${reportfolderPath}" />
    
        <jacoco:report>
          <executiondata>
            <file file="${jacocoexecPath}" />
          </executiondata>
    
          <structure name="JaCoCo Report">
          <group name="order">
            <classfiles>
              <fileset dir="${classServicePath}" />
            </classfiles>
            <sourcefiles encoding="UTF-8">
              <fileset dir="${srcServicePath}" />
            </sourcefiles>
          </group>
          </structure>
          <html destdir="${reportfolderPath}" encoding="utf-8" />
        </jacoco:report>
      </target>
    </project>
build.xml

        进入容器内部执行以下命令,即可生成报告   

ant dump -buildfile   路径/build.xml
ant jacoco -buildfile   路径/build.xml

三、覆盖率统计报告服务器

# 搜索并拉取镜像
docker search tomcat
docker pull tomcat
# 创建并运行容器,把webapp目录挂载出来
docker run -d --restart always -p 9090:8080 -v /var/ant_reports:/usr/local/tomcat/webapps --name tomcat_ant_reports docker.io/tomcat:latest

        通过 http:// 宿主机 IP:9090/medical_reports/reports/ 即可查看代码覆盖率报告

       

四、httprunnermanager 异步生成报告

      1、在宿主机上添加异步任务代码

           

        celerycon.py :

from kombu import Exchange,Queue
from celery import platforms
import os
BROKER_URL =  ''amqp://user:password@rabbitmq的ip:port//''#与httprunnerManager使用同一个rabbitmq
CELERY_RESULT_BACKEND = "db+mysql://root:123456@10.8.154.123:3309/test"
#CELERY_RESULT_BACKEND ="amqp"
CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
)
CELERY_ROUTES = {
''ApiManager.tasks.taskA'':{"queue":"for_task_A","routing_key":"for_task_A"},
''tasks.taskB'':{"queue":"for_task_B","routing_key":"for_task_B"}
}
CELERY_ACCEPT_CONTENT = [''pickle'', ''json'', ''msgpack'', ''yaml'']
platforms.C_FORCE_ROOT = True
CELERYD_MAX_TASKS_PER_CHILD = 40

       tasks.py :

import os
from celery import Celery
app = Celery()
app.config_from_object("celerycon") #获取配置文件
@app.task(bind=True)
def taskA(self, tag, queue=''for_task_A''):
    #在本机执行shell脚本,返回执行结果的状态,成功返回0
    #tag是传入的参数(容器名称)
    lac = os.system(''bash -v /opt/jacoco_work/workSc/ApiManager/ant.sh'' + tag)
    return "Success!" if lac == 0 else "Failure!"
@app.task
def taskB(self,x,y,z):
    return x + y + z

      ant.sh :  

#!bin/bash
docker_name=$1
sudo docker exec $docker_name bin/bash -c "ant dump -buildfile  /$docker_name/build.xml && ant jacoco -buildfile  /$docker_name/build.xml"

#docker_name获取的是执行代码时传入的第一个参数。 #build.xml直接放在jenkins节点(容器)的远程工作目录下,因之前远程工作目录与容器名字相同,此处可以把远程工作目录参数化。如此以来,以后不管任何项目都可以通过参数来生成相应
项目的报告。(这就是之前容器的名字与jenkins节点远程工作目录名字设置一致的原因)

     2、宿主机上启动任务

#进入workSc目录下执行:
# 启动任务
celery multi start jacoco_work -A ApiManager.tasks -l info -n worker1.%h 
# 停止任务
celery multi stop jacoco_work -A ApiManager.tasks -l info -n worker1.%h

     3、在 httprunnerManager 中添加任务代码

       

        views.py 添加

from ApiManager.tasks import *
def refresh1(request):
    # model:容器名称(如:jenkins_slave_jacoco_medical)
    name = request.POST.get(''model'')
    taskA.delay(name)
    data = {
    ''msg'': name + ''    started'',
    ''id'': name
    }
return render_to_response(''refresh.html'', data)

        tasks.py 添加    

@shared_task
def taskA(tag):
    # 以下代码不会执行
    lac = os.system(''bash -v  /opt/jacoco_work/workSc/ApiManager/ant.sh docker_slave_jacoco_medical'')
    return lac

 

     4、httprunnerManager 添加访问报告的链接

          最后在 httprunnerManager 平台使用 iframe 展现 http:// 宿主机 ip:9090/medical_reports/reports/ 页面的报告

五、应用展示

       注意:构建时是从 gitlab 直接拉取代码,所以构建时注意看下开发配置的环境是否为测试环境。

       1、在 httprunnerManager 平台,选择 jacoco 搭建的环境,运行用例

       2、接口测试完毕后,点击” 提交 “实现异步生成报告

      

       3、通过 httprunnerManager 平台查看报告:

 

如果大家有发现什么错误的地方或者好的建议,欢迎评论留言指出,谢谢。

CentOS安装部署HttpRunnerManager V2.0

CentOS安装部署HttpRunnerManager V2.0

 

HttpRunnerManager V2.0此项目是一个基于HttpRunner的接口自动化测试平台,同时HttpRunner 是一款面向 HTTP(S) 协议的通用测试框架,

只需编写维护一份 YAML/JSON 脚本,即可实现自动化测试、性能测试、线上监控、持续集成等多种测试任务。

新版本主要增加了定时任务,异步执行,报告持久化、日志保存以及数据类型支持,QQ群欢迎提问:628448476。

本文将自行进行安装的步骤进行整理发布。

本文在网络参考一些大神发布文章,同时总结了一些新的问题。

 一、准备环境和安装包

主要包括:centos7.6,HttpRunnerManager V2.0,MysqL5.7.26,python3.6.4,Django2.1.2,rabbitmq-server-3.6.8(为了避免兼容问题,强烈建议版本尽可能相同,不同时尽量选更高版本。)

CentOS建议不要用6.X,会出现各种难以解决的问题,一些安装包的很难解决依赖包和相应版本问题,建议重新用centos7X版本。

下载可自行百度,个人在此提供以下包下载链接。

CentOS:https://mirrors.edge.kernel.org/centos/7.6.1810/isos/x86_64/CentOS-7-x86_64-DVD-1810.iso(安装教程:https://www.cnblogs.com/clsn/p/8338099.html#auto_id_22)

HttpRunnerManager V2.0:https://github.com/samasword/HttpRunnerManager或https://github.com/HttpRunner/HttpRunnerManager

其他一些具体配置时提供了下载方法。

二、修改项目设置:

1、打开下载后解压的源码包,进入HttpRunnerManager/HttpRunnerManager文件夹下,找到 settings.py打开修改。

  修改:HttpRunnerManager/HttpRunnerManager/settings.py里DATABASES字典和邮件发送账号相关配置

DATABASES = {
  ‘default‘: {
  ‘ENGINE‘: ‘django.db.backends.MysqL‘,
  ‘NAME‘: ‘HttpRunner‘,# 新建数据库名,自己在数据库新建的一个‘HttpRunner‘
  ‘USER‘: ‘root‘,# 安装的数据库登录名
  ‘PASSWORD‘: ‘lcc123456‘,# 安装的数据库登录密码
  ‘HOST‘: ‘127.0.0.1‘,# 数据库所在服务器ip地址(虚拟机就是虚拟机ip)
  ‘PORT‘: ‘3306‘,# 监听端口 默认3306即可
  }
}

再修改邮箱配置代码:

EMAIL_SEND_USERNAME = ‘[email protected]‘ # 邮箱地址,定时任务报告发送邮箱,支持163,qq,sina,企业QQ邮箱等,注意需要在邮箱设置中开通smtp服务
EMAIL_SEND_PASSWORD = ‘password‘ # 邮箱的登录密码

最后修改worker相关配置:

djcelery.setup_loader()
  CELERY_ENABLE_UTC = True
  CELERY_TIMEZONE = ‘Asia/Shanghai‘
  broKER_URL = ‘amqp://guest:[email protected]:5672//‘ # 127.0.0.1即为rabbitmq-server所在服务器ip地址(比如我的虚拟机192.168.31.16,那就填下这个ip,端口无需修改)
  CELERYBEAT_SCHEDULER = ‘djcelery.schedulers.DatabaseScheduler‘
  CELERY_RESULT_BACKEND = ‘djcelery.backends.database:DatabaseBackend‘
  CELERY_ACCEPT_CONTENT = [‘application/json‘]
  CELERY_TASK_SERIALIZER = ‘json‘
  CELERY_RESULT_SERIALIZER = ‘json‘

  CELERY_TASK_RESULT_EXPIRES = 7200 # celery任务执行结果的超时时间,
  CELERYD_CONCURRENCY = 10 # celery worker的并发数 也是命令行-c指定的数目 根据服务器配置实际更改 默认10
  CELERYD_MAX_TASKS_PER_CHILD = 100 # 每个worker执行了多少任务就会死掉,我建议数量可以大一些,默认100

还要修改HttpRunnerManager目录下requirements.txt:

将Django == 2.0.3改为Django == 2.1.2,以及删除MysqLclient == 1.3.12

因为后续会安装这个版本的Django2.1.2,同时MysqLclient需要手动安装

2、安装数据库(建议cd ../usr/loal后安装软件到根目录的/usr/loal目录下)

个人觉得可以在linux中/usr/loal/目录下直接用:wget http://dev.MysqL.com/get/MysqL57-community-release-el7-8.noarch.rpm进行下载安装(没有此命令,可以yum isntall wget或百度linux下wget安装方法)

具体操作参考这篇博客,个人感觉最详细和合理:https://www.cnblogs.com/dengshihuang/p/8029372.html

3、安装python(建议cd ../usr/loal后安装软件到根目录的/usr/loal目录下)

  下载可以在linux中/usr/loal/直接用:wget https://www.python.org/ftp/python/3.6.4/Python-3.6.4.tgz ,然后解压,安装

  接下来一步步按以下命令执行

tar -xvf Python-3.6.4.tgz
cd Python-3.6.4
./configure  --with-ssl
yum install libffi-devel -y
make all && make install

分享图片


make clean
make distclean
/usr/local/bin/python3 -V

(如果安make装时出现zipimport.ZipImportError: can‘t decompress data; zlib not available
make: *** [install] 错误 1)

请使用以下命令解决:

yum install zlib*

然后在输入y   Is this ok [y/d/N]: y

输入上一条命令后就会显示版本3.6.4

接下来建立软连接指向到当前系统默认python命令的bin目录,让系统使用新版本python 输入以下命令
mv /usr/bin/python /usr/bin/python2.7 
ln -s /usr/local/bin/python3.6 /usr/bin/python
再输入:python -V,即可查看当前默认python版本

4、安装Django,直接使用pip3进行安装:

  pip3 install Django==2.1.2,安装后可进行验证,输入python,再import后打印版本。

  

分享图片

  如果安装失败,可以进行以下尝试;

wget https://www.djangoproject.com/download/2.1.2/tarball/ --no-check-certificate
tar xzvf django-2.1.2.tar.gz    # 解压下载包
cd django-2.1.2               # 进入 Django 目录
python setup.py install       # 执行安装命令

  5、安装RabbitMQ

    具体参考:https://www.cnblogs.com/fxcity/p/11041994.html或者https://testerhome.com/notes/2164

  6、安装MysqLclient,执行命令

    ln -s /usr/local/MysqL/bin/MysqL_config /usr/local/bin/MysqL_config

    pip install MysqLclient 或者 yum install MysqLclient

    

  说明:上述教程不能保证百分百无问题,遇到问题可以自行百度或评论或发信息至我邮箱[email protected]

      有问题把报错的提示复制一两行进行百度或谷歌搜索解决。

三、编译运行

  1、在安装RabbitMQ后,确定该服务已启动,确定数据库MysqL服务和RabbitMQ服务处于启动状态。

  2、在CentsOS中进入的源码包文件夹HttpRunnerManager下,并确定当前文件夹下是有requirements.txt文件的。

    然后执行:pip install -r requirements.txt

  3、执行:python manage.py makemigrations apimanager

  4创建超级用户,执行:python manage.py createsuperuser 用于用户后台管理数据库,并按提示输入相应用户名,密码,邮箱。 如不需用,可跳过此步骤

  5、开启服务,python manage.py runserver 0.0.0.0:8000(ip必须为0.0.0.0,端口8000可自行修改,访问时必须带设置的端口)开启成功如下图。

  说明:Ip为0.0.0.0网上说时为了局域网内其他主机有权限访问,访问时地址是centos的ip加开启服务时设置的端口。

  例如我的centos ip是192.168.31.16

分享图片

  相关地址:http://192.168.31.16:8000/api/login/(登录)

  

分享图片

       http://192.168.31.16:8000/api/register/(注册)

       http://192.168.31.16:8000/admin/(后台)  

  特此说明下:服务正常启动后,centos虚拟机是服务器,这是主机浏览器还是无法访问的,必须再创建一个window虚拟机,在windows虚拟机中访问,

  具体原因应该是局域网网络问题,暂时没找到解决方案。后续有消息会更新或加在评论区。也望各位指正和提醒一些问题的解决方法。

    

  

四、问题补充

1、 后续这个项目具体使用方法参考社区:https://testerhome.com/topics/13295(实际操作)

2、参考链接:

1、https://testerhome.com/topics/18498

2、https://www.cnblogs.com/dengshihuang/p/8029372.html

3、https://testerhome.com/notes/2164

com.amazonaws.mobileconnectors.s3.transfermanager.TransferManager的实例源码

com.amazonaws.mobileconnectors.s3.transfermanager.TransferManager的实例源码

项目:upman    文件:S3Uploader.java   
public static void uploadToS3(TransferManager transferManager,final File file,final FileProgressListener fileProgressListener) throws InterruptedException {
    PutObjectRequest putObjectRequest = new PutObjectRequest("car-classifieds",file.getName(),file)
            .withCannedAcl(CannedAccessControlList.PublicRead);
    final Upload upload = transferManager.upload(putObjectRequest);
    upload.addProgressListener(new ProgressListener() {
        @Override
        public void progressChanged(ProgressEvent progressEvent) {
            fileProgressListener.onProgressChanged(progressEvent.getBytesTransferred());
            if (progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) {
                fileProgressListener.onCompleteUpload();
            }
            if (progressEvent.getEventCode() == com.amazonaws.event.ProgressEvent.STARTED_EVENT_CODE) {
                fileProgressListener.onStartUpload();
            }
            if (progressEvent.getEventCode() == com.amazonaws.event.ProgressEvent.Failed_EVENT_CODE) {
                fileProgressListener.onFailedUpload();
            }
        }
    });
}
项目:snake-game-aws    文件:AWSClientManager.java   
public static void init(Context context)
{
       provider = new CognitoCachingCredentialsProvider(context,AWS_ACCOUNT_ID,COGNITO_POOL_ID,COGNTIO_ROLE_UNAUTH,COGNITO_ROLE_AUTH,Regions.US_EAST_1);

       //initialize the clients
       cognitosync = new CognitoSyncManager(context,Regions.US_EAST_1,provider);        
    manager = new TransferManager(provider);
    ddb = new AmazonDynamoDBClient(provider);
    //ddbmapper = new DynamoDBMapper(ddb);
    analytics = MobileAnalyticsManager.getorCreateInstance(context,MOBILE_ANALYTICS_APP_ID,provider);
    kinesis = new KinesisRecorder(context.getDir(KInesIS_DIRECTORY_NAME,0),provider);
    lambda = new LambdaInvokerFactory(context,Regions.US_WEST_2,provider);
}
项目:upman    文件:S3Uploader.java   
public static void uploadToS3(TransferManager transferManager,final File file) throws InterruptedException {
    PutObjectRequest putObjectRequest = new PutObjectRequest("car-classifieds",file)
            .withCannedAcl(CannedAccessControlList.PublicRead);
    final Upload upload = transferManager.upload(putObjectRequest);
    upload.addProgressListener(new ProgressListener() {
        @Override
        public void progressChanged(ProgressEvent progressEvent) {
            if (progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) {

            }
        }
    });
}
项目:upman    文件:S3Uploader.java   
public static TransferManager getTransferManager(Context context) {
    CognitoCachingCredentialsProvider credentialsProvider = new CognitoCachingCredentialsProvider(
            context,"us-east-1:c2432cb5-55e1-42ec-a12f-4be2bc8abd9a",Regions.US_EAST_1
    );
    TransferManager transferManager = new TransferManager(credentialsProvider);
    return transferManager;
}
项目:kickflip-android-sdk    文件:S3broadcastManager.java   
public S3broadcastManager(broadcaster broadcaster,AWSCredentials creds) {

        // XXX - Need to determine what's going wrong with MD5 computation
        System.setProperty("com.amazonaws.services.s3.disableGetobjectMD5Validation","true");

        mTransferManager = new TransferManager(creds);
        mbroadcaster = broadcaster;
        mQueue = new LinkedBlockingQueue<>();
        mInterceptors = new HashSet<>();
        new Thread(this).start();
    }
项目:aws-mobile-self-paced-labs-samples    文件:AWSClientManager.java   
public static void init(Context context)
{
       provider = new CognitoCachingCredentialsProvider(context,Regions.US_EAST_1);

       //initialize the Cognito Sync Client

       //initialize the Other Clients
    manager = new TransferManager(provider);
    analytics = MobileAnalyticsManager.getorCreateInstance(context,"App_ID_Here",provider);

}
项目:snake-game-aws    文件:AWSClientManager.java   
public static TransferManager getManager() {
    if (manager == null) {
           throw new IllegalStateException("client not initialized yet");
       }
    return manager;
}
项目:aws-mobile-self-paced-labs-samples    文件:AWSClientManager.java   
public static TransferManager getManager() {
    if (manager == null) {
           throw new IllegalStateException("client not initialized yet");
       }
    return manager;
}

FlowRunnerManager java 调度 Linux 命令源码分析

FlowRunnerManager java 调度 Linux 命令源码分析

/*
 * Copyright 2012 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.execapp;

import static java.util.Objects.requireNonNull;

import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
import azkaban.event.Event;
import azkaban.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.event.LocalFlowWatcher;
import azkaban.execapp.event.RemoteFlowWatcher;
import azkaban.execapp.metric.NumFailedFlowMetric;
import azkaban.executor.AlerterHolder;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.Executor;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
import azkaban.metric.MetricReportManager;
import azkaban.metrics.CommonMetrics;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectWhitelist;
import azkaban.project.ProjectWhitelist.WhitelistType;
import azkaban.sla.SlaOption;
import azkaban.spi.AzkabanEventReporter;
import azkaban.spi.EventType;
import azkaban.storage.StorageManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.ThreadPoolExecutingListener;
import azkaban.utils.TrackingThreadPool;
import azkaban.utils.UndefinedPropertyException;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;

/**
 * Execution manager for the server side execution.
 *
 * When a flow is submitted to FlowRunnerManager, it is the {@link Status#PREPARING} status. When a
 * flow is about to be executed by FlowRunner, its status is updated to {@link Status#RUNNING}
 *
 * Two main data structures are used in this class to maintain flows.
 *
 * runningFlows: this is used as a bookkeeping for submitted flows in FlowRunnerManager. It has
 * nothing to do with the executor service that is used to execute the flows. This bookkeeping is
 * used at the time of canceling or killing a flow. The flows in this data structure is removed in
 * the handleEvent method.
 *
 * submittedFlows: this is used to keep track the execution of the flows, so it has the mapping
 * between a Future<?> and an execution id. This would allow us to find out the execution ids of the
 * flows that are in the Status.PREPARING status. The entries in this map is removed once the flow
 * execution is completed.
 */
@Singleton
public class FlowRunnerManager implements EventListener,
    ThreadPoolExecutingListener {

  private static final Logger logger = Logger.getLogger(FlowRunnerManager.class);

  private static final String EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE = "executor.use.bounded.threadpool.queue";
  private static final String EXECUTOR_THREADPOOL_WORKQUEUE_SIZE = "executor.threadpool.workqueue.size";
  private static final String EXECUTOR_FLOW_THREADS = "executor.flow.threads";
  private static final String FLOW_NUM_JOB_THREADS = "flow.num.job.threads";

  // recently finished secs to clean up. 1 minute
  private static final int RECENTLY_FINISHED_TIME_TO_LIVE = 60 * 1000;

  private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
  private static final int DEFAULT_FLOW_NUM_JOB_TREADS = 10;
  private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;

  // this map is used to store the flows that have been submitted to
  // the executor service. Once a flow has been submitted, it is either
  // in the queue waiting to be executed or in executing state.
  private final Map<Future<?>, Integer> submittedFlows = new ConcurrentHashMap<>();
  private final Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap<>();
  // keep track of the number of flow being setup({@link createFlowRunner()})
  private final AtomicInteger preparingFlowCount = new AtomicInteger(0);
  private final Map<Integer, ExecutableFlow> recentlyFinishedFlows = new ConcurrentHashMap<>();
  private final TrackingThreadPool executorService;
  private final CleanerThread cleanerThread;
  private final ExecutorLoader executorLoader;
  private final ProjectLoader projectLoader;
  private final JobTypeManager jobtypeManager;
  private final FlowPreparer flowPreparer;
  private final TriggerManager triggerManager;
  private final AlerterHolder alerterHolder;
  private final AzkabanEventReporter azkabanEventReporter;
  private final Props azkabanProps;
  private final File executionDirectory;
  private final File projectDirectory;
  private final Object executionDirDeletionSync = new Object();
  private final CommonMetrics commonMetrics;
  private final ExecMetrics execMetrics;

  private final int numThreads;
  private final int numJobThreadPerFlow;
  // We want to limit the log sizes to about 20 megs
  private final String jobLogChunkSize;
  private final int jobLogNumFiles;
  // If true, jobs will validate proxy user against a list of valid proxy users.
  private final boolean validateProxyUser;
  private PollingService pollingService;
  private int threadPoolQueueSize = -1;
  private Props globalProps;
  private long lastCleanerThreadCheckTime = -1;
  private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
  // date time of the the last flow submitted.
  private long lastFlowSubmittedDate = 0;
  // Indicate if the executor is set to active.
  private volatile boolean active;

  @Inject
  public FlowRunnerManager(final Props props,
      final ExecutorLoader executorLoader,
      final ProjectLoader projectLoader,
      final StorageManager storageManager,
      final TriggerManager triggerManager,
      final AlerterHolder alerterHolder,
      final CommonMetrics commonMetrics,
      final ExecMetrics execMetrics,
      @Nullable final AzkabanEventReporter azkabanEventReporter) throws IOException {
    this.azkabanProps = props;

    this.executionDirRetention = props.getLong("execution.dir.retention",
        this.executionDirRetention);
    this.azkabanEventReporter = azkabanEventReporter;
    logger.info("Execution dir retention set to " + this.executionDirRetention + " ms");

    this.executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
    if (!this.executionDirectory.exists()) {
      this.executionDirectory.mkdirs();
      setgidPermissionOnExecutionDirectory();
    }
    this.projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
    if (!this.projectDirectory.exists()) {
      this.projectDirectory.mkdirs();
    }

    // azkaban.temp.dir
    this.numThreads = props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
    this.numJobThreadPerFlow = props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
    this.executorService = createExecutorService(this.numThreads);

    this.executorLoader = executorLoader;
    this.projectLoader = projectLoader;
    this.triggerManager = triggerManager;
    this.alerterHolder = alerterHolder;
    this.commonMetrics = commonMetrics;
    this.execMetrics = execMetrics;

    this.jobLogChunkSize = this.azkabanProps.getString("job.log.chunk.size", "5MB");
    this.jobLogNumFiles = this.azkabanProps.getInt("job.log.backup.index", 4);

    this.validateProxyUser = this.azkabanProps.getBoolean("proxy.user.lock.down", false);

    final String globalPropsPath = props.getString("executor.global.properties", null);
    if (globalPropsPath != null) {
      this.globalProps = new Props(null, globalPropsPath);
    }

    this.jobtypeManager =
        new JobTypeManager(props.getString(
            AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR,
            JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), this.globalProps,
            getClass().getClassLoader());

    ProjectCacheCleaner cleaner = null;
    try {
      final double projectCacheSizePercentage =
          props.getDouble(ConfigurationKeys.PROJECT_CACHE_SIZE_PERCENTAGE);
      cleaner = new ProjectCacheCleaner(this.projectDirectory, projectCacheSizePercentage);
    } catch (final UndefinedPropertyException ex) {
    }

    // Create a flow preparer
    this.flowPreparer = new FlowPreparer(storageManager, this.executionDirectory,
        this.projectDirectory, cleaner, this.execMetrics.getProjectCacheHitRatio());

    this.execMetrics.addFlowRunnerManagerMetrics(this);

    this.cleanerThread = new CleanerThread();
    this.cleanerThread.start();

    if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
      this.logger.info("Starting polling service.");
      this.pollingService = new PollingService(this.azkabanProps.getLong
          (ConfigurationKeys.AZKABAN_POLLING_INTERVAL_MS, DEFAULT_POLLING_INTERVAL_MS));
      this.pollingService.start();
    }
  }

  /**
   * Setting the gid bit on the execution directory forces all files/directories created within the
   * directory to be a part of the group associated with the azkaban process. Then, when users
   * create their own files, the azkaban cleanup thread can properly remove them.
   *
   * Java does not provide a standard library api for setting the gid bit because the gid bit is
   * system dependent, so the only way to set this bit is to start a new process and run the shell
   * command "chmod g+s " + execution directory name.
   *
   * Note that this should work on most Linux distributions and MacOS, but will not work on
   * Windows.
   */
  private void setgidPermissionOnExecutionDirectory() throws IOException {
    logger.info("Creating subprocess to run shell command: chmod g+s "
        + this.executionDirectory.toString());
    Runtime.getRuntime().exec("chmod g+s " + this.executionDirectory.toString());
  }

  private TrackingThreadPool createExecutorService(final int nThreads) {
    final boolean useNewThreadPool =
        this.azkabanProps.getBoolean(EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE, false);
    logger.info("useNewThreadPool: " + useNewThreadPool);

    if (useNewThreadPool) {
      this.threadPoolQueueSize =
          this.azkabanProps.getInt(EXECUTOR_THREADPOOL_WORKQUEUE_SIZE, nThreads);
      logger.info("workQueueSize: " + this.threadPoolQueueSize);

      // using a bounded queue for the work queue. The default rejection policy
      // {@ThreadPoolExecutor.AbortPolicy} is used
      final TrackingThreadPool executor =
          new TrackingThreadPool(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
              new LinkedBlockingQueue<>(this.threadPoolQueueSize), this);

      return executor;
    } else {
      // the old way of using unbounded task queue.
      // if the running tasks are taking a long time or stuck, this queue
      // will be very very long.
      return new TrackingThreadPool(nThreads, nThreads, 0L,
          TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), this);
    }
  }

  public void setExecutorActive(final boolean isActive, final String host, final int port)
      throws ExecutorManagerException, InterruptedException {
    final Executor executor = this.executorLoader.fetchExecutor(host, port);
    Preconditions.checkState(executor != null, "Unable to obtain self entry in DB");
    if (executor.isActive() != isActive) {
      executor.setActive(isActive);
      this.executorLoader.updateExecutor(executor);
    } else {
      logger.info(
          "Set active action ignored. Executor is already " + (isActive ? "active" : "inactive"));
    }
    this.active = isActive;
    if (!this.active) {
      // When deactivating this executor, this call will wait to return until every thread in {@link
      // #createFlowRunner} has finished. When deploying new executor, old running executor will be
      // deactivated before new one is activated and only one executor is allowed to
      // delete/hard-linking project dirs to avoid race condition described in {@link
      // FlowPreparer#setup}. So to make deactivation process block until flow preparation work
      // finishes guarantees the old executor won''t access {@link FlowPreparer#setup} after
      // deactivation.
      waitUntilFlowPreparationFinish();
    }
  }

  /**
   * Wait until ongoing flow preparation work finishes.
   */
  private void waitUntilFlowPreparationFinish() throws InterruptedException {
    final Duration SLEEP_INTERVAL = Duration.ofSeconds(5);
    while (this.preparingFlowCount.intValue() != 0) {
      logger.info(this.preparingFlowCount + " flow(s) is/are still being setup before complete "
          + "deactivation.");
      Thread.sleep(SLEEP_INTERVAL.toMillis());
    }
  }

  public long getLastFlowSubmittedTime() {
    // Note: this is not thread safe and may result in providing dirty data.
    //       we will provide this data as is for now and will revisit if there
    //       is a string justification for change.
    return this.lastFlowSubmittedDate;
  }

  public Props getGlobalProps() {
    return this.globalProps;
  }

  public void setGlobalProps(final Props globalProps) {
    this.globalProps = globalProps;
  }

  public void submitFlow(final int execId) throws ExecutorManagerException {
    if (isAlreadyRunning(execId)) {
      return;
    }

    final FlowRunner runner = createFlowRunner(execId);

    // Check again.
    if (isAlreadyRunning(execId)) {
      return;
    }
    submitFlowRunner(runner);
  }

  private boolean isAlreadyRunning(final int execId) throws ExecutorManagerException {
    if (this.runningFlows.containsKey(execId)) {
      logger.info("Execution " + execId + " is already in running.");
      if (!this.submittedFlows.containsValue(execId)) {
        // Execution had been added to running flows but not submitted - something''s wrong.
        // Return a response with error: this is a cue for the dispatcher to retry or finalize the
        // execution as failed.
        throw new ExecutorManagerException("Execution " + execId +
            " is in runningFlows but not in submittedFlows. Most likely submission had failed.");
      }
      // Already running, everything seems fine. Report as a successful submission.
      return true;
    }
    return false;
  }

  /**
   * return whether this execution has useExecutor defined. useExecutor is for running test
   * executions on inactive executor.
   */
  private boolean isExecutorSpecified(final ExecutableFlow flow) {
    return flow.getExecutionOptions().getFlowParameters()
        .containsKey(ExecutionOptions.USE_EXECUTOR);
  }

  private FlowRunner createFlowRunner(final int execId) throws ExecutorManagerException {
    final ExecutableFlow flow;
    flow = this.executorLoader.fetchExecutableFlow(execId);
    if (flow == null) {
      throw new ExecutorManagerException("Error loading flow with exec "
          + execId);
    }

    // Sets up the project files and execution directory.
    this.preparingFlowCount.incrementAndGet();
    // Record the time between submission, and when the flow preparation/execution starts.
    // Note that since submit time is recorded on the web server, while flow preparation is on
    // the executor, there could be some inaccuracies due to clock skew.
    this.commonMetrics.addQueueWait(System.currentTimeMillis() -
        flow.getExecutableFlow().getSubmitTime());

    final Timer.Context flowPrepTimerContext = this.execMetrics.getFlowSetupTimerContext();

    try {
      if (this.active || isExecutorSpecified(flow)) {
        this.flowPreparer.setup(flow);
      } else {
        // Unset the executor.
        this.executorLoader.unsetExecutorIdForExecution(execId);
        throw new ExecutorManagerException("executor became inactive before setting up the "
            + "flow " + execId);
      }
    } finally {
      this.preparingFlowCount.decrementAndGet();
      flowPrepTimerContext.stop();
    }

    // Setup flow runner
    FlowWatcher watcher = null;
    final ExecutionOptions options = flow.getExecutionOptions();
    if (options.getPipelineExecutionId() != null) {
      final Integer pipelineExecId = options.getPipelineExecutionId();
      final FlowRunner runner = this.runningFlows.get(pipelineExecId);

      if (runner != null) {
        watcher = new LocalFlowWatcher(runner);
      } else {
        // also ends up here if execute is called with pipelineExecId that''s not running any more
        // (it could have just finished, for example)
        watcher = new RemoteFlowWatcher(pipelineExecId, this.executorLoader);
      }
    }

    int numJobThreads = this.numJobThreadPerFlow;
    if (options.getFlowParameters().containsKey(FLOW_NUM_JOB_THREADS)) {
      try {
        final int numJobs =
            Integer.valueOf(options.getFlowParameters().get(
                FLOW_NUM_JOB_THREADS));
        if (numJobs > 0 && (numJobs <= numJobThreads || ProjectWhitelist
            .isProjectWhitelisted(flow.getProjectId(),
                WhitelistType.NumJobPerFlow))) {
          numJobThreads = numJobs;
        }
      } catch (final Exception e) {
        throw new ExecutorManagerException(
            "Failed to set the number of job threads "
                + options.getFlowParameters().get(FLOW_NUM_JOB_THREADS)
                + " for flow " + execId, e);
      }
    }

    final FlowRunner runner =
        new FlowRunner(flow, this.executorLoader, this.projectLoader, this.jobtypeManager,
            this.azkabanProps, this.azkabanEventReporter, this.alerterHolder);
    runner.setFlowWatcher(watcher)
        .setJobLogSettings(this.jobLogChunkSize, this.jobLogNumFiles)
        .setValidateProxyUser(this.validateProxyUser)
        .setNumJobThreads(numJobThreads).addListener(this);

    configureFlowLevelMetrics(runner);
    return runner;
  }

  private void submitFlowRunner(final FlowRunner runner) throws ExecutorManagerException {
    this.runningFlows.put(runner.getExecutionId(), runner);
    try {
      // The executorService already has a queue.
      // The submit method below actually returns an instance of FutureTask,
      // which implements interface RunnableFuture, which extends both
      // Runnable and Future interfaces
      final Future<?> future = this.executorService.submit(runner);
      // keep track of this future
      this.submittedFlows.put(future, runner.getExecutionId());
      // update the last submitted time.
      this.lastFlowSubmittedDate = System.currentTimeMillis();
    } catch (final RejectedExecutionException re) {
      this.runningFlows.remove(runner.getExecutionId());
      final StringBuffer errorMsg = new StringBuffer(
          "Azkaban executor can''t execute any more flows. ");
      if (this.executorService.isShutdown()) {
        errorMsg.append("The executor is being shut down.");
      }
      throw new ExecutorManagerException(errorMsg.toString(), re);
    }
  }

  /**
   * Configure Azkaban metrics tracking for a new flowRunner instance
   */
  private void configureFlowLevelMetrics(final FlowRunner flowRunner) {
    logger.info("Configuring Azkaban metrics tracking for flow runner object");

    if (MetricReportManager.isAvailable()) {
      final MetricReportManager metricManager = MetricReportManager.getInstance();
      // Adding NumFailedFlow Metric listener
      flowRunner.addListener((NumFailedFlowMetric) metricManager
          .getMetricFromName(NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME));
    }

  }

  public void cancelJobBySLA(final int execId, final String jobId)
      throws ExecutorManagerException {
    final FlowRunner flowRunner = this.runningFlows.get(execId);

    if (flowRunner == null) {
      throw new ExecutorManagerException("Execution " + execId
          + " is not running.");
    }

    for (final JobRunner jobRunner : flowRunner.getActiveJobRunners()) {
      if (jobRunner.getJobId().equals(jobId)) {
        logger.info("Killing job " + jobId + " in execution " + execId + " by SLA");
        jobRunner.killBySLA();
        break;
      }
    }
  }

  public void cancelFlow(final int execId, final String user)
      throws ExecutorManagerException {
    final FlowRunner runner = this.runningFlows.get(execId);

    if (runner == null) {
      throw new ExecutorManagerException("Execution " + execId
          + " is not running.");
    }

    runner.kill(user);
  }

  public void pauseFlow(final int execId, final String user)
      throws ExecutorManagerException {
    final FlowRunner runner = this.runningFlows.get(execId);

    if (runner == null) {
      throw new ExecutorManagerException("Execution " + execId
          + " is not running.");
    }

    runner.pause(user);
  }

  public void resumeFlow(final int execId, final String user)
      throws ExecutorManagerException {
    final FlowRunner runner = this.runningFlows.get(execId);

    if (runner == null) {
      throw new ExecutorManagerException("Execution " + execId
          + " is not running.");
    }

    runner.resume(user);
  }

  public void retryFailures(final int execId, final String user)
      throws ExecutorManagerException {
    final FlowRunner runner = this.runningFlows.get(execId);

    if (runner == null) {
      throw new ExecutorManagerException("Execution " + execId
          + " is not running.");
    }

    runner.retryFailures(user);
  }

  public ExecutableFlow getExecutableFlow(final int execId) {
    final FlowRunner runner = this.runningFlows.get(execId);
    if (runner == null) {
      return this.recentlyFinishedFlows.get(execId);
    }
    return runner.getExecutableFlow();
  }

  @Override
  public void handleEvent(final Event event) {
    if (event.getType() == EventType.FLOW_FINISHED || event.getType() == EventType.FLOW_STARTED) {
      final FlowRunner flowRunner = (FlowRunner) event.getRunner();
      final ExecutableFlow flow = flowRunner.getExecutableFlow();

      if (event.getType() == EventType.FLOW_FINISHED) {
        this.recentlyFinishedFlows.put(flow.getExecutionId(), flow);
        logger.info("Flow " + flow.getExecutionId()
            + " is finished. Adding it to recently finished flows list.");
        this.runningFlows.remove(flow.getExecutionId());
      } else if (event.getType() == EventType.FLOW_STARTED) {
        // add flow level SLA checker
        this.triggerManager
            .addTrigger(flow.getExecutionId(), SlaOption.getFlowLevelSLAOptions(flow));
      }
    }
  }

  public LogData readFlowLogs(final int execId, final int startByte, final int length)
      throws ExecutorManagerException {
    final FlowRunner runner = this.runningFlows.get(execId);
    if (runner == null) {
      throw new ExecutorManagerException("Running flow " + execId
          + " not found.");
    }

    final File dir = runner.getExecutionDir();
    if (dir != null && dir.exists()) {
      try {
        synchronized (this.executionDirDeletionSync) {
          if (!dir.exists()) {
            throw new ExecutorManagerException(
                "Execution dir file doesn''t exist. Probably has beend deleted");
          }

          final File logFile = runner.getFlowLogFile();
          if (logFile != null && logFile.exists()) {
            return FileIOUtils.readUtf8File(logFile, startByte, length);
          } else {
            throw new ExecutorManagerException("Flow log file doesn''t exist.");
          }
        }
      } catch (final IOException e) {
        throw new ExecutorManagerException(e);
      }
    }

    throw new ExecutorManagerException(
        "Error reading file. Log directory doesn''t exist.");
  }

  public LogData readJobLogs(final int execId, final String jobId, final int attempt,
      final int startByte, final int length) throws ExecutorManagerException {
    final FlowRunner runner = this.runningFlows.get(execId);
    if (runner == null) {
      throw new ExecutorManagerException("Running flow " + execId
          + " not found.");
    }

    final File dir = runner.getExecutionDir();
    if (dir != null && dir.exists()) {
      try {
        synchronized (this.executionDirDeletionSync) {
          if (!dir.exists()) {
            throw new ExecutorManagerException(
                "Execution dir file doesn''t exist. Probably has beend deleted");
          }
          final File logFile = runner.getJobLogFile(jobId, attempt);
          if (logFile != null && logFile.exists()) {
            return FileIOUtils.readUtf8File(logFile, startByte, length);
          } else {
            throw new ExecutorManagerException("Job log file doesn''t exist.");
          }
        }
      } catch (final IOException e) {
        throw new ExecutorManagerException(e);
      }
    }

    throw new ExecutorManagerException(
        "Error reading file. Log directory doesn''t exist.");
  }

  public List<Object> readJobAttachments(final int execId, final String jobId, final int attempt)
      throws ExecutorManagerException {
    final FlowRunner runner = this.runningFlows.get(execId);
    if (runner == null) {
      throw new ExecutorManagerException("Running flow " + execId
          + " not found.");
    }

    final File dir = runner.getExecutionDir();
    if (dir == null || !dir.exists()) {
      throw new ExecutorManagerException(
          "Error reading file. Log directory doesn''t exist.");
    }

    try {
      synchronized (this.executionDirDeletionSync) {
        if (!dir.exists()) {
          throw new ExecutorManagerException(
              "Execution dir file doesn''t exist. Probably has beend deleted");
        }

        final File attachmentFile = runner.getJobAttachmentFile(jobId, attempt);
        if (attachmentFile == null || !attachmentFile.exists()) {
          return null;
        }

        final List<Object> jobAttachments =
            (ArrayList<Object>) JSONUtils.parseJSONFromFile(attachmentFile);

        return jobAttachments;
      }
    } catch (final IOException e) {
      throw new ExecutorManagerException(e);
    }
  }

  public JobMetaData readJobMetaData(final int execId, final String jobId, final int attempt,
      final int startByte, final int length) throws ExecutorManagerException {
    final FlowRunner runner = this.runningFlows.get(execId);
    if (runner == null) {
      throw new ExecutorManagerException("Running flow " + execId
          + " not found.");
    }

    final File dir = runner.getExecutionDir();
    if (dir != null && dir.exists()) {
      try {
        synchronized (this.executionDirDeletionSync) {
          if (!dir.exists()) {
            throw new ExecutorManagerException(
                "Execution dir file doesn''t exist. Probably has beend deleted");
          }
          final File metaDataFile = runner.getJobMetaDataFile(jobId, attempt);
          if (metaDataFile != null && metaDataFile.exists()) {
            return FileIOUtils.readUtf8MetaDataFile(metaDataFile, startByte,
                length);
          } else {
            throw new ExecutorManagerException("Job log file doesn''t exist.");
          }
        }
      } catch (final IOException e) {
        throw new ExecutorManagerException(e);
      }
    }

    throw new ExecutorManagerException(
        "Error reading file. Log directory doesn''t exist.");
  }

  public long getLastCleanerThreadCheckTime() {
    return this.lastCleanerThreadCheckTime;
  }

  public boolean isCleanerThreadActive() {
    return this.cleanerThread.isAlive();
  }

  public State getCleanerThreadState() {
    return this.cleanerThread.getState();
  }

  public boolean isExecutorThreadPoolShutdown() {
    return this.executorService.isShutdown();
  }

  public int getNumQueuedFlows() {
    return this.executorService.getQueue().size();
  }

  public int getNumRunningFlows() {
    return this.executorService.getActiveCount();
  }

  public String getRunningFlowIds() {
    // The in progress tasks are actually of type FutureTask
    final Set<Runnable> inProgressTasks = this.executorService.getInProgressTasks();

    final List<Integer> runningFlowIds =
        new ArrayList<>(inProgressTasks.size());

    for (final Runnable task : inProgressTasks) {
      // add casting here to ensure it matches the expected type in
      // submittedFlows
      final Integer execId = this.submittedFlows.get((Future<?>) task);
      if (execId != null) {
        runningFlowIds.add(execId);
      } else {
        logger.warn("getRunningFlowIds: got null execId for task: " + task);
      }
    }

    Collections.sort(runningFlowIds);
    return runningFlowIds.toString();
  }

  public String getQueuedFlowIds() {
    final List<Integer> flowIdList =
        new ArrayList<>(this.executorService.getQueue().size());

    for (final Runnable task : this.executorService.getQueue()) {
      final Integer execId = this.submittedFlows.get(task);
      if (execId != null) {
        flowIdList.add(execId);
      } else {
        logger
            .warn("getQueuedFlowIds: got null execId for queuedTask: " + task);
      }
    }
    Collections.sort(flowIdList);
    return flowIdList.toString();
  }

  public int getMaxNumRunningFlows() {
    return this.numThreads;
  }

  public int getTheadPoolQueueSize() {
    return this.threadPoolQueueSize;
  }

  public void reloadJobTypePlugins() throws JobTypeManagerException {
    this.jobtypeManager.loadPlugins();
  }

  public int getTotalNumExecutedFlows() {
    return this.executorService.getTotalTasks();
  }

  @Override
  public void beforeExecute(final Runnable r) {
  }

  @Override
  public void afterExecute(final Runnable r) {
    this.submittedFlows.remove(r);
  }

  /**
   * This shuts down the flow runner. The call is blocking and awaits execution of all jobs.
   */
  public void shutdown() {
    logger.warn("Shutting down FlowRunnerManager...");
    if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
      this.pollingService.shutdown();
    }
    this.executorService.shutdown();
    boolean result = false;
    while (!result) {
      logger.info("Awaiting Shutdown. # of executing flows: " + getNumRunningFlows());
      try {
        result = this.executorService.awaitTermination(1, TimeUnit.MINUTES);
      } catch (final InterruptedException e) {
        logger.error(e);
      }
    }
    logger.warn("Shutdown FlowRunnerManager complete.");
  }

  /**
   * This attempts shuts down the flow runner immediately (unsafe). This doesn''t wait for jobs to
   * finish but interrupts all threads.
   */
  public void shutdownNow() {
    logger.warn("Shutting down FlowRunnerManager now...");
    if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
      this.pollingService.shutdown();
    }
    this.executorService.shutdownNow();
    this.triggerManager.shutdown();
  }

  /**
   * Deleting old execution directory to free disk space.
   */
  public void deleteExecutionDirectory() {
    logger.warn("Deleting execution dir: " + this.executionDirectory.getAbsolutePath());
    try {
      FileUtils.deleteDirectory(this.executionDirectory);
    } catch (final IOException e) {
      logger.error(e);
    }
  }

  private Set<Pair<Integer, Integer>> getActiveProjectVersions() {
    final Set<Pair<Integer, Integer>> activeProjectVersions = new HashSet<>();
    for (final FlowRunner runner : FlowRunnerManager.this.runningFlows.values()) {
      final ExecutableFlow flow = runner.getExecutableFlow();
      activeProjectVersions.add(new Pair<>(flow
          .getProjectId(), flow.getVersion()));
    }
    return activeProjectVersions;
  }


  private class CleanerThread extends Thread {

    // Every hour, clean execution dir.
    private static final long EXECUTION_DIR_CLEAN_INTERVAL_MS = 60 * 60 * 1000;
    // Every 2 mins clean the recently finished list
    private static final long RECENTLY_FINISHED_INTERVAL_MS = 2 * 60 * 1000;
    // Every 5 mins kill flows running longer than allowed max running time
    private static final long LONG_RUNNING_FLOW_KILLING_INTERVAL_MS = 5 * 60 * 1000;
    private final long flowMaxRunningTimeInMins = FlowRunnerManager.this.azkabanProps.getInt(
        Constants.ConfigurationKeys.AZKABAN_MAX_FLOW_RUNNING_MINS, -1);
    private boolean shutdown = false;
    private long lastExecutionDirCleanTime = -1;
    private long lastRecentlyFinishedCleanTime = -1;
    private long lastLongRunningFlowCleanTime = -1;

    public CleanerThread() {
      this.setName("FlowRunnerManager-Cleaner-Thread");
      setDaemon(true);
    }

    public void shutdown() {
      this.shutdown = true;
      this.interrupt();
    }

    private boolean isFlowRunningLongerThan(final ExecutableFlow flow,
        final long flowMaxRunningTimeInMins) {
      final Set<Status> nonFinishingStatusAfterFlowStarts = new HashSet<>(
          Arrays.asList(Status.RUNNING, Status.QUEUED, Status.PAUSED, Status.FAILED_FINISHING));
      return nonFinishingStatusAfterFlowStarts.contains(flow.getStatus()) && flow.getStartTime() > 0
          && TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis() - flow.getStartTime())
          >= flowMaxRunningTimeInMins;
    }

    @Override
    public void run() {
      while (!this.shutdown) {
        synchronized (this) {
          try {
            FlowRunnerManager.this.lastCleanerThreadCheckTime = System.currentTimeMillis();
            FlowRunnerManager.logger.info("# of executing flows: " + getNumRunningFlows());

            // Cleanup old stuff.
            final long currentTime = System.currentTimeMillis();
            if (currentTime - RECENTLY_FINISHED_INTERVAL_MS > this.lastRecentlyFinishedCleanTime) {
              FlowRunnerManager.logger.info("Cleaning recently finished");
              cleanRecentlyFinished();
              this.lastRecentlyFinishedCleanTime = currentTime;
            }

            if (currentTime - EXECUTION_DIR_CLEAN_INTERVAL_MS > this.lastExecutionDirCleanTime) {
              FlowRunnerManager.logger.info("Cleaning old execution dirs");
              cleanOlderExecutionDirs();
              this.lastExecutionDirCleanTime = currentTime;
            }

            if (this.flowMaxRunningTimeInMins > 0
                && currentTime - LONG_RUNNING_FLOW_KILLING_INTERVAL_MS
                > this.lastLongRunningFlowCleanTime) {
              FlowRunnerManager.logger
                  .info(String.format("Killing long jobs running longer than %s mins",
                      this.flowMaxRunningTimeInMins));
              for (final FlowRunner flowRunner : FlowRunnerManager.this.runningFlows.values()) {
                if (isFlowRunningLongerThan(flowRunner.getExecutableFlow(),
                    this.flowMaxRunningTimeInMins)) {
                  FlowRunnerManager.logger.info(String
                      .format("Killing job [id: %s, status: %s]. It has been running for %s mins",
                          flowRunner.getExecutableFlow().getId(),
                          flowRunner.getExecutableFlow().getStatus(), TimeUnit.MILLISECONDS
                              .toMinutes(System.currentTimeMillis() - flowRunner.getExecutableFlow()
                                  .getStartTime())));
                  flowRunner.kill();
                }
              }
              this.lastLongRunningFlowCleanTime = currentTime;
            }

            wait(FlowRunnerManager.RECENTLY_FINISHED_TIME_TO_LIVE);
          } catch (final InterruptedException e) {
            FlowRunnerManager.logger.info("Interrupted. Probably to shut down.");
          } catch (final Throwable t) {
            FlowRunnerManager.logger.warn(
                "Uncaught throwable, please look into why it is not caught", t);
          }
        }
      }
    }

    private void cleanOlderExecutionDirs() {
      final File dir = FlowRunnerManager.this.executionDirectory;

      final long pastTimeThreshold =
          System.currentTimeMillis() - FlowRunnerManager.this.executionDirRetention;
      final File[] executionDirs = dir
          .listFiles(path -> path.isDirectory() && path.lastModified() < pastTimeThreshold);

      for (final File exDir : executionDirs) {
        try {
          final int execId = Integer.valueOf(exDir.getName());
          if (FlowRunnerManager.this.runningFlows.containsKey(execId)
              || FlowRunnerManager.this.recentlyFinishedFlows.containsKey(execId)) {
            continue;
          }
        } catch (final NumberFormatException e) {
          FlowRunnerManager.logger.error("Can''t delete exec dir " + exDir.getName()
              + " it is not a number");
          continue;
        }

        synchronized (FlowRunnerManager.this.executionDirDeletionSync) {
          try {
            FileUtils.deleteDirectory(exDir);
          } catch (final IOException e) {
            FlowRunnerManager.logger.error("Error cleaning execution dir " + exDir.getPath(), e);
          }
        }
      }
    }

    private void cleanRecentlyFinished() {
      final long cleanupThreshold =
          System.currentTimeMillis() - FlowRunnerManager.RECENTLY_FINISHED_TIME_TO_LIVE;
      final ArrayList<Integer> executionToKill = new ArrayList<>();
      for (final ExecutableFlow flow : FlowRunnerManager.this.recentlyFinishedFlows.values()) {
        if (flow.getEndTime() < cleanupThreshold) {
          executionToKill.add(flow.getExecutionId());
        }
      }

      for (final Integer id : executionToKill) {
        FlowRunnerManager.logger.info("Cleaning execution " + id
            + " from recently finished flows list.");
        FlowRunnerManager.this.recentlyFinishedFlows.remove(id);
      }
    }
  }

  /**
   * Polls new executions from DB periodically and submits the executions to run on the executor.
   */
  @SuppressWarnings("FutureReturnValueIgnored")
  private class PollingService {

    private final long pollingIntervalMs;
    private final ScheduledExecutorService scheduler;
    private int executorId = -1;

    public PollingService(final long pollingIntervalMs) {
      this.pollingIntervalMs = pollingIntervalMs;
      this.scheduler = Executors.newSingleThreadScheduledExecutor();
    }

    public void start() {
      this.scheduler.scheduleAtFixedRate(() -> pollExecution(), 0L, this.pollingIntervalMs,
          TimeUnit.MILLISECONDS);
    }

    private void pollExecution() {
      if (this.executorId == -1) {
        if (AzkabanExecutorServer.getApp() != null) {
          try {
            final Executor executor = requireNonNull(FlowRunnerManager.this.executorLoader
                .fetchExecutor(AzkabanExecutorServer.getApp().getHost(),
                    AzkabanExecutorServer.getApp().getPort()), "The executor can not be null");
            this.executorId = executor.getId();
          } catch (final Exception e) {
            FlowRunnerManager.logger.error("Failed to fetch executor ", e);
          }
        }
      } else {
        try {
          // Todo jamiesjc: check executor capacity before polling from DB
          final int execId = FlowRunnerManager.this.executorLoader
              .selectAndUpdateExecution(this.executorId, FlowRunnerManager.this.active);
          if (execId != -1) {
            FlowRunnerManager.logger.info("Submitting flow " + execId);
            submitFlow(execId);
            FlowRunnerManager.this.commonMetrics.markDispatchSuccess();
          }
        } catch (final Exception e) {
          FlowRunnerManager.logger.error("Failed to submit flow ", e);
          FlowRunnerManager.this.commonMetrics.markDispatchFail();
        }
      }
    }

    public void shutdown() {
      this.scheduler.shutdown();
      this.scheduler.shutdownNow();
    }
  }
}

httpclient4.5.5 PoolingHttpClientConnectionManager

httpclient4.5.5 PoolingHttpClientConnectionManager

    今天在做腾讯网址安全检测 API 接入时用到了 org.apache.httpcomponents.httpclient-4.5.5 这个包,当初在 main 方法里测试的时候发现从请求到响应大概要用 0.6s 的时间,但是在 springboot 应用里面使用的时候,第一次请求用了 0.4s 左右,第二次就只要 0.1s 左右,很惊讶,然后想是不是连接池的作用,查找过程如下:

    1. 首先在本地,在将程序运行起来后先不进行请求,使用 jmap -histo:live 19144 > d://a.txt 命令(参考 https://www.cnblogs.com/anjijiji/p/6239395.html)将 JVM 中存活的所有对象打印到文件中,在文件中查看是否存在 org.apache.http 开头包名的类,发现找不到。

    2. 使用 postman 进行一次 API 请求,然后在使用 jmap -histo:live 19144 > d://a.txt 将文件覆盖后进行查看,发现在文件中找到几个关于 org.apache.http.impl.conn 开头的几个类,将有关连接池的类列出如下:

    org.apache.http.impl.conn.CPool

    org.apache.http.impl.conn.CPoolEntry

    org.apache.http.impl.conn.PoolingHttpClientConnectionManager

 

    PoolingHttpClientConnectionManager 是一个 HttpClientConnection 的连接池封装,可以为多线程提供并发请求服务。主要作用就是分配连接,回收连接等,同一个 route 的请求,会优先使用连接池提供的空闲长连接;CPoolEntry 是 ManagedHttpClientConnection 的封装,包含具体的一个连接的信息;CPool 是继承 AbstractConnPool 的一个连接池实现,PoolingHttpClientConnectionManager 在进行具体的分配连接,回收连接获时实际上调用的是 CPool。

    

    只介绍 cpool 几个属性(供参考):

        routeToPool: 具体的路由粒度对应的连接池

        leased:    已经被租用的连接(正在被使用的)

        available:    空闲连接

        pending:    正在等待获取连接队列

        maxPerRoute:    每个路由上最大的连接数(不能超过连接池总连接数)

        defaultMaxPerRoute:    默认的每个路由上最大的连接数(不能超过连接池总连接数)

        maxTotal:    连接池最大的连接数

        validateAfterInactivity:空闲永久连接检查间隔,这个牵扯的还比较多官方推荐使用这个来检查永久链接的可用性,而不推荐每次请求的时候才去检查(ms)

   3. 在知道以上的原理之后,下面列出具体配置代码(欢迎指出错误和共同交流!)

public class UrlUtil {
    public static final Logger logger = LoggerFactory.getLogger(UrlUtil.class);

    private static CloseableHttpClient httpClient = null;//
    static {
        PoolingHttpClientConnectionManager clientConnectionManager = new PoolingHttpClientConnectionManager();
        clientConnectionManager.setValidateAfterInactivity(2000);//检测有效连接的间隔
        clientConnectionManager.setMaxTotal(50);//设定连接池最大数量
        clientConnectionManager.setDefaultMaxPerRoute(50);//设定默认单个路由的最大连接数(由于本处只使用一个路由地址所以设定为连接池大小)
        httpClient = HttpClients.createMinimal(clientConnectionManager);
    }

    /**
     * 检测URL安全,腾讯网址安全API(部分代码)
     * @param urlString
     * @return
     * @throws Exception
     */
    public static void urlSafe(String urlString) throws Exception {

        CloseableHttpResponse response = null;
        try {
            HttpPost httpPost = new HttpPost("http://www.cloud.urlsec.qq.com");
            RequestConfig requestConfig = RequestConfig.custom()
                    .setConnectionRequestTimeout(1000)//设定连接服务器超时时间
                    .setConnectTimeout(1000)//设定从连接池获取可用连接的时间
                    .setSocketTimeout(1000)//设定获取数据的超时时间
                    .build();
            httpPost.setConfig(requestConfig);
            HttpEntity entity = new StringEntity(params,"utf-8");
            httpPost.setEntity(entity);
            httpPost.setHeader("Content-type", "application/json");
            response = httpClient.execute(httpPost);
        } catch (Exception e) {
            throw e;
        }finally {
            if (null != response) {
                response.close();
            }
        }
    }

}

参考文章:https://www.cnblogs.com/shoren/p/httpclient-leaseConnection.html

今天关于HttpRunnerManager 平台异步生成及展示代码覆盖率报告 (ant+jacoco+jenkins+HttpRunnerManager)的讲解已经结束,谢谢您的阅读,如果想了解更多关于CentOS安装部署HttpRunnerManager V2.0、com.amazonaws.mobileconnectors.s3.transfermanager.TransferManager的实例源码、FlowRunnerManager java 调度 Linux 命令源码分析、httpclient4.5.5 PoolingHttpClientConnectionManager的相关知识,请在本站搜索。

本文标签: