Skip to content
This repository was archived by the owner on Mar 26, 2021. It is now read-only.

Commit

Permalink
Поправить обработку данных #7
Browse files Browse the repository at this point in the history
wolf1996 committed Jul 1, 2018
1 parent 088e9cd commit 8f8a1d5
Showing 26 changed files with 992 additions and 109 deletions.
4 changes: 0 additions & 4 deletions .vscode/settings.json

This file was deleted.

20 changes: 12 additions & 8 deletions depends/python/data_model/sensor_data.py
Original file line number Diff line number Diff line change
@@ -17,23 +17,27 @@ def __init__(self, mongo):
self.__mgocli = mongo
self.schema = SensorDataModel.Schema()

def get_data_by_period(self, sensor_id, frm, to):
def get_data_by_period(self, sensor_id, frm=None, to=None):
low = frm
hight = to
sen_id = self.get_sensor_id(sensor_id)
coll = self.__mgocli['sensors_data'][sen_id]
for i in coll.find({'timestamp':{'$lt':hight, '$gt':low}}).sort('timestamp'):
timestamp_args = {}
if low is not None:
timestamp_args["$gt"] = low
if hight is not None:
timestamp_args["$lt"] = hight
for i in coll.find({'timestamp':timestamp_args}).sort('timestamp'):
yield i

def get_data_from(self, sensor, frm, limit):
def get_data_from(self, sensor, frm=None, limit=0):
hight = to
sen_id = self.get_sensor_id(sensor['sensor_id'])
coll = self.__mgocli['sensors_data'][sen_id]
ind = 0
for i in coll.find({'timestamp':{'$lt':hight}}).sort('timestamp'):
ind += 1
if ind >= limit:
break
timestamp_args = {}
if hight is not None:
timestamp_args["$lt"] = hight
for i in coll.find({'timestamp':timestamp_args}).sort('timestamp').limit(limit):
yield i

def insert_data(self, data):
117 changes: 114 additions & 3 deletions depends/python/proto/data_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions depends/python/proto/data_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -19,6 +19,11 @@ def __init__(self, channel):
request_serializer=proto_dot_data__pb2.MeterQuery.SerializeToString,
response_deserializer=proto_dot_data__pb2.MeterData.FromString,
)
self.GetLimitedData = channel.unary_stream(
'/DataService/GetLimitedData',
request_serializer=proto_dot_data__pb2.TimeLimitedQuery.SerializeToString,
response_deserializer=proto_dot_data__pb2.MeterData.FromString,
)


class DataServiceServicer(object):
@@ -32,6 +37,13 @@ def GetSensorData(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetLimitedData(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_DataServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
@@ -40,6 +52,11 @@ def add_DataServiceServicer_to_server(servicer, server):
request_deserializer=proto_dot_data__pb2.MeterQuery.FromString,
response_serializer=proto_dot_data__pb2.MeterData.SerializeToString,
),
'GetLimitedData': grpc.unary_stream_rpc_method_handler(
servicer.GetLimitedData,
request_deserializer=proto_dot_data__pb2.TimeLimitedQuery.FromString,
response_serializer=proto_dot_data__pb2.MeterData.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'DataService', rpc_method_handlers)
12 changes: 12 additions & 0 deletions protobuf/proto/data.proto
Original file line number Diff line number Diff line change
@@ -19,8 +19,20 @@ message MeterQuery {
SensorId sensor_id = 3;
}

message LimitQuery {
bool set = 1;
uint64 limit = 2;
}

message TimeLimitedQuery{
SensorId sensor_id = 1;
TimeQuery start = 2;
LimitQuery limit = 3;
}

service DataService{
rpc GetSensorData(MeterQuery) returns (stream MeterData){};
rpc GetLimitedData(TimeLimitedQuery) returns (stream MeterData){}
}

// python3 -m grpc_tools.protoc -Iprotobuf/data protobuf/data/stats.proto --python_out=./python/data/proto
2 changes: 1 addition & 1 deletion python/data/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
24
38
24 changes: 22 additions & 2 deletions python/data/data/app.py
Original file line number Diff line number Diff line change
@@ -11,20 +11,40 @@
import datetime
from data_model.sensor_data import SensorDataModel


# TODO: решить что-нибудь с дефолтными аргументами
class DataServiceServ(data_pb2_grpc.DataServiceServicer):
def __init__(self, model):
self.__model = model

def GetSensorData(self, request, context):
global client
low = datetime.datetime.fromtimestamp(request.low.timestamp)
hight = datetime.datetime.fromtimestamp(request.hight.timestamp)
low = None
if request.low.set:
low = datetime.datetime.fromtimestamp(request.low.timestamp)
hight = None
if request.hight.set:
hight = datetime.datetime.fromtimestamp(request.hight.timestamp)
logging.debug("Got sensor data request {}".format(str(request)))
for i in self.__model.get_data_by_period(request.sensor_id.sensor_id, low, hight):
logging.debug("Sending data{}".format(str(i)))
tss = int(time.mktime(i['timestamp'].timetuple()))
yield data_pb2.MeterData(value=i['value'], timestamp=tss, hash=i['hash'].encode())

def GetLimitedData(self, request, context):
global client
start = None
if request.start.set:
start = datetime.datetime.fromtimestamp(request.start.timestamp)
limit = 0
if request.limit.set:
limit = request.limit.limit
logging.debug("Got sensor data request {}".format(str(request)))
for i in self.__model.get_data_from(request.sensor_id.sensor_id, start, limit):
logging.debug("Sending data{}".format(str(i)))
tss = int(time.mktime(i['timestamp'].timetuple()))
yield data_pb2.MeterData(value=i['value'], timestamp=tss, hash=i['hash'].encode())


def run_consumer(mgocli, rabbitconf):
client = mgocli
39 changes: 25 additions & 14 deletions python/data/depends/data_model/sensor_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from marshmallow import Schema, fields
import datetime
import pymongo

class SensorDataModel(object):
@staticmethod
@@ -16,31 +17,41 @@ class Schema(Schema):
def __init__(self, mongo):
self.__mgocli = mongo
self.schema = SensorDataModel.Schema()
def get_data_by_period(self, sensor_id, frm, to):

def get_data_by_period(self, sensor_id, frm=None, to=None):
low = frm
hight = to
sen_id = self.get_sensor_id(sensor_id)
coll = self.__mgocli['sensors_data'][sen_id]
for i in coll.find({'timestamp':{'$lt':hight, '$gt':low}}).sort('timestamp'):
yield i
timestamp_args = {}
filt = {}
if low is not None:
timestamp_args["$gt"] = low
if hight is not None:
timestamp_args["$lt"] = hight
if timestamp_args:
filt['timestamp'] = timestamp_args
for i in coll.find(filt).sort('timestamp', pymongo.DESCENDING):
yield i

def get_data_from(self, sensor, frm, limit):
hight = to
sen_id = self.get_sensor_id(sensor['sensor_id'])
def get_data_from(self, sensor, frm=None, limit=0):
hight = frm
sen_id = self.get_sensor_id(sensor)
coll = self.__mgocli['sensors_data'][sen_id]
ind = 0
for i in coll.find({'timestamp':{'$lt':hight}}).sort('timestamp'):
ind += 1
if ind >= limit:
break
filt = {}
timestamp_args = {}
if hight is not None:
timestamp_args["$lt"] = hight
if timestamp_args:
filt['timestamp'] = timestamp_args
for i in coll.find(filt).sort('timestamp', pymongo.DESCENDING).limit(limit):
yield i

def insert_data(self, data):
errs = self.schema.validate(data)
if errs:
return errs
coll_id = self.get_sensor_id(data['sensor_id'])
data['timestamp'] = datetime.datetime.strptime( data['timestamp'], '%Y-%m-%dT%H:%M:%S')
data['timestamp'] = datetime.datetime.strptime(data['timestamp'], '%Y-%m-%dT%H:%M:%S')
self.__mgocli['sensors_data'][coll_id].insert_one(data)
return errs
117 changes: 114 additions & 3 deletions python/data/depends/proto/data_pb2.py
17 changes: 17 additions & 0 deletions python/data/depends/proto/data_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -19,6 +19,11 @@ def __init__(self, channel):
request_serializer=proto_dot_data__pb2.MeterQuery.SerializeToString,
response_deserializer=proto_dot_data__pb2.MeterData.FromString,
)
self.GetLimitedData = channel.unary_stream(
'/DataService/GetLimitedData',
request_serializer=proto_dot_data__pb2.TimeLimitedQuery.SerializeToString,
response_deserializer=proto_dot_data__pb2.MeterData.FromString,
)


class DataServiceServicer(object):
@@ -32,6 +37,13 @@ def GetSensorData(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetLimitedData(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_DataServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
@@ -40,6 +52,11 @@ def add_DataServiceServicer_to_server(servicer, server):
request_deserializer=proto_dot_data__pb2.MeterQuery.FromString,
response_serializer=proto_dot_data__pb2.MeterData.SerializeToString,
),
'GetLimitedData': grpc.unary_stream_rpc_method_handler(
servicer.GetLimitedData,
request_deserializer=proto_dot_data__pb2.TimeLimitedQuery.FromString,
response_serializer=proto_dot_data__pb2.MeterData.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'DataService', rpc_method_handlers)
2 changes: 1 addition & 1 deletion python/gateway/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
115
124
20 changes: 12 additions & 8 deletions python/gateway/depends/data_model/sensor_data.py
Original file line number Diff line number Diff line change
@@ -17,23 +17,27 @@ def __init__(self, mongo):
self.__mgocli = mongo
self.schema = SensorDataModel.Schema()

def get_data_by_period(self, sensor_id, frm, to):
def get_data_by_period(self, sensor_id, frm=None, to=None):
low = frm
hight = to
sen_id = self.get_sensor_id(sensor_id)
coll = self.__mgocli['sensors_data'][sen_id]
for i in coll.find({'timestamp':{'$lt':hight, '$gt':low}}).sort('timestamp'):
timestamp_args = {}
if low is not None:
timestamp_args["$gt"] = low
if hight is not None:
timestamp_args["$lt"] = hight
for i in coll.find({'timestamp':timestamp_args}).sort('timestamp'):
yield i

def get_data_from(self, sensor, frm, limit):
def get_data_from(self, sensor, frm=None, limit=0):
hight = to
sen_id = self.get_sensor_id(sensor['sensor_id'])
coll = self.__mgocli['sensors_data'][sen_id]
ind = 0
for i in coll.find({'timestamp':{'$lt':hight}}).sort('timestamp'):
ind += 1
if ind >= limit:
break
timestamp_args = {}
if hight is not None:
timestamp_args["$lt"] = hight
for i in coll.find({'timestamp':timestamp_args}).sort('timestamp').limit(limit):
yield i

def insert_data(self, data):
117 changes: 114 additions & 3 deletions python/gateway/depends/proto/data_pb2.py
17 changes: 17 additions & 0 deletions python/gateway/depends/proto/data_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -19,6 +19,11 @@ def __init__(self, channel):
request_serializer=proto_dot_data__pb2.MeterQuery.SerializeToString,
response_deserializer=proto_dot_data__pb2.MeterData.FromString,
)
self.GetLimitedData = channel.unary_stream(
'/DataService/GetLimitedData',
request_serializer=proto_dot_data__pb2.TimeLimitedQuery.SerializeToString,
response_deserializer=proto_dot_data__pb2.MeterData.FromString,
)


class DataServiceServicer(object):
@@ -32,6 +37,13 @@ def GetSensorData(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetLimitedData(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_DataServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
@@ -40,6 +52,11 @@ def add_DataServiceServicer_to_server(servicer, server):
request_deserializer=proto_dot_data__pb2.MeterQuery.FromString,
response_serializer=proto_dot_data__pb2.MeterData.SerializeToString,
),
'GetLimitedData': grpc.unary_stream_rpc_method_handler(
servicer.GetLimitedData,
request_deserializer=proto_dot_data__pb2.TimeLimitedQuery.FromString,
response_serializer=proto_dot_data__pb2.MeterData.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'DataService', rpc_method_handlers)
21 changes: 11 additions & 10 deletions python/gateway/gateway/app.py
Original file line number Diff line number Diff line change
@@ -8,10 +8,10 @@
import gateway.resources_v2.controller as controllerv2
import gateway.resources_v2.user as userv2
# для того чтобы хорошо генерировались импорты сгенерированных файлов делается
import proto.data_pb2_grpc
import proto.data_pb2
import proto.stats_pb2_grpc
import proto.stats_pb2
from proto import data_pb2_grpc
from proto import data_pb2
from proto import stats_pb2_grpc
from proto import stats_pb2
import grpc

app = Flask(__name__)
@@ -22,11 +22,11 @@
objs = grpc.insecure_channel('object-service:5000')
userch = grpc.insecure_channel('users-service:5000')

args = {'stats': stats, 'data': data, 'object':objs, 'user':userch}
args = {'stats': stats, 'data': data, 'object': objs, 'user': userch}

# Да, я делаю это неправильно
api.add_resource(user.SignIn, '/user/sign_in', resource_class_kwargs=args)
#api.add_resource(UserInfo, '/user/sign_up', resource_class_kwargs=args)
# api.add_resource(UserInfo, '/user/sign_up', resource_class_kwargs=args)
api.add_resource(objects.AddObject, '/object/register', resource_class_kwargs=args)
api.add_resource(controller.AddController, '/controller/register', resource_class_kwargs=args)
api.add_resource(sensor.AddSensor, '/sensor/register', resource_class_kwargs=args)
@@ -38,10 +38,11 @@
api.add_resource(controller.GetControllerSensors, '/controller/<int:controller_id>/get_sensors', resource_class_kwargs=args)
api.add_resource(controller.GetControllerStats, '/controller/<int:controller_id>/get_controller_stats', resource_class_kwargs=args)
api.add_resource(sensor.GetSensorStats, '/sensor/<int:sensor_id>/view_stats', resource_class_kwargs=args)
api.add_resource(sensor.GetSensorData, '/sensor/<int:sensor_id>/get_data', resource_class_kwargs=args)
api.add_resource(sensor.GetSensorDataLimited, '/sensor/<int:sensor_id>/get_data', resource_class_kwargs=args)
api.add_resource(sensor.GetUserSensors, '/sensor/get_user_sensors', resource_class_kwargs=args)

# V2 мать его (лучшеб в отдельном приложении, потом надо переделать вместе с v2 аpi впринципе)
api.add_resource(controllerv2.Relations ,'/v2/controller/<int:_id>/relations', endpoint='contrRelations', resource_class_kwargs=args)
api.add_resource(objectsv2.Relations ,'/v2/object/<int:_id>/relations', endpoint='objectRelations',resource_class_kwargs=args)
api.add_resource(userv2.Relations ,'/v2/user/relations', endpoint='userRelations',resource_class_kwargs=args)
api.add_resource(controllerv2.Relations, '/v2/controller/<int:_id>/relations', endpoint='contrRelations', resource_class_kwargs=args)
api.add_resource(objectsv2.Relations, '/v2/object/<int:_id>/relations', endpoint='objectRelations',resource_class_kwargs=args)
api.add_resource(userv2.Relations, '/v2/user/relations', endpoint='userRelations', resource_class_kwargs=args)
api.add_resource(sensor.GetSensorDataPeriod, '/v2/sensor/<int:sensor_id>/get_data_period', resource_class_kwargs=args)
92 changes: 74 additions & 18 deletions python/gateway/gateway/resources/sensor.py
Original file line number Diff line number Diff line change
@@ -48,20 +48,74 @@ def get(self, sensor_id):
stats["msg"] = stats_resp
return stats, 200

class GetSensorData(Resource):
class GetSensorDataLimited(Resource):
def __init__(self, **kwargs):
self.data_chan = kwargs['data']
self.stats_chan = kwargs['stats']


def get(self, sensor_id):
#
data = {
"code": 0,
"msg": []
"code": 0,
"msg": []
}
stub = data_pb2_grpc.DataServiceStub(self.data_chan)
id = utils_pb2.SensorId(sensor_id=sensor_id)
parser = reqparse.RequestParser()
parser = reqparse.RequestParser()
parser.add_argument("from")
parser.add_argument("limit")
args = parser.parse_args()
frm = args["from"]
limit = args["limit"]
lim = None
if limit:
if isinstance(limit, list) and len(limit) > 1:
return InvalidRequest("Too many values to limit").get_message()
try:
lim = data_pb2.LimitQuery(set=True, limit=int(limit))
except ValueError:
return InvalidRequest("Failed to parse \"limit\"").get_message()
else:
lim = data_pb2.LimitQuery(set=False, limit=0)
if frm:
if isinstance(frm, list) and len(frm) > 1:
return InvalidRequest("too many values to \"from\"").get_message()
try:
dt = datetime.datetime.strptime(frm, "%Y-%m-%dT%H:%M:%S")
frm = data_pb2.TimeQuery(set=True, timestamp=int(time.mktime(dt.timetuple())))
except ValueError:
return InvalidRequest("Failed to parse \"from\" date time").get_message()
else:
frm = data_pb2.TimeQuery(set=False, timestamp=0)
log.debug("from is {}".format(str(frm)))
mq = data_pb2.TimeLimitedQuery(start=frm, limit=lim, sensor_id=id)
data_resp = []
for i in stub.GetLimitedData(mq):
dt = {
"sensor_id": sensor_id,
"date": datetime.datetime.fromtimestamp(i.timestamp).strftime('%Y-%m-%d %H:%M:%S'),
"value": i.value,
"hash": base64.b64encode(i.hash).decode('UTF-8')
}
data_resp.append(dt)
data["msg"] = data_resp
return data, 200


class GetSensorDataPeriod(Resource):
def __init__(self, **kwargs):
self.data_chan = kwargs['data']
self.stats_chan = kwargs['stats']

def get(self, sensor_id):
data = {
"code": 0,
"msg": []
}
stub = data_pb2_grpc.DataServiceStub(self.data_chan)
id = utils_pb2.SensorId(sensor_id=sensor_id)
parser = reqparse.RequestParser()
parser.add_argument("from")
parser.add_argument("to")
args = parser.parse_args()
@@ -71,30 +125,30 @@ def get(self, sensor_id):
if isinstance(to, list) and len(to) > 1:
return InvalidRequest("too many values to \"to\"").get_message()
try:
h = datetime.datetime.strptime( to, "%Y-%m-%dT%H:%M:%S")
to = datetime.datetime.strptime(to, "%Y-%m-%dT%H:%M:%S")
to = data_pb2.TimeQuery(set=True, timestamp=int(time.mktime(to.timetuple())))
except ValueError:
return InvalidRequest("Failed to parse \"to\" date time").get_message()
else:
h = datetime.datetime.now()
to = data_pb2.TimeQuery(set=False, timestamp=0)
if frm:
if isinstance(frm, list) and len(frm) > 1:
return InvalidRequest("too many values to \"from\"").get_message()
try:
l = datetime.datetime.strptime( frm, "%Y-%m-%dT%H:%M:%S")
frm = datetime.datetime.strptime(frm, "%Y-%m-%dT%H:%M:%S")
frm = data_pb2.TimeQuery(set=True, timestamp=int(time.mktime(frm.timetuple())))
except ValueError:
return InvalidRequest("Failed to parse \"from\" date time").get_message()
else:
l = h - datetime.timedelta(days=7)
low = data_pb2.TimeQuery(timestamp= int(time.mktime(l.timetuple())))
hight = data_pb2.TimeQuery(timestamp=int(time.mktime(h.timetuple())))
mq = data_pb2.MeterQuery(low=low, hight=hight, sensor_id=id)
frm = data_pb2.TimeQuery(set=False, timestamp=0)
mq = data_pb2.MeterQuery(low=frm, hight=to, sensor_id=id)
data_resp = []
for i in stub.GetSensorData(mq):
for i in stub.GetSensorData(mq):
dt = {
"sensor_id": sensor_id,
"date": datetime.datetime.fromtimestamp(i.timestamp).strftime('%Y-%m-%d %H:%M:%S'),
"value": i.value,
"hash": base64.b64encode(i.hash).decode('UTF-8')
"sensor_id": sensor_id,
"date": datetime.datetime.fromtimestamp(i.timestamp).strftime('%Y-%m-%d %H:%M:%S'),
"value": i.value,
"hash": base64.b64encode(i.hash).decode('UTF-8')
}
data_resp.append(dt)
data["msg"] = data_resp
@@ -113,6 +167,7 @@ def post(self):
company = body['company']
return Posted().get_message()


class GetUserSensors(Resource):
def __init__(self, **kwargs):
self.data_chan = kwargs['data']
@@ -150,8 +205,9 @@ def sensor(ssr):
None,
[sensor(i) for i in cntrlr.sensors])
if cntrlr.HasField("deactivation_date_val"):
ctr.deactivation_date = ssr.deactivation_date_val
ctr.deactivation_date = cntrlr.deactivation_date_val
return ctr

def obct(rsp):
uo = ObjectInfo(
rsp.id,
@@ -164,4 +220,4 @@ def obct(rsp):
rsp.id,
[obct(i) for i in rsp.objects]
)
return uo.get_message()
return uo.get_message()
1 change: 0 additions & 1 deletion python/gateway/gateway/resources_v2/user.py
Original file line number Diff line number Diff line change
@@ -24,7 +24,6 @@
log = logging.getLogger("flask.app")


#TODO: отрефакторить это чтобы избавиться от копипасты
class Relations(Resource):
def __init__(self, **kwargs):
self.data_chan = kwargs['data']
20 changes: 12 additions & 8 deletions python/object/depends/data_model/sensor_data.py
Original file line number Diff line number Diff line change
@@ -17,23 +17,27 @@ def __init__(self, mongo):
self.__mgocli = mongo
self.schema = SensorDataModel.Schema()

def get_data_by_period(self, sensor_id, frm, to):
def get_data_by_period(self, sensor_id, frm=None, to=None):
low = frm
hight = to
sen_id = self.get_sensor_id(sensor_id)
coll = self.__mgocli['sensors_data'][sen_id]
for i in coll.find({'timestamp':{'$lt':hight, '$gt':low}}).sort('timestamp'):
timestamp_args = {}
if low is not None:
timestamp_args["$gt"] = low
if hight is not None:
timestamp_args["$lt"] = hight
for i in coll.find({'timestamp':timestamp_args}).sort('timestamp'):
yield i

def get_data_from(self, sensor, frm, limit):
def get_data_from(self, sensor, frm=None, limit=0):
hight = to
sen_id = self.get_sensor_id(sensor['sensor_id'])
coll = self.__mgocli['sensors_data'][sen_id]
ind = 0
for i in coll.find({'timestamp':{'$lt':hight}}).sort('timestamp'):
ind += 1
if ind >= limit:
break
timestamp_args = {}
if hight is not None:
timestamp_args["$lt"] = hight
for i in coll.find({'timestamp':timestamp_args}).sort('timestamp').limit(limit):
yield i

def insert_data(self, data):
117 changes: 114 additions & 3 deletions python/object/depends/proto/data_pb2.py
17 changes: 17 additions & 0 deletions python/object/depends/proto/data_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -19,6 +19,11 @@ def __init__(self, channel):
request_serializer=proto_dot_data__pb2.MeterQuery.SerializeToString,
response_deserializer=proto_dot_data__pb2.MeterData.FromString,
)
self.GetLimitedData = channel.unary_stream(
'/DataService/GetLimitedData',
request_serializer=proto_dot_data__pb2.TimeLimitedQuery.SerializeToString,
response_deserializer=proto_dot_data__pb2.MeterData.FromString,
)


class DataServiceServicer(object):
@@ -32,6 +37,13 @@ def GetSensorData(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetLimitedData(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_DataServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
@@ -40,6 +52,11 @@ def add_DataServiceServicer_to_server(servicer, server):
request_deserializer=proto_dot_data__pb2.MeterQuery.FromString,
response_serializer=proto_dot_data__pb2.MeterData.SerializeToString,
),
'GetLimitedData': grpc.unary_stream_rpc_method_handler(
servicer.GetLimitedData,
request_deserializer=proto_dot_data__pb2.TimeLimitedQuery.FromString,
response_serializer=proto_dot_data__pb2.MeterData.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'DataService', rpc_method_handlers)
20 changes: 12 additions & 8 deletions python/stats/depends/data_model/sensor_data.py
Original file line number Diff line number Diff line change
@@ -17,23 +17,27 @@ def __init__(self, mongo):
self.__mgocli = mongo
self.schema = SensorDataModel.Schema()

def get_data_by_period(self, sensor_id, frm, to):
def get_data_by_period(self, sensor_id, frm=None, to=None):
low = frm
hight = to
sen_id = self.get_sensor_id(sensor_id)
coll = self.__mgocli['sensors_data'][sen_id]
for i in coll.find({'timestamp':{'$lt':hight, '$gt':low}}).sort('timestamp'):
timestamp_args = {}
if low is not None:
timestamp_args["$gt"] = low
if hight is not None:
timestamp_args["$lt"] = hight
for i in coll.find({'timestamp':timestamp_args}).sort('timestamp'):
yield i

def get_data_from(self, sensor, frm, limit):
def get_data_from(self, sensor, frm=None, limit=0):
hight = to
sen_id = self.get_sensor_id(sensor['sensor_id'])
coll = self.__mgocli['sensors_data'][sen_id]
ind = 0
for i in coll.find({'timestamp':{'$lt':hight}}).sort('timestamp'):
ind += 1
if ind >= limit:
break
timestamp_args = {}
if hight is not None:
timestamp_args["$lt"] = hight
for i in coll.find({'timestamp':timestamp_args}).sort('timestamp').limit(limit):
yield i

def insert_data(self, data):
117 changes: 114 additions & 3 deletions python/stats/depends/proto/data_pb2.py
17 changes: 17 additions & 0 deletions python/stats/depends/proto/data_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -19,6 +19,11 @@ def __init__(self, channel):
request_serializer=proto_dot_data__pb2.MeterQuery.SerializeToString,
response_deserializer=proto_dot_data__pb2.MeterData.FromString,
)
self.GetLimitedData = channel.unary_stream(
'/DataService/GetLimitedData',
request_serializer=proto_dot_data__pb2.TimeLimitedQuery.SerializeToString,
response_deserializer=proto_dot_data__pb2.MeterData.FromString,
)


class DataServiceServicer(object):
@@ -32,6 +37,13 @@ def GetSensorData(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetLimitedData(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_DataServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
@@ -40,6 +52,11 @@ def add_DataServiceServicer_to_server(servicer, server):
request_deserializer=proto_dot_data__pb2.MeterQuery.FromString,
response_serializer=proto_dot_data__pb2.MeterData.SerializeToString,
),
'GetLimitedData': grpc.unary_stream_rpc_method_handler(
servicer.GetLimitedData,
request_deserializer=proto_dot_data__pb2.TimeLimitedQuery.FromString,
response_serializer=proto_dot_data__pb2.MeterData.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'DataService', rpc_method_handlers)
20 changes: 12 additions & 8 deletions python/users/depends/data_model/sensor_data.py
Original file line number Diff line number Diff line change
@@ -17,23 +17,27 @@ def __init__(self, mongo):
self.__mgocli = mongo
self.schema = SensorDataModel.Schema()

def get_data_by_period(self, sensor_id, frm, to):
def get_data_by_period(self, sensor_id, frm=None, to=None):
low = frm
hight = to
sen_id = self.get_sensor_id(sensor_id)
coll = self.__mgocli['sensors_data'][sen_id]
for i in coll.find({'timestamp':{'$lt':hight, '$gt':low}}).sort('timestamp'):
timestamp_args = {}
if low is not None:
timestamp_args["$gt"] = low
if hight is not None:
timestamp_args["$lt"] = hight
for i in coll.find({'timestamp':timestamp_args}).sort('timestamp'):
yield i

def get_data_from(self, sensor, frm, limit):
def get_data_from(self, sensor, frm=None, limit=0):
hight = to
sen_id = self.get_sensor_id(sensor['sensor_id'])
coll = self.__mgocli['sensors_data'][sen_id]
ind = 0
for i in coll.find({'timestamp':{'$lt':hight}}).sort('timestamp'):
ind += 1
if ind >= limit:
break
timestamp_args = {}
if hight is not None:
timestamp_args["$lt"] = hight
for i in coll.find({'timestamp':timestamp_args}).sort('timestamp').limit(limit):
yield i

def insert_data(self, data):
117 changes: 114 additions & 3 deletions python/users/depends/proto/data_pb2.py
17 changes: 17 additions & 0 deletions python/users/depends/proto/data_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -19,6 +19,11 @@ def __init__(self, channel):
request_serializer=proto_dot_data__pb2.MeterQuery.SerializeToString,
response_deserializer=proto_dot_data__pb2.MeterData.FromString,
)
self.GetLimitedData = channel.unary_stream(
'/DataService/GetLimitedData',
request_serializer=proto_dot_data__pb2.TimeLimitedQuery.SerializeToString,
response_deserializer=proto_dot_data__pb2.MeterData.FromString,
)


class DataServiceServicer(object):
@@ -32,6 +37,13 @@ def GetSensorData(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetLimitedData(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_DataServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
@@ -40,6 +52,11 @@ def add_DataServiceServicer_to_server(servicer, server):
request_deserializer=proto_dot_data__pb2.MeterQuery.FromString,
response_serializer=proto_dot_data__pb2.MeterData.SerializeToString,
),
'GetLimitedData': grpc.unary_stream_rpc_method_handler(
servicer.GetLimitedData,
request_deserializer=proto_dot_data__pb2.TimeLimitedQuery.FromString,
response_serializer=proto_dot_data__pb2.MeterData.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'DataService', rpc_method_handlers)

0 comments on commit 8f8a1d5

Please sign in to comment.