GVKun编程网logo

springboot 集成调用 Azkaban(springboot集成keycloak)

4

如果您对springboot集成调用Azkaban感兴趣,那么本文将是一篇不错的选择,我们将为您详在本文中,您将会了解到关于springboot集成调用Azkaban的详细内容,我们还将为您解答spr

如果您对springboot 集成调用 Azkaban感兴趣,那么本文将是一篇不错的选择,我们将为您详在本文中,您将会了解到关于springboot 集成调用 Azkaban的详细内容,我们还将为您解答springboot集成keycloak的相关问题,并且为您提供关于Java-Springboot - 集成 spring-security 简单示例 (Version-springboot-2-1-3-RELEASE、java版springcloud+springboot多租户社交电子商务-springboot集成apidoc、SpringBoot (九)_springboot 集成 MyBatis、SpringBoot + KafKa集群的集成的有价值信息。

本文目录一览:

springboot 集成调用 Azkaban(springboot集成keycloak)

springboot 集成调用 Azkaban(springboot集成keycloak)

springboot 集成调用 Azkaban

 

一、 说明

  1.Azkaban 是由 Linkedin 公司推出的一个批量工作流任务调度器,主要用于在一个工作流内以一个特定的顺序运行一组工作和流程,它的配置是通过简单的 key:value 对的方式,通过配置中的 dependencies 来设置依赖关系,这个依赖关系必须是无环的,否则会被视为无效的工作流。Azkaban 使用 job 配置文件建立任务之间的依赖关系,并提供一个易于使用的 web 用户界面维护和跟踪你的工作流。

        2.springboot 版本:2.0.5  azkaban 版本:3.59.0

 

二、maven 依赖

<dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.9</version>
        </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.7</version>
        </dependency>
View Code

 

三、代码

  1.azkaban 配置文件(注意需要在启动类 @PropertySource 标签中引入读取该配置文件)

monitor.azkaban.username=azkaban
monitor.azkaban.password=azkaban
monitor.azkaban.url=http://192.168.11.12:8081
monitor.azkaban.connectTimeout=60000
monitor.azkaban.readTimeout=120000
azkaban.properties

  2.azkaban 配置实体类

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;

@Configuration
public class AzkabanConfig {
    @Value("${monitor.azkaban.username}")
    private String azkUsername;
    @Value("${monitor.azkaban.password}")
    private String azkPassword;
    @Value("${monitor.azkaban.url}")
    private String azkUrl;
    @Value("${monitor.azkaban.connectTimeout}")
    private int connectTimeout;
    @Value("${monitor.azkaban.readTimeout}")
    private int readTimeout;

    @Bean
    public RestTemplate getRestTemplate() {
        SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
        requestFactory.setConnectTimeout(connectTimeout);
        requestFactory.setReadTimeout(readTimeout);
        RestTemplate restTemplate = new RestTemplate(requestFactory);
        return restTemplate;
    }

    public String getAzkUsername() {
        return azkUsername;
    }

    public void setAzkUsername(String azkUsername) {
        this.azkUsername = azkUsername;
    }

    public String getAzkPassword() {
        return azkPassword;
    }

    public void setAzkPassword(String azkPassword) {
        this.azkPassword = azkPassword;
    }

    public String getAzkUrl() {
        return azkUrl;
    }

    public void setAzkUrl(String azkUrl) {
        this.azkUrl = azkUrl;
    }

}
AzkabanConfig

  3.HttpClient 配置 SSL 绕过 https 证书 

import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;

import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;

public class SSLUtil {

    private static final String PROTOCOL = "SSL";
    private static final TrustManager[] UNQUESTIONING_TRUST_MANAGER = new TrustManager[] { new X509TrustManager() {

        public java.security.cert.X509Certificate[] getAcceptedIssuers() {
            return null;
        }

        public void checkClientTrusted(X509Certificate[] certs, String authType) {
        }

        public void checkServerTrusted(X509Certificate[] certs, String authType) {
        }
    } };

    private SSLUtil() {
    }

    public static void turnOffSslChecking() throws NoSuchAlgorithmException, KeyManagementException {
        final SSLContext sc = SSLContext.getInstance(PROTOCOL);
        sc.init(null, UNQUESTIONING_TRUST_MANAGER, null);
        HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
    }

    public static void turnOnSslChecking() throws KeyManagementException, NoSuchAlgorithmException {
        SSLContext.getInstance(PROTOCOL).init(null, null, null);
    }
}
SSLUtil

  4. 常量类

public interface SysContants {
    
        /**azkaban成功状态**/
    String AZK_SUCCESS = "success";
    
}
SysContants

  5. 根据 azkaban 返回数据定制实体类 (注释少抱歉)

import java.util.List;

public class ExecNode {
    private String nestedId;
    private List<String> in;
    private String status;
    private String id;
    private String type;
    private Long updateTime;
    private Long startTime;
    private Long endTime;
    private Long attempt;

    public String getNestedId() {
        return nestedId;
    }

    public void setNestedId(String nestedId) {
        this.nestedId = nestedId;
    }

    public List<String> getIn() {
        return in;
    }

    public void setIn(List<String> in) {
        this.in = in;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public Long getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Long updateTime) {
        this.updateTime = updateTime;
    }

    public Long getStartTime() {
        return startTime;
    }

    public void setStartTime(Long startTime) {
        this.startTime = startTime;
    }

    public Long getEndTime() {
        return endTime;
    }

    public void setEndTime(Long endTime) {
        this.endTime = endTime;
    }

    public Long getAttempt() {
        return attempt;
    }

    public void setAttempt(Long attempt) {
        this.attempt = attempt;
    }
}
ExecNode
import java.util.List;

public class ExecNodeBean {
    private String nestedId;
    private List<String> dependencies;
    private String status;
    private String jobId;
    private String type;
    private String updateTime;
    private String startTime;
    private String endTime;
    private Long attempt;
    private String logs;
    private Long elapsed;

    public String getNestedId() {
        return nestedId;
    }

    public void setNestedId(String nestedId) {
        this.nestedId = nestedId;
    }

    public List<String> getDependencies() {
        return dependencies;
    }

    public void setDependencies(List<String> dependencies) {
        this.dependencies = dependencies;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getJobId() {
        return jobId;
    }

    public void setJobId(String jobId) {
        this.jobId = jobId;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(String updateTime) {
        this.updateTime = updateTime;
    }

    public String getStartTime() {
        return startTime;
    }

    public void setStartTime(String startTime) {
        this.startTime = startTime;
    }

    public String getEndTime() {
        return endTime;
    }

    public void setEndTime(String endTime) {
        this.endTime = endTime;
    }

    public Long getAttempt() {
        return attempt;
    }

    public void setAttempt(Long attempt) {
        this.attempt = attempt;
    }

    public String getLogs() {
        return logs;
    }

    public void setLogs(String logs) {
        this.logs = logs;
    }

    public Long getElapsed() {
        return elapsed;
    }

    public void setElapsed(Long elapsed) {
        this.elapsed = elapsed;
    }
    
}
ExecNodeBean
public class Execution {
    private String submitUser;
    private String flowId;
    private String status;
    private Long submitTime;
    private Long startTime;
    private Long endTime;
    private Long projectId;
    private Long execId;

    public String getSubmitUser() {
        return submitUser;
    }

    public void setSubmitUser(String submitUser) {
        this.submitUser = submitUser;
    }

    public String getFlowId() {
        return flowId;
    }

    public void setFlowId(String flowId) {
        this.flowId = flowId;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public Long getSubmitTime() {
        return submitTime;
    }

    public void setSubmitTime(Long submitTime) {
        this.submitTime = submitTime;
    }

    public Long getStartTime() {
        return startTime;
    }

    public void setStartTime(Long startTime) {
        this.startTime = startTime;
    }

    public Long getEndTime() {
        return endTime;
    }

    public void setEndTime(Long endTime) {
        this.endTime = endTime;
    }

    public Long getProjectId() {
        return projectId;
    }

    public void setProjectId(Long projectId) {
        this.projectId = projectId;
    }

    public Long getExecId() {
        return execId;
    }

    public void setExecId(Long execId) {
        this.execId = execId;
    }
Execution
import java.util.List;

public class ExecutionInfo {
    private String project;
    private String type;
    private Long updateTime;
    private Long attempt;
    private Long execid;
    private Long submitTime;
    private Long startTime;
    private Long endTime;
    private Long projectId;
    private String nestedId;
    private String submitUser;
    private String id;
    private String flowId;
    private String flow;
    private String status;
    private List<ExecNode> nodes;

    public String getProject() {
        return project;
    }

    public void setProject(String project) {
        this.project = project;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public Long getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Long updateTime) {
        this.updateTime = updateTime;
    }

    public Long getAttempt() {
        return attempt;
    }

    public void setAttempt(Long attempt) {
        this.attempt = attempt;
    }

    public Long getExecid() {
        return execid;
    }

    public void setExecid(Long execid) {
        this.execid = execid;
    }

    public Long getSubmitTime() {
        return submitTime;
    }

    public void setSubmitTime(Long submitTime) {
        this.submitTime = submitTime;
    }

    public Long getStartTime() {
        return startTime;
    }

    public void setStartTime(Long startTime) {
        this.startTime = startTime;
    }

    public Long getEndTime() {
        return endTime;
    }

    public void setEndTime(Long endTime) {
        this.endTime = endTime;
    }

    public Long getProjectId() {
        return projectId;
    }

    public void setProjectId(Long projectId) {
        this.projectId = projectId;
    }

    public String getNestedId() {
        return nestedId;
    }

    public void setNestedId(String nestedId) {
        this.nestedId = nestedId;
    }

    public String getSubmitUser() {
        return submitUser;
    }

    public void setSubmitUser(String submitUser) {
        this.submitUser = submitUser;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getFlowId() {
        return flowId;
    }

    public void setFlowId(String flowId) {
        this.flowId = flowId;
    }

    public String getFlow() {
        return flow;
    }

    public void setFlow(String flow) {
        this.flow = flow;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public List<ExecNode> getNodes() {
        return nodes;
    }

    public void setNodes(List<ExecNode> nodes) {
        this.nodes = nodes;
    }

}
ExecutionInfo
import java.util.List;

public class ExecutionInfoBean{
        
    private String project;
    private String type;
    private String updateTime;
    private Long attempt;
    private Long execid;
    private String submitTime;
    private String startTime;
    private String endTime;
    private Long projectId;
    private String nestedId;
    private String submitUser;
    private String jobId;
    private String flowId;
    private String flow;
    private String status;
    private String flowLog;
    private Long elapsed;
    private List<ExecNodeBean> nodes;

    public String getProject() {
        return project;
    }

    public void setProject(String project) {
        this.project = project;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(String updateTime) {
        this.updateTime = updateTime;
    }

    public Long getAttempt() {
        return attempt;
    }

    public void setAttempt(Long attempt) {
        this.attempt = attempt;
    }

    public Long getExecid() {
        return execid;
    }

    public void setExecid(Long execid) {
        this.execid = execid;
    }

    public String getSubmitTime() {
        return submitTime;
    }

    public void setSubmitTime(String submitTime) {
        this.submitTime = submitTime;
    }

    public String getStartTime() {
        return startTime;
    }

    public void setStartTime(String startTime) {
        this.startTime = startTime;
    }

    public String getEndTime() {
        return endTime;
    }

    public void setEndTime(String endTime) {
        this.endTime = endTime;
    }

    public Long getProjectId() {
        return projectId;
    }

    public void setProjectId(Long projectId) {
        this.projectId = projectId;
    }

    public String getNestedId() {
        return nestedId;
    }

    public void setNestedId(String nestedId) {
        this.nestedId = nestedId;
    }

    public String getSubmitUser() {
        return submitUser;
    }

    public void setSubmitUser(String submitUser) {
        this.submitUser = submitUser;
    }

    public String getJobId() {
        return jobId;
    }

    public void setJobId(String jobId) {
        this.jobId = jobId;
    }

    public String getFlowId() {
        return flowId;
    }

    public void setFlowId(String flowId) {
        this.flowId = flowId;
    }

    public String getFlow() {
        return flow;
    }

    public void setFlow(String flow) {
        this.flow = flow;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getFlowLog() {
        return flowLog;
    }

    public void setFlowLog(String flowLog) {
        this.flowLog = flowLog;
    }

    public Long getElapsed() {
        return elapsed;
    }

    public void setElapsed(Long elapsed) {
        this.elapsed = elapsed;
    }

    public List<ExecNodeBean> getNodes() {
        return nodes;
    }

    public void setNodes(List<ExecNodeBean> nodes) {
        this.nodes = nodes;
    }

}
ExecutionInfoBean
import java.util.List;

public class FlowExecution {
    private String project;
    private String flow;
    private Long total;
    private Long length;
    private Long from;
    private Long projectId;
    private List<Execution> executions;

    public String getProject() {
        return project;
    }

    public void setProject(String project) {
        this.project = project;
    }

    public String getFlow() {
        return flow;
    }

    public void setFlow(String flow) {
        this.flow = flow;
    }

    public Long getTotal() {
        return total;
    }

    public void setTotal(Long total) {
        this.total = total;
    }

    public Long getLength() {
        return length;
    }

    public void setLength(Long length) {
        this.length = length;
    }

    public Long getFrom() {
        return from;
    }

    public void setFrom(Long from) {
        this.from = from;
    }

    public Long getProjectId() {
        return projectId;
    }

    public void setProjectId(Long projectId) {
        this.projectId = projectId;
    }

    public List<Execution> getExecutions() {
        return executions;
    }

    public void setExecutions(List<Execution> executions) {
        this.executions = executions;
    }

}
FlowExecution
public class GovernTaskRecordBean extends PageEntity {

    private static final long serialVersionUID = 1L;
    private String createTime;
    private String status;
    private String owner;
    private String startTime;
    private String endTime;
    private String flowId;
    private Long projectId;
    private Long execId;
    private String projectPath;
    private Long elapsed;

    public String getCreateTime() {
        return createTime;
    }

    public void setCreateTime(String createTime) {
        this.createTime = createTime;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getOwner() {
        return owner;
    }

    public void setOwner(String owner) {
        this.owner = owner;
    }

    public String getStartTime() {
        return startTime;
    }

    public void setStartTime(String startTime) {
        this.startTime = startTime;
    }

    public String getEndTime() {
        return endTime;
    }

    public void setEndTime(String endTime) {
        this.endTime = endTime;
    }

    public String getFlowId() {
        return flowId;
    }

    public void setFlowId(String flowId) {
        this.flowId = flowId;
    }

    public Long getProjectId() {
        return projectId;
    }

    public void setProjectId(Long projectId) {
        this.projectId = projectId;
    }

    public Long getExecId() {
        return execId;
    }

    public void setExecId(Long execId) {
        this.execId = execId;
    }

    public String getProjectPath() {
        return projectPath;
    }

    public void setProjectPath(String projectPath) {
        this.projectPath = projectPath;
    }

    public Long getElapsed() {
        return elapsed;
    }

    public void setElapsed(Long elapsed) {
        this.elapsed = elapsed;
    }

}
GovernTaskRecordBean

  6. 调度执行状态枚举类

public enum ScheduleStatus {

    READY("READY","就绪"),SUCCEEDED("SUCCEEDED","成功"),KILLING("KILLING","停止中"),KILLED("KILLED","已中断"),FAILED("FAILED","失败"),
    SKIPPED("SKIPPED","跳过"),DISABLED("DISABLED","停用"),QUEUED("QUEUED","等待中"),CANCELLED("CANCELLED","取消执行"),
    RUNNING("RUNNING","运行中"),PAUSED("PAUSED","暂停");
    

    /**
     * 状态编码
     */
    private String code;

    /**
     * 状态描述
     */
    private String desc;

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public String getDesc() {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    ScheduleStatus(String code, String desc) {
        this.code = code;
        this.desc = desc;
    }

}
ScheduleStatus

  7.AzkabanService 接口类

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

import org.apache.http.HttpStatus;

/**
 * azkaban接口
 * @author hao
 *
 */
@Service
public class AzkabanService {
    
    private static final Logger logger = LoggerFactory.getLogger(AzkabanService.class);
    private static final String CONTENT_TYPE = "application/x-www-form-urlencoded; charset=utf-8";
    private static final String X_REQUESTED_WITH = "XMLHttpRequest";
    private static final DateTimeFormatter formatterTime = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
    
    @Autowired
    private RestTemplate restTemplate;
    
    @Autowired
    private AzkabanConfig azkabanConfig;
    
    /**
     * Azkaban登录接口,返回sessionId
     * @return
     * @throws Exception
     */
    public String login() throws Exception {
        SSLUtil.turnOffSslChecking();
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
        linkedMultiValueMap.add("action", "login");
        linkedMultiValueMap.add("username", azkabanConfig.getAzkUsername());
        linkedMultiValueMap.add("password", azkabanConfig.getAzkPassword());

        HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
        String result = restTemplate.postForObject(azkabanConfig.getAzkUrl(), httpEntity, String.class);
        if (!SysContants.AZK_SUCCESS.equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) {
            logger.error("Azkaban登录失败!返回错误信息:"+result);
            throw new Exception("Azkaban登录失败!");
        }
        return new Gson().fromJson(result, JsonObject.class).get("session.id").getAsString();
    }

    /**
     * Azkaban上传zip文件
     * @param projectName 项目名称
     * @param file 文件
     * @return projectId编号
     * @throws Exception
     */
    public String uploadZip(String projectName, File file) throws Exception {
        SSLUtil.turnOffSslChecking();
        FileSystemResource resource = new FileSystemResource(file);
        LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
        linkedMultiValueMap.add("session.id", login());
        linkedMultiValueMap.add("ajax", "upload");
        linkedMultiValueMap.add("project", projectName);
        linkedMultiValueMap.add("file", resource);
        String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/manager", linkedMultiValueMap,
                String.class);
        if (StringUtils.isEmpty(new Gson().fromJson(result, JsonObject.class).get("projectId").getAsString())) {
            logger.error("上传文件至Azkaban失败:",projectName,file.getPath());
            logger.error("Azkaban上传文件失败!返回错误信息:"+result);
            return null;
        }

        return new Gson().fromJson(result, JsonObject.class).get("projectId").getAsString();
    }
    
    /**
     * Azkaban创建project
     * @param projectName,project名称
     * @param description,project描述
     * @return 是否成功
     * @throws Exception
     */
    public boolean createProject(String projectName, String description) throws Exception {
        SSLUtil.turnOffSslChecking();
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
        linkedMultiValueMap.add("session.id", login());
        linkedMultiValueMap.add("action", "create");
        linkedMultiValueMap.add("name", projectName);
        linkedMultiValueMap.add("description", description);

        HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
        logger.info("Azkaban请求信息:" + httpEntity.toString());
        String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/manager", httpEntity, String.class);
        logger.info("Azkaban返回创建Project信息:" + result);
        // 创建成功和已存在,都表示创建成功
        if (!SysContants.AZK_SUCCESS
                .equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) {
            if (!"Project already exists."
                    .equals(new Gson().fromJson(result, JsonObject.class).get("message").getAsString())) {
                logger.error("创建Azkaban Project失败:",projectName);
                logger.error("创建Azkaban Project失败!返回错误信息:"+result);
                return false;
            }
        }
        return true;
    }
    
    /**
     * Azkaban删除project
     * @param projectName 项目名称
     * @throws Exception 
     */
    public void deleteProject(String projectName) throws Exception {
        SSLUtil.turnOffSslChecking();

        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add("Accept", "text/plain;charset=utf-8");

        Map<String, String> map = new HashMap<>();

        map.put("id", login());
        map.put("project", projectName);

        ResponseEntity<String> exchange = restTemplate.exchange(
                azkabanConfig.getAzkUrl() + "/manager?session.id={id}&delete=true&project={project}", HttpMethod.GET,
                new HttpEntity<String>(hs), String.class, map);

        if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
            logger.error("删除Azkaban Project失败!返回错误信息:"+exchange);
            throw new Exception("删除Azkaban Project失败");
        }
    }
    
        /**
         * 获取一个项目的所有流flows
         * @param projectName 项目名称
         * @return List<String> 项目的所有流
         * @throws Exception
         */
        public List<String> fetchFlowsProject(String projectName) throws Exception {
        SSLUtil.turnOffSslChecking();
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add("Accept", "text/plain;charset=utf-8");

        Map<String, String> map = new HashMap<>();

        map.put("id", login());
        map.put("project", projectName);

        ResponseEntity<String> exchange = restTemplate.exchange(
                azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&ajax=fetchprojectflows", HttpMethod.GET,
                new HttpEntity<String>(hs), String.class, map);
        if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
            logger.error("Azkaban获取一个项目的所有流信息失败:" + projectName);
            logger.error("Azkaban获取一个项目的所有流信息失败:!返回错误信息:"+exchange);
            return null;
        }
        JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("flows");
        if (obj == null) {
            logger.error("Azkaban获取一个项目的所有流信息失败:{}:{}", projectName);
            return null;
        }
        List<String> flows = new ArrayList<String>();
        for(JsonElement jobj:obj.getAsJsonArray()) {
            flows.add(jobj.getAsJsonObject().get("flowId").getAsString());
        }
        return flows;
    }
    
        /**
         * 获取一个流的所有作业
         * @param projectName 项目名
         * @param flowId 流id
         * @return 
         * @throws Exception
         */
        public String fetchJobsFlow(String projectName, String flowId) throws Exception {
        SSLUtil.turnOffSslChecking();

        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add("Accept", "text/plain;charset=utf-8");

        Map<String, String> map = new HashMap<>();

        map.put("id", login());
        map.put("project", projectName);
        map.put("flow", flowId);

        ResponseEntity<String> exchange = restTemplate.exchange(
                azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&flow={flow}&ajax=fetchflowgraph", HttpMethod.GET,
                new HttpEntity<String>(hs), String.class, map);
        if (exchange == null) {
            logger.error("Azkaban获取一个项目的所有流信息失败:" + projectName);
            return null;
        }
        logger.debug("Azkaban获取一个项目的所有流信息:" + exchange);
        if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
            throw new Exception("Azkaban获取一个项目的所有流信息失败");
        }
        return exchange.toString();
    }
    
        /**
         * Flow 获取执行的project 列表
         * azkaban api 获取流的执行
         * @param projectName 项目名
         * @param flowId 流id
         * @param start
         * @param length
         * @return
         * @throws Exception
         */
         public FlowExecution fetchFlowExecutions(String projectName, String flowId, String start,String length) throws Exception {
            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
            hs.add("X-Requested-With", "XMLHttpRequest");
            hs.add("Accept", "text/plain;charset=utf-8");

            Map<String, String> map = new HashMap<>();

            map.put("id", login());
            map.put("project", projectName);
            map.put("flow", flowId);
            map.put("start", String.valueOf(start));
            map.put("length", String.valueOf(length));

            ResponseEntity<String> exchange = restTemplate.exchange(
                    azkabanConfig.getAzkUrl() + "/manager?session.id={id}&ajax=fetchFlowExecutions&project={project}&flow={flow}&start={start}&length={length}",
                    HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
            if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                logger.error("Azkaban获取一个项目运行记录信息失败:{}:{}", projectName, flowId);
                return null;
            }
            return new Gson().fromJson(exchange.getBody(), FlowExecution.class);
        }
     
     /**
      * Flow 获取正在执行的流id
      * @param projectName
      * @param flowId
      * @return
      * @throws Exception
      */
         public String getRunning(String projectName, String flowId) throws Exception {
            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
            hs.add("X-Requested-With", "XMLHttpRequest");
            hs.add("Accept", "text/plain;charset=utf-8");

            Map<String, String> map = new HashMap<>();
            map.put("id", login());
            map.put("project", projectName);
            map.put("flow", flowId);
            ResponseEntity<String> exchange = restTemplate.exchange(
                    azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=getRunning&project={project}&flow={flow}", HttpMethod.GET,
                    new HttpEntity<String>(hs), String.class, map);
           return exchange.getBody();
        }
         
         /**
         * Execute a Flow 执行一个流 还有很多其他参数 具体参考azkabanConfig.getAzkUrl()
         * 
         * @throws KeyManagementException
         * @throws NoSuchAlgorithmException
         */
        public String executeFlow(String projectName, String flowId) throws Exception {
            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
            hs.add("X-Requested-With", "XMLHttpRequest");
            hs.add("Accept", "text/plain;charset=utf-8");

            Map<String, String> map = new HashMap<>();
            map.put("id", login());
            map.put("project", projectName);
            map.put("flow", flowId);
            ResponseEntity<String> exchange = restTemplate.exchange(
                    azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=executeFlow&project={project}&flow={flow}", HttpMethod.GET,
                    new HttpEntity<String>(hs), String.class, map);
            if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                logger.error("执行一个流请求失败:{}:{}", projectName, flowId);
                return null;
            }
            JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("execid");
            if (obj == null) {
                logger.error("执行一个流失败:{}:{}", projectName, flowId);
                return null;
            }
            return obj.getAsString();
        }
        
        /**
         * Cancel a Flow Execution 取消流程执行
         * azkaban api 取消流程执行
         * @throws KeyManagementException
         * @throws NoSuchAlgorithmException
         */
        public void cancelEXEaFlow(String projectName,String start,String size) throws Exception {
            int flag=0;
            List<String> flows = fetchFlowsProject(projectName);//获取所有流
            for (String flow : flows) {
                FlowExecution fe = fetchFlowExecutions(projectName, flow, start, size);
                if (fe == null) {
                    continue;
                }
                List<Execution> executions = fe.getExecutions();//获取执行id
                for (Execution execution : executions) {
                    if(null!=execution&&null!=execution.getExecId()&&"RUNNING".equals(execution.getStatus())){//运行中的
                        SSLUtil.turnOffSslChecking();
                        HttpHeaders hs = new HttpHeaders();
                        hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                        hs.add("X-Requested-With", "XMLHttpRequest");
                        hs.add("Accept", "text/plain;charset=utf-8");

                        Map<String, String> map = new HashMap<>();
                        map.put("id", login());
                        map.put("execid", String.valueOf(execution.getExecId()));
                        ResponseEntity<String> exchange = restTemplate.exchange(
                                azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=cancelFlow&execid={execid}", HttpMethod.GET,
                                new HttpEntity<String>(hs), String.class, map);
                        System.out.println(exchange.getBody());
                        if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                            logger.error("取消执行调度失败,请求azkaban接口异常:"+exchange);
                            throw new Exception("取消执行调度失败,请求azkaban接口异常:"+exchange);
                        }
                        JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("error");
                        if (obj != null) {
                            throw new Exception("取消执行调度失败,请刷新列表获取最新调度状态!");
                        }
                        flag++;
                    }
                }
            }
            if(0==flag){
                throw new Exception("该调度不是运行中状态,请刷新列表获取最新状态!");
            }
        }
        
        /**
         * 根据时间 创建调度任务
         * 
         * @param projectId
         * @param projectName
         * @param flow
         * @param flowName
         * @param recurring,是否循环,on循环
         * @param period,循环频率:M:Months,w:Weeks,d:Days,h:Hours,m:Minutes,s:Seconds;如60s,支持分钟的倍数
         * @param date,开始时间
         * @return 返回scheduleId
         * @throws Exception
         */
        public String scheduleEXEaFlow(String projectId, String projectName, String flow, String flowName, String recurring,
                String period, Date date) throws Exception {

            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", CONTENT_TYPE);
            hs.add("X-Requested-With", X_REQUESTED_WITH);
            LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
            linkedMultiValueMap.add("session.id", login());
            linkedMultiValueMap.add("ajax", "scheduleFlow");
            linkedMultiValueMap.add("projectName", projectName);
            linkedMultiValueMap.add("projectId", projectId);
            linkedMultiValueMap.add("flow", flow);
            linkedMultiValueMap.add("flowName", flowName);
            linkedMultiValueMap.add("is_recurring", recurring);
            linkedMultiValueMap.add("period", period);
            scheduleTimeInit(linkedMultiValueMap, date);

            HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
            String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class);

            logger.info("Azkaban返回根据时间创建定时任务信息:" + result);

            if (!SysContants.AZK_SUCCESS.equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())
                    || new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsInt() < 0) {
                logger.error("Azkaban返回根据时间创建定时任务信息失败:!返回错误信息:"+result);
                throw new Exception("根据时间创建定时任务失败");
            }

            return new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsString();
        }

        /**
         * 根据cron表达式 创建调度任务
         * @param projectName 项目名称
         * @param cron cron表达式
         * @param flow 流
         * @param flowName 流名称
         * @return 返回scheduleId
         * @throws Exception
         */
        public String scheduleByCronEXEaFlow(String projectName, String cron, String flowName)
                throws Exception {

            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", CONTENT_TYPE);
            hs.add("X-Requested-With", X_REQUESTED_WITH);
            LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
            linkedMultiValueMap.add("session.id", login());
            linkedMultiValueMap.add("ajax", "scheduleCronFlow");
            linkedMultiValueMap.add("projectName", projectName);
            linkedMultiValueMap.add("cronExpression", cron);
            linkedMultiValueMap.add("flow", flowName);

            HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
            String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class);

            if (!SysContants.AZK_SUCCESS
                    .equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) {
                logger.error("Azkaban返回根据时间创建定时任务信息失败:!返回错误信息:"+result);
                return null;
            }

            return new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsString();
        }

        /**
         * 根据scheduleId取消一个流的调度
         * 暂停一次执行,输入为exec id。如果这个执行不是处于running状态,会返回错误信息。
         * @param scheduleId
         * @throws Exception
         */
        public boolean unscheduleFlow(String scheduleId) {
            try {
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", CONTENT_TYPE);
                hs.add("X-Requested-With", X_REQUESTED_WITH);
                LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
                linkedMultiValueMap.add("session.id", login());
                linkedMultiValueMap.add("action", "removeSched");
                linkedMultiValueMap.add("scheduleId", scheduleId);

                HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
                String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity,
                        String.class);
                if (StringUtils.isBlank(result)) {
                    return false;
                }
                if (!SysContants.AZK_SUCCESS
                        .equals(String.valueOf(new Gson().fromJson(result, JsonObject.class).get("status")))) {
                    logger.error("取消流调度信息失败:{}", scheduleId);
                    logger.error("Azkaban取消流调度信息失败失败:!返回错误信息:"+result);
                    return false;
                }
            } catch (Exception e) {
                logger.error("取消流调度信息失败:{}", scheduleId);
                return false;
            }
            return true;
        }

        /**
         * 下载Azkaban压缩文件
         * @param projectName 项目名称
         * @param zipPath 文件路径
         * @throws Exception 文件异常
         */
        public void downLoadZip(String projectName, String zipPath) throws Exception {
            OutputStream output = null;
            BufferedOutputStream bufferedOutput = null;

            try {
                URL url = new URL(azkabanConfig.getAzkUrl() + "/manager?session.id=" + login() + "&download=true&project="
                        + projectName);
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.setConnectTimeout(3 * 1000);
                InputStream inputStream = conn.getInputStream();
                File file = new File(zipPath);
                output = new FileOutputStream(file);
                bufferedOutput = new BufferedOutputStream(output);
                bufferedOutput.write(IOUtils.toByteArray(inputStream));
            } catch (Exception e) {
                logger.info("下载Azkaban压缩文件异常:" + e.getMessage(), e);
            } finally {
                if (bufferedOutput != null) {
                    try {
                        bufferedOutput.flush();
                        bufferedOutput.close();
                    } catch (IOException e) {
                        logger.info("关闭流异常:" + e.getMessage(), e);
                    }
                }

                if (output != null) {
                    try {
                        output.close();
                    } catch (IOException e) {
                        logger.info("关闭流异常:" + e.getMessage(), e);
                    }
                }
            }
        }


        /**
         * 获取一个调度器job的信息 根据project的id 和 flowId
         * @param projectId 项目名称
         * @param flowId 流编号
         * @return job的信息
         */
        public String fetchSchedule(String projectId, String flowId) {
            try {
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                hs.add("X-Requested-With", "XMLHttpRequest");
                hs.add("Accept", "text/plain;charset=utf-8");

                Map<String, String> map = new HashMap<>();
                map.put("id", login());
                map.put("projectId", projectId);
                map.put("flowId", flowId);
                ResponseEntity<String> exchange = restTemplate.exchange(
                        azkabanConfig.getAzkUrl()
                                + "/schedule?session.id={id}&ajax=fetchSchedule&projectId={projectId}&flowId={flowId}",
                        HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
                if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                    logger.error("获取一个调度器job的信息失败:{}:{}", projectId, flowId);
                    return null;
                }
                JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("schedule");
                if (obj == null) {
                    logger.error("获取一个调度器job的信息失败:{}:{}", projectId, flowId);
                    return null;
                }
                return obj.getAsJsonObject().get("scheduleId").getAsString();
            } catch (Exception e) {
                logger.error("获取一个调度器job的信息失败:{}:{}", projectId, flowId);
            }
            return null;
        }

        /**
         * SLA 设置调度任务 执行的时候 或者执行成功失败等等的规则匹配 发邮件或者...
         * @return
         * @throws Exception
         */
        public String setSla() throws Exception {
            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
            hs.add("X-Requested-With", "XMLHttpRequest");
            LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
            linkedMultiValueMap.add("session.id", "ffad7355-4427-4770-9c14-3d19736fa73a");
            linkedMultiValueMap.add("ajax", "setSla");
            linkedMultiValueMap.add("scheduleId", "6");
            linkedMultiValueMap.add("slaEmails", "771177@qq.com");
            linkedMultiValueMap.add("settings[0]", "begin,SUCCESS,5:00,true,false");
            linkedMultiValueMap.add("settings[1]", "exe,SUCCESS,5:00,true,false");
            linkedMultiValueMap.add("settings[2]", "end,SUCCESS,5:00,true,false");
            // linkedMultiValueMap.add("settings[3]",
            // "xxx,SUCCESS,5:00,true,false");

            HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
            String postForObject = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class);
           return postForObject;
        }

        /**
         * SLA 获取调度的规则配置
         * @throws Exception
         */
        public void slaInfo() throws Exception {

            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
            hs.add("X-Requested-With", "XMLHttpRequest");
            hs.add("Accept", "text/plain;charset=utf-8");

            Map<String, String> map = new HashMap<>();
            map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
            map.put("scheduleId", "6");
            ResponseEntity<String> exchange = restTemplate.exchange(
                    azkabanConfig.getAzkUrl() + "/schedule?session.id={id}&ajax=slaInfo&scheduleId={scheduleId}", HttpMethod.GET,
                    new HttpEntity<String>(hs), String.class, map);
            System.out.println(exchange.getBody());
        }

        /**
         * Execution 暂停一个执行流
         * @throws Exception
         */
        public void pauseFlow() throws Exception {

            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
            hs.add("X-Requested-With", "XMLHttpRequest");
            hs.add("Accept", "text/plain;charset=utf-8");

            Map<String, String> map = new HashMap<>();
            map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
            map.put("execid", "12");
            ResponseEntity<String> exchange = restTemplate.exchange(
                    azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=pauseFlow&execid={execid}", HttpMethod.GET,
                    new HttpEntity<String>(hs), String.class, map);
            System.out.println(exchange.getBody());
        }

        /**
         * Flow Execution 重新执行一个执行流
         * @throws Exception
         */
        public void resumeFlow() throws Exception {

            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
            hs.add("X-Requested-With", "XMLHttpRequest");
            hs.add("Accept", "text/plain;charset=utf-8");

            Map<String, String> map = new HashMap<>();
            map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
            map.put("execid", "11");
            ResponseEntity<String> exchange = restTemplate.exchange(
                    azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=resumeFlow&execid={execid}", HttpMethod.GET,
                    new HttpEntity<String>(hs), String.class, map);
            System.out.println(exchange.getBody());
        }

        /**
         *  获取一个执行流的详细信息 这个流的每个节点的信息 成功或者失败等等
         * @param execid 执行id
         * @return
         * @throws Exception
         */
        public String fetchexecflow(String execid) throws Exception {

            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
            hs.add("X-Requested-With", "XMLHttpRequest");
            hs.add("Accept", "text/plain;charset=utf-8");

            Map<String, String> map = new HashMap<>();
            map.put("id", login());
            map.put("execid", execid);
            ResponseEntity<String> exchange = restTemplate.exchange(
                    azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchexecflow&execid={execid}", HttpMethod.GET,
                    new HttpEntity<String>(hs), String.class, map);
            if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                logger.error("获取一个执行流的详细信息失败:" + execid);
                return null;
            }
            return exchange.getBody();
        }

        /**
         * 获取一个执行流的日志
         * @param execid 执行编号
         * @param jobId job编号
         * @param offset
         * @param length
         * @return
         * @throws Exception
         */
        public String fetchExecJobLogs(String execid,String jobId,String offset,String length) throws Exception {

            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
            hs.add("X-Requested-With", "XMLHttpRequest");
            hs.add("Accept", "text/plain;charset=utf-8");

            Map<String, String> map = new HashMap<>();
            map.put("id", login());
            map.put("execid", execid);
            map.put("jobId", jobId);
            map.put("offset", offset);
            map.put("length", length);
            ResponseEntity<String> exchange = restTemplate.exchange(
                    azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchExecJobLogs&execid={execid}&jobId={jobId}&offset={offset}&length={length}",
                    HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
            if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                logger.error("获取一个执行流的详细信息失败:{}:{}", execid,jobId);
                return null;
            }
            
            JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("data");
            if (obj == null) {
                logger.error("获取一个执行流的详细信息为空:{}:{}", execid,jobId);
                return null;
            }
            return obj.getAsString();
        }
        
        /**
         * 获取一个执行流的日志概要
         * @param execid
         * @param offset
         * @param length
         * @return
         * @throws Exception
         */
        public String fetchExecFlowLogs(String execid,String offset,String length) throws Exception {

            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
            hs.add("X-Requested-With", "XMLHttpRequest");
            hs.add("Accept", "text/plain;charset=utf-8");

            Map<String, String> map = new HashMap<>();
            map.put("id", login());
            map.put("execid", execid);
            map.put("offset", offset);
            map.put("length", length);
            ResponseEntity<String> exchange = restTemplate.exchange(
                    azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchExecFlowLogs&execid={execid}&offset={offset}&length={length}",
                    HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
            if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                logger.error("获取一个执行流的日志概要信息失败:{}", execid);
                return null;
            }
            
            JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("data");
            if (obj == null) {
                logger.error("获取一个执行流的日志概要信息为空:{}:{}", execid);
                return null;
            }
            return obj.getAsString();
        }
        
        /**
         * 获取执行流的信息状态
         * @throws Exception
         */
        public void fetchexecflowupdate() throws Exception {

            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
            hs.add("X-Requested-With", "XMLHttpRequest");
            hs.add("Accept", "text/plain;charset=utf-8");

            Map<String, String> map = new HashMap<>();
            map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
            map.put("execid", "11");
            map.put("lastUpdateTime", "-1");
            ResponseEntity<String> exchange = restTemplate.exchange(
                    azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchexecflowupdate&execid={execid}&lastUpdateTime={lastUpdateTime}",
                    HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
            System.out.println(exchange.getBody());
        }
        
        private void scheduleTimeInit(LinkedMultiValueMap<String, String> linkedMultiValueMap, Date date) {
            Calendar calendar = Calendar.getInstance();
            calendar.setTime(date);
            Integer year = calendar.get(Calendar.YEAR);
            Integer month = calendar.get(Calendar.MONTH) + 1;
            Integer day = calendar.get(Calendar.DATE);
            Integer hour = calendar.get(Calendar.HOUR_OF_DAY);
            Integer minute = calendar.get(Calendar.MINUTE);

            linkedMultiValueMap.add("scheduleTime", hour + "," + minute + (hour > 11 ? ",pm,PDT" : ",am,EDT"));
            linkedMultiValueMap.add("scheduleDate", month + "/" + day + "/" + year);
        }

        /**
         * 获取azkaban调度  调度概况  (分页查询)
         * @param projectName  工程名称
         * @param start  分页参数
         * @param size   分页参数
         * @return
         * @throws Exception
         * @throws Exception
         */
        public List<GovernTaskRecordBean> getAzkabanExcutions(String projectName,String start,String size) throws Exception {

            List<GovernTaskRecordBean> excInfoList = Lists.newArrayList();
            List<String> flows = fetchFlowsProject(projectName);
            for (String flow : flows) {
                FlowExecution fe = fetchFlowExecutions(projectName, flow, start, size);
                if (fe == null) {
                    continue;
                }
                List<Execution> executions = fe.getExecutions();
                for (Execution execution : executions) {
                    GovernTaskRecordBean eInfo = new GovernTaskRecordBean();
                    eInfo.setCreateTime(new DateTime(execution.getSubmitTime()).toString(formatterTime));
                    if (execution.getEndTime() > 0) {
                        eInfo.setEndTime(new DateTime(execution.getEndTime()).toString(formatterTime));
                        eInfo.setElapsed((execution.getEndTime() - execution.getStartTime()) / 1000);
                    } else {
                        eInfo.setElapsed((DateTime.now().getMillis() - execution.getStartTime()) / 1000);
                    }
                    eInfo.setExecId(execution.getExecId());
                    eInfo.setFlowId(execution.getFlowId());
                    eInfo.setOwner(execution.getSubmitUser());
                    eInfo.setProjectId(execution.getProjectId());
                    eInfo.setProjectPath(projectName);
                    eInfo.setStartTime(new DateTime(execution.getStartTime()).toString(formatterTime));
                    if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.FAILED.getDesc())) {
                        eInfo.setStatus(ScheduleStatus.FAILED.getCode());
                    } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.CANCELLED.getDesc())) {
                        eInfo.setStatus(ScheduleStatus.CANCELLED.getCode());
                    } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.KILLED.getDesc())) {
                        eInfo.setStatus(ScheduleStatus.KILLED.getCode());
                    } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.SUCCEEDED.getDesc())) {
                        eInfo.setStatus(ScheduleStatus.SUCCEEDED.getCode());
                    } 
                    excInfoList.add(eInfo);
                }
            }
            return excInfoList;
        }

        /**
         * 获取azkaban调度 流的执行情况 (分页)
         * @param excuteId  调度执行id
         * @param start
         * @param size
         * @return
         * @throws Exception
         */
        public ExecutionInfoBean getAzkabanExcutionDetails(String excuteId, String start, String size) throws Exception {
            String result = fetchexecflow(excuteId);
            if (StringUtils.isBlank(result)) {
                throw new CommonException("查询任务流的执行详情失败!");
            }
            ExecutionInfo ei = new Gson().fromJson(result, ExecutionInfo.class);
            if(ei==null) {
                throw new CommonException("查询任务流的执行详情失败!");
            }
            List<ExecNode> nodes = ei.getNodes();
            if (nodes == null || nodes.size() == 0) {
                return null;
            }
            ExecutionInfoBean eib = new ExecutionInfoBean();
            eib.setAttempt(ei.getAttempt());
            eib.setStartTime(new DateTime(ei.getStartTime()).toString(formatterTime));
            if (ei.getEndTime() > 0) {
                eib.setEndTime(new DateTime(ei.getEndTime()).toString(formatterTime));
                eib.setElapsed((ei.getEndTime() - ei.getStartTime()) / 1000);
            } else {
                eib.setElapsed((DateTime.now().getMillis() - ei.getStartTime()) / 1000);
            }
            eib.setExecid(ei.getExecid());
            eib.setFlow(ei.getFlow());
            eib.setFlowId(ei.getFlowId());
            eib.setJobId(ei.getId());
            eib.setNestedId(ei.getNestedId());
            eib.setProject(ei.getProject());
            eib.setProjectId(ei.getProjectId());
            String stats = ScheduleStatus.getDescByCode(ei.getStatus());
            if(StringUtils.isNotBlank(stats)){
                eib.setStatus(stats);
            }
            eib.setSubmitTime(new DateTime(ei.getSubmitTime()).toString(formatterTime));
            eib.setSubmitUser(ei.getSubmitUser());
            eib.setType(ei.getType());
            eib.setUpdateTime(new DateTime(ei.getUpdateTime()).toString(formatterTime));

            String flowLog= fetchExecFlowLogs(excuteId, start, size);
            eib.setFlowLog(flowLog);
            List<ExecNodeBean> nodeBeanList = Lists.newArrayList();
            for(ExecNode node:nodes) {
                ExecNodeBean ebn = new ExecNodeBean();
                ebn.setAttempt(node.getAttempt());
                ebn.setJobId(node.getId());
                ebn.setDependencies(node.getIn());
                ebn.setNestedId(node.getNestedId());
                ebn.setStartTime(new DateTime(ei.getStartTime()).toString(formatterTime));
                if (node.getEndTime() > 0) {
                    ebn.setEndTime(new DateTime(node.getEndTime()).toString(formatterTime));
                    ebn.setElapsed((node.getEndTime() - node.getStartTime()) / 1000);
                } else {
                    ebn.setElapsed((DateTime.now().getMillis() - node.getStartTime()) / 1000);
                }
                String stats2 = ScheduleStatus.getDescByCode(node.getStatus());
                if(StringUtils.isNotBlank(stats2)){
                    ebn.setStatus(stats2);
                }
                ebn.setType(node.getType());
                ebn.setUpdateTime(new DateTime(ei.getUpdateTime()).toString(formatterTime));
                String logs = fetchExecJobLogs(excuteId,ebn.getJobId(),start, size);
                ebn.setLogs(logs);
                nodeBeanList.add(ebn);
            }
            eib.setNodes(nodeBeanList);
            return eib;
        }

        /**
         * 获取一个项目的projectId
         * @param projectName
         * @return
         * @throws Exception
         */
        public String fetchProjectId(String projectName) throws Exception {
            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", CONTENT_TYPE);
            hs.add("X-Requested-With", X_REQUESTED_WITH);
            hs.add("Accept", "text/plain;charset=utf-8");

            Map<String, String> map = new HashMap<>();

            map.put("id", login());
            map.put("project", projectName);

            ResponseEntity<String> exchange = restTemplate.exchange(
                    azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&ajax=fetchprojectflows", HttpMethod.GET,
                    new HttpEntity<String>(hs), String.class, map);
            if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                logger.error("Azkaban获取一个项目的所有流信息失败:" + projectName);
                return null;
            }
            JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("projectId");
            if (obj == null) {
                logger.error("Azkaban获取一个项目的所有流信息失败:{}:{}", projectName);
                return null;
            }
            String projectId = obj.getAsString();
            if(StringUtils.isBlank(projectId)){
                logger.error("获取Azkaban  projectId 异常");
            }
            return projectId;
        }

        /**
         * 获取
         * @param projectName
         * @return
         * @throws Exception
         */
        public String getLastScheduleStatus(String projectName) throws Exception {
            List<String> flows = fetchFlowsProject(projectName);
            if(CollectionUtils.isNotEmpty(flows)) {
                for (String flow : flows) {
                    FlowExecution fe = fetchFlowExecutions(projectName, flow, "0", "1000");
                    if (fe == null) {
                        continue;
                    }
                    List<Execution> executions = fe.getExecutions();
                    if (executions == null || executions.size() == 0) {
                        continue;
                    }
                    String status = executions.get(0).getStatus();
                    return status;
                }
            }
            return null;
        }
    
    

}
AzkabanService

  8. 调用 demo

/**
     * 根据flowId 立即执行任务
     * @param projectName 项目名称
     * @param flow 流id
     * @throws CommonException 阿兹卡班异常
     */
    private  void excuteFlowImmediately(String projectName, String flow) throws CommonException {
        try {
            azkabanService.executeFlow(projectName,flow);
        } catch (Exception e) {
            throw new CommonException("调度初始化完毕,立即执行任务异常",e);
        }
    }
demo

 

四、注意

  1. 如遇报错情况,请关注 azkaban 相关 log 日志。

  2. 如果 pom 文件 jar 包不全 请评论。

 

 

 

Java-Springboot - 集成 spring-security 简单示例 (Version-springboot-2-1-3-RELEASE

Java-Springboot - 集成 spring-security 简单示例 (Version-springboot-2-1-3-RELEASE

  • 使用 Idea 的 Spring Initializr 或者 SpringBoot 官网下载 quickstart
  • 添加依赖

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-security</artifactId>
    </dependency>
  • 新建控制器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    @RestController
    public class UserController {
    @GetMapping("/user")
    public String getUsers() {
    return "Hello Spring Security";
    }
    }
  • logback.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    <?xml version="1.0" encoding="UTF-8"?>

    <configuration>
    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
    <file> 大专栏  Java-Springboot-集成spring-security简单示例(Version-springboot-2-1-3-RELEASE/data/www/file/logs/springboot.log</file>

    <encoder>
    <pattern>%date %d{HH: mm:ss.SSS} %level [%thread] %logger{10} [%file:%line] %msg%n</pattern>
    </encoder>
    </appender>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
    <pattern>%date %d{HH: mm:ss.SSS} %level [%thread] %logger{10} [%file:%line] %msg%n</pattern>
    </encoder>
    </appender>

    <root level="debug">
    <appender-ref ref="FILE" />
    <appender-ref ref="STDOUT" />
    </root>
    </configuration>
  • application.properties

    1
    2
    3
    # Server Domain-Port
    server.address=127.0.0.1
    server.port=9090
  • 启动 SpringBootApplication,springboot 已经和 spring-security 集成了,如果直接访问 http://localhost:9090/user 会跳到登陆页面,这是 spring-security 自带的,但是我们并没有创建任何用户啊,spring-security 有个默认的用户名 user,密码在控制台登陆页面

  • 默认密码在控制信息里,在控制台信息里搜索 Using generated,当然你的程序生成的密码肯定和我的不一样

    1
    Using generated security password: 6ae529ee-2281-4b66-8f30-b1ba0e7fec97
  • 使用用户名和密码登陆后:
    正常访问

  • 源码

java版springcloud+springboot多租户社交电子商务-springboot集成apidoc

java版springcloud+springboot多租户社交电子商务-springboot集成apidoc

首先声明下,apidoc是基于注释来生成文档的,它不基于任何框架,而且支持大多数编程语言,为了springboot系列的完整性,所以标了个题。

一、apidoc简介
apidoc通过在你代码的注释来生成api文档的。它对代码没有侵入性,只需要你写好相关的注释即可,并且它仅通过写简单的配置就可以生成高颜值的api接口页面。它基于node.js,所以你需要安装node.js环境。node.js安装,点击这里。这里就不介绍。

二、准备工作
安装完node.js安装api.doc.

通过命令安装:

npm install apidoc -g

三、注释怎么写

@api
@api {method} path [title]

method:请求方法,
path:请求路径 
title(可选):标题
@apiDescription
@apiDescription text
text说明
复制代码
@apiError
@apiError [(group)] [{type}] field [description]

(group)(可选):参数将以这个名称分组,不设置的话,默认是Error 4xx 
{type}(可选):返回值类型,例如:{Boolean}, {Number}, {String}, {Object}, {String[]} 
field:返回值字段名称 
descriptionoptional(可选):返回值字段说明
复制代码
@apiGroup
@apiGroup name
name:组名称,也是导航的标题

更多注释,参见官方文档:http://apidocjs.com/#params

四、写给栗子
首先写配置文件
在项目的主目录新建一个apidoc.json文件:

{
  "name": "example",
  "version": "0.1.0",
  "description": "A basic apiDoc example"
}

更多配置参考:http://apidocjs.com/#configur...

写个注释:

/**
     * @api {POST} /register 注册用户
     * @apiGroup Users
     * @apiVersion 0.0.1
     * @apiDescription 用于注册用户
     * @apiParam {String} account 用户账户名
     * @apiParam {String} password 密码
     * @apiParam {String} mobile 手机号
     * @apiParam {int} vip = 0  是否注册Vip身份 0 普通用户 1 Vip用户
     * @apiParam {String} [recommend] 邀请码
     * @apiParamExample {json} 请求样例:
     *                ?account=sodlinken&password=11223344&mobile=13739554137&vip=0&recommend=
     * @apiSuccess (200) {String} msg 信息
     * @apiSuccess (200) {int} code 0 代表无错误 1代表有错误
     * @apiSuccessExample {json} 返回样例:
     *                {"code":"0","msg":"注册成功"}
     */

用apidoc命令生成文档界面
先cd到工程的外层目录,并在外层目建个输出文档的目录,我建的是docapi。

输命令:

apidoc -i chapter4/ -o apidoc/-i 输入目录 -o 输出目录

chapter4是我的工程名。

可以看到在apidoc目录生成了很多文件:

打开index.html,可以看到文档页面:

电子商务社交平台源码请加企鹅求求:一零三八七七四六二六

SpringBoot (九)_springboot 集成 MyBatis

SpringBoot (九)_springboot 集成 MyBatis

MyBatis 是一款标准的 ORM 框架,被广泛的应用于各企业开发中。具体细节这里就不在叙述,大家自行查找资料进行学习下。

加载依赖

<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>1.3.1</version>
</dependency>

application 配置 (application.yml)

mybatis:
  config-location: classpath:mybatis/mybatis-config.xml
  mapper-locations: classpath:mybatis/mapper/*.xml
  type-aliases-package: com.zhb.entity
spring:
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8
    password: root
    username: root

启动类

在启动类中添加对 Mapper 包扫描 @MapperScan,Spring Boot 启动的时候会自动加载包路径下的 Mapper。

@Spring BootApplication
@MapperScan("com.zhb.mapper")
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

或者直接在 Mapper 类上面添加注解 @Mapper,建议使用上面那种,不然每个 Mapper 加个注解会很麻烦。

代码展示

这里只说下,我们在 controller 中 尽量使用 Restful 风格

@RestController
public class UserController {

    @Resource
    private UserMapper userMapper;


    @GetMapping(value = "/users")
    public List<UserEntity> getUsers() {
        List<UserEntity> users=userMapper.getAll();
        return users;
    }



    @GetMapping(value = "/users/{id}")
    public UserEntity getUser(@PathVariable(value = "id") Long id) {
        UserEntity user=userMapper.getOne(id);
        return user;
    }

    @PostMapping("/users")
    public void save(UserEntity user) {
        userMapper.insert(user);
    }

    @PutMapping("/users")
    public void update(UserEntity user) {
        userMapper.update(user);
    }

    @DeleteMapping(value="/users/{id}")
    public void delete(@PathVariable("id") Long id) {
        userMapper.delete(id);
    }


}

实际开发常遇问题

在平常开发中,我们经常会遇到返回树结构,如下展示

[
  {
    "id": "1",
    "name": "山东",
    "pid": "0",
    "children": [
      {
        "id": "2",
        "name": "济南",
        "pid": "1",
        "children": [
          {
            "id": "3",
            "name": "高新区",
            "pid": "2",
            "children": null
          }
        ]
      }
    ]
  }
]
(1) java 中 通过对 list 数据进行拼接成树

如下面的方法(这里直接声明了 list,往里面添加数据,来演示查询出的 list 集合)

@GetMapping(value = "/area")
    public   List<AreaTreeEntity> get(){
        
        List<AreaTreeEntity> res= new ArrayList<>();
        Map<String,List<AreaTreeEntity>> childMap = new HashMap<>(16);

		//为了方便测试,构建list数据
		List<AreaTreeEntity> s =  new ArrayList<>();
        AreaTreeEntity a = new AreaTreeEntity();
        a.setId("1");
        a.setName("山东");
        a.setPid("0");
        s.add(a);

        AreaTreeEntity a1 = new AreaTreeEntity();
        a1.setId("2");
        a1.setName("济南");
        a1.setPid("1");
        s.add(a1);

        AreaTreeEntity a2 = new AreaTreeEntity();
        a2.setId("3");
        a2.setName("高新区");
        a2.setPid("2");
        s.add(a2);

        for(AreaTreeEntity entity : s){

            if ("0".equals(entity.getPid())) {

                res.add(entity);
            }
            else {

                List<AreaTreeEntity> childList = (childMap.containsKey(entity.getPid())) ? childMap.get(entity.getPid()) : new ArrayList<>();
                childList.add(entity);

                if (!childMap.containsKey(entity.getPid())){
                    childMap.put(entity.getPid(),childList);
                }
            }
        }

        for(AreaTreeEntity entity : res){
            findChild(entity,childMap);
        }

        return res;
    }

    public void findChild(AreaTreeEntity entity,Map<String,List<AreaTreeEntity>> childMap){

        if (childMap.containsKey(entity.getId())){
            List<AreaTreeEntity> chidList = childMap.get(entity.getId());
            for (AreaTreeEntity childEntity : chidList){
                findChild(childEntity,childMap);
            }
            entity.setChildren(chidList);
        }
    }

经过验证,返回数据如下

[
  {
    "id": "1",
    "name": "山东",
    "pid": "0",
    "children": [
      {
        "id": "2",
        "name": "济南",
        "pid": "1",
        "children": [
          {
            "id": "3",
            "name": "高新区",
            "pid": "2",
            "children": null
          }
        ]
      }
    ]
  }
]
(2) 使用 mabatis 的 collection 直接查询出树结构

<mapper namespace="com.zhb.mapper.AreaTreeMapper" >
    <resultMap id="TreeResultMap" type="com.zhb.entity.AreaTreeEntity" >
        <result column="id" property="id" jdbcType="VARCHAR" />
        <result column="name" property="name" jdbcType="VARCHAR" />
        <collection property="children" column="id" ofType="com.zhb.entity.AreaTreeEntity" javaType="ArrayList" select="selectChildrenById"/>
    </resultMap>


    <select id="getAreaTree" resultMap="TreeResultMap">
        select  id,name  from  area where parent_id  = ''0''
    </select>

    <select id="selectChildrenById" parameterType="java.lang.String" resultMap="TreeResultMap">
        select id,name from  area where parent_id  = #{id}
    </select>

</mapper>

完整代码下载:github

注:项目中加入 swagger ,方便大家测试,直接访问 http://localhost:8080/swagger-ui.html

SpringBoot + KafKa集群的集成

SpringBoot + KafKa集群的集成

简介

本文主要讲在springboot2中,如何通过自定义的配置来集成,并可以比较好的扩展性,同时集成多个kafka集群

引入依赖

引入kafka的依赖

        <!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

配置文件

添加配置文件,默认添加一个kafka的集群,

topinfo:
     # kafka集群配置 ,bootstrap-servers 是必须的
   kafka:
      # 生产者的kafka集群地址
      bootstrap-servers:  192.168.90.225:9092,192.168.90.226:9092,192.168.90.227:9092 
      producer: 
         topic-name:  topinfo-01
         
      consumer:
         group-id:  ci-data
         

如果多个,则配置多个kafka的集群配置即可

添加属性配置类

添加对应的属性配置类,如果是多个kafka集群,则可以填多个即可,注意对应的@ConfigurationProperties。

package com.topinfo.ci.dataex.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import com.topinfo.ci.dataex.bean.Consumer;
import com.topinfo.ci.dataex.bean.Producer;

/**
 * @Description: kafka 属性配置
 * @Author:杨攀
 * @Since:2019年7月10日上午10:35:18
 */
@ConfigurationProperties(prefix = "topinfo.kafka")
@Component
public class KafKaConfiguration {

    /**
     * @Fields bootstrapServer : 集群的地址
     */
    private String bootstrapServers;

    private Producer producer;

    private Consumer consumer;

    public String getBootstrapServers() {
        return bootstrapServers;
    }

    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }

    public Producer getProducer() {
        return producer;
    }

    public void setProducer(Producer producer) {
        this.producer = producer;
    }

    public Consumer getConsumer() {
        return consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }

}

添加kafka配置类

kafka的配置类中, 主要注意的方法:

生产者工厂方法: producerFactory()
生产者KafkaTemplate :kafkaTemplate()

消费者的工厂方法:consumerFactory()
消费者的监听容器工厂方法: kafkaListenerContainerFactory()

如果对应的是对个集群,需要多配置几个对应的这几个方法即可。

package com.topinfo.ci.dataex.config;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

/**
 * @Description: kafka配置类
 * @Author:杨攀
 * @Since:2019年7月10日下午3:06:58
 */
@Configuration
public class KafKaConfig {

    @Autowired
    private KafKaConfiguration configuration;

     
    
    /**
     * @Description: 生产者的配置
     * @Author:杨攀
     * @Since: 2019年7月10日下午1:41:06
     * @return
     */
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<String, Object>();
        // 集群的服务器地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBootstrapServers());
        //  消息缓存
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        // 生产者空间不足时,send()被阻塞的时间,默认60s
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
        // 生产者重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        // 指定ProducerBatch(消息累加器中BufferPool中的)可复用大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,  4096);
        // 生产者会在ProducerBatch被填满或者等待超过LINGER_MS_CONFIG时发送
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // key 和 value 的序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        // 客户端id
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.topinfo");

        return props;
    }

    /**
     * @Description: 生产者工厂
     * @Author:杨攀
     * @Since: 2019年7月10日下午2:10:04
     * @return
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
    }

    /**
     * @Description: KafkaTemplate
     * @Author:杨攀
     * @Since: 2019年7月10日下午2:10:47
     * @return
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }


    // ------------------------------------------------------------------------------------------------------------

    /**
     * @Description: 消费者配置
     * @Author:杨攀
     * @Since: 2019年7月10日下午1:48:36
     * @return
     */
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<String, Object>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBootstrapServers());
        // 消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, configuration.getConsumer().getGroupId());
        // 自动位移提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自动位移提交间隔时间
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        // 消费组失效超时时间
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
        // 位移丢失和位移越界后的恢复起始位置
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // key 和 value 的反序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");

        return props;
    }

    /**
     * @Description: 消费者工厂
     * @Author:杨攀
     * @Since: 2019年7月10日下午2:14:13
     * @return
     */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * @Description: kafka 监听容器工厂
     * @Author:杨攀
     * @Since: 2019年7月10日下午2:50:44
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 设置消费者工厂
        factory.setConsumerFactory(consumerFactory());
        // 要创建的消费者数量(10 个线程并发处理)
        factory.setConcurrency(10);

        return factory;
    }

}

主题配置类

主要是可以对主题进行管理。新增,修改,删除等

package com.topinfo.ci.dataex.config;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

/**
 * @Description: kafka 主题 配置类
 * @Author:杨攀
 * @Since:2019年7月10日下午3:06:58
 */
@Configuration
public class KafKaTopicConfig {

    @Autowired
    private KafKaConfiguration configuration;

    /**
     *@Description: kafka管理员,委派给AdminClient以创建在应用程序上下文中定义的主题的管理员。
     *@Author:杨攀
     *@Since: 2019年7月10日下午3:14:23
     *@return
     */
    @Bean
    public KafkaAdmin kafkaAdmin() {
        
        Map<String, Object> props = new HashMap<>();
        
        // 配置Kafka实例的连接地址
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBootstrapServers());
        KafkaAdmin admin = new KafkaAdmin(props);
        return admin;
    }

    /**
     *@Description: kafka的管理客户端,用于创建、修改、删除主题等
     *@Author:杨攀
     *@Since: 2019年7月10日下午3:15:01
     *@return
     */
    @Bean
    public AdminClient adminClient() {
        return AdminClient.create(kafkaAdmin().getConfig());
    }
    
    /**
     * @Description: 创建一个新的 topinfo 的Topic,如果kafka中topinfo 的topic已经存在,则忽略。
     * @Author:杨攀
     * @Since: 2019年7月10日上午11:13:28
     * @return
     */
    @Bean
    public NewTopic topinfo() {

        // 主题名称
        String topicName = configuration.getProducer().getTopicName();
        // 第二个参数是分区数, 第三个参数是副本数量,确保集群中配置的数目大于等于副本数量
        return new NewTopic(topicName, 2, (short) 2);
    }

}

生产者测试

生产者在发送消息的时候,使用对应的kafkaTemplate即可,如果是多个,需要注意导入的是对应的kafkaTemplate。

package com.topinfo.ci.dataex.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.topinfo.ci.dataex.config.KafKaConfig;

@RestController
@RequestMapping("kafka")
public class TestKafKaProducerController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @RequestMapping("send")
    public String send(String name) {
        
        ListenableFuture<SendResult<String, String>>  future = kafkaTemplate.send("topinfo", name);
        
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("生产者-发送消息成功:" + result.toString());
            }

            @Override
            public void onFailure(Throwable ex) {
                System.out.println("生产者-发送消息失败:" + ex.getMessage());
            }
        });
        
        
        return "test-ok";
    }
    
}

消费者测试

消费者需要在接收的方法上添加@KafkaListener,用于监听对应的topic,可以配置topic多个。

package com.topinfo.ci.dataex.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.topinfo.ci.dataex.config.KafKaConfig;

/**
 * @Description: kafka消费者
 * @Author:杨攀
 * @Since:2019年7月10日上午11:24:31
 */
@Component
public class KafKaConsumer {

    private final Logger logger = LoggerFactory.getLogger(KafKaConsumer.class);

    
    /**
     * @Description: 可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开
     * @Author:杨攀
     * @Since: 2019年7月10日上午11:26:16
     * @param record
     */
    @KafkaListener(topics = { "topinfo" })
    public void receive(ConsumerRecord<?, ?> record) {

        logger.info("消费得到的消息---key: " + record.key());
        logger.info("消费得到的消息---value: " + record.value().toString());
        
    }

}

如果多个集群的情况下,需要在KafkaListener监听注解上添加containerFactory,对应配置中的监听容器工厂。

/**
     * @Description: 可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开
     * @Author:杨攀
     * @Since: 2019年7月10日上午11:26:16
     * @param record
     */
    @KafkaListener(topics = { "topinfo" }, containerFactory = "kafkaListenerContainerFactory")
    public void receive(ConsumerRecord<?, ?> record) {

        logger.info("消费得到的消息---key: " + record.key());
        logger.info("消费得到的消息---value: " + record.value().toString());
        
    }

好了, 至此所有的配置就差不多了。

最后还有一项, 看到下面的绿色按钮没,来,点一下,乖! O(∩_∩)O哈哈~ ...

今天的关于springboot 集成调用 Azkabanspringboot集成keycloak的分享已经结束,谢谢您的关注,如果想了解更多关于Java-Springboot - 集成 spring-security 简单示例 (Version-springboot-2-1-3-RELEASE、java版springcloud+springboot多租户社交电子商务-springboot集成apidoc、SpringBoot (九)_springboot 集成 MyBatis、SpringBoot + KafKa集群的集成的相关知识,请在本站进行查询。

本文标签: