-
Notifications
You must be signed in to change notification settings - Fork 22
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Including __init__tests too, which seem to be pretty good.
- Loading branch information
Showing
2 changed files
with
140 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
""" | ||
This plugin delays processing of messages by *message_delay* seconds | ||
sarracenia.flowcb.msg.fdelay 30 | ||
import sarracenia.flowcb.filter.fdelay.Fdelay | ||
or more simply: | ||
fdelay 30 | ||
callback filter.fdelay | ||
every message will be at least 30 seconds old before it is forwarded by this plugin. | ||
in the meantime, the message is placed on the retry queue by marking it as failed. | ||
""" | ||
import logging | ||
|
||
from sarracenia.flowcb import FlowCB | ||
|
||
import json | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class GeoJSON(FlowCB): | ||
def __init__(self, options): | ||
|
||
super().__init__(options,logger) | ||
|
||
logging.basicConfig(format=self.o.logFormat, | ||
level=getattr(logging, self.o.logLevel.upper())) | ||
|
||
self.o.add_option('geometry', 'list', []) | ||
|
||
self.geometry_geojson = None | ||
if hasattr(self.o, 'geometry') and self.o.geometry != []: | ||
try: | ||
self.geometry_geojson = json.loads("\n".join(self.o.geometry)) | ||
except json.decoder.JSONDecodeError as err: | ||
logger.error(f"error parsing geometry from configuration file: {err}") | ||
raise | ||
|
||
def after_accept(self, worklist): | ||
outgoing = [] | ||
|
||
for m in worklist.incoming: | ||
|
||
#if geometry isn't configured, or the message doesn't have geometry | ||
#reject the message, and continue | ||
if 'geometry' not in m or self.geometry_geojson == None: | ||
logger.debug('No geometry found in message, or geometry not configured; rejecting') | ||
worklist.rejected.append(m) | ||
continue | ||
|
||
#Parse the message geometry field, and Json-ize it | ||
try: | ||
#We're just going to trust that geometry is a properly formatted GeoJSON object | ||
# Ultimately, if it's not, some of the logic in following sections will fail, and we'll have to catch those errors then | ||
message_geometry = json.loads(m['geometry']) | ||
except json.decoder.JSONDecodeError as err: | ||
logger.error(f"error parsing message geometry: {err}") | ||
|
||
geomotries_overlap = False | ||
try: | ||
# do the comparison, and figure out if the configured geometry contains the message's | ||
pass | ||
|
||
except err: | ||
# catch comparison errors, and add to "failed", logging a message | ||
worklist.failed.append(m) | ||
logger.error(f"error comparing: {err}") | ||
continue | ||
|
||
|
||
if geomotries_overlap: | ||
# If the message's GeoJSON point is in/intersects with the configured GeoJSON | ||
#add the message to outgoing | ||
outgoing.append(m) | ||
else: | ||
#add to rejected? | ||
worklist.rejected.append(m) | ||
|
||
worklist.incoming = outgoing |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import pytest | ||
import types | ||
import json | ||
from tests.conftest import * | ||
#from unittest.mock import Mock | ||
|
||
import sarracenia.config | ||
import sarracenia.flowcb.filter.geojson | ||
|
||
from sarracenia import Message as SR3Message | ||
import sarracenia.config | ||
|
||
def make_worklist(): | ||
WorkList = types.SimpleNamespace() | ||
WorkList.ok = [] | ||
WorkList.incoming = [] | ||
WorkList.rejected = [] | ||
WorkList.failed = [] | ||
WorkList.directories_ok = [] | ||
return WorkList | ||
|
||
def make_message(): | ||
m = SR3Message() | ||
m['new_file'] = '/foo/bar/NewFile.txt' | ||
m['new_dir'] = '/foo/bar' | ||
|
||
return m | ||
|
||
def test___init__(): | ||
options = sarracenia.config.default_config() | ||
options.logLevel = 'DEBUG' | ||
|
||
# Basic, happy path, without configured geometry | ||
geojson = sarracenia.flowcb.filter.geojson.GeoJSON(options) | ||
assert geojson.geometry_geojson == None | ||
|
||
|
||
# happy path with configured geometry | ||
options.geometry = [ | ||
'{"type": "Polygon",', | ||
' "coordinates": [', | ||
' [', | ||
' [-10.0, -10.0],', | ||
' [10.0, -10.0],', | ||
' [10.0, 10.0],', | ||
' [-10.0, -10.0]', | ||
' ]', | ||
' ]', | ||
'}' | ||
] | ||
geojson = sarracenia.flowcb.filter.geojson.GeoJSON(options) | ||
assert geojson.geometry_geojson['type'] == "Polygon" | ||
|
||
#unhappy path, with garbage geometry | ||
options.geometry = ['lkjasdf'] | ||
with pytest.raises(json.decoder.JSONDecodeError): | ||
geojson = sarracenia.flowcb.filter.geojson.GeoJSON(options) |