GVKun编程网logo

docker快速安装启动Rocketmq消息队列(docker部署rocketmq)

12

最近很多小伙伴都在问docker快速安装启动Rocketmq消息队列和docker部署rocketmq这两个问题,那么本篇文章就来给大家详细解答一下,同时本文还将给你拓展Docker初探之运行Rabb

最近很多小伙伴都在问docker快速安装启动Rocketmq消息队列docker部署rocketmq这两个问题,那么本篇文章就来给大家详细解答一下,同时本文还将给你拓展Docker初探之运行RabbitMQ消息队列服务、docker安装启动rabbitmq、Docker快速安装、docker快速安装rabbitmq等相关知识,下面开始了哦!

本文目录一览:

docker快速安装启动Rocketmq消息队列(docker部署rocketmq)

docker快速安装启动Rocketmq消息队列(docker部署rocketmq)

安装docker

网上很多教程了,简单的可以用宝塔直接安装docker。

搜索镜像

docker search rocketmq

看到还是有很多镜像:

在这里插入图片描述

选择第一个吧,rocketmqinc/rocketmq

拉取镜像

docker pull rocketmqinc/rocketmq

启动服务

rocketmq 有2个服务需要启动:namesrv 和 broker

1 启动namesrv服务
docker run -d -p 9876:9876 -v `pwd`/data/namesrv/logs:/root/logs -v `pwd`/data/namesrv/store:/root/store --name rmqnamesrv -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq:4.4.0 sh mqnamesrv
2 启动broker服务
docker run -d -p 10911:10911 -p 10909:10909 -v `pwd`/data/broker/logs:/root/logs -v `pwd`/data/broker/store:/root/store --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq:4.4.0 sh mqbroker

注:pwd 为执行命令的当前目录,可以换成一个绝对路径。

如果没有错误,执行 docker ps,可以看到两个容器都在运行中: 在这里插入图片描述

安装Rocketmq控制台

可以安装一个控制台,查看和管理rocketmq中的消息等。 运行如下命令获取可用控制台

docker search rocketmq-console

搜索出来的结果:

在这里插入图片描述

安装第一个stars最多的styletang/rokcetmq-console-ng

docker pull styletang/rocketmq-console-ng  

启动容器

docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng

注意:这里因为我们用了docker,所以namesrv.addr需要换成docker容器的ip

获取docker容器的ip,进入到rmqnamesrv容器中,执行下面命令:

docker exec -it rmqnamesrv bash
cat /etc/hosts

在这里插入图片描述

这里换成:172.17.0.2

替换 namesrv.addr 后,启动控制台,访问 {你的IP}:8080

在这里插入图片描述

Docker初探之运行RabbitMQ消息队列服务

Docker初探之运行RabbitMQ消息队列服务

  我们平时在使用RabbitMQ是基于Windows操作系统的,在使用前需要安装Er-Lang和RabbitMQ服务程序,如果版本不对RabbitMQ就启动失败,安装流程也比较麻烦。

  但如果在Docker中则变得简单。

  在使用RabbitMQ之前,我们首先需要拉取RabbitMQ镜像,然后在进行运行RabbitMQ容器。

  一、拉取RabbitMQ镜像

  命令:

  docker pull rabbitmq:management

  这个镜像自带Web管理界面。

  如图:

  

  二、运行RabbitMQ容器

  在运行RabbitMQ容器的时候,我们需要设置相关的参数,比如映射端口和RabbitMQ的管理账号以及密码。

  如果我们想使用默认账号(账号:guest密码:guest)可以使用以下命令启动容器:

  docker run  --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

  端口:15672表示Web界面管理访问地址端口

  端口:5672表示服务地址,如果我们需要使用RabbitMQ则用这个端口。

  启动后,我们使用Web管理界面登录试试,如图:

  

  用户名和密码输入guest登录成功!

  如图:

  

  如果我们在运行RabbitMQ容器的时候想指定用户的话可以这么做:

  docker run  --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:management

  -e表示设置环境变量,这里添加了两个,一个是默认登录名,一个是默认登录密码。这里使用的是用户:admin,密码:123456。

  启动后我们打开RabbitMQ的Web界面地址,输入用户名admin,密码123456登录成功!

  

  今天介绍到这,后续我们接着介绍如何在容器里数据持久化。

docker安装启动rabbitmq

docker安装启动rabbitmq

docker pull rabbitmq:management
docker run -d --hostname rabbit-node1 --name rabbit-node1 -p 5672:5672 -p 15672:15672 -v D:\docker\data\rabbitmq:/var/lib/rabbitmq rabbitmq:management

说明

  • 启动后等待完成,大约要3分钟启动完成后访问http://localhost:15672

账号密码 guest guest

  • hostname必须指定,因为rabbitmq要求

One of the important things to note about RabbitMQ is that it stores data based on what it calls the "Node Name", which defaults to the hostname. What this means for usage in Docker is that we should specify -h/--hostname explicitly for each daemon so that we don''t get a random hostname and can keep track of our data

  • management是带管理界面的,如果用了不带管理界面的,那就进入容器安装插件

  • 端口和挂载就没什么说的了

rabbitmq简介 rabbitmq安装

Docker快速安装

Docker快速安装

目前装Docker得最简单方式就是脚本安装了,方法如下:

curl -fsSL https://get.docker.com -o get-docker.sh
sh get-docker.sh

安装后,需要将自己的用户加到docker组里

sudo usermod -aG docker <USER>

断开ssh连接重登/注销一次,即可生效。

docker快速安装rabbitmq

docker快速安装rabbitmq

一、获取镜像

#指定版本,该版本包含了web控制页面
docker pull rabbitmq:management

二、运行镜像

#方式一:默认guest 用户,密码也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

#方式二:设置用户名和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

三、访问ui页面

http://localhost:15672/

四、golang案例

#producer生产者代码
package main

import (
    "fmt"

    "log"

    "github.com/streadway/amqp"
)

const (
    //AMQP URI

    uri = "amqp://guest:guest@10.0.0.11:5672/" // 10.0.0.11为主机ip

    //Durable AMQP exchange name

    exchangeName = ""

    //Durable AMQP queue name

    queueName = "test-queues"

    //Body of message

    bodyMsg string = "hello angel"
)

//如果存在错误,则输出

func failOnError(err error, msg string) {

    if err != nil {

        log.Fatalf("%s: %s", msg, err)

        panic(fmt.Sprintf("%s: %s", msg, err))

    }

}

func main() {

    //调用发布消息函数

    publish(uri, exchangeName, queueName, bodyMsg)

    log.Printf("published %dB OK", len(bodyMsg))

}

//发布者的方法

//@amqpURI, amqp的地址

//@exchange, exchange的名称

//@queue, queue的名称

//@body, 主体内容

func publish(amqpURI string, exchange string, queue string, body string) {

    //建立连接

    log.Printf("dialing %q", amqpURI)

    connection, err := amqp.Dial(amqpURI)

    failOnError(err, "Failed to connect to RabbitMQ")

    defer connection.Close()

    //创建一个Channel

    log.Printf("got Connection, getting Channel")

    channel, err := connection.Channel()

    failOnError(err, "Failed to open a channel")

    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //创建一个queue

    q, err := channel.QueueDeclare(

        queueName, // name

        false, // durable

        false, // delete when unused

        false, // exclusive

        false, // no-wait

        nil, // arguments

    )

    failOnError(err, "Failed to declare a queue")

    log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

    // Producer只能发送到exchange,它是不能直接发送到queue的

    // 现在我们使用默认的exchange(名字是空字符)这个默认的exchange允许我们发送给指定的queue

    // routing_key就是指定的queue名字

    err = channel.Publish(

        exchange, // exchange

        q.Name, // routing key

        false, // mandatory

        false, // immediate

        amqp.Publishing{

            Headers: amqp.Table{},

            ContentType: "text/plain",

            ContentEncoding: "",

            Body: []byte(body),
        })

    failOnError(err, "Failed to publish a message")

}
#consumer消费者代码
package main

import (
    "fmt"

    "log"

    "github.com/streadway/amqp"
)

const (
    //AMQP URI

    uri = "amqp://guest:guest@10.0.0.11:5672/"

    //Durable AMQP exchange nam

    exchangeName = ""

    //Durable AMQP queue name

    queueName = "test-queues"
)

//如果存在错误,则输出

func failOnError(err error, msg string) {

    if err != nil {

        log.Fatalf("%s: %s", msg, err)

        panic(fmt.Sprintf("%s: %s", msg, err))

    }

}

func main() {

    //调用消息接收者

    consumer(uri, exchangeName, queueName)

}

//接收者方法

//@amqpURI, amqp的地址

//@exchange, exchange的名称

//@queue, queue的名称

func consumer(amqpURI string, exchange string, queue string) {

    //建立连接

    log.Printf("dialing %q", amqpURI)

    connection, err := amqp.Dial(amqpURI)

    failOnError(err, "Failed to connect to RabbitMQ")

    defer connection.Close()

    //创建一个Channel

    log.Printf("got Connection, getting Channel")

    channel, err := connection.Channel()

    failOnError(err, "Failed to open a channel")

    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //创建一个queue

    q, err := channel.QueueDeclare(

        queueName, // name

        false, // durable

        false, // delete when unused

        false, // exclusive

        false, // no-wait

        nil, // arguments

    )

    failOnError(err, "Failed to declare a queue")

    log.Printf("Queue bound to Exchange, starting Consume")

    //订阅消息

    msgs, err := channel.Consume(

        q.Name, // queue

        "", // consumer

        true, // auto-ack

        false, // exclusive

        false, // no-local

        false, // no-wait

        nil, // args

    )

    failOnError(err, "Failed to register a consumer")

    //创建一个channel

    forever := make(chan bool)

    //调用gorountine

    go func() {

        for d := range msgs {

            log.Printf("Received a message: %s", d.Body)

        }

    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出

    <-forever

}

 

五、拥有消息确认的代码

#producer
package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "os"
    "strings"
)

const (
    //AMQP URI
    uri = "amqp://guest:guest@10.0.0.11:5672/"
    //Durable AMQP exchange name
    exchangeName = ""
    //Durable AMQP queue name
    queueName = "test-queues-acknowledgments"
)

//如果存在错误,则输出
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    bodyMsg := bodyFrom(os.Args)
    //调用发布消息函数
    publish(uri, exchangeName, queueName, bodyMsg)
    log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello angel"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

//发布者的方法
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string) {
    //建立连接
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    //创建一个Channel
    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //创建一个queue
    q, err := channel.QueueDeclare(
        queueName, // name
        false,     // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

    // Producer只能发送到exchange,它是不能直接发送到queue的。
    // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
    // routing_key就是指定的queue名字。
    err = channel.Publish(
        exchange, // exchange
        q.Name,   // routing key
        false,    // mandatory
        false,    // immediate
        amqp.Publishing{
            Headers:         amqp.Table{},
            ContentType:     "text/plain",
            ContentEncoding: "",
            Body:            []byte(body),
        })
    failOnError(err, "Failed to publish a message")
}
#consumer
package main

import (
    "bytes"
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "time"
)

const (
    //AMQP URI
    uri = "amqp://guest:guest@10.0.0.11:5672/"
    //Durable AMQP exchange nam
    exchangeName = ""
    //Durable AMQP queue name
    queueName = "test-queues-acknowledgments"
)

//如果存在错误,则输出
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    //调用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string) {
    //建立连接
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    //创建一个Channel
    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //创建一个queue
    q, err := channel.QueueDeclare(
        queueName, // name
        false,     // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf("Queue bound to Exchange, starting Consume")
    //订阅消息
    msgs, err := channel.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    //创建一个channel
    forever := make(chan bool)

    //调用gorountine
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second)
            log.Printf("Done")
            d.Ack(false)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
    <-forever
}

 

我们今天的关于docker快速安装启动Rocketmq消息队列docker部署rocketmq的分享已经告一段落,感谢您的关注,如果您想了解更多关于Docker初探之运行RabbitMQ消息队列服务、docker安装启动rabbitmq、Docker快速安装、docker快速安装rabbitmq的相关信息,请在本站查询。

本文标签: