RabbitMQ: различия между версиями
Материал из noname.com.ua
Перейти к навигацииПерейти к поискуSirmax (обсуждение | вклад) |
Sirmax (обсуждение | вклад) |
||
(не показано 11 промежуточных версий этого же участника) | |||
Строка 1: | Строка 1: | ||
+ | [[Категория:Linux]] |
||
+ | [[Категория:RabbitMQ]] |
||
+ | [[Категория:Python]] |
||
=RabbitMQ= |
=RabbitMQ= |
||
Это такой брокер очередей. <BR> |
Это такой брокер очередей. <BR> |
||
Строка 29: | Строка 32: | ||
rabbitmq 26923 0.0 0.0 10784 520 ? Ss 11:05 0:00 \_ inet_gethost 4 |
rabbitmq 26923 0.0 0.0 10784 520 ? Ss 11:05 0:00 \_ inet_gethost 4 |
||
rabbitmq 26924 0.0 0.0 12888 636 ? S 11:05 0:00 \_ inet_gethost 4 |
rabbitmq 26924 0.0 0.0 12888 636 ? S 11:05 0:00 \_ inet_gethost 4 |
||
+ | </PRE> |
||
+ | ===Проверка работы=== |
||
+ | ====Получение сообщения==== |
||
+ | Скрипт получения сообщений (с хабра) который обрабатывает "таск" столько секунд сколько точек в сообщении |
||
+ | <PRE> |
||
+ | #!/usr/bin/env python |
||
+ | |||
+ | import pika |
||
+ | import time |
||
+ | |||
+ | connection = pika.BlockingConnection(pika.ConnectionParameters( |
||
+ | host='localhost')) |
||
+ | channel = connection.channel() |
||
+ | |||
+ | channel.queue_declare(queue='task_queue', durable=True) |
||
+ | print ' [*] Waiting for messages. To exit press CTRL+C' |
||
+ | |||
+ | def callback(ch, method, properties, body): |
||
+ | print " [x] Received %r" % (body,) |
||
+ | time.sleep( body.count('.') ) |
||
+ | print " [x] Done" |
||
+ | ch.basic_ack(delivery_tag = method.delivery_tag) |
||
+ | |||
+ | channel.basic_qos(prefetch_count=1) |
||
+ | channel.basic_consume(callback, |
||
+ | queue='task_queue') |
||
+ | |||
+ | channel.start_consuming() |
||
+ | </PRE> |
||
+ | ====Отправка сообщения==== |
||
+ | <PRE> |
||
+ | #!/usr/bin/env python |
||
+ | |||
+ | import pika |
||
+ | import sys |
||
+ | |||
+ | connection = pika.BlockingConnection(pika.ConnectionParameters( |
||
+ | host='localhost')) |
||
+ | channel = connection.channel() |
||
+ | |||
+ | channel.queue_declare(queue='task_queue', durable=True) |
||
+ | |||
+ | message = ' '.join(sys.argv[1:]) or "Hello World!" |
||
+ | channel.basic_publish(exchange='', |
||
+ | routing_key='task_queue', |
||
+ | body=message, |
||
+ | properties=pika.BasicProperties( |
||
+ | delivery_mode = 2, # make message persistent |
||
+ | )) |
||
+ | print " [x] Sent %r" % (message,) |
||
+ | connection.close() |
||
+ | |||
+ | </PRE> |
||
+ | ====Тестирую отдельно-стоящего кроля ==== |
||
+ | <PRE> |
||
+ | [root@localhost ~]# ./new_task.py 23421412...... |
||
+ | [x] Sent '23421412......' |
||
+ | </PRE> |
||
+ | <PRE> |
||
+ | [root@localhost ~]# ./worket.py |
||
+ | [*] Waiting for messages. To exit press CTRL+C |
||
+ | [x] Received '23421412......' |
||
+ | [x] Done |
||
+ | </PRE> |
||
+ | Т.е. похоже, работает =) |
||
+ | |||
+ | ==Кластеризация== |
||
+ | Действую согласно документации (ноды rabbit1 и rabbit2 прописаны в /etc/hosts) |
||
+ | <PRE> |
||
+ | rabbitmqctl stop_app |
||
+ | rabbitmqctl join_cluster --ram rabbit@rabbit1 |
||
+ | rabbitmqctl start_app |
||
+ | </PRE> |
||
+ | Если что-то не так - есть такие методы диагностики |
||
+ | |||
+ | <PRE> |
||
+ | rabbitmqctl eval 'net_adm:ping(rabbit@rabbit1).' |
||
+ | pong |
||
+ | ...done. |
||
+ | </PRE> |
||
+ | |||
+ | <PRE> |
||
+ | rabbitmqctl -n rabbit@rabbit1 status |
||
+ | </PRE> |
||
+ | Версии должны быть одинаковые |
||
+ | <PRE> |
||
+ | rabbitmqctl -n rabbit@rabbit2 status |
||
+ | Status of node rabbit@rabbit2 ... |
||
+ | [{pid,19804}, |
||
+ | {running_applications,[{rabbit,"RabbitMQ","3.0.0"}, |
||
+ | {os_mon,"CPO CXC 138 46","2.2.7"}, |
||
+ | {mnesia,"MNESIA CXC 138 12","4.5"}, |
||
+ | {sasl,"SASL CXC 138 11","2.1.10"}, |
||
+ | {stdlib,"ERTS CXC 138 10","1.17.5"}, |
||
+ | {kernel,"ERTS CXC 138 10","2.14.5"}]}, |
||
+ | {os,{unix,linux}}, |
||
+ | {erlang_version,"Erlang R14B04 (erts-5.8.5) [source] [64-bit] [rq:1] [async-threads:30] [kernel-poll:true]\n"}, |
||
+ | {memory,[{total,24091352}, |
||
+ | {connection_procs,2648}, |
||
+ | {queue_procs,5296}, |
||
+ | {plugins,0}, |
||
+ | {other_proc,9088480}, |
||
+ | {mnesia,56112}, |
||
+ | {mgmt_db,0}, |
||
+ | {msg_index,22192}, |
||
+ | {other_ets,759744}, |
||
+ | {binary,1896}, |
||
+ | {code,11668609}, |
||
+ | {atom,1199337}, |
||
+ | {other_system,1287038}]}, |
||
+ | {vm_memory_high_watermark,0.4}, |
||
+ | {vm_memory_limit,787611648}, |
||
+ | {disk_free_limit,1000000000}, |
||
+ | {disk_free,11497783296}, |
||
+ | {file_descriptors,[{total_limit,924}, |
||
+ | {total_used,3}, |
||
+ | {sockets_limit,829}, |
||
+ | {sockets_used,1}]}, |
||
+ | {processes,[{limit,1048576},{used,121}]}, |
||
+ | {run_queue,0}, |
||
+ | {uptime,722}] |
||
+ | </PRE> |
||
+ | |||
+ | Проверяю статус кластера: |
||
+ | <PRE> |
||
+ | [root@rabbit1 etc]# rabbitmqctl cluster_status |
||
+ | Cluster status of node rabbit@rabbit1 ... |
||
+ | [{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]}, |
||
+ | {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}, |
||
+ | {partitions,[]}] |
||
+ | ...done. |
||
+ | |||
+ | </PRE> |
||
+ | ===Проверка (базовая)=== |
||
+ | Отправить на одну ноду, получить с другой: |
||
+ | |||
+ | <PRE> |
||
+ | [root@rabbit1 ~]# ./new_task.py Test1234.... |
||
+ | [x] Sent 'Test1234....' |
||
+ | </PRE> |
||
+ | Естественно в ./new_task.py поправлено |
||
+ | <PRE> |
||
+ | connection = pika.BlockingConnection(pika.ConnectionParameters( |
||
+ | host='rabbit2')) |
||
+ | </PRE> |
||
+ | <PRE> |
||
+ | [root@rabbit1 ~]# ./worket.py |
||
+ | [*] Waiting for messages. To exit press CTRL+C |
||
+ | [x] Received 'Test1234....' |
||
+ | [x] Done |
||
</PRE> |
</PRE> |
||
Текущая версия на 16:43, 28 февраля 2013
RabbitMQ
Это такой брокер очередей.
Тут будет компиляция из статей на русском и английском в переводе.
Постановка Задачи
Настроить кластер RabbitMQ
Установка RabbitMQ (отдельно стоящий)
Под CentOs 6.3 - ставлю из epel:
wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-7.noarch.rpm rpm -ivh /epel-release-6-7.noarch.rpm
yum install rabbitmq-server.noarch librabbitmq.x86_64 librabbitmq-devel.x86_64
Стартую с конфигом по-умолчанию:
ps -auxfw .... rabbitmq 26833 0.0 0.0 10820 464 ? S 11:05 0:00 /usr/lib64/erlang/erts-5.8.5/bin/epmd -daemon root 26836 0.0 0.0 106052 1276 ? Ss 11:05 0:00 sh -c RABBITMQ_PID_FILE=/var/run/rabbitmq/pid /usr/sbin/rabbitmq-server > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err root 26839 0.0 0.0 106052 1356 ? S 11:05 0:00 \_ /bin/sh /usr/sbin/rabbitmq-server root 26846 0.0 0.0 145340 1544 ? S 11:05 0:00 \_ su rabbitmq -s /bin/sh -c /usr/lib/rabbitmq/bin/rabbitmq-server rabbitmq 26849 8.4 1.4 579972 27304 ? Ssl 11:05 0:00 \_ /usr/lib64/erlang/erts-5.8.5/bin/beam -W w -K true -A30 -P 1048576 -- -root /usr/lib64/erlang -progname erl -- -home /var/lib/rabbitmq -- -noshell -noinput -sname rabbitmq 26918 0.0 0.0 4048 420 ? Ss 11:05 0:00 \_ /usr/lib64/erlang/lib/os_mon-2.2.7/priv/bin/cpu_sup rabbitmq 26923 0.0 0.0 10784 520 ? Ss 11:05 0:00 \_ inet_gethost 4 rabbitmq 26924 0.0 0.0 12888 636 ? S 11:05 0:00 \_ inet_gethost 4
Проверка работы
Получение сообщения
Скрипт получения сообщений (с хабра) который обрабатывает "таск" столько секунд сколько точек в сообщении
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
Отправка сообщения
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close()
Тестирую отдельно-стоящего кроля
[root@localhost ~]# ./new_task.py 23421412...... [x] Sent '23421412......'
[root@localhost ~]# ./worket.py [*] Waiting for messages. To exit press CTRL+C [x] Received '23421412......' [x] Done
Т.е. похоже, работает =)
Кластеризация
Действую согласно документации (ноды rabbit1 и rabbit2 прописаны в /etc/hosts)
rabbitmqctl stop_app rabbitmqctl join_cluster --ram rabbit@rabbit1 rabbitmqctl start_app
Если что-то не так - есть такие методы диагностики
rabbitmqctl eval 'net_adm:ping(rabbit@rabbit1).' pong ...done.
rabbitmqctl -n rabbit@rabbit1 status
Версии должны быть одинаковые
rabbitmqctl -n rabbit@rabbit2 status Status of node rabbit@rabbit2 ... [{pid,19804}, {running_applications,[{rabbit,"RabbitMQ","3.0.0"}, {os_mon,"CPO CXC 138 46","2.2.7"}, {mnesia,"MNESIA CXC 138 12","4.5"}, {sasl,"SASL CXC 138 11","2.1.10"}, {stdlib,"ERTS CXC 138 10","1.17.5"}, {kernel,"ERTS CXC 138 10","2.14.5"}]}, {os,{unix,linux}}, {erlang_version,"Erlang R14B04 (erts-5.8.5) [source] [64-bit] [rq:1] [async-threads:30] [kernel-poll:true]\n"}, {memory,[{total,24091352}, {connection_procs,2648}, {queue_procs,5296}, {plugins,0}, {other_proc,9088480}, {mnesia,56112}, {mgmt_db,0}, {msg_index,22192}, {other_ets,759744}, {binary,1896}, {code,11668609}, {atom,1199337}, {other_system,1287038}]}, {vm_memory_high_watermark,0.4}, {vm_memory_limit,787611648}, {disk_free_limit,1000000000}, {disk_free,11497783296}, {file_descriptors,[{total_limit,924}, {total_used,3}, {sockets_limit,829}, {sockets_used,1}]}, {processes,[{limit,1048576},{used,121}]}, {run_queue,0}, {uptime,722}]
Проверяю статус кластера:
[root@rabbit1 etc]# rabbitmqctl cluster_status Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}, {partitions,[]}] ...done.
Проверка (базовая)
Отправить на одну ноду, получить с другой:
[root@rabbit1 ~]# ./new_task.py Test1234.... [x] Sent 'Test1234....'
Естественно в ./new_task.py поправлено
connection = pika.BlockingConnection(pika.ConnectionParameters( host='rabbit2'))
[root@rabbit1 ~]# ./worket.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Test1234....' [x] Done