RabbitMQ
Материал из noname.com.ua
RabbitMQ
Это такой брокер очередей.
Тут будет компиляция из статей на русском и английском в переводе.
Постановка Задачи
Настроить кластер RabbitMQ
Установка RabbitMQ (отдельно стоящий)
Под CentOs 6.3 - ставлю из epel:
wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-7.noarch.rpm rpm -ivh /epel-release-6-7.noarch.rpm
yum install rabbitmq-server.noarch librabbitmq.x86_64 librabbitmq-devel.x86_64
Стартую с конфигом по-умолчанию:
ps -auxfw .... rabbitmq 26833 0.0 0.0 10820 464 ? S 11:05 0:00 /usr/lib64/erlang/erts-5.8.5/bin/epmd -daemon root 26836 0.0 0.0 106052 1276 ? Ss 11:05 0:00 sh -c RABBITMQ_PID_FILE=/var/run/rabbitmq/pid /usr/sbin/rabbitmq-server > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err root 26839 0.0 0.0 106052 1356 ? S 11:05 0:00 \_ /bin/sh /usr/sbin/rabbitmq-server root 26846 0.0 0.0 145340 1544 ? S 11:05 0:00 \_ su rabbitmq -s /bin/sh -c /usr/lib/rabbitmq/bin/rabbitmq-server rabbitmq 26849 8.4 1.4 579972 27304 ? Ssl 11:05 0:00 \_ /usr/lib64/erlang/erts-5.8.5/bin/beam -W w -K true -A30 -P 1048576 -- -root /usr/lib64/erlang -progname erl -- -home /var/lib/rabbitmq -- -noshell -noinput -sname rabbitmq 26918 0.0 0.0 4048 420 ? Ss 11:05 0:00 \_ /usr/lib64/erlang/lib/os_mon-2.2.7/priv/bin/cpu_sup rabbitmq 26923 0.0 0.0 10784 520 ? Ss 11:05 0:00 \_ inet_gethost 4 rabbitmq 26924 0.0 0.0 12888 636 ? S 11:05 0:00 \_ inet_gethost 4
Проверка работы
Получение сообщения
Скрипт получения сообщений (с хабра) который обрабатывает "таск" столько секунд сколько точек в сообщении
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
Отправка сообщения
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print " [x] Sent %r" % (message,)
connection.close()
Тестирую отдельно-стоящего кроля
[root@localhost ~]# ./new_task.py 23421412...... [x] Sent '23421412......'
[root@localhost ~]# ./worket.py [*] Waiting for messages. To exit press CTRL+C [x] Received '23421412......' [x] Done
Т.е. похоже, работает =)
Кластеризация
Действую согласно документации (ноды rabbit1 и rabbit2 прописаны в /etc/hosts)
rabbitmqctl stop_app rabbitmqctl join_cluster --ram rabbit@rabbit1 rabbitmqctl start_app
Если что-то не так - есть такие методы диагностики
rabbitmqctl eval 'net_adm:ping(rabbit@rabbit1).' pong ...done.
rabbitmqctl -n rabbit@rabbit1 status
Версии должны быть одинаковые
rabbitmqctl -n rabbit@rabbit2 status
Status of node rabbit@rabbit2 ...
[{pid,19804},
{running_applications,[{rabbit,"RabbitMQ","3.0.0"},
{os_mon,"CPO CXC 138 46","2.2.7"},
{mnesia,"MNESIA CXC 138 12","4.5"},
{sasl,"SASL CXC 138 11","2.1.10"},
{stdlib,"ERTS CXC 138 10","1.17.5"},
{kernel,"ERTS CXC 138 10","2.14.5"}]},
{os,{unix,linux}},
{erlang_version,"Erlang R14B04 (erts-5.8.5) [source] [64-bit] [rq:1] [async-threads:30] [kernel-poll:true]\n"},
{memory,[{total,24091352},
{connection_procs,2648},
{queue_procs,5296},
{plugins,0},
{other_proc,9088480},
{mnesia,56112},
{mgmt_db,0},
{msg_index,22192},
{other_ets,759744},
{binary,1896},
{code,11668609},
{atom,1199337},
{other_system,1287038}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,787611648},
{disk_free_limit,1000000000},
{disk_free,11497783296},
{file_descriptors,[{total_limit,924},
{total_used,3},
{sockets_limit,829},
{sockets_used,1}]},
{processes,[{limit,1048576},{used,121}]},
{run_queue,0},
{uptime,722}]
Проверяю статус кластера:
[root@rabbit1 etc]# rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]},
{running_nodes,[rabbit@rabbit2,rabbit@rabbit1]},
{partitions,[]}]
...done.