GVKun编程网logo

redis订阅者与发布者(redis订阅和发布)

9

关于redis订阅者与发布者和redis订阅和发布的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于Flask-SocketIORedis订阅、ios–在没有订阅者时停止发布,在有订阅者时自

关于redis订阅者与发布者redis订阅和发布的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于Flask-SocketIO Redis订阅、ios – 在没有订阅者时停止发布,在有订阅者时自动启动、js发布者订阅者模式使用详解、JS模式之简单的订阅者和发布者模式完整实例等相关知识的信息别忘了在本站进行查找喔。

本文目录一览:

redis订阅者与发布者(redis订阅和发布)

redis订阅者与发布者(redis订阅和发布)

#conding=utf-8

#一、创建redis类 文件名 RedisHelper
import redis

# conn=redis.Redis(host=''127.0.0.1'')
# import redis


class RedisHelper:
def __init__(self):
# self.__conn = redis.Redis(host=''47.94.18.xxx'')
self.__conn = redis.Redis(host=''127.0.0.1'')
self.chan_sub = ''104.5'' # 接收频道
self.chan_pub = ''104.5'' # 发送频道
#发布者
def public(self, msg):
self.__conn.publish(self.chan_pub, msg)
return True

#订阅者
def subscribe(self):
pub = self.__conn.pubsub()#将频道调到所需的频道
pub.subscribe(self.chan_sub) #开始监听
pub.parse_response() #测试是否开始监听
return pub
#二、Public
#conding=utf-8
from RedisHelper import RedisHelper
obj = RedisHelper()
obj.public(''Hello I am public'')

#三、subscribe
#conding=utf-8

from RedisHelper import RedisHelper
obj = RedisHelper()

redis_sub = obj.subscribe()

while True:
msg = redis_sub.parse_response()
print(msg)





Flask-SocketIO Redis订阅

Flask-SocketIO Redis订阅

如何解决Flask-SocketIO Redis订阅?

我解决了将应用程序作为参数传递给类并按照错误描述的建议使用它的上下文,但是名称空间也是必需的:

class Listener(threading.Thread):
    def __init__(self, r, channels, app):
    threading.Thread.__init__(self)
    self.daemon = True
    self.redis = r
    self.pubsub = self.redis.pubsub()
    self.pubsub.psubscribe(channels)
    self.app = app

    def work(self, item):
        with app.app_context():
            if isinstance(item[''data''], bytes):
                try:
                    msg = item[''data''].decode(''utf-8'')
                    decode_msg = json.loads(msg)                
                    if decode_msg[''type''] == ''UPDATE_TASK'':
                        send(json.dumps({"type":"UPDATE_TASK"}), room=''home'', namespace=''/'')
                    #_send_task_message()
                except ValueError as e:
                    log.error("Error decoding msg to microservice: %s", str(e))

    def run(self):
        for item in self.pubsub.listen():
            self.work(item)

if __name__ == ''__main__'':

    r = redis.Redis()
    client = Listener(r, [''/bobguarana/socketio''], app)
    client.start()

    socketio.run(debug=True, app=app, port=8080)

解决方法

我正在使用https://github.com/miguelgrinberg/Flask-
SocketIO来实现WebSocket服务器。

我需要从另一个进程(仅订阅)接收消息,并为特定房间中的客户端发出消息。

但是,当我尝试发送消息时,出现此错误:

无法将消息发送到家庭会议室:在请求上下文之外工作。

这是我的代码:

from flask import Flask,request
from flask_socketio import SocketIO,join_room,leave_room,send,rooms
import json
import eventlet
import logging
import redis
import threading

FORMAT = ''%(asctime)-15s - %(message)s''
logging.basicConfig(format=FORMAT)
log = logging.getLogger(__name__)

app = Flask(__name__)
app.config[''SECRET_KEY''] = ''secret!''
socketio = SocketIO(app,async_mode=''eventlet'')

.
.
.

def _send_task_message():
    try:
        send(json.dumps({"type":"UPDATE_TASK"}),room=''home'')
    except Exception as e:
        log.error(''Could not send message to home room: %s'' % str(e))

class Listener(threading.Thread):
    def __init__(self,r,channels):
        threading.Thread.__init__(self)
        self.daemon = True
        self.redis = r
        self.pubsub = self.redis.pubsub()
        self.pubsub.psubscribe(channels)

    def work(self,item):
        if isinstance(item[''data''],bytes):
            try:
                msg = item[''data''].decode(''utf-8'')
                decode_msg = json.loads(msg)                
                if decode_msg[''type''] == ''UPDATE_TASK'':
                    _send_task_message()
            except ValueError as e:
                log.error("Error decoding msg to microservice: %s",str(e))

    def run(self):
        for item in self.pubsub.listen():
            self.work(item)


if __name__ == ''__main__'':

    r = redis.Redis()
    client = Listener(r,[''/bobguarana/socketio''])
    client.start()

    socketio.run(debug=True,app=app,port=8080)

ios – 在没有订阅者时停止发布,在有订阅者时自动启动

ios – 在没有订阅者时停止发布,在有订阅者时自动启动

我如何实现一个RACSignal,它会在没有订阅者的情况下停止发布,并在有订阅者时自动启动?

这是一个场景:

我们假设我在AppDelegate中有一个currentLocationSignal.
我的LocationViewController将在视图加载时订阅currentLocationSignal,并在视图卸载时取消订阅(dispose).由于获取当前位置需要几秒钟,我想在应用程序打开时始终订阅currentLocationSignal(并在几秒后自动取消订阅),所以当我到达LocationViewController时,我会得到一个准确的位置.
因此,信号可能有多个订户.当第一个订阅者侦听时,它需要开始调用startUpdatingLocation,当没有订阅者时,它需要调用stopUpdatingLocation.

解决方法

好问题!通常,您会将 RACMulticastConnection用于此类用例,但是,由于您希望信号能够在以后重新激活,因此连接本身并不合适.

最简单的答案可能是模仿连接的工作方式,但可以模拟您想要的特定行为.基本上,我们将跟踪在任何给定时间有多少subscribers,并根据该数字开始/停止更新位置.

我们首先添加一个locationSubject属性.主题需要是RACReplaySubject,因为我们总是希望新订阅者立即获得最近发送的位置.使用该主题实现更新很容易:

- (void)locationManager:(CLLocationManager *)manager didUpdateLocations:(NSArray *)locations {
    [self.locationSubject sendNext:locations.lastObject];
}

然后,我们希望实现跟踪和递增/递减订户计数的信号.这通过使用numberOfLocationSubscribers整数属性来工作:

- (RACSignal *)currentLocationSignal {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        @synchronized (self) {
            if (self.numberOfLocationSubscribers == 0) {
                [self.locationManager startUpdatingLocation];
            }

            ++self.numberOfLocationSubscribers;
        }

        [self.locationSubject subscribe:subscriber];

        return [RACdisposable disposableWithBlock:^{
            @synchronized (self) {
                --self.numberOfLocationSubscribers;
                if (self.numberOfLocationSubscribers == 0) {
                    [self.locationManager stopUpdatingLocation];
                }
            }
        }];
    }];
}

在上面的代码中,每次将新订户添加到返回的信号时,都会调用createSignal:block.发生这种情况时:

>我们检查订户数目前是否为零.如果是这样,刚刚添加的订户是第一个,因此我们需要启用(或重新启用)位置更新.
>我们将订户直接挂到我们的locationSubject,因此后者的值会自动输入前者.
>然后,在将来的某个时间,当订阅是disposed of时,我们会减少计数并在适当时停止位置更新.

现在,剩下的就是在启动时订阅currentLocationSignal,并在几秒后自动取消订阅:

- (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptions {
    // Use a capacity of 1 because we only ever care about the latest
    // location.
    self.locationSubject = [RACReplaySubject replaySubjectWithCapacity:1];

    [[self.currentLocationSignal
        takeuntil:[RACSignal interval:3]]
        subscribeCompleted:^{
            // We don't actually need to do anything here,but we need
            // a subscription to keep the location updating going for the
            // time specified.
        }];

    return YES;
}

这会立即订阅self.currentLocationSignal,然后在interval:signal发送其第一个值时自动处理该订阅.

有趣的是,– [RACMulticastConnection autoconnect]曾经表现得像上面的-currentLocationSignal,但that behavior was changed因为它使副作用变得非常不可预测.这个用例应该是安全的,但是当自动重新连接可怕时,还有其他时候(比如在发出网络请求或运行shell命令时).

js发布者订阅者模式使用详解

js发布者订阅者模式使用详解

这次给大家带来js发布者订阅者模式使用详解,js发布者订阅者模式使用的注意事项有哪些,下面就是实战案例,一起来看一下。

发布者订阅者模式,是一种很常见的模式,比如:

一、买卖房子

生活中的买房,卖房,中介就构成了一个发布订阅者模式,买房的人,一般需要的是房源,价格,使用面积等信息,他充当了订阅者的角色

中介拿到卖主的房源信息,根据手头上掌握的客户联系信息(买房的人的手机号),通知买房的人,他充当了发布者的角色

卖主想卖掉自己的房子,就需要告诉中介,把信息交给中介发布

二,网站订阅信息的用户

订阅者角色:需要订阅某类信息的网民,如某个网站的javascript类型文章

发布者角色:邮箱服务器,根据网站收集到的用户订阅邮箱,通知用户.

网站主想把信息告诉订阅者,需要把文章相关内容告诉邮箱服务器去发送

等等非常多的例子,不一一列举

本文用网站订阅的方式,推导发布者-订阅者框架,然后用发布者-订阅者框架来重构一个简单的购物车

var Site = {};
    Site.userList = [];
    Site.subscribe = function( fn ){
      this.userList.push( fn );
    }
    Site.publish = function(){
      for( var i = 0, len = this.userList.length; i <p>
	Site.userList就是用来保存订阅者</p><p>
	Site.subscribe就是具体的订阅者,把每一个订阅者订阅的具体信息保存在Site.userList</p><p>
	Site.publish就是发布者:根据保存的userList,一个个遍历(通知),执行里面的业务逻辑</p><p>
	但是这个,发布订阅者模式,有个问题,不能订阅想要的类型,上例我加了2个订阅者(第11行,第14行),只要网站发了信息,全部能收到,但是有些用户可能只想收到javascript或者html5的,所以,接下来,我们需要继续完善,希望能够接收到具体的信息,不是某人订阅的类型,就不接收</p><pre>var Site = {};
    Site.userList = {};
    Site.subscribe = function (key, fn) {
      if (!this.userList[key]) {
        this.userList[key] = [];
      }
      this.userList[key].push(fn);
    }
    Site.publish = function () {
      var key = Array.prototype.shift.apply(arguments),
        fns = this.userList[key];
      if ( !fns || fns.length === 0) {
        console.log( ''没有人订阅'' + key + "这个分类的文章" );
        return false;
      }
      for (var i = 0, len = fns.length; i <p>
	输出结果:</p><p>
	[js高手之路]寄生组合式继承的优势</p><p>
	[js高手之路]es6系列教程 - var, let, const详解</p><p>
	没有人订阅html5这个分类的文章</p><p>
	我们可以看到,只有订阅了javascript类型文章的人,才能收到 ”寄生组合式继承的优势” 这篇文章,发布html5类型的时候,没有任何人会收到.</p><p>
	es6类型的,只有订阅es6的人,才能收到</p><p>
	我们已经有了一个基本的发布订阅者框架,接下来,把他完善成一个框架,便于其他功能或者其他网站系统的相同功能可以重用他</p><pre>var Event = {
      userList : {},
      subscribe : function (key, fn) {
        if (!this.userList[key]) {
          this.userList[key] = [];
        }
        this.userList[key].push(fn);
      },
      publish : function () {
        var key = Array.prototype.shift.apply(arguments),
          fns = this.userList[key];
        if (!fns || fns.length === 0) {
          console.log(''没有人订阅'' + key + "这个分类的文章");
          return false;
        }
        for (var i = 0, len = fns.length; i <p>
	然后,我们来重构一个购物车实例,没有重构之前,我的购物车用的是面向过程:</p><pre>nbsp;html&gt;


  <meta>
  <title>Title</title>
  <script></script><p>
  </p>
登录后复制
        
  •              0              单价:       15元;       小计:       0元     
  •     
  •              0              单价:       10元;       小计:       0元     
  •     
  •              0              单价:       5元;       小计:       0元     
  •     
  •              0              单价:       2元;       小计:       0元     
  •     
  •              0              单价:       1元;       小计:       0元     
  •   
  

    商品一共     0     件;     一共花费     0     元;     其中最贵的商品单价是0元   

cart.js文件:

function getByClass(cName, obj) {
  var o = null;
  if (arguments.length == 2) {
    o = obj;
  } else {
    o = document;
  }
  var allNode = o.getElementsByTagName("*");
  var aNode = [];
  for( var i = 0 ; i <p>
	耦合度太高,可维护性很差.</p><p>
	重构之后的购物车:</p><pre>window.onload = function () {
  var Event = {
    userList: {},
    subscribe: function (key, fn) {
      if (!this.userList[key]) {
        this.userList[key] = [];
      }
      this.userList[key].push(fn);
    },
    publish: function () {
      var key = Array.prototype.shift.apply(arguments),
        fns = this.userList[key];
      if (!fns || fns.length === 0) {
        return false;
      }
      for (var i = 0, len = fns.length; i input:first-child"),
      aBtnPlus = document.querySelectorAll( "#box li&gt;input:nth-of-type(2)"),
      curNum = 0, curUnitPrice = 0;
    for( var i = 0, len = aBtnMinus.length; i  0) &amp;&amp; Event.publish( "total-goods-num-minus" );
        --this.parentNode.children[1].innerHTML = 0) &amp;&amp; Event.publish( "total-goods-num-plus" );
        this.parentNode.children[1].innerHTML++;
        curUnitPrice = this.parentNode.children[4].innerHTML;
        Event.publish( "plus-num" + this.index, 
          parseInt( curUnitPrice ),
          parseInt( this.parentNode.children[1].innerHTML )
        );
      }
    }
  })();
  (function(){
    var aSubtotal = document.querySelectorAll("#box .subtotal"),
      oGoodsNum = document.querySelector("#goods-num"),
      oTotalPrice = document.querySelector("#total-price");
      Event.subscribe( ''total-goods-num-plus'', function(){
        ++oGoodsNum.innerHTML;
      });
      Event.subscribe( ''total-goods-num-minus'', function(){
        --oGoodsNum.innerHTML;
      });
    for( let i = 0, len = aSubtotal.length; i <p>相信看了本文案例你已经掌握了方法,更多精彩请关注php中文网其它相关文章!</p><p>推荐阅读:</p><p><a href="http://www.php.cn/js-tutorial-393280.html" target="_blank">es6解构有哪些方法</a><br></p><p><a href="http://www.php.cn/js-tutorial-393281.html" target="_blank">Node.js文件系统操作</a><br></p><!--content end-->
登录后复制

以上就是js发布者订阅者模式使用详解的详细内容,更多请关注php中文网其它相关文章!

JS模式之简单的订阅者和发布者模式完整实例

JS模式之简单的订阅者和发布者模式完整实例

本文实例讲述了JS模式之简单的订阅者和发布者模式。分享给大家供大家参考。具体如下:

rush:js;"> parten

希望本文所述对大家的javascript程序设计有所帮助。

关于redis订阅者与发布者redis订阅和发布的介绍现已完结,谢谢您的耐心阅读,如果想了解更多关于Flask-SocketIO Redis订阅、ios – 在没有订阅者时停止发布,在有订阅者时自动启动、js发布者订阅者模式使用详解、JS模式之简单的订阅者和发布者模式完整实例的相关知识,请在本站寻找。

本文标签:

上一篇node---路由写登录注册(node的路由)

下一篇如何在python中读取管道分隔文件中的所有行?(如何在python中读取管道分隔文件中的所有行数)