GVKun编程网logo

Win下执行kettle的trans和job的批处理示例(kettle执行ktr)

4

本文的目的是介绍Win下执行kettle的trans和job的批处理示例的详细情况,特别关注kettle执行ktr的相关信息。我们将通过专业的研究、有关数据的分析等多种方式,为您呈现一个全面的了解Wi

本文的目的是介绍Win下执行kettle的trans和job的批处理示例的详细情况,特别关注kettle执行ktr的相关信息。我们将通过专业的研究、有关数据的分析等多种方式,为您呈现一个全面的了解Win下执行kettle的trans和job的批处理示例的机会,同时也不会遗漏关于.net中的CreateJobObject / SetInformationJobObject pinvoke的工作示例?、android view之getLeft(),getRight(),getTop(),getBottom(),getX(),getY(),getRawX(),getRawY(),getTrans...、ETL 技术 (Extract-Transform-Load) 数据仓库技术 - 比如 kettle、Flink1.7.2 sql 批处理示例的知识。

本文目录一览:

Win下执行kettle的trans和job的批处理示例(kettle执行ktr)

Win下执行kettle的trans和job的批处理示例(kettle执行ktr)

第一步:定义一个trans示例

第二步:做成bat批处理文件,如下:

set panpath=C:\pdi-ce-5.4.0.1-130\data-integration
set kpath=D:\03works\ZYWSPT\kettle

c:
cd  %panpath%
pan  /file %kpath%\test.ktr

注意:等号的两边不能有空格,不然,会出错。

第三步:定义计划任务

单独创建一个文件夹,存放个人的计划任务

按步骤提示完成。

收工!

.net中的CreateJobObject / SetInformationJobObject pinvoke的工作示例?

.net中的CreateJobObject / SetInformationJobObject pinvoke的工作示例?

我正在努力拼凑一个拼凑的CreateJobObject和SetInformationJobObject的工作示例。通过各种Google搜索(包括俄语和中文帖子!),我将以下代码拼凑在一起。我认为JOBOBJECT_BASIC_LIMIT_INFORMATION的定义根据平台(32/64位)而变化。CreateJobObject
/ AssignProcessToJobObject 似乎 可以工作。SetInformationJobObject失败-错误24或87。

Process myProcess // POPULATED SOMEWHERE ELSE// Create Job & assign this process and another process to the jobIntPtr jobHandle = CreateJobObject( null , null );AssignProcessToJobObject( jobHandle , myProcess.Handle );AssignProcessToJobObject( jobHandle , Process.GetCurrentProcess().Handle );// Ensure that killing one process kills the others                JOBOBJECT_BASIC_LIMIT_INFORMATION limits = new JOBOBJECT_BASIC_LIMIT_INFORMATION();limits.LimitFlags = (short)LimitFlags.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;IntPtr pointerToJobLimitInfo = Marshal.AllocHGlobal( Marshal.SizeOf( limits ) );Marshal.StructureToPtr( limits , pointerToJobLimitInfo , false );   SetInformationJobObject( job , JOBOBJECTINFOCLASS.JobObjectBasicLimitInformation , pionterToJobLimitInfo ,  ( uint )Marshal.SizeOf( limits ) )...        [DllImport( "kernel32.dll" , EntryPoint = "CreateJobObjectW" , CharSet = CharSet.Unicode )]        public static extern IntPtr CreateJobObject( SecurityAttributes JobAttributes , string lpName );        public class SecurityAttributes        {            public int nLength; //Useless field = 0            public IntPtr pSecurityDescriptor; //хз))            public bool bInheritHandle; //Возможность наследования            public SecurityAttributes()            {                this.bInheritHandle = true;                this.nLength = 0;                this.pSecurityDescriptor = IntPtr.Zero;            }        }        [DllImport( "kernel32.dll" )]        static extern bool SetInformationJobObject( IntPtr hJob , JOBOBJECTINFOCLASS JobObjectInfoClass , IntPtr lpJobObjectInfo , uint cbJobObjectInfoLength );        public enum JOBOBJECTINFOCLASS        {            JobObjectAssociateCompletionPortInformation = 7 ,            JobObjectBasicLimitInformation = 2 ,            JobObjectBasicUIRestrictions = 4 ,            JobObjectEndOfJobTimeInformation = 6 ,            JobObjectExtendedLimitInformation = 9 ,            JobObjectSecurityLimitInformation = 5        }        [StructLayout( LayoutKind.Sequential )]        struct JOBOBJECT_BASIC_LIMIT_INFORMATION        {            public Int64 PerProcessUserTimeLimit;            public Int64 PerJobUserTimeLimit;            public Int16 LimitFlags;            public UIntPtr MinimumWorkingSetSize;            public UIntPtr MaximumWorkingSetSize;            public Int16 ActiveProcessLimit;            public Int64 Affinity;            public Int16 PriorityClass;            public Int16 SchedulingClass;        }        public enum LimitFlags        {            JOB_OBJECT_LIMIT_ACTIVE_PROCESS = 0x00000008 ,            JOB_OBJECT_LIMIT_AFFINITY = 0x00000010 ,            JOB_OBJECT_LIMIT_BREAKAWAY_OK = 0x00000800 ,            JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION = 0x00000400 ,            JOB_OBJECT_LIMIT_JOB_MEMORY = 0x00000200 ,            JOB_OBJECT_LIMIT_JOB_TIME = 0x00000004 ,            JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE = 0x00002000 ,            JOB_OBJECT_LIMIT_PRESERVE_JOB_TIME = 0x00000040 ,            JOB_OBJECT_LIMIT_PRIORITY_CLASS = 0x00000020 ,            JOB_OBJECT_LIMIT_PROCESS_MEMORY = 0x00000100 ,            JOB_OBJECT_LIMIT_PROCESS_TIME = 0x00000002 ,            JOB_OBJECT_LIMIT_SCHEDULING_CLASS = 0x00000080 ,            JOB_OBJECT_LIMIT_SILENT_BREAKAWAY_OK = 0x00001000 ,            JOB_OBJECT_LIMIT_WORKINGSET = 0x00000001        }        [DllImport( "kernel32.dll" )]        [return: MarshalAs( UnmanagedType.Bool )]        static extern bool AssignProcessToJobObject( IntPtr hJob , IntPtr hProcess );        [StructLayout( LayoutKind.Sequential )]        public struct SECURITY_ATTRIBUTES        {            public int nLength;            public IntPtr lpSecurityDescriptor;            public int bInheritHandle;        }

答案1

小编典典

这可能会晚一点,但仍然如此。

我在这里尝试了所有示例,但是没有人同时在32位和64位模式下为我工作。最后,我需要亲自检查所有签名并创建相应的PInvoke例程。我认为,其他人可能会发现这很有帮助。

免责声明:解决方案基于 Matt Howells的答案。

using System;using System.Diagnostics;using System.Runtime.InteropServices;namespace JobManagement{    public class Job : IDisposable    {        [DllImport("kernel32.dll", CharSet = CharSet.Unicode)]        static extern IntPtr CreateJobObject(IntPtr a, string lpName);        [DllImport("kernel32.dll")]        static extern bool SetInformationJobObject(IntPtr hJob, JobObjectInfoType infoType, IntPtr lpJobObjectInfo, UInt32 cbJobObjectInfoLength);        [DllImport("kernel32.dll", SetLastError = true)]        static extern bool AssignProcessToJobObject(IntPtr job, IntPtr process);        [DllImport("kernel32.dll", SetLastError = true)]        [return: MarshalAs(UnmanagedType.Bool)]        static extern bool CloseHandle(IntPtr hObject);        private IntPtr handle;        private bool disposed;        public Job()        {            handle = CreateJobObject(IntPtr.Zero, null);            var info = new JOBOBJECT_BASIC_LIMIT_INFORMATION            {                LimitFlags = 0x2000            };            var extendedInfo = new JOBOBJECT_EXTENDED_LIMIT_INFORMATION            {                BasicLimitInformation = info            };            int length = Marshal.SizeOf(typeof(JOBOBJECT_EXTENDED_LIMIT_INFORMATION));            IntPtr extendedInfoPtr = Marshal.AllocHGlobal(length);            Marshal.StructureToPtr(extendedInfo, extendedInfoPtr, false);            if (!SetInformationJobObject(handle, JobObjectInfoType.ExtendedLimitInformation, extendedInfoPtr, (uint)length))                throw new Exception(string.Format("Unable to set information.  Error: {0}", Marshal.GetLastWin32Error()));        }        public void Dispose()        {            Dispose(true);            GC.SuppressFinalize(this);        }        private void Dispose(bool disposing)        {            if (disposed)                return;            if (disposing) { }            Close();            disposed = true;        }        public void Close()        {            CloseHandle(handle);            handle = IntPtr.Zero;        }        public bool AddProcess(IntPtr processHandle)        {            return AssignProcessToJobObject(handle, processHandle);        }        public bool AddProcess(int processId)        {            return AddProcess(Process.GetProcessById(processId).Handle);        }    }    #region Helper classes    [StructLayout(LayoutKind.Sequential)]    struct IO_COUNTERS    {        public UInt64 ReadOperationCount;        public UInt64 WriteOperationCount;        public UInt64 OtherOperationCount;        public UInt64 ReadTransferCount;        public UInt64 WriteTransferCount;        public UInt64 OtherTransferCount;    }    [StructLayout(LayoutKind.Sequential)]    struct JOBOBJECT_BASIC_LIMIT_INFORMATION    {        public Int64 PerProcessUserTimeLimit;        public Int64 PerJobUserTimeLimit;        public UInt32 LimitFlags;        public UIntPtr MinimumWorkingSetSize;        public UIntPtr MaximumWorkingSetSize;        public UInt32 ActiveProcessLimit;        public UIntPtr Affinity;        public UInt32 PriorityClass;        public UInt32 SchedulingClass;    }    [StructLayout(LayoutKind.Sequential)]    public struct SECURITY_ATTRIBUTES    {        public UInt32 nLength;        public IntPtr lpSecurityDescriptor;        public Int32 bInheritHandle;    }    [StructLayout(LayoutKind.Sequential)]    struct JOBOBJECT_EXTENDED_LIMIT_INFORMATION    {        public JOBOBJECT_BASIC_LIMIT_INFORMATION BasicLimitInformation;        public IO_COUNTERS IoInfo;        public UIntPtr ProcessMemoryLimit;        public UIntPtr JobMemoryLimit;        public UIntPtr PeakProcessMemoryUsed;        public UIntPtr PeakJobMemoryUsed;    }    public enum JobObjectInfoType    {        AssociateCompletionPortInformation = 7,        BasicLimitInformation = 2,        BasicUIRestrictions = 4,        EndOfJobTimeInformation = 6,        ExtendedLimitInformation = 9,        SecurityLimitInformation = 5,        GroupInformation = 11    }    #endregion}

android view之getLeft(),getRight(),getTop(),getBottom(),getX(),getY(),getRawX(),getRawY(),getTrans...

android view之getLeft(),getRight(),getTop(),getBottom(),getX(),getY(),getRawX(),getRawY(),getTrans...

自定义view时使经常使用到的view距离及坐标记录

如下

第一:view中一般用于获取view宽高

getLeft()        view本身左侧   距离父布局左侧   的距离

getRight()      view本身右侧   距离父布局左边侧  的距离

getTop()         view本身顶部   距离父布局顶部    的距离

getBottom()    view本身底部   距离父布局顶部   的距离

 

View的  Height = view.getBottom() - view.getTop();

View的  Width = view.getRight() - view.getLeft();

 

移动view时  可以通过变化  getLeft()   getRight()   getTop()  getBottom() 改变view的位置

view.layout(left,top,right,bottom)

 

第二:MotionEvent当中的方法    doTouchEvent   点击view

 

getY()               点击事件距离view本身左边的距离
getY()               点击事件距离view本身顶边的距离
getRawX()       点击事件距离整个屏幕左边的距离
getRawY()       点击事件距离整个屏幕顶边的距离

一般用于move  view时 通过移动的距离加getLeft等变更view的位置

 

第三:相对于view原始位置移动的偏移量

 

 getTranslationX()    设置view横向移动的偏移距离

 getTranslationY()    设置view竖向移动的偏移距离

 

ETL 技术 (Extract-Transform-Load) 数据仓库技术 - 比如 kettle

ETL 技术 (Extract-Transform-Load) 数据仓库技术 - 比如 kettle

每次面试,互联网的面试官,经常问我有没有用过 ETL, 每次我都懵逼,说没用过,觉得是多么高大上的东东,数据仓储

今天查了一下,我晕,自己天天用的 Kettle 就是最典型的 ETL,

可以实现不同数据库之间的数据抽取,转换,只需要你有相应的数据库 driver 即可

查了一下资料记录一下:

ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。ETL 一词较常用在 数据仓库,但其对象并不限于数据仓库。
ETL 是构建数据仓库的重要一环,用户从 数据源抽取出所需的数据,经过 数据清洗 , 最终按照预先定义好的数据仓库模型,将数据加载到数据仓库中去。
信息是现代企业的重要资源,是企业运用科学管理、决策分析的基础。目前,大多数企业花费大量的资金和时间来构建 联机事务处理 OLTP 的业务系统和 办公自动化系统,用来记录事务处理的各种相关数据。据统计,数据量每 2~3 年时间就会成倍增长,这些数据蕴含着巨大的商业价值,而企业所关注的通常只占在总数据量的 2%~4% 左右。因此,企业仍然没有最大化地利用已存在的数据资源,以至于浪费了更多的时间和资金,也失去制定关键商业决策的最佳契机。于是,企业如何通过各种技术手段,并把数据转换为信息、知识,已经成了提高其核心竞争力的主要瓶颈。而 ETL 则是主要的一个技术手段。

 

工具应用

编辑
ETL 工具的典型代表有:Informatica、Datastage、OWB、微软 DTS、 Beeload、 Kettle 等等……
开源的工具有 eclipse 的 etl 插件:cloveretl
ETL 的质量问题具体表现为正确性、完整性、一致性、完备性、有效性、时效性和可获取性等几个特性。而影响质量问题的原因有很多,由 系统集成和历史数据造成的原因主要包括:业务系统不同时期系统之间数据模型不一致;业务系统不同时期业务过程有变化;旧系统模块在运营、人事、财务、办公系统等相关信息的不一致;遗留系统和新业务、管理系统 数据集成不完备带来的不一致性。
实现 ETL,首先要实现 ETL 转换的过程。体现为以下几个方面:
1、空值处理:可捕获字段空值,进行加载或替换为其他含义数据,并可根据字段空值实现分流加载到不同目标库。
2、规范化数据格式:可实现字段格式约束定义,对于 数据源中时间、数值、字符等数据,可自定义加载格式。
3、拆分数据:依据业务需求对字段可进行分解。例,主叫号 861082585313-8148,可进行区域码和电话号码分解。
4、验证数据正确性:可利用 Lookup 及拆分功能进行 数据验证。例如,主叫号 861082585313-8148,进行区域码和电话号码分解后,可利用 Lookup 返回主叫网关或交换机记载的主叫地区,进行数据验证。
5、数据替换:对于因业务因素,可实现无效数据、缺失数据的替换。
6、Lookup:查获丢失数据 Lookup 实现 子查询,并返回用其他手段获取的缺失字段,保证字段完整性。
7、建立 ETL 过程的主 外键约束:对无依赖性的非法数据,可替换或导出到错误数据文件中,保证主键唯一记录的加载。

体系结构

编辑
ETL 工具目前有两种技术架构 —— ETL 架构和 ELT 架构,不仔细看好像没什么不同之处,那么这两种架构到底有什么区别呢?  [1] 

ETL 架构

在 ETL 架构中,数据的流向是从源数据流到 ETL 工具,ETL 工具是一个单独的数据处理引擎,一般会在单独的硬件服务器上,实现所有数据转化的工作,然后将数据加载到目标数据仓库中,如果要增加整个 ETL 过程的效率,则只能增强 ETL 工具服务器的配置,优化系统处理流程(一般可调的东西非常少)。IBM 的 datastage 和 Informatica 的 powercenter 原来都是采用的这种架构。
ETL 架构的优势:
  1. ETL 可以分担数据库系统的负载(采用单独的硬件服务器)
  2. ETL 相对于 EL-T 架构可以实现更为复杂的数据转化逻辑
  3. ETL 采用单独的硬件服务器。.
  4. ETL 与底层的数据库数据存储无关。

ELT 架构

在 ELT 架构中,ELT 只负责提供图形化的界面来设计业务规则,数据的整个加工过程都在目标和源的数据库之间流动,ELT 协调相关的数据库系统来执行相关的应用,数据加工过程既可以在源数据库端执行,也可以在目标数据仓库端执行(主要取决于系统的架构设计和数据属性)。当 ETL 过程需要提高效率,则可以通过对相关数据库进行调优,或者改变执行加工的服务器就可以达到。一般数据库厂商会力推该种架构,像 Oracle 和 Teradata 都极力宣传 ELT 架构。
ELT 架构的优势:
  1. ELT 主要通过数据库引擎来实现系统的可扩展性(尤其是当数据加工过程在晚上时,可以充分利用数据库引擎的资源)
  2. ELT 可以保持所有的数据始终在数据库当中,避免数据的加载和导出,从而保证效率,提高系统的可监控性。
  3. ELT 可以根据数据的分布情况进行并行处理优化,并可以利用数据库的固有功能优化磁盘 I/O。
  4. ELT 的可扩展性取决于数据库引擎和其硬件服务器的可扩展性。
  5. 通过对相关数据库进行性能调优,ETL 过程获得 3 到 4 倍的效率提升一般不是特别困难。

注意事项

编辑
为了能更好地实现 ETL,用户在实施 ETL 过程中应注意以下几点:
第一,如果条件允许,可利用数据中转区对运营数据进行预处理,保证集成与加载的高效性;
第二,如果 ETL 的过程是主动 “拉取”,而不是从内部 “推送”,其可控性将大为增强;
第三,ETL 之前应制定流程化的配置管理和标准协议;
第四,关键数据标准至关重要。ETL 面临的最大挑战是当接收数据时其各源数据的异构性和低质量。以电信为例,A 系统按照统计代码管理数据,B 系统按照账目数字管理,C 系统按照语音 ID 管理。当 ETL 需要对这三个系统进行集成以获得对客户的全面视角时,这一过程需要复杂的匹配规则、名称 / 地址正常化与标准化。而 ETL 在处理过程中会定义一个关键数据标准,并在此基础上,制定相应的数据接口标准。
第五,将数据加载到个体数据集时。在没有一个集中化的数据库情况下,拥有数据模板是非常重要的。它们是标准化的接口,每一个个体或者部门数据集市都能够填充。确保你的 ETL 工具有这样的功能,能够扩展到一个数据仓库平台,将信息从一个数据集市流动到下一个。  [2] 

特色功能

编辑
ETL 过程在很大程度上受企业对源数据的理解程度的影响,也就是说从业务的角度看 数据集成非常重要。一个优秀的 ETL 设计应该具有如下功能:

管理简单

采用 元数据方法,集中进行管理;接口、数据格式、传输有严格的规范;尽量不在 外部数据源安装软件; 数据抽取系统流程自动化,并有自动调度功能;抽取的数据及时、准确、完整;可以提供同各种数据系统的接口,系统适应性强;提供软件框架系统,系统功能改变时,应用程序很少改变便可适应变化;可扩展性强。

标准定义数据

合理的业务模型设计对 ETL 至关重要。 数据仓库是企业唯一、真实、可靠的 综合数据平台。数据仓库的设计建模一般都依照三范式、星型模型、雪花模型,无论哪种设计思想,都应该最大化地涵盖关键业务数据,把运营环境中杂乱无序的数据结构统一成为合理的、关联的、分析型的新结构,而 ETL 则会依照模型的定义去提取数据源,进行转换、清洗,并最终加载到目标数据仓库中。
模型的重要之处在于对数据做标准化定义,实现统一的编码、统一的分类和组织。标准化定义的内容包括:标准代码统一、业务 术语统一。ETL 依照模型进行初始加载、增量加载、缓慢增长维、慢速变化维、事实表加载等 数据集成,并根据业务需求制定相应的加载策略、刷新策略、汇总策略、维护策略。

拓展新型应用

对业务数据本身及其运行环境的描述与定义的数据,称之为 元数据(metadata)。元数据是描述数据的数据。从某种意义上说,业务数据主要用于支持业务系统应用的数据,而元数据则是 企业信息门户、客户关系管理、 数据仓库、决策支持和 B2B 等新型应用所不可或缺的内容。
元数据的典型表现为对象的描述,即对数据库、表、列、列属性(类型、格式、约束等)以及主键 / 外部键关联等等的描述。特别是现行应用的异构性与分布性越来越普遍的情况下,统一的元数据就愈发重要了。“信息孤岛” 曾经是很多企业对其应用现状的一种抱怨和概括,而合理的元数据则会有效地描绘出信息的关联性。
而 元数据对于 ETL 的集中表现为:定义数据源的位置及数据源的属性、确定从源数据到目标数据的对应规则、确定相关的业务逻辑、在数据实际加载前的其他必要的准备工作,等等,它一般贯穿整个 数据仓库项目,而 ETL 的所有过程必须最大化地参照元数据,这样才能快速实现 ETL。

 

 

ETL 是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程,目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业的决策提供分析依据。 ETL 是 BI 项目重要的一个环节。 通常情况下,在 BI 项目中 ETL 会花掉整个项目至少 1/3 的时间,ETL 设计的好坏直接关接到 BI 项目的成败。       

  ETL 的设计分三部分:数据抽取、数据的清洗转换、数据的加载。在设计 ETL 的时候我们也是从这三部分出发。数据的抽取是从各个不同的数据源抽取到 ODS (Operational Data Store,操作型数据存储) 中 —— 这个过程也可以做一些数据的清洗和转换),在抽取的过程中需要挑选不同的抽取方法,尽可能的提高 ETL 的运行效率。ETL 三个部分中,花费时间最长的是 “T”(Transform,清洗、转换) 的部分,一般情况下这部分工作量是整个 ETL 的 2/3。数据的加载一般在数据清洗完了之后直接写入 DW (Data Warehousing,数据仓库) 中去。

  ETL 的实现有多种方法,常用的有三种。一种是借助 ETL 工具 (如 Oracle 的 OWB、SQL Server 2000 的 DTS、SQL Server2005 的 SSIS 服务、Informatic 等) 实现,一种是 SQL 方式实现,另外一种是 ETL 工具和 SQL 相结合。前两种方法各有各的优缺点,借助工具可以快速的建立起 ETL 工程,屏蔽了复杂的编码任务,提高了速度,降低了难度,但是缺少灵活性。SQL 的方法优点是灵活,提高 ETL 运行效率,但是编码复杂,对技术要求比较高。第三种是综合了前面二种的优点,会极大地提高 ETL 的开发速度和效率。

  一、 数据的抽取(Extract)

  这一部分需要在调研阶段做大量的工作,首先要搞清楚数据是从几个业务系统中来,各个业务系统的数据库服务器运行什么 DBMS, 是否存在手工数据,手工数据量有多大,是否存在非结构化的数据等等,当收集完这些信息之后才可以进行数据抽取的设计。

  1、对于与存放 DW 的数据库系统相同的数据源处理方法

  这一类数据源在设计上比较容易。一般情况下,DBMS (SQLServer、Oracle) 都会提供数据库链接功能,在 DW 数据库服务器和原业务系统之间建立直接的链接关系就可以写 Select 语句直接访问。

  2、对于与 DW 数据库系统不同的数据源的处理方法

  对于这一类数据源,一般情况下也可以通过 ODBC 的方式建立数据库链接 —— 如 SQL Server 和 Oracle 之间。如果不能建立数据库链接,可以有两种方式完成,一种是通过工具将源数据导出成.txt 或者是.xls 文件,然后再将这些源系统文件导入到 ODS 中。另外一种方法是通过程序接口来完成。

  3、对于文件类型数据源 (.txt,.xls),可以培训业务人员利用数据库工具将这些数据导入到指定的数据库,然后从指定的数据库中抽取。或者还可以借助工具实现。

  4、增量更新的问题

  对于数据量大的系统,必须考虑增量抽取。一般情况下,业务系统会记录业务发生的时间,我们可以用来做增量的标志,每次抽取之前首先判断 ODS 中记录最大的时间,然后根据这个时间去业务系统取大于这个时间所有的记录。利用业务系统的时间戳,一般情况下,业务系统没有或者部分有时间戳。

二、数据的清洗转换(Cleaning、Transform)

  一般情况下,数据仓库分为 ODS、DW 两部分。通常的做法是从业务系统到 ODS 做清洗,将脏数据和不完整数据过滤掉,在从 ODS 到 DW 的过程中转换,进行一些业务规则的计算和聚合。

  1、 数据清洗

  数据清洗的任务是过滤那些不符合要求的数据,将过滤的结果交给业务主管部门,确认是否过滤掉还是由业务单位修正之后再进行抽取。

不符合要求的数据主要是有不完整的数据、错误的数据、重复的数据三大类。

  (1) 不完整的数据:这一类数据主要是一些应该有的信息缺失,如供应商的名称、分公司的名称、客户的区域信息缺失、业务系统中主表与明细表不能匹配等。对于这一类数据过滤出来,按缺失的内容分别写入不同 Excel 文件向客户提交,要求在规定的时间内补全。补全后才写入数据仓库。

  (2) 错误的数据:这一类错误产生的原因是业务系统不够健全,在接收输入后没有进行判断直接写入后台数据库造成的,比如数值数据输成全角数字字符、字符串数据后面有一个回车操作、日期格式不正确、日期越界等。这一类数据也要分类,对于类似于全角字符、数据前后有不可见字符的问题,只能通过写 SQL 语句的方式找出来,然后要求客户在业务系统修正之后抽取。日期格式不正确的或者是日期越界的这一类错误会导致 ETL 运行失败,这一类错误需要去业务系统数据库用 SQL 的方式挑出来,交给业务主管部门要求限期修正,修正之后再抽取。

  (3) 重复的数据:对于这一类数据 —— 特别是维表中会出现这种情况 —— 将重复数据记录的所有字段导出来,让客户确认并整理。

  数据清洗是一个反复的过程,不可能在几天内完成,只有不断的发现问题,解决问题。对于是否过滤,是否修正一般要求客户确认,对于过滤掉的数据,写入 Excel 文件或者将过滤数据写入数据表,在 ETL 开发的初期可以每天向业务单位发送过滤数据的邮件,促使他们尽快地修正错误,同时也可以做为将来验证数据的依据。数据清洗需要注意的是不要将有用的数据过滤掉,对于每个过滤规则认真进行验证,并要用户确认。

  2、 数据转换

  数据转换的任务主要进行不一致的数据转换、数据粒度的转换,以及一些商务规则的计算。

  (1) 不一致数据转换:这个过程是一个整合的过程,将不同业务系统的相同类型的数据统一,比如同一个供应商在结算系统的编码是 XX0001, 而在 CRM 中编码是 YY0001,这样在抽取过来之后统一转换成一个编码。

  (2) 数据粒度的转换:业务系统一般存储非常明细的数据,而数据仓库中数据是用来分析的,不需要非常明细的数据。一般情况下,会将业务系统数据按照数据仓库粒度进行聚合。

  (3) 商务规则的计算:不同的企业有不同的业务规则、不同的数据指标,这些指标有的时候不是简单的加加减减就能完成,这个时候需要在 ETL 中将这些数据指标计算好了之后存储在数据仓库中,以供分析使用。

三、ETL 日志、警告发送

  1、 ETL 日志

  ETL 日志分为三类。

一类是执行过程日志,这一部分日志是在 ETL 执行过程中每执行一步的记录,记录每次运行每一步骤的起始时间,影响了多少行数据,流水账形式。

一类是错误日志,当某个模块出错的时候写错误日志,记录每次出错的时间、出错的模块以及出错的信息等。

第三类日志是总体日志,只记录 ETL 开始时间、结束时间是否成功信息。如果使用 ETL 工具,ETL 工具会自动产生一些日志,这一类日志也可以作为 ETL 日志的一部分。

记录日志的目的是随时可以知道 ETL 运行情况,如果出错了,可以知道哪里出错。

  2、 警告发送

  如果 ETL 出错了,不仅要形成 ETL 出错日志,而且要向系统管理员发送警告。发送警告的方式多种,一般常用的就是给系统管理员发送邮件,并附上出错的信息,方便管理员排查错误。

  ETL 是 BI 项目的关键部分,也是一个长期的过程,只有不断的发现问题并解决问题,才能使 ETL 运行效率更高,为 BI 项目后期开发提供准确与高效的数据。

后记

     做数据仓库系统,ETL 是关键的一环。说大了,ETL 是数据整合解决方案,说小了,就是倒数据的工具。回忆一下工作这么长时间以来,处理数据迁移、转换的工作倒还真的不少。但是那些工作基本上是一次性工作或者很小数据量。可是在数据仓库系统中,ETL 上升到了一定的理论高度,和原来小打小闹的工具使用不同了。究竟什么不同,从名字上就可以看到,人家已经将倒数据的过程分成 3 个步骤,E、T、L 分别代表抽取、转换和装载。

其实 ETL 过程就是数据流动的过程,从不同的数据源流向不同的目标数据。但在数据仓库中,

ETL 有几个特点,

一是数据同步,它不是一次性倒完数据就拉到,它是经常性的活动,按照固定周期运行的,甚至现在还有人提出了实时 ETL 的概念。

二是数据量,一般都是巨大的,值得你将数据流动的过程拆分成 E、T 和 L。

    现在有很多成熟的工具提供 ETL 功能,且不说他们的好坏。从应用角度来说,ETL 的过程其实不是非常复杂,这些工具给数据仓库工程带来和很大的便利性,特别是开发的便利和维护的便利。但另一方面,开发人员容易迷失在这些工具中。举个例子,VB 是一种非常简单的语言并且也是非常易用的编程工具,上手特别快,但是真正 VB 的高手有多少?微软设计的产品通常有个原则是 “将使用者当作傻瓜”,在这个原则下,微软的东西确实非常好用,但是对于开发者,如果你自己也将自己当作傻瓜,那就真的傻了。ETL 工具也是一样,这些工具为我们提供图形化界面,让我们将主要的精力放在规则上,以期提高开发效率。从使用效果来说,确实使用这些工具能够非常快速地构建一个 job 来处理某个数据,不过从整体来看,并不见得他的整体效率会高多少。问题主要不是出在工具上,而是在设计、开发人员上。他们迷失在工具中,没有去探求 ETL 的本质。可以说这些工具应用了这么长时间,在这么多项目、环境中应用,它必然有它成功之处,它必定体现了 ETL 的本质。如果我们不透过表面这些工具的简单使用去看它背后蕴涵的思想,最终我们作出来的东西也就是一个个独立的 job,将他们整合起来仍然有巨大的工作量。大家都知道 “理论与实践相结合”,如果在一个领域有所超越,必须要在理论水平上达到一定的高度.

 

下面是 ETL 工具 Kettle 的使用

参考:ETL 工具之 Kettle 的简单使用一 (不同数据库之间的数据抽取 - 转换 - 加载)

本文参考了:

参考:ETL(数据仓库技术)

参考:ETL 讲解(很详细!!!)

Flink1.7.2 sql 批处理示例

Flink1.7.2 sql 批处理示例

Flink1.7.2 sql 批处理示例

源码

  • https://github.com/opensourceteams/flink-maven-scala

概述

  • 本文为Flink sql Dataset 示例
  • 主要操作包括:Scan / Select,as (table),as (column),limit,Where / Filter,between and (where),Sum,min,max,avg, sum (group by ),group by having,distinct,INNER JOIN,left join,right join,full outer join,union,unionAll,INTERSECT in,EXCEPT,insert into

SELECT

Scan / Select

  • 功能描述: 查询一个表中的所有数据
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scan

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user1",dataSet,''name,''age,''sex)



    tableEnv.sqlQuery(s"select name,age FROM user1")
      .first(100).print()


    /**
      * 输出结果
      *
      * 小明,15
      * 小王,45
      * 小李,25
      * 小慧,35
      */
  }

}


  • 输出结果
小明,15
小王,45
小李,25
小慧,35

as (table)

  • 功能描述: 给表名取别称
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scan

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user1",dataSet,''name,''age,''sex)



    tableEnv.sqlQuery(s"select t1.name,t1.age FROM user1 as t1")
      .first(100).print()


    /**
      * 输出结果
      *
      * 小明,15
      * 小王,45
      * 小李,25
      * 小慧,35
      */
  }

}


  • 输出结果
小明,15
小王,45
小李,25
小慧,35

as (column)

  • 功能描述: 给表名取别称
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scan

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user1",dataSet,''name,''age,''sex)



    tableEnv.sqlQuery(s"select name a,age as b FROM user1 ")
      .first(100).print()


    /**
      * 输出结果
      *
      * 小明,15
      * 小王,45
      * 小李,25
      * 小慧,35
      */
  }

}


  • 输出结果
小明,15
小王,45
小李,25
小慧,35

limit

  • 功能描述:查询一个表的数据,只返回指定的前几行(争对并行度而言,所以并行度不一样,结果不一样)

  • scala 程序

package com.opensourceteams.mo`dule.bigdata.flink.example.sql.dataset.operations.limit

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)


    val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user1",dataSet,''name,''age,''sex)


    /**
      * 先排序,按age的降序排序,输出前100位结果,注意是按同一个并行度中的数据进行排序,也就是同一个分区
      */
    tableEnv.sqlQuery(s"select name,age FROM user1  ORDER BY age desc LIMIT 100  ")
      .first(100).print()


    /**
      * 输出结果 并行度设置为2
      *
      * 小明,15
      * 小王,45
      * 小慧,35
      * 小李,25
      */

    /**
      * 输出结果 并行度设置为1
      *
      * 小王,45
      * 小慧,35
      * 小李,25
      * 小明,15
      */



  }

}

  • 输出结果
小明,15
小王,45
小慧,35
小李,25

Where / Filter

  • 功能描述:列加条件过滤表中的数据
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.where

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user1",dataSet,''name,''age,''sex)



    tableEnv.sqlQuery(s"select name,age,sex FROM user1 where sex = ''女''")
      .first(100).print()


    /**
      * 输出结果
      * 
      * 小李,25,女
      * 小慧,35,女
      */
    
  }

}


  • 输出结果

小李,25,女
小慧,35,女

between and (where)

  • 功能描述: 过滤列中的数据, 开始数据 <= data <= 结束数据
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.whereBetweenAnd

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user1",dataSet,''name,''age,''sex)



    tableEnv.sqlQuery(s"select name,age,sex FROM user1 where age between 20 and  35")
      .first(100).print()


    /**
      * 结果
      *
      * 小李,25,女
      * 小慧,35,女
      */

  }

}


  • 输出结果
小李,25,女
小慧,35,女

Sum

  • 功能描述: 求和所有数据
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.sum

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user1",dataSet,''name,''age,''sex,''salary)



    //汇总所有数据
    tableEnv.sqlQuery(s"select sum(salary) FROM user1")
      .first(100).print()


    /**
      * 输出结果
      *
      * 6800
      */


  }

}


  • 输出结果
6800

max

  • 功能描述: 求最大值
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.max

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user1",dataSet,''name,''age,''sex,''salary)



    //汇总所有数据
    tableEnv.sqlQuery(s"select max(salary) FROM user1 ")
      .first(100).print()


    /**
      * 输出结果
      *
      * 4000
      */


  }

}


  • 输出结果
4000

min

  • 功能描述: 求最小值
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.min

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user1",dataSet,''name,''age,''sex,''salary)



    tableEnv.sqlQuery(s"select min(salary) FROM user1 ")
      .first(100).print()


    /**
      * 输出结果
      *
      * 500
      */


  }

}


  • 输出结果
500

sum (group by )

  • 功能描述: 按性别分组求和
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.group

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user1",dataSet,''name,''age,''sex,''salary)



    //汇总所有数据
    tableEnv.sqlQuery(s"select sex,sum(salary) FROM user1 group by sex")
      .first(100).print()

    /**
      * 输出结果
      * 
      * 女,1300
      * 男,5500
      */

  }

}


  • 输出结果
女,1300
男,5500

group by having

  • 功能描述:
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.group_having

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user1",dataSet,''name,''age,''sex,''salary)



    //分组统计,having是分组条件查询
    tableEnv.sqlQuery(s"select sex,sum(salary) FROM user1 group by sex having sum(salary) >1500")
      .first(100).print()

    /**
      * 输出结果
      * 
      * 
      */


  }

}


  • 输出结果
男,5500

distinct

  • 功能描述: 去重一列或多列
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.distinct

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements(("a",15,"male"),("a",45,"female"),("d",25,"male"),("c",35,"female"))

    val tableEnv = TableEnvironment.getTableEnvironment(env)

    tableEnv.registerDataSet("user1",dataSet,''name,''age,''sex)


    /**
      * 对数据去重
      */
    tableEnv.sqlQuery("select distinct name  FROM user1   ")
      .first(100).print()


    /**
      * 输出结果
      *
      * a
      * c
      * d
      */

  }

}


  • 输出结果
a
c
d

join

INNER JOIN

  • 功能描述: 连接两个表,按指定的列,两列都存在值才输出
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.innerJoin

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
    val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50) )

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user",dataSet,''id,''name,''age,''sex,''salary)
    tableEnv.registerDataSet("grade",dataSetGrade,''userId,''name,''fraction)



    //内连接,两个表
   // tableEnv.sqlQuery("select * FROM `user`  INNER JOIN  grade on  `user`.id = grade.userId ")
    tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user`  INNER JOIN  grade on  `user`.id = grade.userId ")
      .first(100).print()


    /**
      * 输出结果
      * 2,小王,45,男,4000,数学,80
      * 1,小明,15,男,1500,语文,100
      * 1,小明,15,男,1500,外语,50
      */

  }

}


  • 输出结果
2,小王,45,男,4000,数学,80
1,小明,15,男,1500,语文,100
1,小明,15,男,1500,外语,50

left join

  • 功能描述:连接两个表,按指定的列,左表中存在值就一定输出,右表如果不存在,就显示为空
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.leftJoin

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
    val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50) )

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user",dataSet,''id,''name,''age,''sex,''salary)
    tableEnv.registerDataSet("grade",dataSetGrade,''userId,''name,''fraction)



  //左连接,拿左边的表中的每一行数据,去关联右边的数据,如果有相同的匹配数据,就都匹配出来,如果没有,就匹配一条,不过右边的数据为空
    tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user`  LEFT JOIN  grade on  `user`.id = grade.userId ")
      .first(100).print()


    /**
      * 输出结果
      *
      * 1,小明,15,男,1500,语文,100
      * 1,小明,15,男,1500,外语,50
      * 2,小王,45,男,4000,数学,80
      * 4,小慧,35,女,500,null,null
      * 3,小李,25,女,800,null,null
      *
      *
      */

  }

}


  • 输出结果
1,小明,15,男,1500,语文,100
1,小明,15,男,1500,外语,50
2,小王,45,男,4000,数学,80
4,小慧,35,女,500,null,null
3,小李,25,女,800,null,null

right join

  • 功能描述:连接两个表,按指定的列,右表中存在值就一定输出,左表如果不存在,就显示为空
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.rightJoin

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
    val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50),(10,"外语",90) )

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user",dataSet,''id,''name,''age,''sex,''salary)
    tableEnv.registerDataSet("grade",dataSetGrade,''userId,''name,''fraction)



  //左连接,拿左边的表中的每一行数据,去关联右边的数据,如果有相同的匹配数据,就都匹配出来,如果没有,就匹配一条,不过右边的数据为空
    tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user`  RIGHT JOIN  grade on  `user`.id = grade.userId ")
      .first(100).print()


    /**
      * 输出结果
      *
      * 1,小明,15,男,1500,外语,50
      * 1,小明,15,男,1500,语文,100
      * 2,小王,45,男,4000,数学,80
      * null,null,null,null,null,外语,90
      *

      *
      */

  }

}


  • 输出结果
1,小明,15,男,1500,外语,50
1,小明,15,男,1500,语文,100
2,小王,45,男,4000,数学,80
null,null,null,null,null,外语,90

full outer join

  • 功能描述: 连接两个表,按指定的列,只要有一表中存在值就一定输出,另一表如果不存在就显示为空
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.fullOuterJoin

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
    val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50),(10,"外语",90) )

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user",dataSet,''id,''name,''age,''sex,''salary)
    tableEnv.registerDataSet("grade",dataSetGrade,''userId,''name,''fraction)



  //左,右,全匹配所有数据
    tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` FULL OUTER JOIN  grade on  `user`.id = grade.userId ")
      .first(100).print()


    /**
      * 输出结果
      *
      *
      * 3,小李,25,女,800,null,null
      * 1,小明,15,男,1500,外语,50
      * 1,小明,15,男,1500,语文,100
      * 2,小王,45,男,4000,数学,80
      * 4,小慧,35,女,500,null,null
      * null,null,null,null,null,外语,90
      *
      *
      *
      */

  }

}


  • 输出结果
3,小李,25,女,800,null,null
1,小明,15,男,1500,外语,50
1,小明,15,男,1500,语文,100
2,小王,45,男,4000,数学,80
4,小慧,35,女,500,null,null
null,null,null,null,null,外语,90

Set Operations

union

  • 功能描述: 连接两个表中的数据,会去重
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.union

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
    val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user",dataSet,''id,''name,''age,''sex,''salary)
    tableEnv.registerDataSet("t2",dataSet2,''id,''name,''age,''sex,''salary)


    /**
      *  union 连接两个表,会去重
      */
    tableEnv.sqlQuery(
      "select * from ("
                +"select t1.* FROM `user` as t1 ) " +
                + " UNION "
                + " ( select t2.* FROM t2 )"



       )
      .first(100).print()


    /**
      * 输出结果
      *
      * 30,小李,25,女,800
      * 40,小慧,35,女,500
      * 2,小王,45,男,4000
      * 4,小慧,35,女,500
      * 3,小李,25,女,800
      * 1,小明,15,男,1500
      *
      */

  }

}


  • 输出结果
30,小李,25,女,800
40,小慧,35,女,500
2,小王,45,男,4000
4,小慧,35,女,500
3,小李,25,女,800
1,小明,15,男,1500

unionAll

  • 功能描述: 连接两表中的数据,不会去重
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.unionAll

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
    val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user",dataSet,''id,''name,''age,''sex,''salary)
    tableEnv.registerDataSet("t2",dataSet2,''id,''name,''age,''sex,''salary)


    /**
      *  union 连接两个表,不会去重
      */
    tableEnv.sqlQuery(
      "select * from ("
                +"select t1.* FROM `user` as t1 ) " +
                + " UNION ALL "
                + " ( select t2.* FROM t2 )"



       )
      .first(100).print()


    /**
      * 输出结果
      *
      * 1,小明,15,男,1500
      * 2,小王,45,男,4000
      * 3,小李,25,女,800
      * 4,小慧,35,女,500
      * 1,小明,15,男,1500
      * 2,小王,45,男,4000
      * 30,小李,25,女,800
      * 40,小慧,35,女,500
      *
      */

  }

}


  • 输出结果
1,小明,15,男,1500
2,小王,45,男,4000
3,小李,25,女,800
4,小慧,35,女,500
1,小明,15,男,1500
2,小王,45,男,4000
30,小李,25,女,800
40,小慧,35,女,500

INTERSECT

  • 功能描述: INTERSECT 连接两个表,找相同的数据(相交的数据,重叠的数据)
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.intersect

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
    val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user",dataSet,''id,''name,''age,''sex,''salary)
    tableEnv.registerDataSet("t2",dataSet2,''id,''name,''age,''sex,''salary)


    /**
      *  INTERSECT 连接两个表,找相同的数据(相交的数据,重叠的数据)
      */
    tableEnv.sqlQuery(
      "select * from ("
                +"select t1.* FROM `user` as t1 ) " +
                + " INTERSECT "
                + " ( select t2.* FROM t2 )"



       )
      .first(100).print()


    /**
      * 输出结果
      *
      * 1,小明,15,男,1500
      * 2,小王,45,男,4000
      *
      */

  }

}


  • 输出结果
 1,小明,15,男,1500
 2,小王,45,男,4000

in

  • 功能描述: 子查询
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.in

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
    val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user",dataSet,''id,''name,''age,''sex,''salary)
    tableEnv.registerDataSet("t2",dataSet2,''id,''name,''age,''sex,''salary)


    /**
      *  in ,子查询
      */
    tableEnv.sqlQuery(

                "select t1.* FROM `user` t1  where t1.id in " +
                        " (select t2.id from t2) "




       )
      .first(100).print()


    /**
      * 输出结果
      *
      * 1,小明,15,男,1500
      * 2,小王,45,男,4000
      *
      */

  }

}


  • 输出结果
 1,小明,15,男,1500
 2,小王,45,男,4000

EXCEPT

  • 功能描述: EXCEPT 连接两个表,找不相同的数据(不相交的数据,不重叠的数据)
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.except

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
    val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))

    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user",dataSet,''id,''name,''age,''sex,''salary)
    tableEnv.registerDataSet("t2",dataSet2,''id,''name,''age,''sex,''salary)


    /**
      *  EXCEPT 连接两个表,找不相同的数据(不相交的数据,不重叠的数据)
      */
    tableEnv.sqlQuery(
      "select * from ("
                +"select t1.* FROM `user` as t1 ) " +
                + " EXCEPT "
                + " ( select t2.* FROM t2 )"



       )
      .first(100).print()


    /**
      * 输出结果
      *
      * 3,小李,25,女,800
      * 4,小慧,35,女,500
      *
      */

  }

}


  • 输出结果
 3,小李,25,女,800
 4,小慧,35,女,500

DML

insert into

  • 功能描述:将一个表中的数据(source),插入到 csv文件中(sink)
  • scala程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.insert

import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.common.typeinfo.TypeInformation

object Run {



  def main(args: Array[String]): Unit = {


    //得到批环境
    val env = ExecutionEnvironment.getExecutionEnvironment


    val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))


    //得到Table环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    //注册table
    tableEnv.registerDataSet("user1",dataSet,''name,''age,''sex)




    // create a TableSink
    val csvSink = new CsvTableSink("sink-data/csv/a.csv",",",1,WriteMode.OVERWRITE);
    val fieldNames = Array("name", "age", "sex")
    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.STRING)
    tableEnv.registerTableSink("t2",fieldNames,fieldTypes,csvSink)


    tableEnv.sqlUpdate(s" insert into  t2 select name,age,sex FROM user1  ")


    env.execute()


    /**
      * 输出结果
      * a.csv
      *
      * 小明,15,男
      * 小王,45,男
      * 小李,25,女
      * 小慧,35,女
      */





  }

}


  • 输出数据 a.csv
小明,15,男
小王,45,男
小李,25,女
小慧,35,女


Scan

  • 功能描述:
  • scala 程序

  • 输出结果


关于Win下执行kettle的trans和job的批处理示例kettle执行ktr的介绍现已完结,谢谢您的耐心阅读,如果想了解更多关于.net中的CreateJobObject / SetInformationJobObject pinvoke的工作示例?、android view之getLeft(),getRight(),getTop(),getBottom(),getX(),getY(),getRawX(),getRawY(),getTrans...、ETL 技术 (Extract-Transform-Load) 数据仓库技术 - 比如 kettle、Flink1.7.2 sql 批处理示例的相关知识,请在本站寻找。

本文标签: