forked from dstuebe/pyon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream_producer.py
60 lines (43 loc) · 1.73 KB
/
stream_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#!/usr/bin/env python
__author__ = 'Michael Meisinger'
import time
from gevent.greenlet import Greenlet
from pyon.public import log, ProcessPublisher, SimpleProcess
from pyon.core import bootstrap
"""
@author Michael Meisinger
@author David Stuebe
@author Luke Campbell
@brief Run using:
bin/pycc --rel res/deploy/examples/stream.yml
To start the producer in the pycc shell:
id_p = cc.spawn_process('myproducer', 'examples.stream.stream_producer', 'StreamProducer', {'stream_producer':{'interval':4000,'routing_key':'glider_data'}})
"""
class StreamProducer(SimpleProcess):
"""
StreamProducer is not a stream process. A stream process is defined by a having an input stream which is processed.
The Stream Producer takes the part of an agent pushing data into the system.
"""
def on_init(self):
log.debug("StreamProducer init. Self.id=%s" % self.id)
def on_start(self):
log.debug("StreamProducer start")
self.producer_proc = Greenlet(self._trigger_func)
self.producer_proc.start()
def on_quit(self):
log.debug("StreamProducer quit")
self.process_proc.kill()
super(StreamProducer,self).on_quit()
def _trigger_func(self):
interval = self.CFG.get('stream_producer').get('interval')
routing_key = self.CFG.get('stream_producer').get('routing_key')
# Create scoped exchange name
XP = '.'.join([bootstrap.get_sys_name(),'science_data'])
pub = ProcessPublisher(node=self.container.node, name=(XP,routing_key), process=self)
num = 1
while True:
msg = dict(num=str(num))
pub.publish(msg)
log.debug("Message %s published", num)
num += 1
time.sleep(interval/1000.0)