-
Notifications
You must be signed in to change notification settings - Fork 1
/
spark-flask.py
58 lines (52 loc) · 1.72 KB
/
spark-flask.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from flask import stream_with_context, Flask, render_template, request, redirect, url_for, Response
import json
import os
import time
import pandas as pd
import numpy as np
from kafka import KafkaConsumer
import socket
import logging
import threading
import sys
consumer = KafkaConsumer('anomalies', bootstrap_servers=['localhost:9092'])
app = Flask(__name__)
url = sys.argv[1]
time_list = []
@app.route('/home')
def home():
return render_template('firstPage.html',data = url)
@app.route('/table-data', methods=['GET','POST'])
def table_data():
return Response(get_stream_data(),mimetype="text/event-stream")
def get_stream_data():
try:
for msg in consumer:
print('received')
record = json.loads(msg.value.decode('utf-8'))
if 'timestamp' in record:
record['timestamp'] = record['timestamp'][:19]
print(record)
# if len(record.keys()) < 3:
# df.loc[df['sentiment'] == record['sentiment'],'count'] = record['count']
# print(df)
# continue
if len(record.keys()) > 3:
time_list.append(time.time())
yield (f"data:{json.dumps(record)}\n\n")
except KeyboardInterrupt:
df = pd.DataFrame({"time":time_list})
df.to_csv('HSD_time1.csv')
consumer.close()
print('Saved!')
finally:
df = pd.DataFrame({"time":time_list})
df.to_csv('HSD_time2.csv')
consumer.close()
print('consumer closed')
if __name__ == "__main__":
app.run(debug=True)
df = pd.DataFrame({"time":time_list})
df.to_csv('HSD_time3.csv')
consumer.close()
print('Saved!')