对于想了解Swift3.0【Swift3.0环境下使用AFNetworking封装网络请求业务类】的读者,本文将是一篇不可错过的文章,我们将详细介绍swiftframework,并且为您提供关于AFN
对于想了解Swift 3.0 【Swift 3.0 环境下使用 AFNetworking 封装网络请求业务类】的读者,本文将是一篇不可错过的文章,我们将详细介绍swift framework,并且为您提供关于AFNetworking-不缓存响应、Android 环境下使用 call_usermodehelper () 以及调试、C# .net 环境下使用 rabbitmq 消息队列、docker 环境下使用 gitlab,gitlab-runner 为 NetCore 持续集成的有价值信息。
本文目录一览:- Swift 3.0 【Swift 3.0 环境下使用 AFNetworking 封装网络请求业务类】(swift framework)
- AFNetworking-不缓存响应
- Android 环境下使用 call_usermodehelper () 以及调试
- C# .net 环境下使用 rabbitmq 消息队列
- docker 环境下使用 gitlab,gitlab-runner 为 NetCore 持续集成
Swift 3.0 【Swift 3.0 环境下使用 AFNetworking 封装网络请求业务类】(swift framework)
前面已经有过在Swift2.3中如何使用AFNetworking封装网络请求业务类,读者可以去对比参考,传送门:
点击打开链接
由于Swift3.0并没有兼容Swift2.3,所以在Swift2.3版本中封装的业务类现在已经不可用。
本文是在Swift3.0语法更新后推出的。可以供大家参考使用。
源码已经在GitHub中给出,另外附有Demo如何使用,大家可以去下载学习。
GitHub地址:
点击打开链接
AFNetworking-不缓存响应
我正在使用以下代码从服务器提取简单的JSON提要:
AFHTTPRequestOperationManager *manager = [AFHTTPRequestOperationManager manager]; manager.responseSerializer = [AFJSONResponseSerializer serializer];[manager GET:kDataUrl parameters:nil success:^(AFHTTPRequestOperation *operation, id responseObject) { NSLog(@"response: %@", responseObject); } failure:^(AFHTTPRequestOperation *operation, NSError *error) { NSLog(@"JSON DataError: %@", error); }];
有用。但是,在更改JSON文件kDataUrl
并验证是否在浏览器中进行了更改之后,当我再次运行该应用程序时,仍会得到先前的响应。
看来AFNetworking正在某种程度上缓存旧的响应。我不要这种行为。我想下载当前的提要。是否需要某种类型的设置或参数来关闭缓存?
答案1
小编典典简而言之,只需定义您的AFNetworking经理即可:
AFHTTPRequestOperationManager *manager = [AFHTTPRequestOperationManager manager];[manager.requestSerializer setCachePolicy:NSURLRequestReloadIgnoringLocalCacheData];
请享用!
Android 环境下使用 call_usermodehelper () 以及调试
有时候设备驱动需要做一些与其他的设备通信的操作,但是驱动本身又不可以去实作,那这个时候就可以通过调用用户态的软件,通过这个软件和其他的设备进行通信。
那在内核态如何去调用用户态的程序呢?call_usermodehelper () 可以做到。这个 linux kernel 提供的一个接口,并且这个程序具体 root 权限。
这个函数的调用的方法是非常简单的。如下:
char cmd_path[] = "/vendor/bin/XX";
char* cmd_argv_low[] = {cmd_path,"wlan0","write","xxxx",NULL};
char* cmd_envp[] = {"PATH=/sbin:/system/bin", NULL};
int touch_int = 0;
touch_int = call_usermodehelper(cmd_path, cmd_argv_low, cmd_envp, UMH_WAIT_EXEC);
printk(KERN_DEBUG "call_usermodehelper is %d \n", touch_int);
如果顺利执行,上面的代码相当于在 shell 中执行了 /vendor/bin/XX wlan0 write xxxx 这么一条指令。下面简单解释一下 各个参数的意思:
- cmd_path 就是这个用户态的程序存放的位置。
- cmd_argv_low 是传给用户态程序的参数列表,相当于 argv [0],argv [1], 最后要以 NULL 结尾
- cmd_envp 是传给程序的环境变量,同样也要以 NULL 结尾
- 最后如果执行成功,那么返回 0
但是当我执行这段程序的时候,并没有返回 0,而是返回了 - 13,搜索了一下 linux 内核关于错误码的定义,发现是 Permission denied ,程序是 root 权限为什么还是会是权限不通过呢?原来是 Android 有 selinux 机制挡掉了这次调用。
暂时关闭 selinux 机制就可以使用了,当然要彻底解决的话,就要通过添加 te 规则来通过 Android 的安全检查。
这里顺便也列一下 kernel 的常见的错误码:
#define EPERM 1 /* Operation not permitted */
#define ENOENT 2 /* No such file or directory */
#define ESRCH 3 /* No such process */
#define EINTR 4 /* Interrupted system call */
#define EIO 5 /* I/O error */
#define ENXIO 6 /* No such device or address */
#define E2BIG 7 /* Argument list too long */
#define ENOEXEC 8 /* Exec format error */
#define EBADF 9 /* Bad file number */
#define ECHILD 10 /* No child processes */
#define EAGAIN 11 /* Try again */
#define ENOMEM 12 /* Out of memory */
#define EACCES 13 /* Permission denied */
#define EFAULT 14 /* Bad address */
#define ENOTBLK 15 /* Block device required */
#define EBUSY 16 /* Device or resource busy */
#define EEXIST 17 /* File exists */
#define EXDEV 18 /* Cross-device link */
#define ENODEV 19 /* No such device */
#define ENOTDIR 20 /* Not a directory */
#define EISDIR 21 /* Is a directory */
#define EINVAL 22 /* Invalid argument */
#define ENFILE 23 /* File table overflow */
#define EMFILE 24 /* Too many open files */
#define ENOTTY 25 /* Not a typewriter */
#define ETXTBSY 26 /* Text file busy */
#define EFBIG 27 /* File too large */
#define ENOSPC 28 /* No space left on device */
#define ESPIPE 29 /* Illegal seek */
#define EROFS 30 /* Read-only file system */
#define EMLINK 31 /* Too many links */
#define EPIPE 32 /* Broken pipe */
#define EDOM 33 /* Math argument out of domain of func */
#define ERANGE 34 /* Math result not representable */
#define EDEADLK 35 /* Resource deadlock would occur */
#define ENAMETOOLONG 36 /* File name too long */
#define ENOLCK 37 /* No record locks available */
#define ENOSYS 38 /* Function not implemented */
#define ENOTEMPTY 39 /* Directory not empty */
#define ELOOP 40 /* Too many symbolic links encountered */
#define EWOULDBLOCK EAGAIN /* Operation would block */
#define ENOMSG 42 /* No message of desired type */
#define EIDRM 43 /* Identifier removed */
#define ECHRNG 44 /* Channel number out of range */
#define EL2NSYNC 45 /* Level 2 not synchronized */
#define EL3HLT 46 /* Level 3 halted */
#define EL3RST 47 /* Level 3 reset */
#define ELNRNG 48 /* Link number out of range */
#define EUNATCH 49 /* Protocol driver not attached */
#define ENOCSI 50 /* No CSI structure available */
#define EL2HLT 51 /* Level 2 halted */
#define EBADE 52 /* Invalid exchange */
#define EBADR 53 /* Invalid request descriptor */
#define EXFULL 54 /* Exchange full */
#define ENOANO 55 /* No anode */
#define EBADRQC 56 /* Invalid request code */
#define EBADSLT 57 /* Invalid slot */
#define EDEADLOCK EDEADLK
#define EBFONT 59 /* Bad font file format */
#define ENOSTR 60 /* Device not a stream */
#define ENODATA 61 /* No data available */
#define ETIME 62 /* Timer expired */
#define ENOSR 63 /* Out of streams resources */
#define ENONET 64 /* Machine is not on the network */
#define ENOPKG 65 /* Package not installed */
#define EREMOTE 66 /* Object is remote */
#define ENOLINK 67 /* Link has been severed */
#define EADV 68 /* Advertise error */
#define ESRMNT 69 /* Srmount error */
#define ECOMM 70 /* Communication error on send */
#define EPROTO 71 /* Protocol error */
#define EMULTIHOP 72 /* Multihop attempted */
#define EDOTDOT 73 /* RFS specific error */
#define EBADMSG 74 /* Not a data message */
#define EOVERFLOW 75 /* Value too large for defined data type */
#define ENOTUNIQ 76 /* Name not unique on network */
#define EBADFD 77 /* File descriptor in bad state */
#define EREMCHG 78 /* Remote address changed */
#define ELIBACC 79 /* Can not access a needed shared library */
#define ELIBBAD 80 /* Accessing a corrupted shared library */
#define ELIBSCN 81 /* .lib section in a.out corrupted */
#define ELIBMAX 82 /* Attempting to link in too many shared libraries */
#define ELIBEXEC 83 /* Cannot exec a shared library directly */
#define EILSEQ 84 /* Illegal byte sequence */
#define ERESTART 85 /* Interrupted system call should be restarted */
#define ESTRPIPE 86 /* Streams pipe error */
#define EUSERS 87 /* Too many users */
#define ENOTSOCK 88 /* Socket operation on non-socket */
#define EDESTADDRREQ 89 /* Destination address required */
#define EMSGSIZE 90 /* Message too long */
#define EPROTOTYPE 91 /* Protocol wrong type for socket */
#define ENOPROTOOPT 92 /* Protocol not available */
#define EPROTONOSUPPORT 93 /* Protocol not supported */
#define ESOCKTNOSUPPORT 94 /* Socket type not supported */
#define EOPNOTSUPP 95 /* Operation not supported on transport endpoint */
#define EPFNOSUPPORT 96 /* Protocol family not supported */
#define EAFNOSUPPORT 97 /* Address family not supported by protocol */
#define EADDRINUSE 98 /* Address already in use */
#define EADDRNOTAVAIL 99 /* Cannot assign requested address */
#define ENETDOWN 100 /* Network is down */
#define ENETUNREACH 101 /* Network is unreachable */
#define ENETRESET 102 /* Network dropped connection because of reset */
#define ECONNABORTED 103 /* Software caused connection abort */
#define ECONNRESET 104 /* Connection reset by peer */
#define ENOBUFS 105 /* No buffer space available */
#define EISCONN 106 /* Transport endpoint is already connected */
#define ENOTCONN 107 /* Transport endpoint is not connected */
#define ESHUTDOWN 108 /* Cannot send after transport endpoint shutdown */
#define ETOOMANYREFS 109 /* Too many references: cannot splice */
#define ETIMEDOUT 110 /* Connection timed out */
#define ECONNREFUSED 111 /* Connection refused */
#define EHOSTDOWN 112 /* Host is down */
#define EHOSTUNREACH 113 /* No route to host */
#define EALREADY 114 /* Operation already in progress */
#define EINPROGRESS 115 /* Operation now in progress */
#define ESTALE 116 /* Stale NFS file handle */
#define EUCLEAN 117 /* Structure needs cleaning */
#define ENOTNAM 118 /* Not a XENIX named type file */
#define ENAVAIL 119 /* No XENIX semaphores available */
#define EISNAM 120 /* Is a named type file */
#define EREMOTEIO 121 /* Remote I/O error */
#define EDQUOT 122 /* Quota exceeded */
#define ENOMEDIUM 123 /* No medium found */
#define EMEDIUMTYPE 124 /* Wrong medium type */
C# .net 环境下使用 rabbitmq 消息队列
消息队列的地位越来越重要,几乎是面试的必问问题了,不会使用几种消息队列都显得尴尬,正好本文使用 C# 来带你认识 rabbitmq 消息队列
首先,我们要安装 rabbitmq,当然,如果有现成的,也可以使用,不知道曾几何时,我喜欢将数据库等等软件安装在 linux 虚拟机,如果没现成的 rabbitmq,按照下面的来吧,嘿嘿
rabbitmq 安装:https://www.cnblogs.com/shanfeng1000/p/11951703.html
如果要实现 rabbitmq 集群,参考:https://www.cnblogs.com/shanfeng1000/p/12097054.html
我这里使用的是 rabbitmq 集群,但是没有比较,只是已经安装好了,就直接使用算了
虚拟机集群地址:192.168.209.133,192.168.209.134,192.168.209.135
端口使用的默认端口,都是 5672,也就是 AMQP 协议端口
Rabbitmq 的工作模式
先说说几个概念
生产者(producer):负责生产消息,可以有多个生产者,可以理解为生成消息的那部分逻辑
消费者(consumer):从队列中获取消息,对消息处理的那部分逻辑
队列(queue):用于存放消息,可以理解为先进先出的一个对象
交换机(exchange):顾名思义,就是个中介的角色,将接收到的消息按不同的规则转发到其他交换机或者队列中
路由(route):就是交换机分发消息的规则,交换机可以指定路由规则,生产者在发布消息时也可以指定消息路由,比如交换机中设置 A 路由表示将消息转发到队列 1,B 路由表示将消息转发到队列 2,那么当交换机接收到消息时,如果消息的路由满足 A 路由,则将消息转发到队列 1,如果满足 B 路由则将消息转发到队列 2
虚拟主机(virtual host):虚拟地址,用于进行逻辑隔离,一个虚拟主机里面可以有若干个 exchange 和 queue,但是里面不能有相同名称的 exchange 或 queue
再看看 rabbitmq 的几种工作模式,具体可参考 rabbitmq 官网给出的 Demo:https://www.rabbitmq.com/getstarted.html
其中,第 6 中类似我们常用的请求 - 响应模式,但是使用的 RPC 请求响应,用的比较少,这里就不过多解释,感兴趣的可以参考官网文档:https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html。
总的来说,就是生产者将消息发布到 rabbitmq 上,然后消费者连接 rabbitmq,获取到消息就消费,但是有几点说明一下
1、rabbitmq 中的消息是可被多次消费的,因为 rabbitmq 提供了 ack 机制,当消费者在消费消息时,如果将自动 ack 设置成 false,那么需要手动提交 ack 才能告诉 rabbitmq 消息已被使用,否则当通道关闭时,消息会继续呆在队列中等待消费
2、当存在多个消费者时,默认情况下,一个消费者获取一个消息,处理完成后再获取下一个,但是 rabbitmq 消费一次性获取多个,当然后当这些消息消费完成后,再获取下一批,这也就是 rabbitmq 的 Qos 机制
C# 使用 rabbitmq
如果感兴趣的人多,到时候再单独开一篇博文,现在就介绍其中的 1-5 种,也可以分类成两种:不使用交换机和使用交换机,所以下面就分这两种来说明
首先,我们创建了两个 Demo 项目:RabbitMQ.PublishConsole 和 RabbitMQ.ConsumeConsole,分别使用使用 nuget 安装 RabbitMQ.Client:
其中 RabbitMQ.PublishConsole 是用来生产消息,RabbitMQ.ConsumeConsole 用来消费消息
这里我们安装的是最新版本,旧版本和新版本在使用上可能会有一些区别
不使用交换机情形
不使用交换机有两种模式:简单模式和工作模式
这里先贴上生产者生成消息的代码,简单模式和工作模式这部分测试代码是一样的:


using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
namespace RabbitMQ.PublishConsole
{
class Program
{
static void Main(string[] args)
{
string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
int port = 5672;
string userName = "admin";
string password = "123456";
string virtualHost = "/";
//创建一个连接工厂
var factory = new ConnectionFactory();
factory.UserName = userName;
factory.Password = password;
factory.Port = port;
factory.VirtualHost = virtualHost;
//创建一个连接,此时可以在rabbitmq后台Web管理页面中的Connections中看到一个连接生成
//一个连接可以创建多个通道
var connection = factory.CreateConnection(hosts);
string queue = "queue1";//队列名称
//创建一个通道
//此时可以在rabbitmq后台Web管理页面中的Channels中看到一个新通道生成
var channel = connection.CreateModel();
//给通道绑定一个队列,队列如果不存在,则会创建新队列,如果队列已存在,那么参数一定要正确,特别是arguments参数,否则会报错
var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
//发布10条消息
for (var i = 0; i < 10; i++)
{
var buffer = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish("", queue, null, buffer);
}
channel.Close();
Console.ReadKey();
}
}
}
上述代码执行完成后,队列 queue1 中就有了 10 条消息,可以在 rabbitmq 的后台管理中看到:
代码中提到,通道在申明队列时,如果队列已经存在,则申明的参数一定要对上,否则会抛出异常:The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text=''PRECONDITION_FAILED - inequivalent arg ''x-queue-type'' for queue ''queue1'' in vhost ''/'': received none but current is the value ''classic'' of type ''longstr'''', classId=50, methodId=10
比如这里,我实现在 rabbitmq 后台创建了队列,那么他们的对应关系如下图:
简单模式
这个模式很简单,其实就是只有一个消费者,简单的保证操作的顺序性
接着贴上消费者代码:


using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQ.ConsumeConsole
{
class Program
{
static void Main(string[] args)
{
string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
int port = 5672;
string userName = "admin";
string password = "123456";
string virtualHost = "/";
//创建一个连接工厂
var factory = new ConnectionFactory();
factory.UserName = userName;
factory.Password = password;
factory.Port = port;
factory.VirtualHost = virtualHost;
//创建一个连接,此时可以在rabbitmq后台Web管理页面中的Connections中看到一个连接生成
//一个连接可以创建多个通道
var connection = factory.CreateConnection(hosts);
string queue = "queue1";//队列名称
//创建一个通道
//此时可以在rabbitmq后台Web管理页面中的Channels中看到一个新通道生成
var channel = connection.CreateModel();
//给通道绑定一个队列,队列如果不存在,则会创建新队列,如果队列已存在,那么参数一定要正确,特别是arguments参数,否则会报错
var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
//channel.BasicQos(2, 2, false);//设置QOS
//在通道中定义一个事件消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
string message = Encoding.UTF8.GetString(e.Body);
Console.WriteLine($"接收到消息:{message}");
Thread.Sleep(500);//暂停一下
//通知消息已被处理,如果没有,那么消息将会被重复消费
channel.BasicAck(e.DeliveryTag, false);
};
//ack设置成false,表示不自动提交,那么就需要在消息被消费后,手动调用BasicAck去提交消息
channel.BasicConsume(queue, false, consumer);
Console.ReadKey();
}
}
}
上述代码执行完成后,在后台管理中可以看到消息被消费掉了
工作模式
工作模式是简单模式的拓展,如果业务简单,对消息的消费是一个耗时的过程,这个模式是一个好的选择。
接着调用生产者代码生产 10 条消息,下面是消费者的测试代码


using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQ.ConsumeConsole
{
class Program
{
static void Main(string[] args)
{
string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
int port = 5672;
string userName = "admin";
string password = "123456";
string virtualHost = "/";
//创建一个连接工厂
var factory = new ConnectionFactory();
factory.UserName = userName;
factory.Password = password;
factory.Port = port;
factory.VirtualHost = virtualHost;
//创建一个连接,此时可以在rabbitmq后台Web管理页面中的Connections中看到一个连接生成
//一个连接可以创建多个通道
var connection = factory.CreateConnection(hosts);
Consumer(connection, 1);//消费者1
Consumer(connection, 2);//消费者2
Console.ReadKey();
}
static void Consumer(IConnection connection, ushort prefetch)
{
//使用多线程来执行,可以模拟多个消费者
new Thread(() =>
{
int threadId = Thread.CurrentThread.ManagedThreadId;//线程Id,用于区分消费者
string queue = "queue1";//队列名称
//创建一个通道
//此时可以在rabbitmq后台Web管理页面中的Channels中看到一个新通道生成
var channel = connection.CreateModel();
//给通道绑定一个队列,队列如果不存在,则会创建新队列,如果队列已存在,那么参数一定要正确,特别是arguments参数,否则会报错
var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
//设置消费者每次获取的消息数,可以用来设置消费者消息的权重
//必须等获取的消息都消费完成后才能重新获取
channel.BasicQos(0, prefetch, true);
//在通道中定义一个事件消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
string message = Encoding.UTF8.GetString(e.Body);
Console.WriteLine($"ThreadId:【{threadId}】 接收到消息:{message}");
Thread.Sleep(500);
//通知消息已被处理,如果没有,那么消息将会被重复消费
channel.BasicAck(e.DeliveryTag, false);
};
//ack设置成false,表示不自动提交,那么就需要在消息被消费后,手动调用BasicAck去提交消息
channel.BasicConsume(queue, false, consumer);
}).Start();
}
}
}
另外说明一下,代码中提到 rabbitmq 的 QOS 机制,这里简单解释一下,当生产者将消息发布到 rabbitmq 之后,如果在未配置 QOS 的情况下,rabbitmq 尽可能快速地发送队列中的所有消息到消费者端,如果消息比较多,消费者来不及处理,就会缓存这些消息,当消息堆积过多,可能导致服务器内存不足而影响其他进程,rabbitmq 的 QOS 可以很好的解决这类问题,QOS 就是限制消费者一次性从 rabbitmq 中获取消息的个数,而不是获取所有消息。比如设置 rabbitmq 的 QOS 为 10,也就是 prefetch=10,就是说,哪怕 rabbitmq 中有 100 条消息,消费者也只是一次性获取 10 条,然后消费者消费这 10 条消息,剩下的交给其他消费者,当 10 条消息中的 unacked 个数少于 prefetch * 消费者数目时,会继续从 rabbitmq 获取消息,如果在工作模式中,不使用 QOS,你会发现,所有的消息都被一个消费者消费了
使用交换机情形
使用交换机的情形有 3 种:发布订阅模式,路由模式,主题模式
上面说了,交换机是一个中介的角色,当一个交换机创建后,可以将其他队列或者交换机与当前交换机绑定,绑定时需要指定绑定路由规则,这个和交换机类型有关。
当我们不使用交换机时,那么生产者是直接将消息发布到队列中去的,生产者只需要指定消息接收的队列即可,而使用交换机做中转时,生产者只需要将消息发布到交换机,然后交换机根据接收到的消息,按与交换机绑定的路由规则,将消息转发到其他交换机或者队列中,这个处理过程和交换机的类型有关,交换机一般分为 4 类:
direct:直连类型,就是将消息的路由和交换机的绑定路由作比较,当两者一致时,则匹配成功,然后消息就会被转发到这个绑定路由后的队列或者交换机
fanout:这种类型的交换机是不需要指定路由的,当交换机接收到消息时,会将消息广播到所有绑定到它的所有队列或交换机中
topic:主题类型,类似 direct 类型,只不过在将消息的路由和绑定路由做比较时,是通过特定表达式去比较的,其中# 匹配一个或多个,* 匹配一个
headers:头部交换机,允许使用消息头中的信息来做匹配规则,这个用的少,基本上不用,这里也就不过多介绍了
到这里,你应该发觉,使用交换机的三种情形,无非就是使用交换机的类型不一样,发布订阅模式 --direct,路由模式 --fanout,主题模式 --topic
现在我们先去 rabbitmq 的后台中,创建这几种交换机:
交换机的创建及绑定都可以在代码中实现,如 IModel 类的 QueueBind,ExchangeBind 等方法,用多了就自然熟了,这里为了方便截图,就到后台去创建了
然后我们创建两个队列,并按指定类型分别绑定到这 3 个交换机中:
队列:
demo.direct 绑定队列规则:
demo.fanout 绑定队列规则:
demo.topic 绑定队列规则:
上面所描述的,无非就是三种模式中发布消息方式的不一样,消费者当然还是从队列获取消息消费的,这里我们就先贴出消费者的代码:


using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQ.ConsumeConsole
{
class Program
{
static void Main(string[] args)
{
string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
int port = 5672;
string userName = "admin";
string password = "123456";
string virtualHost = "/";
//创建一个连接工厂
var factory = new ConnectionFactory();
factory.UserName = userName;
factory.Password = password;
factory.Port = port;
factory.VirtualHost = virtualHost;
//创建一个连接,此时可以在rabbitmq后台Web管理页面中的Connections中看到一个连接生成
//一个连接可以创建多个通道
var connection = factory.CreateConnection(hosts);
Consumer(connection, "queue1");//消费者1
Consumer(connection, "queue2");//消费者2
Console.ReadKey();
}
static void Consumer(IConnection connection, string queue)
{
//使用多线程来执行,可以模拟多个消费者
new Thread(() =>
{
int threadId = Thread.CurrentThread.ManagedThreadId;//线程Id,用于区分消费者
//创建一个通道
//此时可以在rabbitmq后台Web管理页面中的Channels中看到一个新通道生成
var channel = connection.CreateModel();
//给通道绑定一个队列,队列如果不存在,则会创建新队列,如果队列已存在,那么参数一定要正确,特别是arguments参数,否则会报错
var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
//在通道中定义一个事件消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
string message = Encoding.UTF8.GetString(e.Body);
Console.WriteLine($"ThreadId:【{threadId}】 接收到消息:{message}");
Thread.Sleep(500);
//通知消息已被处理,如果没有,那么消息将会被重复消费
channel.BasicAck(e.DeliveryTag, false);
};
//ack设置成false,表示不自动提交,那么就需要在消息被消费后,手动调用BasicAck去提交消息
channel.BasicConsume(queue, false, consumer);
}).Start();
}
}
}
这里我们使用了两个队列,每个队列我们这里只用了一个消费者,对于下面几种模式,这个消费者代码都能消费到
发布订阅模式
发布订阅模式使用的是 fanout 类型的交换机,这个类型无需指定路由,交换机会将消息广播到每个绑定到交换机的队列或者交换机


using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
namespace RabbitMQ.PublishConsole
{
class Program
{
static void Main(string[] args)
{
string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
int port = 5672;
string userName = "admin";
string password = "123456";
string virtualHost = "/";
//创建一个连接工厂
var factory = new ConnectionFactory();
factory.UserName = userName;
factory.Password = password;
factory.Port = port;
factory.VirtualHost = virtualHost;
//创建一个连接,此时可以在rabbitmq后台Web管理页面中的Connections中看到一个连接生成
//一个连接可以创建多个通道
var connection = factory.CreateConnection(hosts);
string exchange = "demo.fanout";//交换机名称
string exchangeType = "fanout";//交换机类型
//创建一个通道
//此时可以在rabbitmq后台Web管理页面中的Channels中看到一个新通道生成
var channel = connection.CreateModel();
//给通道绑定一个交换机,交换机如果不存在,则会创建新交换机,如果交换机已存在,那么参数一定要正确,特别是arguments参数,各参数类似队列
var arguments = new Dictionary<string, object>() { };
channel.ExchangeDeclare(exchange: exchange, type: exchangeType, durable: true, autoDelete: false, arguments: arguments);
//发布10条消息
for (var i = 0; i < 10; i++)
{
var buffer = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish(exchange, "", null, buffer);
}
channel.Close();
Console.ReadKey();
}
}
}
代码中,我们往交换机发布了 10 条消息,交换机接收到消息后,会将消息转发到 queue1 和 queue2,因此,queue1 和 queue2 都会收到 10 条消息:
路由模式
路由模式使用的是 direct 类型的交换机,也即在进行路由匹配时,需要匹配的路由一直才算匹配成功,我们把发布订阅模式的代码稍作修改即可,贴出生产者部分代码:


using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
namespace RabbitMQ.PublishConsole
{
class Program
{
static void Main(string[] args)
{
string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
int port = 5672;
string userName = "admin";
string password = "123456";
string virtualHost = "/";
//创建一个连接工厂
var factory = new ConnectionFactory();
factory.UserName = userName;
factory.Password = password;
factory.Port = port;
factory.VirtualHost = virtualHost;
//创建一个连接,此时可以在rabbitmq后台Web管理页面中的Connections中看到一个连接生成
//一个连接可以创建多个通道
var connection = factory.CreateConnection(hosts);
string exchange = "demo.direct";//交换机名称
string exchangeType = "direct";//交换机类型
//创建一个通道
//此时可以在rabbitmq后台Web管理页面中的Channels中看到一个新通道生成
var channel = connection.CreateModel();
//给通道绑定一个交换机,交换机如果不存在,则会创建新交换机,如果交换机已存在,那么参数一定要正确,特别是arguments参数,各参数类似队列
var arguments = new Dictionary<string, object>() { };
channel.ExchangeDeclare(exchange: exchange, type: exchangeType, durable: true, autoDelete: false, arguments: arguments);
string[] routes = new string[] { "apple", "banana" };
//发布10条消息
for (var i = 0; i < 10; i++)
{
var buffer = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish(exchange, routes[i % 2], null, buffer);
}
channel.Close();
Console.ReadKey();
}
}
}
代码中,我们往 demo.direct 交换机发布了 10 条消息,其中 5 条消息的路由是 apple,另外 5 条消息的路由是 banana,demo.direct 交换机绑定的两个队列中,queue1 的绑定路由是 apple,queue2 的绑定路由是 banana,那么 demo.direct 交换机会将路由是 apple 的消息转发到 queue1,将路由是 banana 的消息转发到 queue2,从后台可以看每个队列中已经有 5 个消息准备好了:
接下来可以使用消费者将它们消费掉
主题模式
主题模式使用的 topic 类型的交换机,在进行匹配时,是根据表达式去匹配,# 匹配一个或多个,* 匹配一个,我们将路由模式的代码稍作修改:


using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
namespace RabbitMQ.PublishConsole
{
class Program
{
static void Main(string[] args)
{
string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
int port = 5672;
string userName = "admin";
string password = "123456";
string virtualHost = "/";
//创建一个连接工厂
var factory = new ConnectionFactory();
factory.UserName = userName;
factory.Password = password;
factory.Port = port;
factory.VirtualHost = virtualHost;
//创建一个连接,此时可以在rabbitmq后台Web管理页面中的Connections中看到一个连接生成
//一个连接可以创建多个通道
var connection = factory.CreateConnection(hosts);
string exchange = "demo.topic";//交换机名称
string exchangeType = "topic";//交换机类型
//创建一个通道
//此时可以在rabbitmq后台Web管理页面中的Channels中看到一个新通道生成
var channel = connection.CreateModel();
//给通道绑定一个交换机,交换机如果不存在,则会创建新交换机,如果交换机已存在,那么参数一定要正确,特别是arguments参数,各参数类似队列
var arguments = new Dictionary<string, object>() { };
channel.ExchangeDeclare(exchange: exchange, type: exchangeType, durable: true, autoDelete: false, arguments: arguments);
string[] routes = new string[] { "apple.", "banana." };
//发布10条消息
for (var i = 0; i < 10; i++)
{
var buffer = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish(exchange, routes[i % 2] + i, null, buffer);
}
channel.Close();
Console.ReadKey();
}
}
}
代码中,我们往 demo.topic 交换机中发布了 10 条消息,其中 5 条消息的路由是以 apple 开头的,另外 5 条消息的路由是以 banana 开头的,demo.direct 交换机绑定的两个队列中,queue1 的绑定路由是 apple.#,就是匹配以 apple 开头的路由,queue2 的绑定路由是 banana.#,就是匹配以 banana 开头的路由,那么 demo.direct 交换机会将路由是以 apple 开头的的消息转发到 queue1,将路由是以 banana 开头的的消息转发到 queue2,从后台可以看每个队列中已经有 5 个消息准备好了:
封装
其实 rabbitmq 的使用还是比较简单的,只需要多谢谢代码尝试一下就能熟悉
一般的,像这种第三方插件的调用,我建议自己要做一层封装,最好是根据自己的需求去封装,然后项目中只需要调用自己封装的类就行了,下面贴出我自己封装的类:


using System;
using System.Collections.Generic;
using System.Text;
namespace RabbitMQ.ConsoleApp
{
public class QueueOptions
{
/// <summary>
/// 是否持久化
/// </summary>
public bool Durable { get; set; } = true;
/// <summary>
/// 是否自动删除
/// </summary>
public bool AutoDelete { get; set; } = false;
/// <summary>
/// 参数
/// </summary>
public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>();
}
public class ConsumeQueueOptions : QueueOptions
{
/// <summary>
/// 是否自动提交
/// </summary>
public bool AutoAck { get; set; } = false;
/// <summary>
/// 每次发送消息条数
/// </summary>
public ushort? FetchCount { get; set; }
}
public class ExchangeConsumeQueueOptions : ConsumeQueueOptions
{
/// <summary>
/// 路由值
/// </summary>
public string[] RoutingKeys { get; set; }
/// <summary>
/// 参数
/// </summary>
public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>();
}
public class ExchangeQueueOptions : QueueOptions
{
/// <summary>
/// 交换机类型
/// </summary>
public string Type { get; set; }
/// <summary>
/// 队列及路由值
/// </summary>
public (string,string)[] QueueAndRoutingKey { get; set; }
/// <summary>
/// 参数
/// </summary>
public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>();
}
}


using System;
using System.Collections.Generic;
using System.Text;
namespace RabbitMQ.ConsoleApp
{
public static class RabbitMQExchangeType
{
/// <summary>
/// 普通模式
/// </summary>
public const string Common = "";
/// <summary>
/// 路由模式
/// </summary>
public const string Direct = "direct";
/// <summary>
/// 发布/订阅模式
/// </summary>
public const string Fanout = "fanout";
/// <summary>
/// 匹配订阅模式
/// </summary>
public const string Topic = "topic";
}
}


using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace RabbitMQ.ConsoleApp
{
public abstract class RabbitBase : IDisposable
{
List<AmqpTcpEndpoint> amqpList;
IConnection connection;
protected RabbitBase(params string[] hosts)
{
if (hosts == null || hosts.Length == 0)
{
throw new ArgumentException("invalid hosts!", nameof(hosts));
}
this.amqpList = new List<AmqpTcpEndpoint>();
this.amqpList.AddRange(hosts.Select(host => new AmqpTcpEndpoint(host, Port)));
}
protected RabbitBase(params (string, int)[] hostAndPorts)
{
if (hostAndPorts == null || hostAndPorts.Length == 0)
{
throw new ArgumentException("invalid hosts!", nameof(hostAndPorts));
}
this.amqpList = new List<AmqpTcpEndpoint>();
this.amqpList.AddRange(hostAndPorts.Select(tuple => new AmqpTcpEndpoint(tuple.Item1, tuple.Item2)));
}
/// <summary>
/// 端口
/// </summary>
public int Port { get; set; } = 5672;
/// <summary>
/// 账号
/// </summary>
public string UserName { get; set; } = ConnectionFactory.DefaultUser;
/// <summary>
/// 密码
/// </summary>
public string Password { get; set; } = ConnectionFactory.DefaultPass;
/// <summary>
/// 虚拟机
/// </summary>
public string VirtualHost { get; set; } = ConnectionFactory.DefaultVHost;
/// <summary>
/// 释放
/// </summary>
public virtual void Dispose()
{
//connection?.Close();
//connection?.Dispose();
}
/// <summary>
/// 关闭连接
/// </summary>
public void Close()
{
connection?.Close();
connection?.Dispose();
}
#region Private
/// <summary>
/// 获取rabbitmq的连接
/// </summary>
/// <returns></returns>
protected IModel GetChannel()
{
if (connection == null)
{
lock (this)
{
if (connection == null)
{
var factory = new ConnectionFactory();
factory.Port = Port;
factory.UserName = UserName;
factory.VirtualHost = VirtualHost;
factory.Password = Password;
connection = factory.CreateConnection(this.amqpList);
}
}
}
return connection.CreateModel();
}
#endregion
}
}


using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace RabbitMQ.ConsoleApp
{
public class RabbitMQProducer : RabbitBase
{
public RabbitMQProducer(params string[] hosts) : base(hosts)
{
}
public RabbitMQProducer(params (string,int)[] hostAndPorts) : base(hostAndPorts)
{
}
#region 普通模式、Work模式
/// <summary>
/// 发布消息
/// </summary>
/// <param name="queue"></param>
/// <param name="message"></param>
/// <param name="options"></param>
public void Publish(string queue, string message, QueueOptions options = null)
{
options = options ?? new QueueOptions();
var channel = GetChannel();
channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
var buffer = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", queue, null, buffer);
channel.Close();
}
/// <summary>
/// 发布消息
/// </summary>
/// <param name="queue"></param>
/// <param name="message"></param>
/// <param name="configure"></param>
public void Publish(string queue, string message, Action<QueueOptions> configure)
{
QueueOptions options = new QueueOptions();
configure?.Invoke(options);
Publish(queue, message, options);
}
#endregion
#region 订阅模式、路由模式、Topic模式
/// <summary>
/// 发布消息
/// </summary>
/// <param name="exchange"></param>
/// <param name="routingKey"></param>
/// <param name="message"></param>
/// <param name="options"></param>
public void Publish(string exchange, string routingKey, string message, ExchangeQueueOptions options = null)
{
options = options ?? new ExchangeQueueOptions();
var channel = GetChannel();
channel.ExchangeDeclare(exchange, string.IsNullOrEmpty(options.Type) ? RabbitMQExchangeType.Fanout : options.Type, options.Durable, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
if (options.QueueAndRoutingKey != null)
{
foreach (var t in options.QueueAndRoutingKey)
{
if (!string.IsNullOrEmpty(t.Item1))
{
channel.QueueBind(t.Item1, exchange, t.Item2 ?? "", options.BindArguments ?? new Dictionary<string, object>());
}
}
}
var buffer = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange, routingKey, null, buffer);
channel.Close();
}
/// <summary>
/// 发布消息
/// </summary>
/// <param name="exchange"></param>
/// <param name="routingKey"></param>
/// <param name="message"></param>
/// <param name="configure"></param>
public void Publish(string exchange, string routingKey, string message, Action<ExchangeQueueOptions> configure)
{
ExchangeQueueOptions options = new ExchangeQueueOptions();
configure?.Invoke(options);
Publish(exchange, routingKey, message, options);
}
#endregion
}
}


using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQ.ConsoleApp
{
public class RabbitMQConsumer : RabbitBase
{
public RabbitMQConsumer(params string[] hosts) : base(hosts)
{
}
public RabbitMQConsumer(params (string, int)[] hostAndPorts) : base(hostAndPorts)
{
}
public event Action<RecieveResult> Received;
/// <summary>
/// 构造消费者
/// </summary>
/// <param name="channel"></param>
/// <param name="options"></param>
/// <returns></returns>
private IBasicConsumer ConsumeInternal(IModel channel, ConsumeQueueOptions options)
{
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
if (!options.AutoAck)
{
cancellationTokenSource.Token.Register(() =>
{
channel.BasicAck(e.DeliveryTag, false);
});
}
Received?.Invoke(new RecieveResult(e, cancellationTokenSource));
}
catch { }
};
if (options.FetchCount != null)
{
channel.BasicQos(0, options.FetchCount.Value, false);
}
return consumer;
}
#region 普通模式、Work模式
/// <summary>
/// 消费消息
/// </summary>
/// <param name="queue"></param>
/// <param name="options"></param>
public ListenResult Listen(string queue, ConsumeQueueOptions options = null)
{
options = options ?? new ConsumeQueueOptions();
var channel = GetChannel();
channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
var consumer = ConsumeInternal(channel, options);
channel.BasicConsume(queue, options.AutoAck, consumer);
ListenResult result = new ListenResult();
result.Token.Register(() =>
{
try
{
channel.Close();
channel.Dispose();
}
catch { }
});
return result;
}
/// <summary>
/// 消费消息
/// </summary>
/// <param name="queue"></param>
/// <param name="configure"></param>
public ListenResult Listen(string queue, Action<ConsumeQueueOptions> configure)
{
ConsumeQueueOptions options = new ConsumeQueueOptions();
configure?.Invoke(options);
return Listen(queue, options);
}
#endregion
#region 订阅模式、路由模式、Topic模式
/// <summary>
/// 消费消息
/// </summary>
/// <param name="exchange"></param>
/// <param name="queue"></param>
/// <param name="options"></param>
public ListenResult Listen(string exchange, string queue, ExchangeConsumeQueueOptions options = null)
{
options = options ?? new ExchangeConsumeQueueOptions();
var channel = GetChannel();
channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
if (options.RoutingKeys != null && !string.IsNullOrEmpty(exchange))
{
foreach (var key in options.RoutingKeys)
{
channel.QueueBind(queue, exchange, key, options.BindArguments);
}
}
var consumer = ConsumeInternal(channel, options);
channel.BasicConsume(queue, options.AutoAck, consumer);
ListenResult result = new ListenResult();
result.Token.Register(() =>
{
try
{
channel.Close();
channel.Dispose();
}
catch { }
});
return result;
}
/// <summary>
/// 消费消息
/// </summary>
/// <param name="exchange"></param>
/// <param name="queue"></param>
/// <param name="configure"></param>
public ListenResult Listen(string exchange, string queue, Action<ExchangeConsumeQueueOptions> configure)
{
ExchangeConsumeQueueOptions options = new ExchangeConsumeQueueOptions();
configure?.Invoke(options);
return Listen(exchange, queue, options);
}
#endregion
}
public class RecieveResult
{
CancellationTokenSource cancellationTokenSource;
public RecieveResult(BasicDeliverEventArgs arg, CancellationTokenSource cancellationTokenSource)
{
this.Body = Encoding.UTF8.GetString(arg.Body);
this.ConsumerTag = arg.ConsumerTag;
this.DeliveryTag = arg.DeliveryTag;
this.Exchange = arg.Exchange;
this.Redelivered = arg.Redelivered;
this.RoutingKey = arg.RoutingKey;
this.cancellationTokenSource = cancellationTokenSource;
}
/// <summary>
/// 消息体
/// </summary>
public string Body { get; private set; }
/// <summary>
/// 消费者标签
/// </summary>
public string ConsumerTag { get; private set; }
/// <summary>
/// Ack标签
/// </summary>
public ulong DeliveryTag { get; private set; }
/// <summary>
/// 交换机
/// </summary>
public string Exchange { get; private set; }
/// <summary>
/// 是否Ack
/// </summary>
public bool Redelivered { get; private set; }
/// <summary>
/// 路由
/// </summary>
public string RoutingKey { get; private set; }
public void Commit()
{
if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return;
cancellationTokenSource.Cancel();
cancellationTokenSource.Dispose();
cancellationTokenSource = null;
}
}
public class ListenResult
{
CancellationTokenSource cancellationTokenSource;
/// <summary>
/// CancellationToken
/// </summary>
public CancellationToken Token { get { return cancellationTokenSource.Token; } }
/// <summary>
/// 是否已停止
/// </summary>
public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } }
public ListenResult()
{
cancellationTokenSource = new CancellationTokenSource();
}
/// <summary>
/// 停止监听
/// </summary>
public void Stop()
{
cancellationTokenSource.Cancel();
}
}
}
测试 Demo


using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQ.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
int port = 5672;
string userName = "admin";
string password = "123456";
string virtualHost = "/";
string queue = "queue1";
var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
//消费者
new Thread(() =>
{
using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
{
consumer.UserName = userName;
consumer.Password = password;
consumer.Port = port;
consumer.VirtualHost = virtualHost;
consumer.Received += result =>
{
Console.WriteLine($"接收到数据:{result.Body}");
result.Commit();//提交
};
consumer.Listen(queue, options =>
{
options.AutoAck = false;
options.Arguments = arguments;
});
}
}).Start();
//消息生产
using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
{
producer.UserName = userName;
producer.Password = password;
producer.Port = port;
producer.VirtualHost = virtualHost;
string message = "";
do
{
message = Console.ReadLine();
if (string.IsNullOrEmpty(message))
{
break;
}
producer.Publish(queue, message, options => { options.Arguments = arguments; });
} while (true);
}
}
}
}


using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQ.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
int port = 5672;
string userName = "admin";
string password = "123456";
string virtualHost = "/";
string queue = "queue1";
var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
//消费者1
new Thread(() =>
{
using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
{
consumer.UserName = userName;
consumer.Password = password;
consumer.Port = port;
consumer.VirtualHost = virtualHost;
consumer.Received += result =>
{
Console.WriteLine($"消费者1接收到数据:{result.Body}");
result.Commit();//提交
};
consumer.Listen(queue, options =>
{
options.AutoAck = false;
options.Arguments = arguments;
options.FetchCount = 1;
});
}
}).Start();
//消费者2
new Thread(() =>
{
using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
{
consumer.UserName = userName;
consumer.Password = password;
consumer.Port = port;
consumer.VirtualHost = virtualHost;
consumer.Received += result =>
{
Console.WriteLine($"消费者2接收到数据:{result.Body}");
result.Commit();//提交
};
consumer.Listen(queue, options =>
{
options.AutoAck = false;
options.Arguments = arguments;
options.FetchCount = 2;
});
}
}).Start();
//消息生产
using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
{
producer.UserName = userName;
producer.Password = password;
producer.Port = port;
producer.VirtualHost = virtualHost;
string message = "";
do
{
message = Console.ReadLine();
if (string.IsNullOrEmpty(message))
{
break;
}
producer.Publish(queue, message, options => { options.Arguments = arguments; });
} while (true);
}
}
}
}


using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQ.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
int port = 5672;
string userName = "admin";
string password = "123456";
string virtualHost = "/";
string queue1 = "queue1";
string queue2 = "queue2";
string exchange = "demo.fanout";
string exchangeType = RabbitMQExchangeType.Fanout;
var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
//消费者1
new Thread(() =>
{
using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
{
consumer.UserName = userName;
consumer.Password = password;
consumer.Port = port;
consumer.VirtualHost = virtualHost;
consumer.Received += result =>
{
Console.WriteLine($"消费者1接收到数据:{result.Body}");
result.Commit();//提交
};
consumer.Listen(queue1, options =>
{
options.AutoAck = false;
options.Arguments = arguments;
});
}
}).Start();
//消费者2
new Thread(() =>
{
using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
{
consumer.UserName = userName;
consumer.Password = password;
consumer.Port = port;
consumer.VirtualHost = virtualHost;
consumer.Received += result =>
{
Console.WriteLine($"消费者2接收到数据:{result.Body}");
result.Commit();//提交
};
consumer.Listen(queue2, options =>
{
options.AutoAck = false;
options.Arguments = arguments;
});
}
}).Start();
//消息生产
using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
{
producer.UserName = userName;
producer.Password = password;
producer.Port = port;
producer.VirtualHost = virtualHost;
string message = "";
do
{
message = Console.ReadLine();
if (string.IsNullOrEmpty(message))
{
break;
}
producer.Publish(exchange, "", message, options => { options.Type = exchangeType; });
} while (true);
}
}
}
}


using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQ.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
int port = 5672;
string userName = "admin";
string password = "123456";
string virtualHost = "/";
string queue1 = "queue1";
string queue2 = "queue2";
string exchange = "demo.direct";
string exchangeType = RabbitMQExchangeType.Direct;
var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
//消费者1
new Thread(() =>
{
using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
{
consumer.UserName = userName;
consumer.Password = password;
consumer.Port = port;
consumer.VirtualHost = virtualHost;
consumer.Received += result =>
{
Console.WriteLine($"消费者1接收到数据:{result.Body}");
result.Commit();//提交
};
consumer.Listen(queue1, options =>
{
options.AutoAck = false;
options.Arguments = arguments;
});
}
}).Start();
//消费者2
new Thread(() =>
{
using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
{
consumer.UserName = userName;
consumer.Password = password;
consumer.Port = port;
consumer.VirtualHost = virtualHost;
consumer.Received += result =>
{
Console.WriteLine($"消费者2接收到数据:{result.Body}");
result.Commit();//提交
};
consumer.Listen(queue2, options =>
{
options.AutoAck = false;
options.Arguments = arguments;
});
}
}).Start();
//消息生产
using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
{
producer.UserName = userName;
producer.Password = password;
producer.Port = port;
producer.VirtualHost = virtualHost;
string message = "";
int index = 1;
string[] routes = new string[] { "apple", "banana" };
do
{
message = Console.ReadLine();
if (string.IsNullOrEmpty(message))
{
break;
}
var route = routes[index++ % 2];
producer.Publish(exchange, route, message, options => { options.Type = exchangeType; });
} while (true);
}
}
}
}


using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQ.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
int port = 5672;
string userName = "admin";
string password = "123456";
string virtualHost = "/";
string queue1 = "queue1";
string queue2 = "queue2";
string exchange = "demo.topic";
string exchangeType = RabbitMQExchangeType.Topic;
var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
//消费者1
new Thread(() =>
{
using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
{
consumer.UserName = userName;
consumer.Password = password;
consumer.Port = port;
consumer.VirtualHost = virtualHost;
consumer.Received += result =>
{
Console.WriteLine($"消费者1接收到数据:{result.Body}");
result.Commit();//提交
};
consumer.Listen(queue1, options =>
{
options.AutoAck = false;
options.Arguments = arguments;
});
}
}).Start();
//消费者2
new Thread(() =>
{
using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
{
consumer.UserName = userName;
consumer.Password = password;
consumer.Port = port;
consumer.VirtualHost = virtualHost;
consumer.Received += result =>
{
Console.WriteLine($"消费者2接收到数据:{result.Body}");
result.Commit();//提交
};
consumer.Listen(queue2, options =>
{
options.AutoAck = false;
options.Arguments = arguments;
});
}
}).Start();
//消息生产
using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
{
producer.UserName = userName;
producer.Password = password;
producer.Port = port;
producer.VirtualHost = virtualHost;
string message = "";
int index = 1;
string[] routes = new string[] { "apple.", "banana." };
do
{
message = Console.ReadLine();
if (string.IsNullOrEmpty(message))
{
break;
}
var route = routes[index % 2] + index++;
producer.Publish(exchange, route, message, options => { options.Type = exchangeType; });
} while (true);
}
}
}
}
上面是我自己做的封装,因为 RabbitMQ.Client 功能齐全,但是使用比较麻烦,需要编写的代码多一些,推荐一下第三方对 rabbitmq 的封装插件:EasyNetQ,它是建立在 RabbitMQ.Client 上的,多数时候可以直接通过 EasyNetQ 就可以完成消息发布与消费,感兴趣的可以了解一下
docker 环境下使用 gitlab,gitlab-runner 为 NetCore 持续集成
环境
Centos7.6 安装应用 docker,docker-compose (我的 Centos 是用 Hyper-V 跑的分了 8G 的内存,阿里云 2G 根本跑不起来 gitlab)
为了保证我的 Centos 环境干净所以我的 gitlab 与 gitlab-runner 都是采用 docker 服务运行,包括后续的 runner 的工作形式(executor)也是选的 docker。
准备工作:
拉取镜像:这步骤耗时挺长的,耐心等待吧(如果这个镜像没有了,你可以去 hub.docker.com 搜一下对应的镜像)
docker pull gitlab/gitlab-ce:latest
docker pull gitlab/gitlab-runner:latest
docker pull docker:stable
docker pull mcr.microsoft.com/dotnet/core/sdk
创建 gitlab 与 gitlabruner 服务
新建文件:docker-compose.yml ,
在 Centos 服务器上创建 docker-compose.yml 文件并运行
docker-compose run -d
复制代码
gitlab:
image: ''gitlab/gitlab-ce:latest''
restart: always
hostname: ''192.168.2.2''
environment:
GITLAB_OMNIBUS_CONFIG: |
external_url ''http://hts92.wicp.vip:8989''# 这里需要更换成你的固定 ip 或局域网 IP 地址(我个人做法是用的动态域名。做的端口映射,如果你是内网做 demo 无所谓)
ports:
- ''8989:8989''
volumes:
- ''/srv/gitlab/config:/etc/gitlab''
- ''/srv/gitlab/logs:/var/log/gitlab''
- ''/srv/gitlab/data:/var/opt/gitlab''
gitlab-runner:
image: ''gitlab/gitlab-runner:latest''
container_name: ''gitlab-runner''
restart: ''always''
volumes:
- ''/srv/gitlab-runner/confg:/etc/gitlab-runner''
- ''/var/run/docker.sock:/var/run/docker.sock''
复制代码
到此 gitlab 与 gitlab-runner 已经搭建好了。(第一次登陆时需要你设置 root 用户密码这里我就不截图了,因为我已经设置完了。)
接下来进入 gitlab 新建个项目。项目名随意,
进入刚建好的项目
在开发机新建 webapi 项目:
dotnet new webapi -n user.api --no-https
添加镜像检测脚本
添加镜像检测删除脚本到项目根目录(后续 ci 构建脚本会用到,每次从新编译 docker file 时 会帮你删除掉之前的实例跟镜像):保存为 check-images.sh 放到项目根目录
if [ $(docker ps -a --format {{.Names}} | grep user-api) ]
then
docker rm -f user-api
docker rmi user-api
fi
创建 .gitlab-ci.yml 文件 放到项目根目录
复制代码
stages:
- build
- deploy
# 构建
build-job:
stage: build
only:
- master
cache:
untracked: true
script:
- dotnet restore
- dotnet publish -www.renheeyuLe.com o ./out -c Release
artifacts:
# 可以缓存在 gitlab 的流水线记录中,供直接下载
expire_in: 30 days
paths:
- out/
tags:
- 01-user-api-builder
# 发布正式
deploy-job:
stage: deploy
only:
- master
dependencies:
- build-job # 这里一定要依赖 build-job,不然 dockerfile 里面的 out 目录无法使用
script:
- ls out/
- docker ps
- sh ./check-images.sh
- docker build -t user-api .
# 这里可以添加将生成好的 image 上传到 dockerhub 或者 docker 本地仓库
### 如果生成的镜像需要统一上传到仓库管理,则后面的逻辑可以分离到另外一个 runner 去执行
# 这里可以添加从 dockerhub 或本地仓库拉取指定镜像
- docker run -d -www.chaoyul.com -name user-api http://kunlunyule.com/-p 8080:80 user-api
tags:
- 01-user-api-deploy
复制代码
创建 Dockerfile 文件
创建 Dockerfile 文件 放到项目根目录 (这里值得注意的是 mcr.microsoft.com/dotnet/core/sdk 镜像名,要跟我们准备环境时候的镜像名保持一致,要不然 build 时还需要在拉取 浪费时间,当然你可可以换成 runtime 环境的。好处就是编译镜像小,用我这个编译镜像大)
FROM mcr.microsoft.com/dotnet/core/sdk
WORKDIR /app
COPY out/ /app
ENTRYPOINT [ "dotnet", www.chengmingdl.com"/app/user.api.dll" ]
以上内容一同传至 gitlab 刚建好的项目
gitlab 项目目录结构如下
注册 runner,
找到 rnner 信息
注册第一个 runner
记得替换吊对应信息。(--url,--registration-toke)
复制代码
docker exec -it gitlab-runner gitlab-runner register -n \
--url http://hts92.wicp.vip:8989/ \
--registration-token QJiAZYz3KSJyhWfsHKhC \
--executor docker \
--tag-list "01-user-api-builder" \
--description "01-user-api-builder" \
--docker-image "mcr.microsoft.com/dotnet/core/sdk" \
复制代码
注册第二个 runner
(值得注意的是: --docker-volumes /var/run/docker.sock:/var/run/docker.sock,当时没有这句话 我的 docker 实例无法跟 docker 容器(docker run docker)本身通讯 。这个问题让我找了进一天的时间)
复制代码
docker exec -it gitlab-runner gitlab-runner register -n \
--url http://hts92.wicp.vip:8989/ \
--registration-token QJiAZYz3KSJyhWfsHKhC \
--executor docker \
--tag-list "01-user-api-deploy" \
--description "01-user-api-deploy" \
--docker-image "docker:stable" \
--docker-volumes /var/run/docker.sock:/var/run/docker.sock
复制代码
如下代表 runner 已经开始工作了并且执行成功。
查看镜像
复制代码
[root@localhost ~]# docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
user-api latest 62eafc3e4bf6 About a minute ago 1.74GB
mcr.microsoft.com/dotnet/core/sdk 20190726 3af77ac73731 2 days ago 1.74GB
mcr.microsoft.com/dotnet/core/sdk latest 3af77ac73731 2 days ago 1.74GB
gitlab/gitlab-runner-helper x86_64-d0b76032 f8d183475601 2 days ago 52.4MB
docker stable c4154a2b47a1 4 days ago 216MB
mysql/mysql-server latest 12a8d88596c0 4 days ago 294MB
gitlab/gitlab-runner latest 4142c6fc05d4 2 weeks ago 410MB
gitlab/gitlab-ce latest 15563c211d40 3 weeks ago 1.8GB
microsoft/mssql-server-linux latest 314918ddaedf 7 months ago 1.35GB
registry 2.3 83139345d017 3 years ago 166MB
[root@localhost ~]#
复制代码
查看容器
复制代码
2ced458eea91 user-api "dotnet /app/User.Ap…" 21 seconds ago Up 20 seconds 0.0.0.0:8080->80/tcp user-api
cfed5894c526 microsoft/mssql-server-linux "/opt/mssql/bin/sqls…" 3 minutes ago Up 3 minutes 0.0.0.0:1433->1433/tcp sqlserver
d713e32ee388 gitlab/gitlab-ce:latest "/assets/wrapper" 3 days ago Up 39 minutes (healthy) 22/tcp, 80/tcp, 443/tcp, 0.0.0.0:8989->8989/tcp gitlab_gitlab_1
e0cf226629d3 registry:2.3 "/bin/registry /etc/…" 3 days ago Up 39 minutes 0.0.0.0:5000->5000/tcp gitlab_registry_1
eab855f64938 gitlab/gitlab-runner:latest "/usr/bin/dumb-init …" 3 days ago Up 39 minutes gitlab-runner
复制代码
以上容器已经运行成功
测试
(我的 Centos 虚拟机地址 192.168.2.2)
总结:
看着几行代码搞定,但是由于第一次做也耗时将近两天,随后在做就简单多了。整理出以上内容给大家分享。 以下为参考文章。有问题留言。
参考文章
我们今天的关于Swift 3.0 【Swift 3.0 环境下使用 AFNetworking 封装网络请求业务类】和swift framework的分享就到这里,谢谢您的阅读,如果想了解更多关于AFNetworking-不缓存响应、Android 环境下使用 call_usermodehelper () 以及调试、C# .net 环境下使用 rabbitmq 消息队列、docker 环境下使用 gitlab,gitlab-runner 为 NetCore 持续集成的相关信息,可以在本站进行搜索。
本文标签: