-
Notifications
You must be signed in to change notification settings - Fork 0
/
postgres_sub.py
39 lines (28 loc) · 1.05 KB
/
postgres_sub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from pika.exceptions import ConnectionClosed
from log import tolog
from master_server import RabbitConsumer
from config import *
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
file_name = properties.headers['filename']
with open('{}/{}'.format(csv_store_path, file_name), 'wb') as write_csv:
write_csv.write(body)
tolog('[Server] {} {} got message...'.format(pg_queue_name, file_name))
def get_data(channel):
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
except ConnectionClosed:
print('[Server] Crashed. Reconnecting...')
tolog('[Server] Crashed. Reconnecting...', 'warn')
main()
def main():
postgres_server = RabbitConsumer(pg_queue_name)
channel = postgres_server.get_channel()
tolog('[Server] {} starting...'.format(pg_queue_name))
channel.basic_consume(callback, queue=pg_queue_name)
get_data(channel)
postgres_server.close_connection()
if __name__ == '__main__':
main()