Skip to content

Commit 61b8049

Browse files
committed
Adds pusher
1 parent d9fc6a9 commit 61b8049

File tree

6 files changed

+180
-1
lines changed

6 files changed

+180
-1
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ ullala is a demo that showcase using kafka as a data pipeline and log aggregatio
1414
6. Run Capturer: `KAFKA_HOSTS=KAFKA:9092 capturer/capturer.py`
1515
7. Install processor dependencies `cd processor && npm insall`
1616
8. Run processor: `cd processor && KAFKA_HOSTS=kafka:9092 node index.js`
17-
17+
9. Install pusher dependencies `cd pusher && npm install`
18+
10. Run pusher `cd pusher && KAFKA_HOSTS=kafka:9092 node index.js

pusher/index.js

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
const fs = require('fs');
2+
3+
const http = require('http');
4+
const express = require('express');
5+
const socketIO = require('socket.io');
6+
const kafka = require('no-kafka');
7+
const avro = require('avsc');
8+
9+
const app = express();
10+
const server = http.Server(app);
11+
const io = socketIO().listen(server);
12+
13+
const kafka_hosts = process.env.KAFKA_HOSTS;
14+
15+
const avroType = avro.parse({
16+
name: 'Image',
17+
type: 'record',
18+
fields: [
19+
{name: 'image', type: 'bytes'},
20+
{name: 'capture_timestamp', type: 'int'}
21+
]
22+
});
23+
24+
if (!kafka_hosts) {
25+
console.error('No $KAFKA_HOSTS environment variable set');
26+
process.exit(1);
27+
}
28+
29+
server.listen(8080);
30+
app.use(express.static('static'));
31+
32+
app.get('/', function (req, res) {
33+
res.sendfile(`${__dirname}/static/index.html`);
34+
});
35+
36+
function handleKafkaMessages(messageSet, topic, partition) {
37+
for (const message of messageSet) {
38+
const imageFileName = `${partition}_${message.offset}.jpg`;
39+
const packedImage = avroType.fromBuffer(message.message.value);
40+
41+
fs.writeFile(`static/images/${imageFileName}`, packedImage.image, (error) => {
42+
if (error) {
43+
error(`ERROR: Writing ${imageFileName}`);
44+
return;
45+
}
46+
const consumerLag = getConsumerLag(packedImage.capture_timestamp);
47+
logConsumerLag(`Got image ${imageFileName} with consumer lag: ${consumerLag}`, consumerLag);
48+
io.emit('image', {
49+
path: imageFileName,
50+
captureTimestamp: packedImage.capture_timestamp
51+
});
52+
});
53+
}
54+
}
55+
56+
function getConsumerLag(captureTimestamp) {
57+
const captureDate = new Date(0);
58+
captureDate.setUTCSeconds(captureTimestamp);
59+
return (new Date() - captureDate) / 1000;
60+
}
61+
62+
const consumer = new kafka.GroupConsumer({
63+
connectionString: kafka_hosts,
64+
groupId: 'IMAGE_PUSHER',
65+
logger: {
66+
logFunction: info
67+
}
68+
});
69+
70+
const producer = new kafka.Producer({
71+
connectionString: kafka_hosts
72+
});
73+
74+
consumer.init([{
75+
strategy: 'DEFAULT_STRATEGY',
76+
subscriptions: ['PROCESSED_FEED'],
77+
handler: handleKafkaMessages
78+
}]);
79+
80+
producer.init();
81+
82+
function info() {
83+
const argumentList = Array.prototype.slice.call(arguments);
84+
console.log.apply(console, argumentList);
85+
log('INFO', argumentList.join(' '));
86+
}
87+
88+
function error(string) {
89+
const argumentList = Array.prototype.slice.call(arguments);
90+
console.error.apply(console, argumentList);
91+
log('ERROR', argumentList.join(' '));
92+
}
93+
94+
function logConsumerLag(string, consumerLag) {
95+
console.log(string);
96+
log('INFO', string, consumerLag);
97+
}
98+
99+
function log(level, string, consumerLag) {
100+
const logPayload = {
101+
'@timestamp': new Date().toISOString(),
102+
'level': level,
103+
'message': string,
104+
'application': 'IMAGE_PUSHER'
105+
};
106+
if (consumerLag) {
107+
logPayload.consumerLag = consumerLag;
108+
}
109+
producer.send({
110+
topic: 'APPLICATION_LOGS',
111+
message: {
112+
value: JSON.stringify(logPayload)
113+
},
114+
partition: 0
115+
});
116+
}
117+

pusher/package.json

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"name": "pusher",
3+
"version": "0.0.1",
4+
"description": "Pushes processed images from kafka to browsers",
5+
"main": "index.js",
6+
"scripts": {
7+
"start": "node index.js"
8+
},
9+
"author": {
10+
"name": "Alberto Avila",
11+
"email": "[email protected]"
12+
},
13+
"license": "MIT",
14+
"private": false,
15+
"repository": {
16+
"type": "git",
17+
"url": "[email protected]:albertein/pictofun.git"
18+
},
19+
"devDependencies": {
20+
},
21+
"dependencies": {
22+
"no-kafka": "^2.5.6",
23+
"express": "^4.14.0",
24+
"socket.io": "^1.4.8",
25+
"avsc": "^4.1.6"
26+
}
27+
}

pusher/static/images/.gitkeep

Whitespace-only changes.

pusher/static/index.html

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<!DOCTYPE html>
2+
<html>
3+
<head>
4+
<title>Fun stuff</title>
5+
<script src='/socket.io/socket.io.js'></script>
6+
<script src='script.js'></script>
7+
</head>
8+
<body>
9+
<img id='player'/>
10+
<p id='time' style='font-size: x-large'></p>
11+
</body>
12+
</html>
13+

pusher/static/script.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
'use strict';
2+
3+
function init() {
4+
var socket = window.io();
5+
socket.on('image', updateImage);
6+
}
7+
8+
function updateImage(data) {
9+
var image = document.getElementById('player');
10+
var time = document.getElementById('time');
11+
image.src = '/images/' + data.path;
12+
time.innerText = getConsumerLag(data.captureTimestamp) + ' seconds of consumer lag';
13+
}
14+
15+
function getConsumerLag(captureTimestamp) {
16+
const captureDate = new Date(0);
17+
captureDate.setUTCSeconds(captureTimestamp);
18+
return (new Date() - captureDate) / 1000;
19+
}
20+
21+
init();

0 commit comments

Comments
 (0)