RabbitMQ

Материал из noname.com.ua
Версия от 16:43, 28 февраля 2013; Sirmax (обсуждение | вклад) (→‎RabbitMQ)
(разн.) ← Предыдущая | Текущая версия (разн.) | Следующая → (разн.)
Перейти к навигацииПерейти к поиску

RabbitMQ

Это такой брокер очередей.
Тут будет компиляция из статей на русском и английском в переводе.

Постановка Задачи

Настроить кластер RabbitMQ


Установка RabbitMQ (отдельно стоящий)

Под CentOs 6.3 - ставлю из epel:

wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-7.noarch.rpm
rpm -ivh /epel-release-6-7.noarch.rpm
yum install rabbitmq-server.noarch librabbitmq.x86_64  librabbitmq-devel.x86_64

Стартую с конфигом по-умолчанию:

ps -auxfw
....
rabbitmq 26833  0.0  0.0  10820   464 ?        S    11:05   0:00 /usr/lib64/erlang/erts-5.8.5/bin/epmd -daemon
root     26836  0.0  0.0 106052  1276 ?        Ss   11:05   0:00 sh -c RABBITMQ_PID_FILE=/var/run/rabbitmq/pid /usr/sbin/rabbitmq-server >             /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err
root     26839  0.0  0.0 106052  1356 ?        S    11:05   0:00  \_ /bin/sh /usr/sbin/rabbitmq-server
root     26846  0.0  0.0 145340  1544 ?        S    11:05   0:00      \_ su rabbitmq -s /bin/sh -c /usr/lib/rabbitmq/bin/rabbitmq-server 
rabbitmq 26849  8.4  1.4 579972 27304 ?        Ssl  11:05   0:00          \_ /usr/lib64/erlang/erts-5.8.5/bin/beam -W w -K true -A30 -P 1048576 -- -root /usr/lib64/erlang -progname erl -- -home /var/lib/rabbitmq -- -noshell -noinput -sname
rabbitmq 26918  0.0  0.0   4048   420 ?        Ss   11:05   0:00              \_ /usr/lib64/erlang/lib/os_mon-2.2.7/priv/bin/cpu_sup
rabbitmq 26923  0.0  0.0  10784   520 ?        Ss   11:05   0:00              \_ inet_gethost 4
rabbitmq 26924  0.0  0.0  12888   636 ?        S    11:05   0:00                  \_ inet_gethost 4

Проверка работы

Получение сообщения

Скрипт получения сообщений (с хабра) который обрабатывает "таск" столько секунд сколько точек в сообщении

#!/usr/bin/env python

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

Отправка сообщения

#!/usr/bin/env python

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print " [x] Sent %r" % (message,)
connection.close()

Тестирую отдельно-стоящего кроля

[root@localhost ~]# ./new_task.py 23421412......
 [x] Sent '23421412......'
[root@localhost ~]# ./worket.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received '23421412......'
 [x] Done

Т.е. похоже, работает =)

Кластеризация

Действую согласно документации (ноды rabbit1 и rabbit2 прописаны в /etc/hosts)

rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit1
rabbitmqctl start_app

Если что-то не так - есть такие методы диагностики

rabbitmqctl eval 'net_adm:ping(rabbit@rabbit1).'
pong
...done.
rabbitmqctl -n rabbit@rabbit1 status

Версии должны быть одинаковые

rabbitmqctl -n rabbit@rabbit2 status
Status of node rabbit@rabbit2 ...
[{pid,19804},
 {running_applications,[{rabbit,"RabbitMQ","3.0.0"},
                        {os_mon,"CPO  CXC 138 46","2.2.7"},
                        {mnesia,"MNESIA  CXC 138 12","4.5"},
                        {sasl,"SASL  CXC 138 11","2.1.10"},
                        {stdlib,"ERTS  CXC 138 10","1.17.5"},
                        {kernel,"ERTS  CXC 138 10","2.14.5"}]},
 {os,{unix,linux}},
 {erlang_version,"Erlang R14B04 (erts-5.8.5) [source] [64-bit] [rq:1] [async-threads:30] [kernel-poll:true]\n"},
 {memory,[{total,24091352},
          {connection_procs,2648},
          {queue_procs,5296},
          {plugins,0},
          {other_proc,9088480},
          {mnesia,56112},
          {mgmt_db,0},
          {msg_index,22192},
          {other_ets,759744},
          {binary,1896},
          {code,11668609},
          {atom,1199337},
          {other_system,1287038}]},
 {vm_memory_high_watermark,0.4},
 {vm_memory_limit,787611648},
 {disk_free_limit,1000000000},
 {disk_free,11497783296},
 {file_descriptors,[{total_limit,924},
                    {total_used,3},
                    {sockets_limit,829},
                    {sockets_used,1}]},
 {processes,[{limit,1048576},{used,121}]},
 {run_queue,0},
 {uptime,722}]

Проверяю статус кластера:

[root@rabbit1 etc]# rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]},
 {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]},
 {partitions,[]}]
...done.

Проверка (базовая)

Отправить на одну ноду, получить с другой:

[root@rabbit1 ~]# ./new_task.py Test1234....
 [x] Sent 'Test1234....'

Естественно в ./new_task.py поправлено

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='rabbit2'))
[root@rabbit1 ~]# ./worket.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Test1234....'
 [x] Done

Ссылки