Skip to content

Commit 202fb1b

Browse files
FIX: More generic way of using different (de)-serializers
Signed-off-by: Sebastian Waldbauer <[email protected]>
1 parent 5697faf commit 202fb1b

File tree

24 files changed

+205
-125
lines changed

24 files changed

+205
-125
lines changed

intelmq/bin/intelmqdump.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ def main():
349349
if queue_name in pipeline_pipes:
350350
if runtime_config[pipeline_pipes[queue_name]]['group'] == 'Parser' and json.loads(msg)['__type'] == 'Event':
351351
print('Event converted to Report automatically.')
352-
msg = message.Report(message.MessageFactory.unserialize(msg)).serialize()
352+
msg = message.Report(message.MessageFactory.deserialize(msg)).serialize()
353353
else:
354354
print(red("The given queue '{}' is not configured. Please retry with a valid queue.".format(queue_name)))
355355
break

intelmq/bots/collectors/amqp/collector_amqp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def process(self):
7979
self.logger.exception('Error receiving messages.')
8080
else:
8181
if self.expect_intelmq_message:
82-
message = MessageFactory.unserialize(body.decode())
82+
message = MessageFactory.deserialize(body.decode())
8383
self.send_message(message, auto_add=False)
8484
else:
8585
report = self.new_report()

intelmq/bots/parsers/json/parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def process(self):
2626
lines = [base64_decode(report['raw'])]
2727

2828
for line in lines:
29-
new_event = MessageFactory.unserialize(line,
29+
new_event = MessageFactory.deserialize(line,
3030
harmonization=self.harmonization,
3131
default_type='Event',
3232
use_packer="json")

intelmq/lib/bot.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import inspect
1818
import io
1919
import json
20-
import msgpack
2120
import logging
2221
import os
2322
import re
@@ -99,6 +98,7 @@ class Bot(object):
9998
statistics_host: str = "127.0.0.1"
10099
statistics_password: Optional[str] = None
101100
statistics_port: int = 6379
101+
pipeline_use_packer: str = os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')
102102

103103
_message_processed_verb: str = 'Processed'
104104

@@ -321,8 +321,8 @@ def start(self, starting: bool = True, error_on_pipeline: bool = True,
321321
self.logger.error('Pipeline failed.')
322322
self.__disconnect_pipelines()
323323

324-
except exceptions.UnserializationError as exc:
325-
self.logger.exception('Could not unserialize message from pipeline. No retries useful.')
324+
except exceptions.DeserializationError as exc:
325+
self.logger.exception('Could not deserialize message from pipeline. No retries useful.')
326326

327327
# ensure that we do not re-process the faulty message
328328
self.__error_retries_counter = self.error_max_retries + 1
@@ -657,7 +657,7 @@ def receive_message(self):
657657
return self.receive_message()
658658

659659
try:
660-
self.__current_message = libmessage.MessageFactory.unserialize(message,
660+
self.__current_message = libmessage.MessageFactory.deserialize(message,
661661
harmonization=self.harmonization)
662662
except exceptions.InvalidKey as exc:
663663
# In case a incoming message is malformed an does not conform with the currently
@@ -808,7 +808,7 @@ def __init_logger(self):
808808

809809
def __log_configuration_parameter(self, config_name: str, option: str, value: Any):
810810
if "password" in option or "token" in option:
811-
value = "HIDDEN"
811+
value = "<redacted>"
812812

813813
message = "{} configuration: parameter {!r} loaded with value {!r}." \
814814
.format(config_name.title(), option, value)
@@ -1369,9 +1369,8 @@ def export_event(self, event: libmessage.Event,
13691369
if 'raw' in event:
13701370
del event['raw']
13711371
if return_type is str:
1372-
return event.to_json(hierarchical=self.hierarchical,
1373-
with_type=self.with_type,
1374-
jsondict_as_string=self.jsondict_as_string)
1372+
return event.to_pack("JSON", hierarchical=self.hierarchical,
1373+
with_type=self.with_type)
13751374
else:
13761375
retval = event.to_dict(hierarchical=self.hierarchical,
13771376
with_type=self.with_type,

intelmq/lib/bot_debugger.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ def outputappend(self, msg):
169169
def arg2msg(self, msg):
170170
default_type = "Report" if (self.runtime_configuration.get("group", None) == "Parser" or isinstance(self.instance, ParserBot)) else "Event"
171171
try:
172-
msg = MessageFactory.unserialize(msg, default_type=default_type)
172+
msg = MessageFactory.deserialize(msg, default_type=default_type)
173173
except (Exception, KeyError, TypeError, ValueError) as exc:
174174
if exists(msg):
175175
with open(msg, "r") as f:

intelmq/lib/exceptions.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,24 @@ def __init__(self, encodings=None, exception: UnicodeDecodeError = None,
174174
super().__init__("Could not decode string%s." % suffix)
175175

176176

177-
class UnserializationError(IntelMQException, ValueError):
177+
class DeserializationError(IntelMQException, ValueError):
178178
"""
179-
Unrecoverable error during message unserialization
179+
Unrecoverable error during message deserialization
180180
"""
181181
def __init__(self, exception: Exception = None, object: bytes = None):
182182
self.object = object
183-
super().__init__("Could not unserialize message%s." % exception)
183+
super().__init__("Could not deserialize message, %s." % exception)
184+
185+
186+
class SerializationError(IntelMQException, ValueError):
187+
"""
188+
Unrecoverable error during message serialization
189+
"""
190+
def __init__(self, exception: Exception = None, object: bytes = None):
191+
self.object = object
192+
super().__init__("Could not serialize message, %s." % exception)
193+
194+
195+
class MissingPackerError(IntelMQException):
196+
def __init__(self, packer: str):
197+
super().__init__(f"Could not load '{packer}' as packer, please check intelmq.lib.packers.{packer.lower()} and documentation")

intelmq/lib/message.py

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,27 @@
99
Use MessageFactory to get a Message object (types Report and Event).
1010
"""
1111
import hashlib
12+
import importlib
13+
import inspect
1214
import json
1315
import re
1416
import warnings
1517
from collections import defaultdict
1618
from typing import Any, Dict, Iterable, Optional, Sequence, Union
17-
import msgpack
1819

1920
import intelmq.lib.exceptions as exceptions
2021
import intelmq.lib.harmonization
2122
from intelmq import HARMONIZATION_CONF_FILE
2223
from intelmq.lib import utils
24+
from intelmq.lib.packers.packer import Packer
2325

2426
__all__ = ['Event', 'Message', 'MessageFactory', 'Report']
2527
VALID_MESSSAGE_TYPES = ('Event', 'Message', 'Report')
2628

2729

2830
class MessageFactory(object):
2931
"""
30-
unserialize: JSON encoded message to object
32+
deserialize: JSON encoded message to object
3133
serialize: object to JSON encoded object
3234
"""
3335

@@ -43,7 +45,7 @@ def from_dict(message: dict, harmonization=None,
4345
default_type: If '__type' is not present in message, the given type will be used
4446
4547
See also:
46-
MessageFactory.unserialize
48+
MessageFactory.deserialize
4749
MessageFactory.serialize
4850
"""
4951
if default_type and "__type" not in message:
@@ -59,8 +61,8 @@ def from_dict(message: dict, harmonization=None,
5961
return class_reference(message, auto=True, harmonization=harmonization)
6062

6163
@staticmethod
62-
def unserialize(raw_message: bytes, harmonization: dict = None,
63-
default_type: Optional[str] = None, use_packer: str = "msgpack") -> dict:
64+
def deserialize(raw_message: bytes, harmonization: dict = None,
65+
default_type: Optional[str] = None, use_packer: str = "MsgPack") -> dict:
6466
"""
6567
Takes JSON-encoded Message object, returns instance of correct class.
6668
@@ -73,19 +75,18 @@ def unserialize(raw_message: bytes, harmonization: dict = None,
7375
MessageFactory.from_dict
7476
MessageFactory.serialize
7577
"""
76-
message = Message.unserialize(raw_message, use_packer=use_packer)
78+
message = Message.deserialize(raw_message, use_packer=use_packer)
7779
return MessageFactory.from_dict(message, harmonization=harmonization,
7880
default_type=default_type)
7981

8082
@staticmethod
81-
def serialize(message) -> bytes:
83+
def serialize(message, use_packer: str = 'MsgPack') -> bytes:
8284
"""
8385
Takes instance of message-derived class and makes JSON-encoded Message.
8486
8587
The class is saved in __type attribute.
8688
"""
87-
raw_message = Message.serialize(message)
88-
return raw_message
89+
return Message.serialize(message, use_packer=use_packer)
8990

9091

9192
class Message(dict):
@@ -305,36 +306,43 @@ def copy(self):
305306
return retval
306307

307308
def deep_copy(self):
308-
return MessageFactory.unserialize(MessageFactory.serialize(self),
309+
return MessageFactory.deserialize(MessageFactory.serialize(self),
309310
harmonization={self.__class__.__name__.lower(): self.harmonization_config})
310311

311312
def __str__(self):
312-
return self.serialize(use_packer="json")
313+
return self.serialize(use_packer="JSON")
313314

314-
def serialize(self, use_packer: str = "msgpack"):
315+
def serialize(self, use_packer: str = "MsgPack"):
315316
delete_type = False
316317
if '__type' not in self:
317318
delete_type = True
318319
self['__type'] = self.__class__.__name__
319320

320-
if use_packer == "json":
321-
packed = json.dumps(self)
322-
else:
323-
packed = msgpack.packb(self)
321+
try:
322+
packer: Packer = inspect.getmembers(importlib.import_module(f'intelmq.lib.packers.{use_packer.lower()}.packer'))[0][1]()
323+
except:
324+
raise exceptions.MissingPackerError(packer=use_packer)
325+
326+
try:
327+
packed = packer.serialize(data=self)
328+
except Exception as exc:
329+
raise exceptions.SerializationError(exception=exc, object=self)
324330

325331
if delete_type:
326332
del self['__type']
327333
return packed
328334

329335
@staticmethod
330-
def unserialize(message: bytes, use_packer: str = "msgpack"):
336+
def deserialize(message: bytes, use_packer: str = "MsgPack"):
331337
try:
332-
if use_packer == "json":
333-
return json.loads(message)
334-
else:
335-
return msgpack.unpackb(message, raw=False)
338+
packer: Packer = inspect.getmembers(importlib.import_module(f'intelmq.lib.packers.{use_packer.lower()}.packer'))[0][1]()
339+
except:
340+
raise exceptions.MissingPackerError(packer=use_packer)
341+
342+
try:
343+
return packer.deserialize(data=message)
336344
except Exception as exc:
337-
raise exceptions.UnserializationError(exception=exc, object=message)
345+
raise exceptions.DeserializationError(exception=exc, object=message)
338346

339347
def __is_valid_key(self, key: str):
340348
try:
@@ -485,13 +493,17 @@ def to_dict(self, hierarchical: bool = False, with_type: bool = False,
485493

486494
return new_dict
487495

488-
def to_json(self, hierarchical=False, with_type=False, jsondict_as_string=False):
489-
json_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type)
490-
return json.dumps(json_dict, ensure_ascii=False, sort_keys=True)
496+
def to_pack(self, use_packer="MsgPack", hierarchical=False, with_type=False, **kwargs):
497+
try:
498+
packer: Packer = inspect.getmembers(importlib.import_module(f'intelmq.lib.packers.{use_packer.lower()}.packer'))[0][1]()
499+
except:
500+
raise exceptions.MissingPackerError(packer=use_packer)
491501

492-
def to_msgpack(self, hierarchical=False, with_type=False):
493-
msgpack_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type)
494-
return msgpack.packb(msgpack_dict)
502+
try:
503+
data = self.to_dict(hierarchical=hierarchical, with_type=with_type)
504+
return packer.serialize(data, **kwargs)
505+
except Exception as exc:
506+
raise exceptions.SerializationError(exception=exc, object=self)
495507

496508
def __eq__(self, other: dict) -> bool:
497509
"""

intelmq/lib/packers/__init__.py

Whitespace-only changes.

intelmq/lib/packers/json/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# SPDX-FileCopyrightText: 2022 CERT.at GmbH <[email protected]>
2+
#
3+
# SPDX-License-Identifier: AGPL-3.0-or-later
4+
5+
# -*- coding: utf-8 -*-

intelmq/lib/packers/json/packer.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# SPDX-FileCopyrightText: 2022 CERT.at GmbH <[email protected]>
2+
#
3+
# SPDX-License-Identifier: AGPL-3.0-or-later
4+
5+
# -*- coding: utf-8 -*-
6+
7+
from intelmq.lib.packers.packer import Packer
8+
import json
9+
10+
11+
class JSON(Packer):
12+
def __init__(self) -> None:
13+
super().__init__()
14+
15+
def serialize(self, data, **kwargs) -> bytes:
16+
return json.dumps(data, **kwargs)
17+
18+
def deserialize(self, data, **kwargs) -> object:
19+
return json.loads(data, **kwargs)

0 commit comments

Comments
 (0)