Skip to content

Commit

Permalink
Add AM AHL headers and put bulletin contents in message (#900)
Browse files Browse the repository at this point in the history
* Add verbosity to amserver

* Add logic to add AHL headers

* Add bulletin contents to message

* Add finishing touches (identity, content specifications)

* Add option for AM header completion & make condition more precise

* Add binary bulletin compatability + correct bulletin size/checksum

* Add statement to handle binary data + improve description

* Add option for specifying binary types and decode binary in iso-8859-1

* Fix binary bulletins, fix missing_ahl, add generalized encoding for content reception
  • Loading branch information
andreleblanc11 authored Jan 29, 2024
1 parent d13f5c7 commit 6e58d65
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 21 deletions.
2 changes: 1 addition & 1 deletion sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ def write_inline_file(self, msg) -> bool:
if msg['content']['encoding'] == 'base64':
data = b64decode(msg['content']['value'])
else:
data = msg['content']['value'].encode('utf-8')
data = msg['content']['value'].encode(msg['content']['encoding'])

if self.o.identity_method.startswith('cod,'):
algo_method = self.o.identity_method[4:]
Expand Down
132 changes: 112 additions & 20 deletions sarracenia/flowcb/amserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,26 @@
To start, when on_start is called, the socket binds, listens and afterwards accepts a connection when one is received from a remote host.
The receiver acts like an amtcp server. Instances are split into child/parent once an outbound connection is accepted.
The child proceeds to the receiver service while the parent stays and keeps accepting connections.
In the service, bulletins are received and are stored locally inside of a given filepath.
In the service, bulletins are received and are stored in a newly built sarracenia message from the contents field.
With the `download` option set to true, the file contents can then be ingested through the sarracenia main flow.
Options:
AllowIPs (list): Filters outbound connections with provided IPs. All other IPs won't be accepted.
If unselected, accept all IPs.
AllowIPs (list):
Filters outbound connections with provided IPs. All other IPs won't be accepted.
If unselected, accept all IPs.
directory (string): Specifies the directory where the bulletin files are to be stored.
MissingAMHeaders (string):
Specify headers to be added inside of the file contents.
binaryInitialCharacters (list):
Binary bulletins are characterised by having certain sets of characters on its second line.
This option allows to customise which binary strings to look for to determine if a bulletin is binary or not.
directory (string):
Specifies the directory where the bulletin files are to be stored.
NOTE: AM protocol cannot correct data corruption and cannot be stopped without data loss.
Precautions should be taken during maintenance interventions.
Precautions should be taken during maintenance interventions
Platform:
Linux/Mac: because of process forking, this will not work on Windows.
Expand All @@ -39,6 +49,7 @@
"""

import logging, socket, struct, time, sys, os, signal, ipaddress
from base64 import b64encode
import urllib.parse
import sarracenia
import sarracenia.config
Expand All @@ -59,7 +70,11 @@ def __init__(self, options):
self.limit = 32678
self.patternAM = '80sII4siIII20s'
self.sizeAM = struct.calcsize(self.patternAM)

self.o.add_option('AllowIPs', 'list', [])
self.o.add_option('MissingAMHeaders', 'str', 'CN00 CWAO')
self.o.add_option('binaryInitialCharacters', 'list', [b'BUFR' , b'GRIB', b'\211PNG'])

self.host = self.url.netloc.split(':')[0]
self.port = int(self.url.netloc.split(':')[1])
self.minnum = 00000
Expand Down Expand Up @@ -266,27 +281,104 @@ def gather(self):
logger.debug(f"Bulletin contents: {bulletin}")

parse = self.header.split(b'\0',1)

# We only want the first two letters of the bulletin.
bulletinHeader = parse[0].decode('iso-8859-1').replace(' ', '_')
firstchars = bulletinHeader[0:2]

# Create a file for new messages and let sarracenia format the data
# Treat bulletin contents and compose file name
try:
## Filenames have the following naming scheme:
## 1. Bulletin header
## 2. Counter (makes filename unique for each bulletin)
## IF A* or R* present in header, include in filename
## NOTE: Bulletin filenames have the following naming scheme
## 1. Bulletin header (composed of bulletin type, Issuing office, timestamp)
## 2. BBB, for amendments
## 3. Station (sometimes omitted, depending on the bulletin)
## 4. Counter (makes filename unique for each bulletin)
##
## Example: SXVX65_KWNB_181800_RRB__99705
filepath = self.o.directory + os.sep + bulletinHeader + '__' + f"{randint(self.minnum, self.maxnum)}".zfill(len(str(self.maxnum)))
## Example: SXVX65_KWNB_181800_RRB_WVR_99705
## Type | | | | |
## Issuing office | |
## Timestamps | |
## Amendment
## Station
## Random Integer

binary = 0
missing_ahl = self.o.MissingAMHeaders


filename = bulletinHeader + '__' + f"{randint(self.minnum, self.maxnum)}".zfill(len(str(self.maxnum)))
filepath = self.o.directory + os.sep + filename

lines = bulletin.splitlines()

# Determine if bulletin is binary or not
# From sundew source code
if lines[1][:4] in self.o.binaryInitialCharacters:
binary = 1

# Ported from Sundew. Complete missing headers from bulletins starting with the first characters below.
if firstchars in [ "CA", "RA", "MA" ]:

logger.debug("Adding missing headers in file contents")

lines[0] += missing_ahl.encode('iso-8859-1')

# Reconstruct the bulletin
new_bulletin = b''
for i in lines:
new_bulletin += i + b'\n'
bulletin = new_bulletin

logger.debug("Missing contents added")

except Exception as e:
logger.error(f"Unable to add AHL headers. Error message: {e}")

# Create sarracenia message
try:

msg = sarracenia.Message.fromFileInfo(filepath, self.o)

# Store the bulletin contents inside of the message.
if not binary:

# Data can't be binary. Post method fails with binary data, with JSON parser.
decoded_bulletin = bulletin.decode('iso-8859-1')

msg['content'] = {
"encoding":"iso-8859-1",
"value":decoded_bulletin
}

else:

decoded_bulletin = b64encode(bulletin).decode('ascii')

msg['content'] = {
"encoding":"base64",
"value":decoded_bulletin
}

# Receiver is looking for raw message.
msg['size'] = len(bulletin)

msg['new_file'] = filename
msg['new_dir'] = self.o.directory
msg.updatePaths(self.o, msg['new_dir'], msg['new_file'])

# Calculate the checksum
# There is always a default value given
ident = sarracenia.identity.Identity.factory(method=self.o.identity_method)
ident.set_path("") # the argument isn't used
ident.update(bulletin)
msg['identity'] = {'method':self.o.identity_method, 'value':ident.value}

logger.debug(f"New sarracenia message: {msg}")

file = open(filepath, 'wb')
file.write(bulletin)
file.close()
st = os.stat(filepath)
newmsg.append(msg)

sarramsg = sarracenia.Message.fromFileData(filepath, self.o, lstat=st)
newmsg.append(sarramsg)

except:
logger.error("Unable to generate bulletin file.")
except Exception as e:
logger.error(f"Unable to generate bulletin file. Error message: {e}")

return newmsg

0 comments on commit 6e58d65

Please sign in to comment.