-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
97 lines (71 loc) · 2.67 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import os
import sys
import time
import traceback
import json
from consumer import KafkaConsumer
from processor import BagheeraMessageProcessor
import Queue
import threading
import config
import codecs
sys.path.extend(['log4j.properties'])
sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
from java.lang import System
import com.alibaba.fastjson.JSON as JSON
def runner(offsets):
queues = {}
bmp_map = {}
offset_update_freq = config.offset_update_freq
for host in config.bagheera_nodes:
for topic in config.topics:
for partition in config.partitions:
queue = Queue.Queue(256)
queues[(host, topic, partition)] = queue
bmp = BagheeraMessageProcessor(queue)
bmp_map[id(bmp)] = (host, topic, partition)
offset = offsets[(host, topic, partition)]
kc = KafkaConsumer(host, {}, topic, partition, bmp.processor, offset, offset_update_freq)
t = threading.Thread(target = kc.process_messages_forever)
t.start()
while True:
for htp, q in queues.iteritems():
try:
v = q.get(False)
except Queue.Empty:
continue
if v[1] == 'PUT':
pid, op, ts, ipaddr, doc_id, payload = v
System.out.println('%s %s %d %s %s %s' % (htp[1], op, ts, ipaddr, doc_id, JSON.toJSONString(payload)))
elif v[1] == 'DELETE':
pid, op, ts, ipaddr, doc_id = v
System.out.println('%s %s %d %s %s' % (htp[1], op, ts, ipaddr, doc_id))
def parse_offsets(filex):
offsets = {}
# lines in this "file" contain one serialized (json) entry per line with following fields
# time_millis hostname topic partition offset
#
for i in open(filex, "r"):
try:
dictx = json.loads(i)
host = dictx['hostname']
topic = dictx['topic']
partition = dictx['partition']
offset = dictx['offset']
offsets[(host, topic, partition)] = offset
except:
pass
if (not offsets) or (len(offsets) != (len(config.topics) * len(config.partitions) * len(config.bagheera_nodes))):
System.err.println("ERROR: could not find valid initial offsets for given configuration")
sys.exit(1)
return offsets
if __name__ == '__main__':
if len(sys.argv) != 2:
System.err.println("Needs file containing offsets as first argument")
sys.exit(1)
try:
runner(parse_offsets(sys.argv[1]))
except:
System.err.println("ERROR: " + traceback.format_exc())
finally:
System.exit(1)