Skip to content

Commit 637c956

Browse files
author
yahuarlocro
committed
add hello world and work queues examples
1 parent b82a3be commit 637c956

File tree

11 files changed

+308
-43
lines changed

11 files changed

+308
-43
lines changed

README.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,70 @@
11
# flask-rabbitmq
2+
3+
http://localhost:15672
4+
user: guest
5+
password: guest
6+
7+
8+
# DEFINITIONS
9+
10+
## Producer
11+
A program that sends messages
12+
13+
## Queue
14+
It is the post box. Messages can only be stored in a queue. It is bound by
15+
the host's memory and disks limits. It is just a large message buffer
16+
17+
## Consumer
18+
A program that wait to receive messages
19+
20+
21+
22+
# HELLO WORLD
23+
24+
RabittMQ is a message broker system. RabbitMq is a post box, a post office,
25+
and a letter carrier
26+
27+
A message must always be sent by a ***exchange***. Cannot be sent direct
28+
to a queue
29+
30+
31+
# WORK QUEUES
32+
33+
A work queue is used to distribute time-consuming tasks among workers
34+
35+
The idea of work queues is to avoid waiting for a task to complete. Instead
36+
tasks is scheduled to be completed later
37+
38+
We encapsulate the task as a message and send it to the queue. A worker
39+
running on the background (or more) will execute the job
40+
41+
This concept is especially useful in web applications where it's impossible
42+
to handle a complex task during a short HTTP request window.
43+
44+
## Message Acknowlegment
45+
46+
What happens if a workers dies or is stopped ? Messages dispatched and
47+
processed by this worker are lost
48+
49+
If a worker dies, we want the message to be delivered to another worker.
50+
51+
In order to be sure that messages are not lost, RabbitMQ supports ***message
52+
Acknowlegment***. An ack(nowledgement) is sent back by the consumer to tell
53+
RabbitMQ that a particular message had been received, processed and that
54+
RabbitMQ is free to delete it.
55+
56+
If a worker dies (its channel is closed, connection is closed, or TCP
57+
connection is lost) without sending an acknowledgment, RabbitMQ understands
58+
that message was not processed and it is delivered to another worker (consumer)
59+
60+
A timeout (30 minutes by default) is enforced on consumer delivery
61+
acknowledgement. This helps detect buggy (stuck) consumers that never
62+
acknowledge deliveries. You can increase this timeout
63+
64+
Manual message acknowledgments are turned on by default. In previous examples
65+
we explicitly turned them off via the ***auto_ack=True*** flag. It's time to
66+
remove this flag and send a proper acknowledgment from the worker, once we're
67+
done with a task.
68+
69+
Acknowlegments should be sent to the same channel, otherwise will result in a
70+
channel-level protocol exception

app.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
app = Flask(__name__)
66

77

8-
queue_name = 'new_task_queue'
8+
queue_name = 'my_task_queue'
99

1010
@app.route('/')
1111
def index():
@@ -25,4 +25,4 @@ def send_order(service):
2525

2626

2727
if __name__ == '__main__':
28-
app.run(debug=True, host='0.0.0.0', port=5000)
28+
app.run(debug=True, host='0.0.0.0', port=5000)

connection.py

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,49 @@
11
import pika
22

3-
class RabbitMQConnection(object):
4-
# class RabbitMQConnection:
5-
# def __init__(self, host, user, password, queue_name, port=5627):
6-
def __init__(self, host, queue_name):
7-
self.host = host
8-
# self.user = user
9-
# self.password = password
10-
# self.port = port
11-
self.queue_name = queue_name
12-
13-
def __enter__(self):
143

15-
# credentials = pika.PlainCredentials(self.user, self.password)
16-
parameters = pika.ConnectionParameters(
17-
host=self.host,
18-
# port=self.port,
19-
# credentials=credentials
20-
)
21-
# connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host))
22-
self.connection = pika.BlockingConnection(parameters)
23-
self.channel = self.connection.channel()
24-
self.channel.queue_declare(queue=self.queue_name)
25-
return self.channel
26-
27-
def __exit__(self, exc_type, exc_value, traceback):
28-
self.connection.close()
4+
# class RabbitMQConnection(object):
5+
# # class RabbitMQConnection:
6+
# # def __init__(self, host, user, password, queue_name, port=5627):
7+
# def __init__(self, host, queue_name):
8+
# self.host = host
9+
# # self.user = user
10+
# # self.password = password
11+
# # self.port = port
12+
# self.queue_name = queue_name
13+
#
14+
# def __enter__(self):
15+
#
16+
# # credentials = pika.PlainCredentials(self.user, self.password)
17+
# parameters = pika.ConnectionParameters(
18+
# host=self.host,
19+
# # port=self.port,
20+
# # credentials=credentials
21+
# )
22+
# # connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host))
23+
# self.connection = pika.BlockingConnection(parameters)
24+
# self.channel = self.connection.channel()
25+
# self.channel.queue_declare(queue=self.queue_name)
26+
# return self.channel
27+
#
28+
# def __exit__(self, exc_type, exc_value, traceback):
29+
# self.connection.close()
2930

31+
# SENDING
32+
# stablich connection with rabbitmq server
33+
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
34+
channel = connection.channel()
3035

31-
# connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
32-
# channel = connection.channel()
33-
# channel.queue_declare(queue='hello')
34-
# channel.basic_publish(exchange='',
35-
# routing_key='hello',
36-
# body='Hello World!')
37-
# connection.close()
36+
# create a quere were the message will be delivered
37+
channel.queue_declare(queue='hello')
3838

39+
# declare exchange to specify exactly to which queue the message should go
40+
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
3941

40-
# def __repr__(self):
41-
# return f"userID {self.user_id}"
42+
# debug
43+
print(" [x] Sent 'Hello World!'")
44+
45+
# flush network buffers and confirm message was delivered
46+
connection.close()
47+
#
48+
# def __repr__(self):
49+
# return f"userID {self.user_id}"

hello_world/receive.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import pika
2+
import sys
3+
import os
4+
5+
6+
def main():
7+
connection = pika.BlockingConnection(
8+
pika.ConnectionParameters(host='localhost'))
9+
10+
channel = connection.channel()
11+
12+
# make sure that the queue already exists
13+
channel.queue_declare(queue='hello')
14+
15+
# meesages are received from the queue with a callback function, executed
16+
# by the pika library
17+
def callback(ch, method, properties, body):
18+
print(f" [x] Received {body}")
19+
20+
# tell rabbitmq that this callback function receives messages from the
21+
# queue
22+
channel.basic_consume(queue='hello',
23+
on_message_callback=callback, auto_ack=True)
24+
25+
# never-ending loop that waits for data and runs callbacks whenever
26+
# necessary
27+
print(' [*] Waiting for messages. To exit press CTRL+C')
28+
channel.start_consuming()
29+
30+
31+
if __name__ == '__main__':
32+
# run programm and catch KeyboardInterrupt during program shutdown.
33+
try:
34+
main()
35+
except KeyboardInterrupt:
36+
print('Interrupted')
37+
try:
38+
sys.exit(0)
39+
except SystemExit:
40+
os._exit(0)

hello_world/send.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import pika
2+
3+
# SENDING
4+
# stablich connection with rabbitmq server
5+
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
6+
channel = connection.channel()
7+
8+
# create a quere were the message will be delivered
9+
channel.queue_declare(queue='hello')
10+
11+
# declare exchange to specify exactly to which queue the message should go
12+
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
13+
14+
# debug
15+
print(" [x] Sent 'Hello World!'")
16+
17+
# flush network buffers and confirm message was delivered
18+
connection.close()
19+
#

jobs.sh

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#!/bin/bash
2+
#
3+
#curl http://localhost:5000/order/ndvi.
4+
#curl http://localhost:5000/order/ndvi..
5+
#curl http://localhost:5000/order/ndvi...
6+
#curl http://localhost:5000/order/ndvi....
7+
#curl http://localhost:5000/order/ndvi.....
8+
#curl http://localhost:5000/order/ndvi......
9+
#curl http://localhost:5000/order/ndvi.......
10+
#curl http://localhost:5000/order/ndvi........
11+
#curl http://localhost:5000/order/ndvi.........
12+
#curl http://localhost:5000/order/ndvi..........

ndvi_worker.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,22 @@
44
from connection import RabbitMQConnection
55
from app import queue_name
66

7-
def main():
87

8+
def main():
99
# Receiving messages from the queue is more complex. It works by
1010
# subscribing a callback function to a queue. Whenever we receive a
1111
# message, this callback function is called by the Pika library. In our
1212
# case this function will print on the screen the contents of the message.
1313
def callback(ch, method, properties, body):
1414
print(" [x] Received %r" % body.decode())
1515

16-
1716
# simulate a job that takes long. every point is one second
1817
time.sleep(body.count(b'.'))
1918

2019
print(" [x] Done")
2120

2221
# Message aknowledgement:
23-
# send a proper acknowledgment from the worker, once we're done with a
22+
# send a proper acknowledgment from the worker, once we're done with a
2423
# task
2524
ch.basic_ack(delivery_tag = method.delivery_tag)
2625

@@ -33,19 +32,19 @@ def callback(ch, method, properties, body):
3332
with RabbitMQConnection(host='localhost', queue_name=queue_name) as rc:
3433

3534
# fair dispatch
36-
# don't dispatch a new message to a worker until it has processed and
35+
# don't dispatch a new message to a worker until it has processed and
3736
# acknowledged the previous one. Instead, it will dispatch it to the
3837
# next worker that is not still busy.
3938
rc.basic_qos(prefetch_count=1)
4039

4140
# tell RabbitMQ that this particular callback function should receive
4241
# messages from our order queue
43-
# when auto_ack is set to true, then no message aknowledgement will be
42+
# when auto_ack is set to true, then no message aknowledgement will be
4443
# sent in case a worker dies
4544
rc.basic_consume(queue=queue_name,
4645
on_message_callback=callback)
4746
#auto_ack=True)
48-
47+
4948
print(' [*] Waiting for messages. To exit press CTRL+C')
5049

5150
# enter a never-ending loop that waits for data and runs callbacks
@@ -61,4 +60,4 @@ def callback(ch, method, properties, body):
6160
try:
6261
sys.exit(0)
6362
except SystemExit:
64-
os._exit(0)
63+
os._exit(0)

receive.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import pika
2+
import sys
3+
import os
4+
5+
6+
def main():
7+
connection = pika.BlockingConnection(
8+
pika.ConnectionParameters(host='localhost'))
9+
10+
channel = connection.channel()
11+
12+
# make sure that the queue already exists
13+
channel.queue_declare(queue='hello')
14+
15+
# meesages are received from the queue with a callback function, executed
16+
# by the pika library
17+
def callback(ch, method, properties, body):
18+
print(f" [x] Received {body}")
19+
20+
# tell rabbitmq that this callback function receives messages from the
21+
# queue
22+
channel.basic_consume(queue='hello',
23+
on_message_callback=callback, auto_ack=True)
24+
25+
# never-ending loop that waits for data and runs callbacks whenever
26+
# necessary
27+
print(' [*] Waiting for messages. To exit press CTRL+C')
28+
channel.start_consuming()
29+
30+
31+
if __name__ == '__main__':
32+
# run programm and catch KeyboardInterrupt during program shutdown.
33+
try:
34+
main()
35+
except KeyboardInterrupt:
36+
print('Interrupted')
37+
try:
38+
sys.exit(0)
39+
except SystemExit:
40+
os._exit(0)

send.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import pika
2+
3+
# SENDING
4+
# stablich connection with rabbitmq server
5+
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
6+
channel = connection.channel()
7+
8+
# create a quere were the message will be delivered
9+
channel.queue_declare(queue='hello')
10+
11+
# declare exchange to specify exactly to which queue the message should go
12+
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
13+
14+
# debug
15+
print(" [x] Sent 'Hello World!'")
16+
17+
# flush network buffers and confirm message was delivered
18+
connection.close()
19+
#

0 commit comments

Comments
 (0)