GVKun编程网logo

(转)ZeroMQ 试用笔记之 REQ & ROUTER(zeromq req rep)

4

这篇文章主要围绕和转ZeroMQ试用笔记之REQ&ROUTER展开,旨在为您提供一份详细的参考资料。我们将全面介绍的优缺点,解答转ZeroMQ试用笔记之REQ&ROUTER的相关问题,同时也会为您带来

这篇文章主要围绕转ZeroMQ 试用笔记之 REQ & ROUTER展开,旨在为您提供一份详细的参考资料。我们将全面介绍的优缺点,解答转ZeroMQ 试用笔记之 REQ & ROUTER的相关问题,同时也会为您带来(转) ZeroMQ 的模式 - Requset-Reply、(转) 小心使用 zeromq、./node_modules/react-router-dom/react-router-dom.js尝试导入错误:未从“ react-router”导出“ Navigate”、Multer 不使用 next-connect 返回 req.body 和 req.file的实用方法。

本文目录一览:

(转)ZeroMQ 试用笔记之 REQ & ROUTER(zeromq req rep)

(转)ZeroMQ 试用笔记之 REQ & ROUTER(zeromq req rep)

zeromq 的尝鲜笔记之一。内容包含 ROUTER socket 的理解介绍,一个小代码片段,以及 czmq 中处理消息帧的 api 的用法。

试用说白了就是用 zeromq 写套小东西。期间必然会遇到问题,笔记的目的无非就是记录问题,加深理解。而实际上我在记录的过程中也不断地修正了一些起初想当然的错误的理解。虽说已极力避免,但由于都是自己一人的理解,错误在所难免,希望有兴趣看的同学帮我指出。

背景

目标是依赖 Zeromq 实现一个支持并发的 server 端,接收多个 client 的请求,略作处理后返回响应。

client 端是 windows 上的 java 程序。server 端则是 CentOS 下的 cpp 程序。

实现

若直接用 REQ-REP 模式的话,需要注意到:REP 端必须严格遵循 recv,send,recv,send…. 的步骤,倘若 REP 端在 recv 后 需要一定时间的处理之后才能 send,那么接下来的下一个 REQ 就得被迫等着了。所以从外部看,我们的 REP 的并发只有 1。就跟下面这张图一样,步骤 4 的 消息必须等到步骤 3 之后才能被接收。

这可能是有些场景下必须的,但不是我想要的。因为我们需要并发。说白了就是我们要在步骤 2 的执行期间,把步骤 4 甚至接下来的 5、6 都做了。

ROUTER 正是基于这样的目的才被引入的。我们之前也有过介绍,但那个介绍在我看来更像是个 guide 翻译。这里再加上自己的理解细说一下。

REP 之所以要按部就班,因为它如果不按部就班,就不知道把响应发回给哪里,所以它必须要同步地,先 recv 再 send。

我们再来看 ROUTER。它之所以可以不按部就班,是因为它收到 REQ 的消息时,在消息头上加入来源地址,然后再交给客户端。发送时,取出消息第一帧作为目标地址,将空帧之后的帧进行发送。

举例来说,app1 通过 REQ 发送给通过 ROUTER 接收的 app2。若 app1 发送的是

1 ["hello"]

,经由 ROUTER 的处理,app2 应用层得到的消息将是

1 [app1''s address|empty|"hello"]

对于 app2,不能只关心业务数据”hello”,还需要将 app1′s address 缓存下来,用以响应的回复。比如,若 app2 要回复”world”,需要手动构造一个有三个 frame 的消息:

1 [app1''s address|empty|"world"]

再把这个消息交给 ROUTER socket 进行 send,这时 ROUTER 会将第一帧 address 取出作为目标地址 —— 也就是的 REQ 端 ——,再将空帧之后的数据发出。所以最终 REQ 端收到响应为:

1 ["world"]

这些步骤看起来复杂,看看 C 代码怎么写。zeromq 给我们提供了包装库 czmq,利用它我们可以很方便地做到上述的过程。
这是代码片段。

1 void*receiver = zsocket_new(ctx, ZMQ_ROUTER);
2 zsocket_bind(receiver,"ipc:///tmp/0");
3  
4 ....
5  
6 //recv message
7 zmsg_t *msg = zmsg_recv(receiver);
8 if(!msg){
9     //error handle..
10     return;
11 }
12 zframe_t *address = zmsg_unwrap (msg);
13 zframe_t *frame = zmsg_first (msg);
14 zmsg_destroy (&msg);
15  
16 //do something, it make time some times...
17  
18 //make response message
19 zmsg_t *message = zmsg_new();
20 zframe_t* body = zframe_new("world",sizeof("world"));
21  
22 zmsg_push (message, body);
23 zmsg_wrap (message, address);
24  
25 zmsg_send (&message, receiver);
26 zmsg_destroy(&message);

需要说明的是 czmq 里的几个函数,万万不可用错。虽然有注释,但要用对这些 api 首先得搞清楚里面提到的 first,front 在 message 里是怎么个位置,为此我画了个图:

1 //  Pop frame off front of message, caller now owns frame
2 //  If next frame is empty, pops and destroys that empty frame.
3 zframe_t *
4     zmsg_unwrap (zmsg_t *self);
1 //  Return first frame in message, or null
2 zframe_t *
3     zmsg_first (zmsg_t *self);
1 //  Push frame to front of message, before first frame
2 void
3     zmsg_push (zmsg_t *self, zframe_t *frame);
1 //  Push frame to front of message, before first frame
2 //  Pushes an empty frame in front of frame
3 void
4     zmsg_wrap (zmsg_t *self, zframe_t *frame);

所以回头看代码片段,我们在构造响应消息时先 push 了一个填充着 "world" 的帧。再 zmsg_wrap 了一个填充着 address 的帧。
zmsg_wrap 做了两个事情:1)在 "world" 之前放上 address 的帧; 2)在这个帧 front 放个空帧。

(转) ZeroMQ 的模式 - Requset-Reply

(转) ZeroMQ 的模式 - Requset-Reply

我们先来看看第一种模式:Request-Reply Pattern。 请求应答模式。

Request-Reply 这个名字很直白,口语点说就是一问一答。可以使同步的遵循请求序的一问一答,也可以 是异步的不按请求序的一问一答;其中也可以包含各种不同的路由策略 —— 让谁来回答。zeromq 定义的为这个模式服务的 socket 有:ZMQ_REQ, ZMQ_REP, ZMQ_ROUTER 以及 ZMQ_DEALER. 用他们进行合理的组合,就可以实现现实世界中各种不同的请求应答模式。

分别来看:

ZMQ_REQ

ZMQ_REQ 做的事情就是发问,然后收答。发、收必须是严格按序进行。请求时对对端进行 Round Robin,遇到异常则阻塞。官方对这个 socket 的总结如下:

Summary of ZMQ_REQ characteristics
Compatible peer sockets ZMQ_REP
Direction Bidirectional
Send/receive pattern Send, Receive, Send, Receive, …
Outgoing routing strategy Round-robin
Incoming routing strategy Last peer
ZMQ_HWM option action Block

ZMQ_REP

ZMQ_REP 做的事情是收问题然后回答。收、发严格按序调用。对收到的问题公平排队,逐一作答。回答发出时遇到异常则直接丢弃,不会阻塞。

Summary of ZMQ_REP characteristics
Compatible peer sockets ZMQ_REQ
Direction Bidirectional
Send/receive pattern Receive, Send, Receive, Send, …
Incoming routing strategy Fair-queued
Outgoing routing strategy Last peer
ZMQ_HWM option action Drop

可以发现,上述两种 socket 是纯同步的,连用在它们身上的 api 的调用顺序都有严格定义。而且没有办法动态决定请求的去向。如果要实现更复杂的请求应答模式,就要借助于下面两种 socket 了。

ZMQ_DEALER

其实 ZMQ_DEALER 也是同步的,ZMQ_DEALER 也叫作 ZMQ_XREQ,概念上是 request/reply socket 的扩展 (实现上刚好相反哦)。从名字上可知它是一个代理层。它对收到的消息公平排队,并以 RR 方式发送消息,在遇到异常时发送阻塞。它的主要 作用是将 come in 的请求 load balance 地发送给到对端们。

Summary of ZMQ_DEALER characteristics
Compatible peer sockets ZMQ_ROUTERZMQ_REQZMQ_REP
Direction Bidirectional
Send/receive pattern Unrestricted
Outgoing routing strategy Round-robin
Incoming routing strategy Fair-queued
ZMQ_HWM option action Block

ZMQ_ROUTER

ZMQ_ROUTER 是真正的异步。ZMQ_ROUTER socket 收到消息时会在消息栈上加一层包含消息来源地址的消息;发送消息时,会将这一层消息取出,将其作为发送的目的地。如果发送时遇到异常,则丢弃 消息。ZMQ_ROUTER 通过这种方式做到了不需要保存任何状态便可异步地转发消息,而这一切应用层是看不到的。

Summary of ZMQ_ROUTER characteristics
Compatible peer sockets ZMQ_DEALERZMQ_REQZMQ_REP
Direction Bidirectional
Send/receive pattern Unrestricted
Outgoing routing strategy See text
Incoming routing strategy Fair-queued
ZMQ_HWM option action Drop

总结

归纳总结一下,0mq 的 Request-Reply 模式下有四种 socket 类型:

  • DEALER: 给连接的对端 RR 地分发消息,对收到的消息公平排队。
  • REQ:在应用层外发的消息上加一层空消息再发送;在收到消息后去掉分隔空消息再返回应用层。它实质上是在 DEALER 上构建的,只是在此基础上强制加入了发、收循环。
  • ROUTER:针对每一个收到的消息:加一层来源地址段,然后再交给应用层;针对每一个要发出的消息:去掉最上层的地址段,并以该地址段为目的地进行发送。
  • REP:对收到的消息:储存所有的消息内容直到第一个分隔空消息段,把剩余的消息体传给应用层;对发出的消息:把之前储存的消息体加回来,并像 ROUTER 一样发送出去。它实质上实在 ROUTER 上构建的,只是在此基础上强制加入了收、发循环。

有了这几种 socket,便可以组合成各种 Request-Reply 模式了:

[REQ] <--> [REP] [REQ] <--> [ROUTER--DEALER] <--> [REP] [REQ] <--> [ROUTER--DEALER] <--> [ROUTER--DEALER] <--> [REP]...

举个例子,N 个 client 要给 M 个 worker 提交任务,worker 完成任务后返回给 client;worker 需要平衡负载以避免某一 worker 过于劳累,client 发出的请求也有先后之说,不能让后发的请求先于之前的被交给 worker。这番描述的 socket 组合便可如下图 (from the guide) 方式构建:

(转) 小心使用 zeromq

(转) 小心使用 zeromq

1. 关于介绍 zeromq 的就不说了,可以自己去看官方 guide 很详细
2. 主要说下在使用过程中需要注意的地方
1)使用如果使用 c++ 的接口的时候,在你自己的类中或者 apache 模块中
需要将 zmq::context_t 对象定义在 zmq::socket_t 对象的前面,这样可以保证销毁的顺序

2)使用 sub-pub 时候,如果 sub 没有调用 setsockopt 设置过滤项 (设置 NULL 则接受所有),那么将会接收不到任何的消息,默认会阻塞所有消息

3)如果是后台服务集群使用且 zmq 需要 bind 端口的,那么最好在 iptables 中进行下过滤,不要让外部链接
连接过来,否则会造成 cpu 空转;关于 cpu 空转将会在最后说

4)客户端连接服务器时,每个客户端中的 zmq,每个连接使用一个 socket,虽然可以一个 socket 可以调用 connect 多次,也就是连接多个 server。但是,记住,最好是一个 socket 只调用一次 connect

5)使用 zmq 的程序,运行一段时间后 (可能几天),如果你碰到异常退出,并且你使用 nohup 运行的 log
nohup.out 中显示 zeromq connection timeout,那么你可以 zeromq/src/tcp_socket.cpp 文件中的 203 行看是 write 的时候出错。
[cpp]   view plain copy
  1. int zmq::tcp_socket_t::write (const void *data, int size)  
  2. {  
  3.     ssize_t nbytes = send (s, data, size, 0);  
  4.   
  5.     //  Several errors are OK. When speculative write is being done we may not  
  6.     //  be able to write a single byte to the socket. Also, SIGSTOP issued  
  7.     //  by a debugging tool can result in EINTR error.  
  8.     if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||  
  9.           errno == EINTR))  
  10.         return 0;  
  11.   
  12.     //  Signalise peer failure.  
  13.     if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE))  
  14.         return -1;  
  15.   
  16.     errno_assert (nbytes != -1);  
  17.     return (size_t) nbytes;  
  18. }  


这是由于 errno_assert (nbytes != -1); 造成的,但是,真正的原因是 nbytes==-1 时,errno==ETIMEDOUT ,在前面一行中没有去添加对这个 timeout 的判断,另外,可以参考该文件中对 win 上的处理。因此,这里如果你碰到了 timeout 错误,那么请将 errno == ETIMEOUT 加入到 errno==EPIPE 后面,然后重新编译

6)关于 cpu 空转。首先你可以用 top 程序名,去看你的程序的 cpu 使用率是否是 100%,而且基本固定不变,那么这时候,你就需要怀疑是否你的程序的问题,然后通过 lsof -p 程序的进程号,去查看是否有文件描述符有 can''t identify protocol, 还要注意下,是否有外网的 ip 连接到你的 zmq 的监听的端口。
然后,你可以在 zeromq/src/epoll.cpp 中的 loop,rm_fd,add_fd 函数中打 log,看看在监听到 EPOLLERR|EPOLLHUP
的文件描述符是不是 lsof -p 查看到的 can''t identify protocol 的文件描述符一致。其实,只要做到前面 5 点,那么 cpu
空转基本是不会出现的。

ps: 最后,希望碰到问题,大家能自己思考为什么会出现这样的问题,该如何解决,对自己的提升会有很大的帮助

./node_modules/react-router-dom/react-router-dom.js尝试导入错误:未从“ react-router”导出“ Navigate”

如何解决./node_modules/react-router-dom/react-router-dom.js尝试导入错误:未从“ react-router”导出“ Navigate”

./node_modules/react-router-dom/react-router-dom.js Attempted import error: ''Navigate'' is not exported from ''react-router''.

react-router-dom的版本是6.0.0-alpha.2,而react-router是5.2.0。 两者均已正确安装。我不确定如何解决此错误。有人可以给我任何可能的解决方法吗?

我的代码中甚至没有<Navigate to=?>行。

解决方法

为什么只安装两个都需要,这可以工作

  1. 执行npm删除react-router
  2. 删除node_modules
  3. 纱线安装或npm安装和
  4. 启动纱线或启动npm

Multer 不使用 next-connect 返回 req.body 和 req.file

Multer 不使用 next-connect 返回 req.body 和 req.file

如何解决Multer 不使用 next-connect 返回 req.body 和 req.file

我正在使用 nextjs 并且我想上传一个文件,所以我使用了 next-connect 以使用 multer

import nc from "next-connect";
import multer from "multer";

export const config = {
    api: {
      bodyParser: false,},}

const upload = multer({ dest: `${__dirname}../../public` });

const handler = nc()
  .use(upload.single(''image''))
  .post(async (req,res)=>{
    console.log(req.body); // undefined
    console.log(req.file); // undefined
    res.status(200).send("yo");
  })


export default handler;

这是客户端代码:

function handleOnSubmit(e){
        e.preventDefault();

        const data = {};

        var formData = new FormData(e.target);
        for (const [name,value] of formData) {
            data[name] = value;
        }
        data.type = data.type[0].toupperCase();
        data.name = data.name[0].toupperCase();

        axios({
            method: "POST",url:"/api/manager",data,config: {
                headers: {
                    ''content-type'': ''multipart/form-data''
                }
            }
        })
        .then(res=>{
            console.log(res);
        })
        .catch(err=>{
            throw err
        });
    }
...
return(
   ...
   <Form onSubmit={(e)=>handleOnSubmit(e)}>
   ...
   </Form>
)

我进行了搜索,发现的所有内容都与 nodejs 和 expressjs 有关,但在 next.js 上没有任何内容。我不知道如何继续。

解决方法

当我切换到 Next.js 框架时,我首先尝试使用 ''multer'' 库上传一个文件,并倾向于放弃并使用 ''foridable''。原因是 Nextjs 和 multer 有一些可用的资源。

如果我没记错的话,multer 应该被添加为中间件,所以这意味着重写 server.js 页面。 您可以查看以下主题:

  • 使用 Nextjs 创建自定义服务器: https://nextjs.org/docs/advanced-features/custom-server
  • 并添加中间件:https://nextjs.org/docs/api-routes/api-middlewares#connectexpress-middleware-support

如果你不想处理这个问题,你可以查看一个关于使用这个强大的资源库的简单要点:https://gist.github.com/agmm/da47a027f3d73870020a5102388dd820

这是我创建的文件上传脚本:https://github.com/fatiiates/rest-w-nextjs/blob/main/src/assets/lib/user/file/upload.ts

,

遇到了同样的问题。不知道它是否与您的相同,但如果它可能对某人有所帮助.. 当我将 multer 转移到使用内存存储而不是桌面存储时,它工作正常.. 在我的情况下,我在 multer 之后使用另一个中间件上传到 aws s3 所以我不需要将文件永久存储在 ./public .. 我只需要下一个中间件中可用的 req.file 。 在你的情况下尝试改变

const upload = multer({ dest: `${__dirname}../../public` });

const storage = multer.memoryStorage()
const upload = multer({storage: storage});

今天关于转ZeroMQ 试用笔记之 REQ & ROUTER的讲解已经结束,谢谢您的阅读,如果想了解更多关于(转) ZeroMQ 的模式 - Requset-Reply、(转) 小心使用 zeromq、./node_modules/react-router-dom/react-router-dom.js尝试导入错误:未从“ react-router”导出“ Navigate”、Multer 不使用 next-connect 返回 req.body 和 req.file的相关知识,请在本站搜索。

本文标签: