diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 346faed..dc3c4a0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,17 +1,18 @@ repos: - # - repo: https://github.com/psf/black - # rev: 22.8.0 - # hooks: - # - id: black - # - repo: https://github.com/charliermarsh/ruff-pre-commit - # rev: v0.0.238 - # hooks: - # - id: ruff - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.4.0 hooks: - id: end-of-file-fixer - id: trailing-whitespace + - repo: https://github.com/astral-sh/ruff-pre-commit + # Ruff version. + rev: v0.9.0 + hooks: + # Run the linter. + - id: ruff + args: [ --fix ] + # Run the formatter. + - id: ruff-format #- repo: https://github.com/pre-commit/mirrors-clang-format # rev: v15.0.7 # hooks: diff --git a/README.md b/README.md index 0b289f2..8a7a206 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,8 @@ RecSync ======= The record synchronizer project includes two parts. -A client (RecCaster) which runing as part of an EPICS -IOC, and a server (RecCeiver) which is a stand alone +A client ([RecCaster](./client/README.md)) which runing as part of an EPICS +IOC, and a server ([RecCeiver](./server/README.md)) which is a stand alone daemon. Together they work to ensure the the server(s) have a complete list of all records currently provided by the client IOCs. diff --git a/client/README.md b/client/README.md new file mode 100644 index 0000000..4cc8d2e --- /dev/null +++ b/client/README.md @@ -0,0 +1 @@ +# Reccaster diff --git a/server/README b/server/README deleted file mode 100644 index 0441033..0000000 --- a/server/README +++ /dev/null @@ -1,28 +0,0 @@ -Server testing - -Setup - -$ sqlite3 test.db -init recceiver.sqlite3 .exit - -Run (for twistd <= 16.0.3) - -$ twistd -n recceiver -f demo.conf - -or (see below for discussion) - -$ twistd -r poll -n recceiver -f demo.conf - -Run (for twistd >= 16.0.4) - -$ PYTHONPATH=$PWD twistd -r poll -n recceiver -f demo.conf - -At some point 'twistd' stopped implicitly searching the working directory. - -May need to uncomment 'addrlist = 127.255.255.255:5049' in demo.conf -when doing local testing on a computer w/ a firewall. - -Twisted 14.0.2 seems to have a problem with the epoll() reactor -which raises 'IOError: [Errno 2] No such file or directory' -during startup. Try with the poll() reactor. - -$ twistd -r poll -n recceiver -f demo.conf diff --git a/server/README.md b/server/README.md new file mode 100644 index 0000000..d513055 --- /dev/null +++ b/server/README.md @@ -0,0 +1,63 @@ +# Recceiver + +Application for talking between IOCs (via [reccaster](../client)) and ChannelFinder (via [pyCFClient](https://github.com/ChannelFinder/pyCFClient)). + +Written using [twistd](https://twisted.org/). + +## Formatting and Linting + +Recceiver uses [ruff](https://docs.astral.sh/ruff/) for formatting and linting. See website for installation instructions. + + +```bash +ruff check +``` + +```bash +ruff check --fix +``` + +```bash +ruff format +``` + + +## Server testing + +Setup + +```bash + sqlite3 test.db -init recceiver.sqlite3 .exit +``` + +Run (for twistd <= 16.0.3) + +```bash +twistd -n recceiver -f demo.conf +``` + +or (see below for discussion) + +```bash +twistd -r poll -n recceiver -f demo.conf +``` + + +Run (for twistd >= 16.0.4) + +```bash +PYTHONPATH=$PWD twistd -r poll -n recceiver -f demo.conf +``` + +At some point 'twistd' stopped implicitly searching the working directory. + +May need to uncomment `addrlist = 127.255.255.255:5049` in demo.conf +when doing local testing on a computer w/ a firewall. + +Twisted 14.0.2 seems to have a problem with the epoll() reactor +which raises 'IOError: [Errno 2] No such file or directory' +during startup. Try with the poll() reactor. + +```bash +twistd -r poll -n recceiver -f demo.conf +``` diff --git a/server/recceiver/announce.py b/server/recceiver/announce.py index 25a8cc9..44ce82b 100644 --- a/server/recceiver/announce.py +++ b/server/recceiver/announce.py @@ -4,20 +4,26 @@ import struct from twisted.internet import protocol +from twisted.internet.error import MessageLengthError import logging _log = logging.getLogger(__name__) -_Ann = struct.Struct('>HH4sHHI') +_Ann = struct.Struct(">HH4sHHI") + +__all__ = ["Announcer"] -__all__ = ['Announcer'] class Announcer(protocol.DatagramProtocol): - def __init__(self, tcpport, key=0, - tcpaddr='\xff\xff\xff\xff', - udpaddrs=[('',5049)], - period=15.0): + def __init__( + self, + tcpport, + key=0, + tcpaddr="\xff\xff\xff\xff", + udpaddrs=[("", 5049)], + period=15.0, + ): from twisted.internet import reactor self.reactor = reactor @@ -25,42 +31,42 @@ def __init__(self, tcpport, key=0, if sys.version_info[0] < 3: self.msg = _Ann.pack(0x5243, 0, tcpaddr, tcpport, 0, key) else: - self.msg = _Ann.pack(0x5243, 0, tcpaddr.encode('latin-1'), tcpport, 0, key) + self.msg = _Ann.pack(0x5243, 0, tcpaddr.encode("latin-1"), tcpport, 0, key) self.delay = period self.udps = udpaddrs self.udpErr = set() self.D = None - if len(self.udps)==0: - raise RuntimeError('Announce list is empty at start time...') + if len(self.udps) == 0: + raise RuntimeError("Announce list is empty at start time...") def startProtocol(self): - _log.info('Setup Announcer') + _log.info("Setup Announcer") self.D = self.reactor.callLater(0, self.sendOne) # we won't process any receieved traffic, so no reason to wake # up for it... self.transport.pauseProducing() def stopProtocol(self): - _log.info('Stop Announcer') + _log.info("Stop Announcer") self.D.cancel() del self.D def datagramReceived(self, src): - pass # ignore + pass # ignore def sendOne(self): self.D = self.reactor.callLater(self.delay, self.sendOne) for A in self.udps: try: - _log.debug('announce to {s}'.format(s=A)) + _log.debug("announce to {s}".format(s=A)) self.transport.write(self.msg, A) try: self.udpErr.remove(A) - _log.warning('announce OK to {s}'.format(s=A)) + _log.warning("announce OK to {s}".format(s=A)) except KeyError: pass - except: + except MessageLengthError: if A not in self.udpErr: self.udpErr.add(A) - _log.exception('announce Error to {s}'.format(s=A)) + _log.exception("announce Error to {s}".format(s=A)) diff --git a/server/recceiver/application.py b/server/recceiver/application.py index 857dd09..4d768d0 100644 --- a/server/recceiver/application.py +++ b/server/recceiver/application.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- -import sys -import random, logging +import random +import logging from zope.interface import implementer @@ -18,56 +18,58 @@ _log = logging.getLogger(__name__) + class Log2Twisted(logging.StreamHandler): - """Print logging module stream to the twisted log - """ + """Print logging module stream to the twisted log""" + def __init__(self): - super(Log2Twisted,self).__init__(stream=self) + super(Log2Twisted, self).__init__(stream=self) # The Twisted log publisher adds a newline, so strip the newline added by the Python log handler. self.terminator = "" self.write = log.msg + def flush(self): pass -class RecService(service.MultiService): +class RecService(service.MultiService): def __init__(self, config): from twisted.internet import reactor self.reactor = reactor service.MultiService.__init__(self) - self.annperiod = float(config.get('announceInterval', '15.0')) - self.tcptimeout = float(config.get('tcptimeout', '15.0')) - self.commitperiod = float(config.get('commitInterval', '5.0')) - self.commitSizeLimit = int(config.get('commitSizeLimit', '0')) - self.maxActive = int(config.get('maxActive', '20')) - self.bind, _sep, portn = config.get('bind', '').strip().partition(':') + self.annperiod = float(config.get("announceInterval", "15.0")) + self.tcptimeout = float(config.get("tcptimeout", "15.0")) + self.commitperiod = float(config.get("commitInterval", "5.0")) + self.commitSizeLimit = int(config.get("commitSizeLimit", "0")) + self.maxActive = int(config.get("maxActive", "20")) + self.bind, _sep, portn = config.get("bind", "").strip().partition(":") self.addrlist = [] - self.port = int(portn or '0') + self.port = int(portn or "0") - for addr in config.get('addrlist', '').split(','): + for addr in config.get("addrlist", "").split(","): if not addr: continue - addr,_,port = addr.strip().partition(':') + addr, _, port = addr.strip().partition(":") if port: port = int(port) - if port<=0 or port>0xffff: - raise usage.UsageError('Port numbers must be in the range [1,65535]') + if port <= 0 or port > 0xFFFF: + raise usage.UsageError( + "Port numbers must be in the range [1,65535]" + ) else: port = 5049 self.addrlist.append((addr, port)) - if len(self.addrlist)==0: - self.addrlist = [('',5049)] - + if len(self.addrlist) == 0: + self.addrlist = [("", 5049)] def privilegedStartService(self): - - _log.info('Starting RecService') + _log.info("Starting RecService") # Start TCP server on random port self.tcpFactory = CastFactory() @@ -79,68 +81,75 @@ def privilegedStartService(self): # Attaching CastFactory to ProcessorController self.tcpFactory.commit = self.ctrl.commit - self.tcp = self.reactor.listenTCP(self.port, self.tcpFactory, - interface=self.bind) + self.tcp = self.reactor.listenTCP( + self.port, self.tcpFactory, interface=self.bind + ) try: self.tcp.startListening() except CannotListenError: - pass # older Twisted required this. - # newer Twisted errors. sigh... + pass # older Twisted required this. + # newer Twisted errors. sigh... # Find out which port is in use addr = self.tcp.getHost() - _log.info('RecService listening on {addr}'.format(addr=addr)) + _log.info("RecService listening on {addr}".format(addr=addr)) - self.key = random.randint(0,0xffffffff) + self.key = random.randint(0, 0xFFFFFFFF) # start up the UDP announcer - self.udpProto = Announcer(tcpport=addr.port, key=self.key, - udpaddrs=self.addrlist, - period=self.annperiod) - - self.udp = SharedUDP(self.port, self.udpProto, reactor=self.reactor, - interface=self.bind) + self.udpProto = Announcer( + tcpport=addr.port, + key=self.key, + udpaddrs=self.addrlist, + period=self.annperiod, + ) + + self.udp = SharedUDP( + self.port, self.udpProto, reactor=self.reactor, interface=self.bind + ) self.udp.startListening() # This will start up plugin Processors service.MultiService.privilegedStartService(self) def stopService(self): - _log.info('Stopping RecService') + _log.info("Stopping RecService") # This will stop plugin Processors D2 = defer.maybeDeferred(service.MultiService.stopService, self) U = defer.maybeDeferred(self.udp.stopListening) T = defer.maybeDeferred(self.tcp.stopListening) - return defer.DeferredList([U,T,D2], consumeErrors=True) + return defer.DeferredList([U, T, D2], consumeErrors=True) + class Options(usage.Options): - optParameters =[ - ("config","f",None,"Configuration file"), + optParameters = [ + ("config", "f", None, "Configuration file"), ] + @implementer(service.IServiceMaker, plugin.IPlugin) class Maker(object): - tapname = 'recceiver' - description = 'RecCaster receiver server' + tapname = "recceiver" + description = "RecCaster receiver server" options = Options def makeService(self, opts): - ctrl = ProcessorController(cfile=opts['config']) - conf = ctrl.config('recceiver') + ctrl = ProcessorController(cfile=opts["config"]) + conf = ctrl.config("recceiver") S = RecService(conf) S.addService(ctrl) S.ctrl = ctrl - lvlname = conf.get('loglevel', 'WARN') + lvlname = conf.get("loglevel", "WARN") lvl = logging.getLevelName(lvlname) - if not isinstance(lvl, (int, )): + if not isinstance(lvl, (int,)): print("Invalid loglevel {}. Setting to WARN level instead.".format(lvlname)) lvl = logging.WARN - fmt = conf.get('logformat', "%(levelname)s:%(name)s %(message)s") + fmt = conf.get("logformat", "%(levelname)s:%(name)s %(message)s") handle = Log2Twisted() handle.setFormatter(logging.Formatter(fmt)) diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 1c8e2c7..7893b52 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -2,7 +2,6 @@ import logging import socket -_log = logging.getLogger(__name__) from zope.interface import implementer @@ -20,6 +19,7 @@ import json from channelfinder import ChannelFinderClient +_log = logging.getLogger(__name__) # ITRANSACTION FORMAT: # @@ -31,11 +31,12 @@ # "recid: {key:value}" # -__all__ = ['CFProcessor'] +__all__ = ["CFProcessor"] -RECCEIVERID_KEY = 'recceiverID' +RECCEIVERID_KEY = "recceiverID" RECCEIVERID_DEFAULT = socket.gethostname() + @implementer(interfaces.IProcessor) class CFProcessor(service.Service): def __init__(self, name, conf): @@ -55,7 +56,7 @@ def startService(self): if not d.called: d.cancel() service.Service.stopService(self) - raise RuntimeError('Failed to acquired CF Processor lock for service start') + raise RuntimeError("Failed to acquired CF Processor lock for service start") try: self._startServiceWithLock() @@ -75,61 +76,73 @@ def _startServiceWithLock(self): """ self.client = ChannelFinderClient() try: - cf_props = [prop['name'] for prop in self.client.getAllProperties()] - reqd_props = {'hostName', 'iocName', 'pvStatus', 'time', 'iocid', 'iocIP', RECCEIVERID_KEY} - - if (self.conf.get('alias', 'default') == 'on'): - reqd_props.add('alias') - if (self.conf.get('recordType', 'default') == 'on'): - reqd_props.add('recordType') - env_vars_setting = self.conf.get('environment_vars') + cf_props = [prop["name"] for prop in self.client.getAllProperties()] + reqd_props = { + "hostName", + "iocName", + "pvStatus", + "time", + "iocid", + "iocIP", + RECCEIVERID_KEY, + } + + if self.conf.get("alias", "default") == "on": + reqd_props.add("alias") + if self.conf.get("recordType", "default") == "on": + reqd_props.add("recordType") + env_vars_setting = self.conf.get("environment_vars") self.env_vars = {} if env_vars_setting != "" and env_vars_setting is not None: - env_vars_dict = dict(item.strip().split(":") for item in env_vars_setting.split(",")) - self.env_vars = { k.strip():v.strip() for k, v in env_vars_dict.items()} + env_vars_dict = dict( + item.strip().split(":") for item in env_vars_setting.split(",") + ) + self.env_vars = { + k.strip(): v.strip() for k, v in env_vars_dict.items() + } for epics_env_var_name, cf_prop_name in self.env_vars.items(): reqd_props.add(cf_prop_name) # Standard property names for CA/PVA name server connections. These are # environment variables from reccaster so take advantage of env_vars - if self.conf.get('iocConnectionInfo', 'default') != 'off': + if self.conf.get("iocConnectionInfo", "default") != "off": self.env_vars["RSRV_SERVER_PORT"] = "caPort" self.env_vars["PVAS_SERVER_PORT"] = "pvaPort" reqd_props.add("caPort") reqd_props.add("pvaPort") - wl = self.conf.get('infotags', list()) + wl = self.conf.get("infotags", list()) if wl: - whitelist = [s.strip(', ') for s in wl.split()] + whitelist = [s.strip(", ") for s in wl.split()] else: whitelist = [] - if (self.conf.get('recordDesc', 'default') == 'on'): - whitelist.append('recordDesc') + if self.conf.get("recordDesc", "default") == "on": + whitelist.append("recordDesc") # Are any required properties not already present on CF? properties = reqd_props - set(cf_props) # Are any whitelisted properties not already present on CF? # If so, add them too. properties.update(set(whitelist) - set(cf_props)) - owner = self.conf.get('username', 'cfstore') + owner = self.conf.get("username", "cfstore") for prop in properties: - self.client.set(property={u'name': prop, u'owner': owner}) + self.client.set(property={"name": prop, "owner": owner}) self.whitelist = set(whitelist) - _log.debug('WHITELIST = {}'.format(self.whitelist)) + _log.debug("WHITELIST = {}".format(self.whitelist)) except ConnectionError: _log.exception("Cannot connect to Channelfinder service") raise else: - if self.conf.getboolean('cleanOnStart', True): + if self.conf.getboolean("cleanOnStart", True): self.clean_service() def stopService(self): - _log.info('CF_STOP') + _log.info("CF_STOP") service.Service.stopService(self) return self.lock.run(self._stopServiceWithLock) def _stopServiceWithLock(self): # Set channels to inactive and close connection to client - if self.conf.getboolean('cleanOnStop', True): + if self.conf.getboolean("cleanOnStop", True): self.clean_service() _log.info("CF_STOP with lock") @@ -175,7 +188,11 @@ def chainResult(_ignored): def _commitWithThread(self, TR): if not self.running: - raise defer.CancelledError('CF Processor is not running (TR: {host}:{port})', host=TR.src.host, port=TR.src.port) + raise defer.CancelledError( + "CF Processor is not running (TR: {host}:{port})", + host=TR.src.host, + port=TR.src.port, + ) _log.info("CF_COMMIT: {TR}".format(TR=TR)) """ @@ -187,10 +204,14 @@ def _commitWithThread(self, TR): host = TR.src.host port = TR.src.port - iocName = TR.infos.get('IOCNAME') or TR.src.port - hostName = TR.infos.get('HOSTNAME') or TR.src.host - owner = TR.infos.get('ENGINEER') or TR.infos.get('CF_USERNAME') or self.conf.get('username', 'cfstore') - time = self.currentTime(timezone=self.conf.get('timezone', None)) + iocName = TR.infos.get("IOCNAME") or TR.src.port + hostName = TR.infos.get("HOSTNAME") or TR.src.host + owner = ( + TR.infos.get("ENGINEER") + or TR.infos.get("CF_USERNAME") + or self.conf.get("username", "cfstore") + ) + time = self.currentTime(timezone=self.conf.get("timezone", None)) """The unique identifier for a particular IOC""" iocid = host + ":" + str(port) @@ -198,83 +219,123 @@ def _commitWithThread(self, TR): pvInfo = {} for rid, (rname, rtype) in TR.addrec.items(): pvInfo[rid] = {"pvName": rname} - if (self.conf.get('recordType', 'default' == 'on')): - pvInfo[rid]['recordType'] = rtype + if self.conf.get("recordType", "default" == "on"): + pvInfo[rid]["recordType"] = rtype for rid, (recinfos) in TR.recinfos.items(): # find intersection of these sets if rid not in pvInfo: - _log.warning('IOC: {iocid}: PV not found for recinfo with RID: {rid}'.format(iocid=iocid, rid=rid)) + _log.warning( + "IOC: {iocid}: PV not found for recinfo with RID: {rid}".format( + iocid=iocid, rid=rid + ) + ) continue recinfo_wl = [p for p in self.whitelist if p in recinfos.keys()] if recinfo_wl: - pvInfo[rid]['infoProperties'] = list() + pvInfo[rid]["infoProperties"] = list() for infotag in recinfo_wl: - property = {u'name': infotag, u'owner': owner, - u'value': recinfos[infotag]} - pvInfo[rid]['infoProperties'].append(property) + property = { + "name": infotag, + "owner": owner, + "value": recinfos[infotag], + } + pvInfo[rid]["infoProperties"].append(property) for rid, alias in TR.aliases.items(): if rid not in pvInfo: - _log.warning('IOC: {iocid}: PV not found for alias with RID: {rid}'.format(iocid=iocid, rid=rid)) + _log.warning( + "IOC: {iocid}: PV not found for alias with RID: {rid}".format( + iocid=iocid, rid=rid + ) + ) continue - pvInfo[rid]['aliases'] = alias + pvInfo[rid]["aliases"] = alias for rid in pvInfo: for epics_env_var_name, cf_prop_name in self.env_vars.items(): if TR.infos.get(epics_env_var_name) is not None: - property = {u'name': cf_prop_name, u'owner': owner, - u'value': TR.infos.get(epics_env_var_name)} + property = { + "name": cf_prop_name, + "owner": owner, + "value": TR.infos.get(epics_env_var_name), + } if "infoProperties" not in pvInfo[rid]: - pvInfo[rid]['infoProperties'] = list() - pvInfo[rid]['infoProperties'].append(property) + pvInfo[rid]["infoProperties"] = list() + pvInfo[rid]["infoProperties"].append(property) else: - _log.debug('EPICS environment var {env_var} listed in environment_vars setting list not found in this IOC: {iocName}'.format(env_var=epics_env_var_name, iocName=iocName)) + _log.debug( + "EPICS environment var {env_var} listed in environment_vars setting list not found in this IOC: {iocName}".format( + env_var=epics_env_var_name, iocName=iocName + ) + ) delrec = list(TR.delrec) _log.debug("Delete records: {s}".format(s=delrec)) - pvInfoByName = {} for rid, (info) in pvInfo.items(): if info["pvName"] in pvInfoByName: - _log.warning("Commit contains multiple records with PV name: {pv} ({iocid})".format(pv=info["pvName"], iocid=iocid)) + _log.warning( + "Commit contains multiple records with PV name: {pv} ({iocid})".format( + pv=info["pvName"], iocid=iocid + ) + ) continue pvInfoByName[info["pvName"]] = info _log.debug("Add record: {rid}: {info}".format(rid=rid, info=info)) if TR.initial: """Add IOC to source list """ - self.iocs[iocid] = {"iocname": iocName, "hostname": hostName, "iocIP": host, "owner": owner, "time": time, - "channelcount": 0} + self.iocs[iocid] = { + "iocname": iocName, + "hostname": hostName, + "iocIP": host, + "owner": owner, + "time": time, + "channelcount": 0, + } if not TR.connected: delrec.extend(self.channel_dict.keys()) for pv in pvInfoByName.keys(): self.channel_dict[pv].append(iocid) # add iocname to pvName in dict self.iocs[iocid]["channelcount"] += 1 """In case, alias exists""" - if (self.conf.get('alias', 'default' == 'on')): + if self.conf.get("alias", "default" == "on"): if pv in pvInfoByName and "aliases" in pvInfoByName[pv]: for a in pvInfoByName[pv]["aliases"]: - self.channel_dict[a].append(iocid) # add iocname to pvName in dict + self.channel_dict[a].append( + iocid + ) # add iocname to pvName in dict self.iocs[iocid]["channelcount"] += 1 for pv in delrec: if iocid in self.channel_dict[pv]: self.remove_channel(pv, iocid) """In case, alias exists""" - if (self.conf.get('alias', 'default' == 'on')): + if self.conf.get("alias", "default" == "on"): if pv in pvInfoByName and "aliases" in pvInfoByName[pv]: for a in pvInfoByName[pv]["aliases"]: self.remove_channel(a, iocid) - poll(__updateCF__, self, pvInfoByName, delrec, hostName, iocName, host, iocid, owner, time) + poll( + __updateCF__, + self, + pvInfoByName, + delrec, + hostName, + iocName, + host, + iocid, + owner, + time, + ) dict_to_file(self.channel_dict, self.iocs, self.conf) def remove_channel(self, a, iocid): self.channel_dict[a].remove(iocid) if iocid in self.iocs: self.iocs[iocid]["channelcount"] -= 1 - if self.iocs[iocid]['channelcount'] == 0: + if self.iocs[iocid]["channelcount"] == 0: self.iocs.pop(iocid, None) - elif self.iocs[iocid]['channelcount'] < 0: + elif self.iocs[iocid]["channelcount"] < 0: _log.error("Channel count negative: {s}", s=iocid) if len(self.channel_dict[a]) <= 0: # case: channel has no more iocs del self.channel_dict[a] @@ -285,7 +346,7 @@ def clean_service(self): """ sleep = 1 retry_limit = 5 - owner = self.conf.get('username', 'cfstore') + owner = self.conf.get("username", "cfstore") recceiverid = self.conf.get(RECCEIVERID_KEY, RECCEIVERID_DEFAULT) while 1: try: @@ -303,42 +364,66 @@ def clean_service(self): except RequestException as e: _log.error("Clean service failed: {s}".format(s=e)) retry_seconds = min(60, sleep) - _log.info("Clean service retry in {retry_seconds} seconds".format(retry_seconds=retry_seconds)) + _log.info( + "Clean service retry in {retry_seconds} seconds".format( + retry_seconds=retry_seconds + ) + ) time.sleep(retry_seconds) sleep *= 1.5 if self.running == 0 and sleep >= retry_limit: - _log.info("Abandoning clean after {retry_limit} seconds".format(retry_limit=retry_limit)) + _log.info( + "Abandoning clean after {retry_limit} seconds".format( + retry_limit=retry_limit + ) + ) return def get_active_channels(self, recceiverid): - return self.client.findByArgs(prepareFindArgs(self.conf, [('pvStatus', 'Active'), (RECCEIVERID_KEY, recceiverid)])) + return self.client.findByArgs( + prepareFindArgs( + self.conf, [("pvStatus", "Active"), (RECCEIVERID_KEY, recceiverid)] + ) + ) def clean_channels(self, owner, channels): new_channels = [] for ch in channels or []: - new_channels.append(ch[u'name']) - _log.info("Total channels to update: {nChannels}".format(nChannels=len(new_channels))) - _log.debug('Update "pvStatus" property to "Inactive" for {n_channels} channels'.format(n_channels=len(new_channels))) - self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"}, - channelNames=new_channels) + new_channels.append(ch["name"]) + _log.info( + "Total channels to update: {nChannels}".format(nChannels=len(new_channels)) + ) + _log.debug( + 'Update "pvStatus" property to "Inactive" for {n_channels} channels'.format( + n_channels=len(new_channels) + ) + ) + self.client.update( + property={"name": "pvStatus", "owner": owner, "value": "Inactive"}, + channelNames=new_channels, + ) def dict_to_file(dict, iocs, conf): - filename = conf.get('debug_file_loc', None) + filename = conf.get("debug_file_loc", None) if filename: if os.path.isfile(filename): os.remove(filename) list = [] for key in dict: - list.append([key, iocs[dict[key][-1]]['hostname'], iocs[dict[key][-1]]['iocname']]) + list.append( + [key, iocs[dict[key][-1]]["hostname"], iocs[dict[key][-1]]["iocname"]] + ) list.sort(key=itemgetter(0)) - with open(filename, 'w+') as f: + with open(filename, "w+") as f: json.dump(list, f) -def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocIP, iocid, owner, iocTime): +def __updateCF__( + proc, pvInfoByName, delrec, hostName, iocName, iocIP, iocid, owner, iocTime +): _log.info("CF Update IOC: {iocid}".format(iocid=iocid)) # Consider making this function a class methed then 'proc' simply becomes 'self' @@ -356,96 +441,216 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocIP, iocid, ow iocTime = iocs[iocid]["time"] iocIP = iocs[iocid]["iocIP"] else: - _log.warning('IOC Env Info not found: {iocid}'.format(iocid=iocid)) + _log.warning("IOC Env Info not found: {iocid}".format(iocid=iocid)) if hostName is None or iocName is None: - raise Exception('missing hostName or iocName') + raise Exception("missing hostName or iocName") if proc.cancelled: raise defer.CancelledError() channels = [] """A list of channels in channelfinder with the associated hostName and iocName""" - _log.debug('Find existing channels by IOCID: {iocid}'.format(iocid=iocid)) - old = client.findByArgs(prepareFindArgs(conf, [('iocid', iocid)])) + _log.debug("Find existing channels by IOCID: {iocid}".format(iocid=iocid)) + old = client.findByArgs(prepareFindArgs(conf, [("iocid", iocid)])) if proc.cancelled: raise defer.CancelledError() if old is not None: for ch in old: - if len(new) == 0 or ch[u'name'] in delrec: # case: empty commit/del, remove all reference to ioc - if ch[u'name'] in channels_dict: - ch[u'owner'] = iocs[channels_dict[ch[u'name']][-1]]["owner"] - ch[u'properties'] = __merge_property_lists(ch_create_properties(owner, iocTime, recceiverid, channels_dict, iocs, ch), - ch[u'properties']) - if (conf.get('recordType', 'default') == 'on'): - ch[u'properties'] = __merge_property_lists(ch[u'properties'].append({u'name': 'recordType', u'owner': owner, u'value': iocs[channels_dict[ch[u'name']][-1]]["recordType"]}), ch[u'properties']) + if ( + len(new) == 0 or ch["name"] in delrec + ): # case: empty commit/del, remove all reference to ioc + if ch["name"] in channels_dict: + ch["owner"] = iocs[channels_dict[ch["name"]][-1]]["owner"] + ch["properties"] = __merge_property_lists( + ch_create_properties( + owner, iocTime, recceiverid, channels_dict, iocs, ch + ), + ch["properties"], + ) + if conf.get("recordType", "default") == "on": + ch["properties"] = __merge_property_lists( + ch["properties"].append( + { + "name": "recordType", + "owner": owner, + "value": iocs[channels_dict[ch["name"]][-1]][ + "recordType" + ], + } + ), + ch["properties"], + ) channels.append(ch) - _log.debug("Add existing channel to previous IOC: {s}".format(s=channels[-1])) + _log.debug( + "Add existing channel to previous IOC: {s}".format( + s=channels[-1] + ) + ) """In case alias exist, also delete them""" - if (conf.get('alias', 'default') == 'on'): - if ch[u'name'] in pvInfoByName and "aliases" in pvInfoByName[ch[u'name']]: - for a in pvInfoByName[ch[u'name']]["aliases"]: - if a[u'name'] in channels_dict: - a[u'owner'] = iocs[channels_dict[a[u'name']][-1]]["owner"] - a[u'properties'] = __merge_property_lists(ch_create_properties(owner, iocTime, recceiverid, channels_dict, iocs, ch), - a[u'properties']) - if (conf.get('recordType', 'default') == 'on'): - ch[u'properties'] = __merge_property_lists(ch[u'properties'].append({u'name': 'recordType', u'owner': owner, u'value': iocs[channels_dict[a[u'name']][-1]]["recordType"]}), ch[u'properties']) + if conf.get("alias", "default") == "on": + if ( + ch["name"] in pvInfoByName + and "aliases" in pvInfoByName[ch["name"]] + ): + for a in pvInfoByName[ch["name"]]["aliases"]: + if a["name"] in channels_dict: + a["owner"] = iocs[channels_dict[a["name"]][-1]][ + "owner" + ] + a["properties"] = __merge_property_lists( + ch_create_properties( + owner, + iocTime, + recceiverid, + channels_dict, + iocs, + ch, + ), + a["properties"], + ) + if conf.get("recordType", "default") == "on": + ch["properties"] = __merge_property_lists( + ch["properties"].append( + { + "name": "recordType", + "owner": owner, + "value": iocs[ + channels_dict[a["name"]][-1] + ]["recordType"], + } + ), + ch["properties"], + ) channels.append(a) - _log.debug("Add existing alias to previous IOC: {s}".format(s=channels[-1])) + _log.debug( + "Add existing alias to previous IOC: {s}".format( + s=channels[-1] + ) + ) else: """Orphan the channel : mark as inactive, keep the old hostName and iocName""" - ch[u'properties'] = __merge_property_lists([{u'name': 'pvStatus', u'owner': owner, u'value': 'Inactive'}, - {u'name': 'time', u'owner': owner, u'value': iocTime}], - ch[u'properties']) + ch["properties"] = __merge_property_lists( + [ + {"name": "pvStatus", "owner": owner, "value": "Inactive"}, + {"name": "time", "owner": owner, "value": iocTime}, + ], + ch["properties"], + ) channels.append(ch) - _log.debug("Add orphaned channel with no IOC: {s}".format(s=channels[-1])) + _log.debug( + "Add orphaned channel with no IOC: {s}".format(s=channels[-1]) + ) """Also orphan any alias""" - if (conf.get('alias', 'default') == 'on'): - if ch[u'name'] in pvInfoByName and "aliases" in pvInfoByName[ch[u'name']]: - for a in pvInfoByName[ch[u'name']]["aliases"]: - a[u'properties'] = __merge_property_lists([{u'name': 'pvStatus', u'owner': owner, u'value': 'Inactive'}, - {u'name': 'time', u'owner': owner, u'value': iocTime}], - a[u'properties']) + if conf.get("alias", "default") == "on": + if ( + ch["name"] in pvInfoByName + and "aliases" in pvInfoByName[ch["name"]] + ): + for a in pvInfoByName[ch["name"]]["aliases"]: + a["properties"] = __merge_property_lists( + [ + { + "name": "pvStatus", + "owner": owner, + "value": "Inactive", + }, + { + "name": "time", + "owner": owner, + "value": iocTime, + }, + ], + a["properties"], + ) channels.append(a) - _log.debug("Add orphaned alias with no IOC: {s}".format(s=channels[-1])) + _log.debug( + "Add orphaned alias with no IOC: {s}".format( + s=channels[-1] + ) + ) else: - if ch[u'name'] in new: # case: channel in old and new + if ch["name"] in new: # case: channel in old and new """ Channel exists in Channelfinder with same hostname and iocname. Update the status to ensure it is marked active and update the time. """ - ch[u'properties'] = __merge_property_lists([{u'name': 'pvStatus', u'owner': owner, u'value': 'Active'}, - {u'name': 'time', u'owner': owner, u'value': iocTime}], - ch[u'properties']) + ch["properties"] = __merge_property_lists( + [ + {"name": "pvStatus", "owner": owner, "value": "Active"}, + {"name": "time", "owner": owner, "value": iocTime}, + ], + ch["properties"], + ) channels.append(ch) - _log.debug("Add existing channel with same IOC: {s}".format(s=channels[-1])) - new.remove(ch[u'name']) + _log.debug( + "Add existing channel with same IOC: {s}".format(s=channels[-1]) + ) + new.remove(ch["name"]) """In case, alias exist""" - if (conf.get('alias', 'default') == 'on'): - if ch[u'name'] in pvInfoByName and "aliases" in pvInfoByName[ch[u'name']]: - for a in pvInfoByName[ch[u'name']]["aliases"]: + if conf.get("alias", "default") == "on": + if ( + ch["name"] in pvInfoByName + and "aliases" in pvInfoByName[ch["name"]] + ): + for a in pvInfoByName[ch["name"]]["aliases"]: if a in old: """alias exists in old list""" - a[u'properties'] = __merge_property_lists([{u'name': 'pvStatus', u'owner': owner, u'value': 'Active'}, - {u'name': 'time', u'owner': owner, u'value': iocTime}], - a[u'properties']) + a["properties"] = __merge_property_lists( + [ + { + "name": "pvStatus", + "owner": owner, + "value": "Active", + }, + { + "name": "time", + "owner": owner, + "value": iocTime, + }, + ], + a["properties"], + ) channels.append(a) - new.remove(a[u'name']) + new.remove(a["name"]) else: """alias exists but not part of old list""" - aprops = __merge_property_lists([{u'name': 'pvStatus', u'owner': owner, u'value': 'Active'}, - {u'name': 'time', u'owner': owner, u'value': iocTime}, - {u'name': 'alias', u'owner': owner, u'value': ch[u'name']}], - ch[u'properties']) - channels.append({u'name': a[u'name'], - u'owner': owner, - u'properties': aprops}) - new.remove(a[u'name']) - _log.debug("Add existing alias with same IOC: {s}".format(s=channels[-1])) + aprops = __merge_property_lists( + [ + { + "name": "pvStatus", + "owner": owner, + "value": "Active", + }, + { + "name": "time", + "owner": owner, + "value": iocTime, + }, + { + "name": "alias", + "owner": owner, + "value": ch["name"], + }, + ], + ch["properties"], + ) + channels.append( + { + "name": a["name"], + "owner": owner, + "properties": aprops, + } + ) + new.remove(a["name"]) + _log.debug( + "Add existing alias with same IOC: {s}".format( + s=channels[-1] + ) + ) # now pvNames contains a list of pv's new on this host/ioc """A dictionary representing the current channelfinder information associated with the pvNames""" existingChannels = {} @@ -455,72 +660,96 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocIP, iocid, ow The search is split into groups to ensure that the size does not exceed 600 characters """ searchStrings = [] - searchString = '' + searchString = "" for pv in new: if not searchString: searchString = pv - elif (len(searchString) + len(pv) < 600): - searchString = searchString + '|' + pv + elif len(searchString) + len(pv) < 600: + searchString = searchString + "|" + pv else: searchStrings.append(searchString) - searchString=pv + searchString = pv if searchString: searchStrings.append(searchString) for eachSearchString in searchStrings: - _log.debug('Find existing channels by name: {search}'.format(search=eachSearchString)) - for ch in client.findByArgs(prepareFindArgs(conf, [('~name', eachSearchString)])): + _log.debug( + "Find existing channels by name: {search}".format(search=eachSearchString) + ) + for ch in client.findByArgs( + prepareFindArgs(conf, [("~name", eachSearchString)]) + ): existingChannels[ch["name"]] = ch if proc.cancelled: raise defer.CancelledError() for pv in new: - newProps = create_properties(owner, iocTime, recceiverid, hostName, iocName, iocIP, iocid) - if (conf.get('recordType', 'default') == 'on'): - newProps.append({u'name': 'recordType', u'owner': owner, u'value': pvInfoByName[pv]['recordType']}) + newProps = create_properties( + owner, iocTime, recceiverid, hostName, iocName, iocIP, iocid + ) + if conf.get("recordType", "default") == "on": + newProps.append( + { + "name": "recordType", + "owner": owner, + "value": pvInfoByName[pv]["recordType"], + } + ) if pv in pvInfoByName and "infoProperties" in pvInfoByName[pv]: newProps = newProps + pvInfoByName[pv]["infoProperties"] if pv in existingChannels: """update existing channel: exists but with a different hostName and/or iocName""" existingChannel = existingChannels[pv] - existingChannel["properties"] = __merge_property_lists(newProps, existingChannel["properties"]) + existingChannel["properties"] = __merge_property_lists( + newProps, existingChannel["properties"] + ) channels.append(existingChannel) - _log.debug("Add existing channel with different IOC: {s}".format(s=channels[-1])) + _log.debug( + "Add existing channel with different IOC: {s}".format(s=channels[-1]) + ) """in case, alias exists, update their properties too""" - if (conf.get('alias', 'default') == 'on'): + if conf.get("alias", "default") == "on": if pv in pvInfoByName and "aliases" in pvInfoByName[pv]: - alProps = [{u'name': 'alias', u'owner': owner, u'value': pv}] + alProps = [{"name": "alias", "owner": owner, "value": pv}] for p in newProps: alProps.append(p) for a in pvInfoByName[pv]["aliases"]: if a in existingChannels: ach = existingChannels[a] - ach["properties"] = __merge_property_lists(alProps, ach["properties"]) + ach["properties"] = __merge_property_lists( + alProps, ach["properties"] + ) channels.append(ach) else: - channels.append({u'name': a, - u'owner': owner, - u'properties': alProps}) - _log.debug("Add existing alias with different IOC: {s}".format(s=channels[-1])) + channels.append( + {"name": a, "owner": owner, "properties": alProps} + ) + _log.debug( + "Add existing alias with different IOC: {s}".format( + s=channels[-1] + ) + ) else: """New channel""" - channels.append({u'name': pv, - u'owner': owner, - u'properties': newProps}) + channels.append({"name": pv, "owner": owner, "properties": newProps}) _log.debug("Add new channel: {s}".format(s=channels[-1])) - if (conf.get('alias', 'default') == 'on'): + if conf.get("alias", "default") == "on": if pv in pvInfoByName and "aliases" in pvInfoByName[pv]: - alProps = [{u'name': 'alias', u'owner': owner, u'value': pv}] + alProps = [{"name": "alias", "owner": owner, "value": pv}] for p in newProps: alProps.append(p) for a in pvInfoByName[pv]["aliases"]: - channels.append({u'name': a, - u'owner': owner, - u'properties': alProps}) + channels.append( + {"name": a, "owner": owner, "properties": alProps} + ) _log.debug("Add new alias: {s}".format(s=channels[-1])) - _log.info("Total channels to update: {nChannels} {iocName}".format(nChannels=len(channels), iocName=iocName)) + _log.info( + "Total channels to update: {nChannels} {iocName}".format( + nChannels=len(channels), iocName=iocName + ) + ) if len(channels) != 0: client.set(channels=channels) else: @@ -529,22 +758,29 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocIP, iocid, ow if proc.cancelled: raise defer.CancelledError() + def create_properties(owner, iocTime, recceiverid, hostName, iocName, iocIP, iocid): return [ - {u'name': 'hostName', u'owner': owner, u'value': hostName}, - {u'name': 'iocName', u'owner': owner, u'value': iocName}, - {u'name': 'iocid', u'owner': owner, u'value': iocid}, - {u'name': 'iocIP', u'owner': owner, u'value': iocIP}, - {u'name': 'pvStatus', u'owner': owner, u'value': 'Active'}, - {u'name': 'time', u'owner': owner, u'value': iocTime}, - {u'name': RECCEIVERID_KEY, u'owner': owner, u'value': recceiverid}] + {"name": "hostName", "owner": owner, "value": hostName}, + {"name": "iocName", "owner": owner, "value": iocName}, + {"name": "iocid", "owner": owner, "value": iocid}, + {"name": "iocIP", "owner": owner, "value": iocIP}, + {"name": "pvStatus", "owner": owner, "value": "Active"}, + {"name": "time", "owner": owner, "value": iocTime}, + {"name": RECCEIVERID_KEY, "owner": owner, "value": recceiverid}, + ] + def ch_create_properties(owner, iocTime, recceiverid, channels_dict, iocs, ch): - return create_properties(owner, iocTime, recceiverid, - iocs[channels_dict[ch[u'name']][-1]]["hostname"], - iocs[channels_dict[ch[u'name']][-1]]["iocname"], - iocs[channels_dict[ch[u'name']][-1]]["iocIP"], - channels_dict[ch[u'name']][-1]) + return create_properties( + owner, + iocTime, + recceiverid, + iocs[channels_dict[ch["name"]][-1]]["hostname"], + iocs[channels_dict[ch["name"]][-1]]["iocname"], + iocs[channels_dict[ch["name"]][-1]]["iocIP"], + channels_dict[ch["name"]][-1], + ) def __merge_property_lists(newProperties, oldProperties): @@ -553,9 +789,9 @@ def __merge_property_lists(newProperties, oldProperties): the same name In case of overlap between the new and old property lists the new property list wins out """ - newPropNames = [ p[u'name'] for p in newProperties ] + newPropNames = [p["name"] for p in newProperties] for oldProperty in oldProperties: - if oldProperty[u'name'] not in newPropNames: + if oldProperty["name"] not in newPropNames: newProperties = newProperties + [oldProperty] return newProperties @@ -567,26 +803,42 @@ def getCurrentTime(timezone=False): def prepareFindArgs(conf, args, size=0): - size_limit = conf.get('findSizeLimit', size) + size_limit = conf.get("findSizeLimit", size) if size_limit > 0: - args.append(('~size', size_limit)) + args.append(("~size", size_limit)) return args -def poll(update, proc, pvInfoByName, delrec, hostName, iocName, iocIP, iocid, owner, iocTime): +def poll( + update, proc, pvInfoByName, delrec, hostName, iocName, iocIP, iocid, owner, iocTime +): _log.info("Polling {iocName} begins...".format(iocName=iocName)) sleep = 1 success = False while not success: try: - update(proc, pvInfoByName, delrec, hostName, iocName, iocIP, iocid, owner, iocTime) + update( + proc, + pvInfoByName, + delrec, + hostName, + iocName, + iocIP, + iocid, + owner, + iocTime, + ) success = True return success except RequestException as e: _log.error("ChannelFinder update failed: {s}".format(s=e)) retry_seconds = min(60, sleep) - _log.info("ChannelFinder update retry in {retry_seconds} seconds".format(retry_seconds=retry_seconds)) - #_log.debug(str(channels_dict)) + _log.info( + "ChannelFinder update retry in {retry_seconds} seconds".format( + retry_seconds=retry_seconds + ) + ) + # _log.debug(str(channels_dict)) time.sleep(retry_seconds) sleep *= 1.5 _log.info("Polling {iocName} complete".format(iocName=iocName)) diff --git a/server/recceiver/dbstore.py b/server/recceiver/dbstore.py index 189b7f6..6ac337d 100644 --- a/server/recceiver/dbstore.py +++ b/server/recceiver/dbstore.py @@ -13,7 +13,8 @@ _log = logging.getLogger(__name__) -__all__ = ['DBProcessor'] +__all__ = ["DBProcessor"] + @implementer(interfaces.IProcessor) class DBProcessor(service.Service): @@ -21,12 +22,12 @@ def __init__(self, name, conf): self.name, self.conf = name, conf self.Ds = set() self.done = False - self.tserver = self.conf.get('table.server', 'server') - self.tinfo = self.conf.get('table.info', 'servinfo') - self.trecord = self.conf.get('table.record', 'record') - self.tname = self.conf.get('table.record_name', 'record_name') - self.trecinfo = self.conf.get('table.recinfo', 'recinfo') - self.mykey = int(self.conf['idkey']) + self.tserver = self.conf.get("table.server", "server") + self.tinfo = self.conf.get("table.info", "servinfo") + self.trecord = self.conf.get("table.record", "record") + self.tname = self.conf.get("table.record_name", "record_name") + self.trecinfo = self.conf.get("table.recinfo", "recinfo") + self.mykey = int(self.conf["idkey"]) def decCount(self, X, D): assert len(self.Ds) > 0 @@ -47,23 +48,23 @@ def startService(self): self.sources = {} dbargs = {} - for arg in self.conf.get('dbargs', '').split(','): - key, _, val = arg.partition('=') + for arg in self.conf.get("dbargs", "").split(","): + key, _, val = arg.partition("=") key, val = key.strip(), val.strip() if not key or not val: continue dbargs[key] = val - if self.conf['dbtype'] == 'sqlite3': - if 'isolation_level' not in dbargs: - dbargs['isolation_level'] = 'IMMEDIATE' + if self.conf["dbtype"] == "sqlite3": + if "isolation_level" not in dbargs: + dbargs["isolation_level"] = "IMMEDIATE" # workaround twisted bug #3629 - dbargs['check_same_thread'] = False + dbargs["check_same_thread"] = False - self.pool = db.ConnectionPool(self.conf['dbtype'], - self.conf['dbname'], - **dbargs) + self.pool = db.ConnectionPool( + self.conf["dbtype"], self.conf["dbname"], **dbargs + ) self.waitFor(self.pool.runInteraction(self.cleanupDB)) @@ -82,68 +83,91 @@ def cleanupDB(self, cur): _log.info("Cleanup DBService") assert self.mykey != 0 - cur.execute('PRAGMA foreign_keys = ON;') - cur.execute('DELETE FROM %s WHERE owner=?' % self.tserver, - self.mykey) + cur.execute("PRAGMA foreign_keys = ON;") + cur.execute("DELETE FROM %s WHERE owner=?" % self.tserver, self.mykey) def commit(self, TR): return self.pool.runInteraction(self._commit, TR) def _commit(self, cur, TR): - cur.execute('PRAGMA foreign_keys = ON;') + cur.execute("PRAGMA foreign_keys = ON;") if not TR.initial: srvid = self.sources[TR.srcid] else: - cur.execute('INSERT INTO %s (hostname,port,owner) VALUES (?,?,?)' % self.tserver, - (TR.src.host, TR.src.port, self.mykey)) - cur.execute('SELECT id FROM %s WHERE hostname=? AND port=? AND owner=?' % self.tserver, - (TR.src.host, TR.src.port, self.mykey)) + cur.execute( + "INSERT INTO %s (hostname,port,owner) VALUES (?,?,?)" % self.tserver, + (TR.src.host, TR.src.port, self.mykey), + ) + cur.execute( + "SELECT id FROM %s WHERE hostname=? AND port=? AND owner=?" + % self.tserver, + (TR.src.host, TR.src.port, self.mykey), + ) R = cur.fetchone() srvid = R[0] self.sources[TR.srcid] = srvid if not TR.connected: - cur.execute('DELETE FROM %s where id=? AND owner=?' % self.tserver, - (srvid, self.mykey)) + cur.execute( + "DELETE FROM %s where id=? AND owner=?" % self.tserver, + (srvid, self.mykey), + ) del self.sources[TR.srcid] return # update client-wide infos - cur.executemany('INSERT OR REPLACE INTO %s (host,key,value) VALUES (?,?,?)' % self.tinfo, - [(srvid, K, V) for K, V in TR.infos.items()]) + cur.executemany( + "INSERT OR REPLACE INTO %s (host,key,value) VALUES (?,?,?)" % self.tinfo, + [(srvid, K, V) for K, V in TR.infos.items()], + ) # Remove all records, including those which will be re-created - cur.executemany('DELETE FROM %s WHERE host=? AND id=?' % self.trecord, - itertools.chain( - [(srvid, recid) for recid in TR.addrec], - [(srvid, recid) for recid in TR.delrec] - )) + cur.executemany( + "DELETE FROM %s WHERE host=? AND id=?" % self.trecord, + itertools.chain( + [(srvid, recid) for recid in TR.addrec], + [(srvid, recid) for recid in TR.delrec], + ), + ) # Start new records - cur.executemany('INSERT INTO %s (host, id, rtype) VALUES (?,?,?)' % self.trecord, - [(srvid, recid, rtype) for recid, (rname, rtype) in TR.addrec.items()]) + cur.executemany( + "INSERT INTO %s (host, id, rtype) VALUES (?,?,?)" % self.trecord, + [(srvid, recid, rtype) for recid, (rname, rtype) in TR.addrec.items()], + ) # Add primary record names - cur.executemany("""INSERT INTO %s (rec, rname, prim) VALUES ( + cur.executemany( + """INSERT INTO %s (rec, rname, prim) VALUES ( (SELECT pkey FROM %s WHERE id=? AND host=?) - ,?,1)""" % (self.tname, self.trecord), - [(recid, srvid, rname) for recid, (rname, rtype) in TR.addrec.items()]) + ,?,1)""" + % (self.tname, self.trecord), + [(recid, srvid, rname) for recid, (rname, rtype) in TR.addrec.items()], + ) # Add new record aliases - cur.executemany("""INSERT INTO %(name)s (rec, rname, prim) VALUES ( + cur.executemany( + """INSERT INTO %(name)s (rec, rname, prim) VALUES ( (SELECT pkey FROM %(rec)s WHERE id=? AND host=?) - ,?,0)""" % {'name': self.tname, 'rec': self.trecord}, - [(recid, srvid, rname) - for recid, names in TR.aliases.items() - for rname in names - ]) + ,?,0)""" + % {"name": self.tname, "rec": self.trecord}, + [ + (recid, srvid, rname) + for recid, names in TR.aliases.items() + for rname in names + ], + ) # add record infos - cur.executemany("""INSERT OR REPLACE INTO %s (rec,key,value) VALUES ( + cur.executemany( + """INSERT OR REPLACE INTO %s (rec,key,value) VALUES ( (SELECT pkey FROM %s WHERE id=? AND host=?) - ,?,?)""" % (self.trecinfo, self.trecord), - [(recid, srvid, K, V) - for recid, infos in TR.recinfos.items() - for K, V in infos.items() - ]) + ,?,?)""" + % (self.trecinfo, self.trecord), + [ + (recid, srvid, K, V) + for recid, infos in TR.recinfos.items() + for K, V in infos.items() + ], + ) diff --git a/server/recceiver/interfaces.py b/server/recceiver/interfaces.py index af1bb4f..e6af176 100644 --- a/server/recceiver/interfaces.py +++ b/server/recceiver/interfaces.py @@ -4,24 +4,24 @@ from twisted.application import service -class ITransaction(Interface): - src = Attribute('Source Address.') +class ITransaction(Interface): + src = Attribute("Source Address.") addrec = Attribute("""Records being added {recid: ('recname', 'rectype', {'key':'val'})} """) - delrec = Attribute('A set() of recids which are being removed') + delrec = Attribute("A set() of recids which are being removed") - infos = Attribute('A dictionary of new client wide infos') + infos = Attribute("A dictionary of new client wide infos") recinfos = Attribute("""Additional infos being added to existing records recid: {'key':'val'} """) -class IProcessor(service.IService): +class IProcessor(service.IService): def commit(transaction): """Consume and process the provided ITransaction. @@ -31,6 +31,7 @@ def commit(transaction): will be committed until it completes. """ + class IProcessorFactory(Interface): name = Attribute("A unique name identifying this factory") diff --git a/server/recceiver/mock_client.py b/server/recceiver/mock_client.py index 51ede44..73e5099 100644 --- a/server/recceiver/mock_client.py +++ b/server/recceiver/mock_client.py @@ -1,6 +1,8 @@ from twisted.internet.address import IPv4Address from requests import HTTPError -class mock_client(): + + +class mock_client: def __init__(self): self.cf = {} self.connected = True @@ -16,22 +18,22 @@ def findByArgs(self, args): if args[0][0] == "iocid": # returning old for ch in self.cf: name_flag = False - for props in self.cf[ch][u'properties']: - if props[u'name'] == args[0][0]: - if props[u'value'] == args[0][1]: + for props in self.cf[ch]["properties"]: + if props["name"] == args[0][0]: + if props["value"] == args[0][1]: name_flag = True if name_flag: result.append(self.cf[ch]) return result else: - if args[0][0] == '~name': + if args[0][0] == "~name": names = str(args[0][1]).split("|") - return [ self.cf[name] for name in names if name in self.cf ] - if args[0][0] == 'pvStatus' and args[0][1] == 'Active': + return [self.cf[name] for name in names if name in self.cf] + if args[0][0] == "pvStatus" and args[0][1] == "Active": for ch in self.cf: - for prop in self.cf[ch]['properties']: - if prop['name'] == 'pvStatus': - if prop['value'] == 'Active': + for prop in self.cf[ch]["properties"]: + if prop["name"] == "pvStatus": + if prop["value"] == "Active": result.append(self.cf[ch]) return result @@ -39,7 +41,7 @@ def findProperty(self, prop_name): if not self.connected: raise HTTPError("Mock Channelfinder Client HTTPError", response=self) else: - if prop_name in ['hostName', 'iocName', 'pvStatus', 'time', "iocid"]: + if prop_name in ["hostName", "iocName", "pvStatus", "time", "iocid"]: return prop_name def set(self, channels): @@ -57,29 +59,33 @@ def update(self, property, channelNames): self.__updateChannelWithProp(property, channel) def addChannel(self, channel): - self.cf[channel[u'name']] = channel + self.cf[channel["name"]] = channel def __updateChannelWithProp(self, property, channel): if channel in self.cf: - for prop in self.cf[channel]['properties']: - if prop['name'] == property['name']: - prop['value'] = property['value'] - prop['owner'] = property['owner'] # also overwriting owner because that's what CF does + for prop in self.cf[channel]["properties"]: + if prop["name"] == property["name"]: + prop["value"] = property["value"] + prop["owner"] = property[ + "owner" + ] # also overwriting owner because that's what CF does return -class mock_conf(): + +class mock_conf: def __init__(self): pass def get(self, name, target): return "cf-engi" -class mock_TR(): + +class mock_TR: def __init__(self): self.addrec = {} - self.src = IPv4Address('TCP', 'testhosta', 1111) + self.src = IPv4Address("TCP", "testhosta", 1111) self.delrec = () - self.infos = {'CF_USERNAME': 'cf-update', 'ENGINEER': 'cf-engi'} + self.infos = {"CF_USERNAME": "cf-update", "ENGINEER": "cf-engi"} self.initial = True self.connected = True self.fail_set = False diff --git a/server/recceiver/processors.py b/server/recceiver/processors.py index c633ff5..2cf85ae 100644 --- a/server/recceiver/processors.py +++ b/server/recceiver/processors.py @@ -1,10 +1,8 @@ # -*- coding: utf-8 -*- import logging -import sys from zope.interface import implementer -from zope.interface import provider from configparser import ConfigParser as Parser import configparser as ConfigParser @@ -17,13 +15,15 @@ from twisted.application import service from . import interfaces + _log = logging.getLogger(__name__) __all__ = [ - 'ShowProcessor', - 'ProcessorFactory', + "ShowProcessor", + "ProcessorFactory", ] + class ConfigAdapter(object): def __init__(self, conf, section): self._C, self._S = conf, section @@ -50,12 +50,13 @@ def __getitem__(self, key): try: return self._C.get(self._S, key) except ConfigParser.NoOptionError: - raise KeyError('No option value') + raise KeyError("No option value") -class ProcessorController(service.MultiService): +class ProcessorController(service.MultiService): defaults = {} - paths = ['/etc/recceiver.conf', '~/.recceiver.conf'] + paths = ["/etc/recceiver.conf", "~/.recceiver.conf"] + def __init__(self, cfile=None): service.MultiService.__init__(self) parser = Parser(self.defaults) @@ -63,28 +64,28 @@ def __init__(self, cfile=None): read = parser.read(map(expanduser, self.paths)) if cfile: - parser.read_file(open(cfile,'r')) + parser.read_file(open(cfile, "r")) - if not cfile and len(read)==0: + if not cfile and len(read) == 0: # no user configuration given so load some defaults - parser.add_section('recceiver') - parser.set('recceiver', 'procs', 'show') - elif not parser.has_section('recceiver'): - parser.add_section('recceiver') + parser.add_section("recceiver") + parser.set("recceiver", "procs", "show") + elif not parser.has_section("recceiver"): + parser.add_section("recceiver") - pnames = parser.get('recceiver', 'procs').split(',') + pnames = parser.get("recceiver", "procs").split(",") plugs = {} for plug in plugin.getPlugins(interfaces.IProcessorFactory): - _log.debug('Available plugin: {name}'.format(name=plug.name)) + _log.debug("Available plugin: {name}".format(name=plug.name)) plugs[plug.name] = plug self.procs = [] for P in pnames: P = P.strip() - plugname, _, instname = P.partition(':') + plugname, _, instname = P.partition(":") if not instname: instname = plugname @@ -102,30 +103,42 @@ def __init__(self, cfile=None): def config(self, section): if not self._C.has_section(section): - raise KeyError('No section') + raise KeyError("No section") return ConfigAdapter(self._C, section) def commit(self, trans): - def punish(err, B): if err.check(defer.CancelledError): - _log.debug('Cancel processing: {name}: {trans}'.format(name=B.name, trans=trans)) + _log.debug( + "Cancel processing: {name}: {trans}".format( + name=B.name, trans=trans + ) + ) return err try: self.procs.remove(B) - _log.error('Remove processor: {name}: {err}'.format(name=B.name, err=err)) - except: - _log.debug('Remove processor: {name}: aleady removed'.format(name=B.name)) + _log.error( + "Remove processor: {name}: {err}".format(name=B.name, err=err) + ) + except ValueError: + _log.debug( + "Remove processor: {name}: aleady removed".format(name=B.name) + ) return err - defers = [ defer.maybeDeferred(P.commit, trans).addErrback(punish, P) for P in self.procs ] + defers = [ + defer.maybeDeferred(P.commit, trans).addErrback(punish, P) + for P in self.procs + ] def findFirstError(result_list): for success, result in result_list: if not success: return result - return defer.DeferredList(defers, consumeErrors=True).addCallback(findFirstError) + return defer.DeferredList(defers, consumeErrors=True).addCallback( + findFirstError + ) @implementer(interfaces.IProcessor) @@ -139,7 +152,6 @@ def startService(self): _log.info("Show processor '{processor}' starting".format(processor=self.name)) def commit(self, transaction): - def withLock(_ignored): # Why doesn't coiterate() just handle cancellation!? t = task.cooperate(self._commit(transaction)) @@ -160,20 +172,23 @@ def releaseLock(result): return self.lock.acquire().addCallback(withLock) - def _commit(self, trans): _log.debug("# Show processor '{name}' commit".format(name=self.name)) - _log.info("# From {host}:{port}".format(host=trans.src.host,port=trans.src.port)) + _log.info( + "# From {host}:{port}".format(host=trans.src.host, port=trans.src.port) + ) if not trans.connected: _log.info("# connection lost") for item in trans.infos.items(): - _log.info(" epicsEnvSet('{name}','{value}')".format(name=item[0], value=item[1])) + _log.info( + " epicsEnvSet('{name}','{value}')".format(name=item[0], value=item[1]) + ) for rid, (rname, rtype) in trans.addrec.items(): - _log.info(" record({rtype}, \"{rname}\") {{".format(rtype=rtype, rname=rname)) + _log.info(' record({rtype}, "{rname}") {{'.format(rtype=rtype, rname=rname)) for alias in trans.aliases.get(rid, []): - _log.info(" alias(\"{alias}\")".format(alias=alias)) + _log.info(' alias("{alias}")'.format(alias=alias)) for item in trans.recinfos.get(rid, {}).items(): - _log.info(" info({name},\"{value}\")".format(name=item[0], value=[1])) + _log.info(' info({name},"{value}")'.format(name=item[0], value=[1])) _log.info(" }") yield _log.info("# End") diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index 1dcd858..aebf39f 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -1,13 +1,15 @@ # -*- coding: utf-8 -*- import logging -_log = logging.getLogger(__name__) -import sys, time +import sys +import time from zope.interface import implementer -import struct, collections, random, sys +import struct +import collections +import random from twisted.protocols import stateful from twisted.internet import defer @@ -15,28 +17,30 @@ from .interfaces import ITransaction +_log = logging.getLogger(__name__) + _M = 0x5243 -_Head = struct.Struct('>HHI') -assert _Head.size==8 +_Head = struct.Struct(">HHI") +assert _Head.size == 8 -_ping = struct.Struct('>I') -assert _ping.size==4 +_ping = struct.Struct(">I") +assert _ping.size == 4 -_s_greet = struct.Struct('>B') -assert _s_greet.size==1 +_s_greet = struct.Struct(">B") +assert _s_greet.size == 1 -_c_greet = struct.Struct('>BBxxI') -assert _c_greet.size==8 +_c_greet = struct.Struct(">BBxxI") +assert _c_greet.size == 8 -_c_info = struct.Struct('>IBxH') -assert _c_info.size==8 +_c_info = struct.Struct(">IBxH") +assert _c_info.size == 8 -_c_rec = struct.Struct('>IBBH') -assert _c_rec.size==8 +_c_rec = struct.Struct(">IBBH") +assert _c_rec.size == 8 -class CastReceiver(stateful.StatefulProtocol): +class CastReceiver(stateful.StatefulProtocol): timeout = 3.0 version = 0 @@ -59,7 +63,7 @@ def __init__(self, active=True): def writeMsg(self, msgid, body): head = _Head.pack(_M, msgid, len(body)) - msg = b''.join((head, body)) + msg = b"".join((head, body)) self.transport.write(msg) def dataReceived(self, data): @@ -98,7 +102,7 @@ def writePing(self): else: self.restartPingTimer() self.phase = 2 - self.nonce = random.randint(0,0xffffffff) + self.nonce = random.randint(0, 0xFFFFFFFF) self.writeMsg(0x8002, _ping.pack(self.nonce)) _log.debug("ping nonce: " + str(self.nonce)) @@ -108,21 +112,21 @@ def getInitialState(self): def recvHeader(self, data): self.restartPingTimer() magic, msgid, blen = _Head.unpack(data) - if magic!=_M: - _log.error('Protocol error! Bad magic {magic}'.format(magic=magic)) + if magic != _M: + _log.error("Protocol error! Bad magic {magic}".format(magic=magic)) self.transport.loseConnection() return self.msgid = msgid fn, minlen = self.rxfn[self.msgid] - if minlen>=0 and blen < minlen: + if minlen >= 0 and blen < minlen: return (self.ignoreBody, blen) else: return (fn, blen) # 0x0001 def recvClientGreeting(self, body): - cver, ctype, skey = _c_greet.unpack(body[:_c_greet.size]) - if ctype!=0: + cver, ctype, skey = _c_greet.unpack(body[: _c_greet.size]) + if ctype != 0: _log.error("I don't understand you! {s}".format(s=ctype)) self.transport.loseConnection() return @@ -133,25 +137,29 @@ def recvClientGreeting(self, body): # 0x0002 def recvPong(self, body): - nonce, = _ping.unpack(body[:_ping.size]) + (nonce,) = _ping.unpack(body[: _ping.size]) if nonce != self.nonce: - _log.error('pong nonce does not match! {pong_nonce}!={nonce}'.format(pong_nonce=nonce,nonce=self.nonce)) + _log.error( + "pong nonce does not match! {pong_nonce}!={nonce}".format( + pong_nonce=nonce, nonce=self.nonce + ) + ) self.transport.loseConnection() else: - _log.debug('pong nonce match') + _log.debug("pong nonce match") self.phase = 1 return self.getInitialState() # 0x0006 def recvInfo(self, body): - rid, klen, vlen = _c_info.unpack(body[:_c_info.size]) - text = body[_c_info.size:] + rid, klen, vlen = _c_info.unpack(body[: _c_info.size]) + text = body[_c_info.size :] text = text.decode() - if klen==0 or klen+vlen < len(text): - _log.error('Ignoring info update') + if klen == 0 or klen + vlen < len(text): + _log.error("Ignoring info update") return self.getInitialState() key = text[:klen] - val = text[klen:klen+vlen] + val = text[klen : klen + vlen] if rid: self.sess.recInfo(rid, key, val) else: @@ -160,26 +168,26 @@ def recvInfo(self, body): # 0x0003 def recvAddRec(self, body): - rid, rtype, rtlen, rnlen = _c_rec.unpack(body[:_c_rec.size]) - text = body[_c_rec.size:] + rid, rtype, rtlen, rnlen = _c_rec.unpack(body[: _c_rec.size]) + text = body[_c_rec.size :] text = text.decode() - if rnlen==0 or rtlen+rnlen0 and rtype==0:# new record + elif rtlen > 0 and rtype == 0: # new record rectype = text[:rtlen] - recname = text[rtlen:rtlen+rnlen] + recname = text[rtlen : rtlen + rnlen] self.sess.addRecord(rid, rectype, recname) - elif rtype==1: # record alias - recname = text[rtlen:rtlen+rnlen] + elif rtype == 1: # record alias + recname = text[rtlen : rtlen + rnlen] self.sess.addAlias(rid, recname) return self.getInitialState() # 0x0004 def recvDelRec(self, body): - rid = _ping.unpack(body[:_ping.size]) + rid = _ping.unpack(body[: _ping.size]) self.sess.delRecord(rid) return self.getInitialState() @@ -194,7 +202,11 @@ def recvDone(self, body): size_kb = self.uploadSize / 1024 rate_kbs = size_kb / elapsed_s src = "{}:{}".format(self.sess.ep.host, self.sess.ep.port) - _log.info('Done message from {src}: uploaded {size_kb}kB in {elapsed_s}s ({rate_kbs}kB/s)'.format(src=src, size_kb=size_kb, elapsed_s=elapsed_s, rate_kbs=rate_kbs)) + _log.info( + "Done message from {src}: uploaded {size_kb}kB in {elapsed_s}s ({rate_kbs}kB/s)".format( + src=src, size_kb=size_kb, elapsed_s=elapsed_s, rate_kbs=rate_kbs + ) + ) return self.getInitialState() @@ -229,7 +241,10 @@ def __str__(self): ndel = len(self.delrec) ninfo = len(self.recinfos) nalias = len(self.aliases) - return "Transaction(Src:{}, Init:{}, Conn:{}, Env:{}, Rec:{}, Alias:{}, Info:{}, Del:{})".format(src, init, conn, nenv, nadd, nalias, ninfo, ndel) + return "Transaction(Src:{}, Init:{}, Conn:{}, Env:{}, Rec:{}, Alias:{}, Info:{}, Del:{})".format( + src, init, conn, nenv, nadd, nalias, ninfo, ndel + ) + class CollectionSession(object): timeout = 5.0 @@ -253,7 +268,7 @@ def close(self): def suppressCancelled(err): if not err.check(defer.CancelledError): return err - _log.debug('Suppress the expected CancelledError') + _log.debug("Suppress the expected CancelledError") self.C.addErrback(suppressCancelled).cancel() @@ -274,15 +289,15 @@ def flush(self, connected=True): self.dirty = False def commit(_ignored): - _log.info('Commit: {TR}'.format(TR=TR)) + _log.info("Commit: {TR}".format(TR=TR)) return defer.maybeDeferred(self.factory.commit, TR) def abort(err): if err.check(defer.CancelledError): - _log.info('Commit cancelled: {TR}'.format(TR=TR)) + _log.info("Commit cancelled: {TR}".format(TR=TR)) return err else: - _log.error('Commit failure: {err}'.format(err=err)) + _log.error("Commit failure: {err}".format(err=err)) self.proto.transport.loseConnection() raise defer.CancelledError() @@ -294,7 +309,9 @@ def abort(err): def flushSafely(self): if self.T and self.T <= time.time(): self.flush() - elif self.trlimit and self.trlimit <= (len(self.TR.addrec) + len(self.TR.delrec)): + elif self.trlimit and self.trlimit <= ( + len(self.TR.addrec) + len(self.TR.delrec) + ): self.flush() def markDirty(self): @@ -334,6 +351,7 @@ def recInfo(self, rid, key, val): infos[key] = val self.markDirty() + class CastFactory(protocol.ServerFactory): protocol = CastReceiver session = CollectionSession @@ -351,7 +369,7 @@ def isDone(self, P, active): if not active: # connection closed before activation self.Wait.remove(P) - elif len(self.Wait)>0: + elif len(self.Wait) > 0: # Others are waiting P2 = self.Wait.pop(0) P2.active = True @@ -373,6 +391,6 @@ def addClient(self, proto, address): S.factory = self return S - #Note: this method replaced by RecService + # Note: this method replaced by RecService def commit(self, transaction): transaction.show() diff --git a/server/recceiver/scripts/add_extra_properties.py b/server/recceiver/scripts/add_extra_properties.py index 1cb0e11..a02cf70 100644 --- a/server/recceiver/scripts/add_extra_properties.py +++ b/server/recceiver/scripts/add_extra_properties.py @@ -1,23 +1,32 @@ from channelfinder import ChannelFinderClient -''' +""" Simple script for adding active channels to Channel Finder Service for testing cf-store clean If it gives a 500 error, run it again. Glassfish and CFS must be set up and running. -''' +""" def abbr(name, hostname, iocname, status): - return {u'owner': 'cf-update', - u'name': name, - u'properties': [ - {u'owner': 'cf-update', u'name': 'hostName', - u'value': hostname}, - {u'owner': 'cf-update', u'name': 'iocName', - u'value': iocname}, - {u'owner': 'cf-update', u'name': 'pvStatus', - u'value': status}, - {u'owner': 'cf-update', u'name': 'time', - u'value': '2016-08-18 12:33:09.953985'}]} + return { + "owner": "cf-update", + "name": name, + "properties": [ + {"owner": "cf-update", "name": "hostName", "value": hostname}, + {"owner": "cf-update", "name": "iocName", "value": iocname}, + {"owner": "cf-update", "name": "pvStatus", "value": status}, + { + "owner": "cf-update", + "name": "time", + "value": "2016-08-18 12:33:09.953985", + }, + ], + } + client = ChannelFinderClient() -client.set(channels=[abbr(u'ch1', 'testhosta', 1111, 'Active'), abbr(u'test_channel', 'testhosta', 1111, 'Active')]) +client.set( + channels=[ + abbr("ch1", "testhosta", 1111, "Active"), + abbr("test_channel", "testhosta", 1111, "Active"), + ] +) diff --git a/server/recceiver/scripts/print_cf_data.py b/server/recceiver/scripts/print_cf_data.py index 686a620..30a5a78 100644 --- a/server/recceiver/scripts/print_cf_data.py +++ b/server/recceiver/scripts/print_cf_data.py @@ -2,24 +2,26 @@ import json import os from operator import itemgetter + filename = "/home/devuser/cfdata" # change this to output file name client = ChannelFinderClient() def get_cf_data(client): - channels = client.findByArgs([('pvStatus', 'Active')]) + channels = client.findByArgs([("pvStatus", "Active")]) for ch in channels: - ch.pop('owner', None) - ch.pop('tags', None) - for prop in ch['properties']: - if prop['name'] == 'hostName': - ch['hostName'] = prop['value'] - if prop['name'] == 'iocName': - ch['iocName'] = prop['value'] - ch.pop('properties', None) + ch.pop("owner", None) + ch.pop("tags", None) + for prop in ch["properties"]: + if prop["name"] == "hostName": + ch["hostName"] = prop["value"] + if prop["name"] == "iocName": + ch["iocName"] = prop["value"] + ch.pop("properties", None) return channels + channels = get_cf_data(client) if os.path.isfile(filename): @@ -28,9 +30,9 @@ def get_cf_data(client): new_list = [] for channel in channels: - new_list.append([channel['name'], channel['hostName'], int(channel['iocName'])]) + new_list.append([channel["name"], channel["hostName"], int(channel["iocName"])]) new_list.sort(key=itemgetter(0)) -with open(filename, 'wrx') as f: +with open(filename, "x") as f: json.dump(new_list, f) diff --git a/server/recceiver/scripts/test_mock_iocs.py b/server/recceiver/scripts/test_mock_iocs.py index e389fbc..5cf3c8b 100644 --- a/server/recceiver/scripts/test_mock_iocs.py +++ b/server/recceiver/scripts/test_mock_iocs.py @@ -12,14 +12,14 @@ def startIOC(): pid, fd = os.forkpty() if pid == 0: os.chdir("../../../client/iocBoot/iocdemo") - print os.curdir - os.execv("st_test.cmd", ['']) + print(os.curdir) + os.execv("st_test.cmd", [""]) return pid, fd def readfd(fd): while 1: - empt = str(os.read(fd, 16384).strip("\n")) + _ = str(os.read(fd, 16384).strip("\n")) def handler(signum, frame): @@ -33,13 +33,17 @@ def main(): global pids pids = [] signal.signal(signal.SIGTERM, handler) - os.chdir(os.path.dirname(os.path.abspath(sys.argv[0]))) # Uses a filename, not good, also only works on linux? + os.chdir( + os.path.dirname(os.path.abspath(sys.argv[0])) + ) # Uses a filename, not good, also only works on linux? threads = [] for i in range(1, 100): iocpid, iocfd = startIOC() pids.append(iocpid) - print "len pids: ", len(pids) - iocthread = threading.Thread(group=None, target=readfd, args=(iocfd,), name="iocthread", kwargs={}) + print("len pids: ", len(pids)) + iocthread = threading.Thread( + group=None, target=readfd, args=(iocfd,), name="iocthread", kwargs={} + ) threads.append(iocthread) iocthread.start() try: @@ -48,5 +52,6 @@ def main(): except KeyboardInterrupt: sys.exit() -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/server/recceiver/test_cfstore.py b/server/recceiver/test_cfstore.py deleted file mode 100644 index d14c3a7..0000000 --- a/server/recceiver/test_cfstore.py +++ /dev/null @@ -1,267 +0,0 @@ -from twisted.trial.unittest import TestCase -from twisted.internet import defer -from mock_client import mock_conf -from mock_client import mock_TR -from mock_client import mock_client -from cfstore import CFProcessor - -import threading -import time - - -class MyTestCase(TestCase): - def setUp(self): - conf = mock_conf() - self.cfstore = CFProcessor("cf", conf) - self.cfstore.currentTime = getTime - self.maxDiff = None - - @defer.inlineCallbacks - def test_3_iocs(self): - self.cfclient = mock_client() - self.cfstore.client = self.cfclient - self.cfstore.startService() - TR1 = mock_TR() - TR1.src.host = 'testhosta' - TR1.src.port = 1111 - TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} - deferred = yield self.cfstore.commit(TR1) - self.assertDictEqual(self.cfstore.client.cf, - {u'ch1': abbr('cf-engi', u'ch1', 'testhosta', 1111, 'Active'), - u'ch5': abbr('cf-engi', u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr('cf-engi', u'ch6', 'testhosta', 1111, 'Active'), - u'ch7': abbr('cf-engi', u'ch7', 'testhosta', 1111, 'Active')}) - - TR2 = mock_TR() - TR2.src.host = 'testhostb' - TR2.src.port = 2222 - TR2.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 3: ('ch3', 'stringout'), 7: ('ch7', 'longout')} - deferred = yield self.cfstore.commit(TR2) - self.assertDictEqual(self.cfstore.client.cf, - {u'ch1': abbr('cf-engi', u'ch1', 'testhostb', 2222, 'Active'), - u'ch2': abbr('cf-engi', u'ch2', 'testhostb', 2222, 'Active'), - u'ch3': abbr('cf-engi', u'ch3', 'testhostb', 2222, 'Active'), - u'ch5': abbr('cf-engi', u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr('cf-engi', u'ch6', 'testhosta', 1111, 'Active'), - u'ch7': abbr('cf-engi', u'ch7', 'testhostb', 2222, 'Active')}) - - TR3 = mock_TR() - TR3.src.host = 'testhostc' - TR3.src.port = 3333 - TR3.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 4: ('ch4', 'stringin'), 6: ('ch6', 'stringin')} - deferred = yield self.cfstore.commit(TR3) - self.assertDictEqual(self.cfstore.client.cf, - {u'ch1': abbr('cf-engi', u'ch1', 'testhostc', 3333, 'Active'), - u'ch2': abbr('cf-engi', u'ch2', 'testhostc', 3333, 'Active'), - u'ch3': abbr('cf-engi', u'ch3', 'testhostb', 2222, 'Active'), - u'ch4': abbr('cf-engi', u'ch4', 'testhostc', 3333, 'Active'), - u'ch5': abbr('cf-engi', u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr('cf-engi', u'ch6', 'testhostc', 3333, 'Active'), - u'ch7': abbr('cf-engi', u'ch7', 'testhostb', 2222, 'Active')}) - - TR4 = mock_TR() - TR4.initial = False - TR4.addrec = {} - TR4.connected = False # simulated IOC Disconnect - TR4.src.host = 'testhostc' - TR4.src.port = 3333 - deferred = yield self.cfstore.commit(TR4) - assertChannelsDictEqual(self, self.cfstore.client.cf, - {u'ch1': abbr('cf-engi', u'ch1', 'testhostb', 2222, 'Active'), - u'ch2': abbr('cf-engi', u'ch2', 'testhostb', 2222, 'Active'), - u'ch3': abbr('cf-engi', u'ch3', 'testhostb', 2222, 'Active'), - u'ch4': abbr('cf-engi', u'ch4', 'testhostc', 3333, 'Inactive'), - u'ch5': abbr('cf-engi', u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr('cf-engi', u'ch6', 'testhosta', 1111, 'Active'), - u'ch7': abbr('cf-engi', u'ch7', 'testhostb', 2222, 'Active')}) - - TR5 = mock_TR() - TR5.src.host = 'testhostc' - TR5.src.port = 3333 - TR5.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 4: ('ch4', 'stringin'), 6: ('ch6', 'stringin')} - deferred = yield self.cfstore.commit(TR5) - assertChannelsDictEqual(self, self.cfstore.client.cf, - {u'ch1': abbr('cf-engi', u'ch1', 'testhostc', 3333, 'Active'), - u'ch2': abbr('cf-engi', u'ch2', 'testhostc', 3333, 'Active'), - u'ch3': abbr('cf-engi', u'ch3', 'testhostb', 2222, 'Active'), - u'ch4': abbr('cf-engi', u'ch4', 'testhostc', 3333, 'Active'), - u'ch5': abbr('cf-engi', u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr('cf-engi', u'ch6', 'testhostc', 3333, 'Active'), - u'ch7': abbr('cf-engi', u'ch7', 'testhostb', 2222, 'Active')}) - - TR6 = mock_TR() - TR6.initial = False - TR6.addrec = {} - TR6.connected = False # simulated IOC Disconnect - TR6.src.host = 'testhostb' - TR6.src.port = 2222 - deferred = yield self.cfstore.commit(TR6) - assertChannelsDictEqual(self, self.cfstore.client.cf, - {u'ch1': abbr('cf-engi', u'ch1', 'testhostc', 3333, 'Active'), - u'ch2': abbr('cf-engi', u'ch2', 'testhostc', 3333, 'Active'), - u'ch3': abbr('cf-engi', u'ch3', 'testhostb', 2222, 'Inactive'), - u'ch4': abbr('cf-engi', u'ch4', 'testhostc', 3333, 'Active'), - u'ch5': abbr('cf-engi', u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr('cf-engi', u'ch6', 'testhostc', 3333, 'Active'), - u'ch7': abbr('cf-engi', u'ch7', 'testhosta', 1111, 'Active')}) - - TR7 = mock_TR() - TR7.initial = False - TR7.addrec = {} - TR7.connected = False # simulated IOC Disconnect - TR7.src.host = 'testhosta' - TR7.src.port = 1111 - deferred = yield self.cfstore.commit(TR7) - assertChannelsDictEqual(self, self.cfstore.client.cf, - {u'ch1': abbr('cf-engi', u'ch1', 'testhostc', 3333, 'Active'), - u'ch2': abbr('cf-engi', u'ch2', 'testhostc', 3333, 'Active'), - u'ch3': abbr('cf-engi', u'ch3', 'testhostb', 2222, 'Inactive'), - u'ch4': abbr('cf-engi', u'ch4', 'testhostc', 3333, 'Active'), - u'ch5': abbr('cf-engi', u'ch5', 'testhosta', 1111, 'Inactive'), - u'ch6': abbr('cf-engi', u'ch6', 'testhostc', 3333, 'Active'), - u'ch7': abbr('cf-engi', u'ch7', 'testhosta', 1111, 'Inactive')}) - - TR8 = mock_TR() - TR8.initial = False - TR8.addrec = {} - TR8.connected = False # simulated IOC Disconnect - TR8.src.host = 'testhostc' - TR8.src.port = 3333 - deferred = yield self.cfstore.commit(TR8) - assertChannelsDictEqual(self, self.cfstore.client.cf, - {u'ch1': abbr('cf-engi', u'ch1', 'testhostc', 3333, 'Inactive'), - u'ch2': abbr('cf-engi', u'ch2', 'testhostc', 3333, 'Inactive'), - u'ch3': abbr('cf-engi', u'ch3', 'testhostb', 2222, 'Inactive'), - u'ch4': abbr('cf-engi', u'ch4', 'testhostc', 3333, 'Inactive'), - u'ch5': abbr('cf-engi', u'ch5', 'testhosta', 1111, 'Inactive'), - u'ch6': abbr('cf-engi', u'ch6', 'testhostc', 3333, 'Inactive'), - u'ch7': abbr('cf-engi', u'ch7', 'testhosta', 1111, 'Inactive')}) - - TR9 = mock_TR() - TR9.src.host = 'testhostb' - TR9.src.port = 2222 - TR9.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 3: ('ch3', 'stringout'), 7: ('ch7', 'longout')} - deferred = yield self.cfstore.commit(TR9) - self.assertDictEqual(self.cfstore.client.cf, - {u'ch1': abbr('cf-engi', u'ch1', 'testhostb', 2222, 'Active'), - u'ch2': abbr('cf-engi', u'ch2', 'testhostb', 2222, 'Active'), - u'ch3': abbr('cf-engi', u'ch3', 'testhostb', 2222, 'Active'), - u'ch4': abbr('cf-engi', u'ch4', 'testhostc', 3333, 'Inactive'), - u'ch5': abbr('cf-engi', u'ch5', 'testhosta', 1111, 'Inactive'), - u'ch6': abbr('cf-engi', u'ch6', 'testhostc', 3333, 'Inactive'), - u'ch7': abbr('cf-engi', u'ch7', 'testhostb', 2222, 'Active')}) - - TR10 = mock_TR() - TR10.src.host = 'testhosta' - TR10.src.port = 1111 - TR10.addrec = {1: ('ch1', 'longin'), 2: ('ch2', 'longout'), 3: ('ch3', 'stringout'), 4: ('ch4', 'stringin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} - deferred = yield self.cfstore.commit(TR10) - self.assertDictEqual(self.cfstore.client.cf, - {u'ch1': abbr('cf-engi', u'ch1', 'testhosta', 1111, 'Active'), - u'ch2': abbr('cf-engi', u'ch2', 'testhosta', 1111, 'Active'), - u'ch3': abbr('cf-engi', u'ch3', 'testhosta', 1111, 'Active'), - u'ch4': abbr('cf-engi', u'ch4', 'testhosta', 1111, 'Active'), - u'ch5': abbr('cf-engi', u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr('cf-engi', u'ch6', 'testhosta', 1111, 'Active'), - u'ch7': abbr('cf-engi', u'ch7', 'testhosta', 1111, 'Active')}) - - @defer.inlineCallbacks - def test_no_CFS(self): - self.cfclient = mock_client() - self.cfclient.connected = False - self.cfstore.client = self.cfclient - rcon_thread = threading.Timer(2, self.simulate_reconnect) - rcon_thread.start() - self.cfstore.startService() - TR1 = mock_TR() - TR1.src.host = 'testhosta' - TR1.src.port = 1111 - TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} - deferred = yield self.cfstore.commit(TR1) - self.assertDictEqual(self.cfstore.client.cf, - {u'ch1': abbr('cf-engi', u'ch1', 'testhosta', 1111, 'Active'), - u'ch5': abbr('cf-engi', u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr('cf-engi', u'ch6', 'testhosta', 1111, 'Active'), - u'ch7': abbr('cf-engi', u'ch7', 'testhosta', 1111, 'Active')}) - - @defer.inlineCallbacks - def test_set_fail(self): - self.cfclient = mock_client() - self.cfclient.fail_set = True - self.cfstore.client = self.cfclient - self.cfstore.startService() - TR1 = mock_TR() - TR1.src.host = 'testhosta' - TR1.src.port = 1111 - TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} - rcon_thread = threading.Timer(2, self.simulate_reconnect) - rcon_thread.start() - deferred = yield self.cfstore.commit(TR1) - self.assertDictEqual(self.cfstore.client.cf, - {u'ch1': abbr('cf-engi', u'ch1', 'testhosta', 1111, 'Active'), - u'ch5': abbr('cf-engi', u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr('cf-engi', u'ch6', 'testhosta', 1111, 'Active'), - u'ch7': abbr('cf-engi', u'ch7', 'testhosta', 1111, 'Active')}) - - @defer.inlineCallbacks - def test_clean_service(self): - self.cfclient = mock_client() - self.cfclient.fail_find = True - self.cfclient.cf = {u'ch1': abbr('cf-engi', u'ch1', 'testhostb', 2222, 'Active'), - u'ch5': abbr('cf-engi', u'ch5', 'testhostb', 2222, 'Active'), - u'ch6': abbr('cf-engi', u'ch6', 'testhostb', 2222, 'Active'), - u'ch7': abbr('cf-engi', u'ch7', 'testhostb', 2222, 'Active')} - self.cfstore.client = self.cfclient - rcon_thread = threading.Timer(2, self.simulate_reconnect) - rcon_thread.start() - self.cfstore.startService() - self.assertDictEqual(self.cfstore.client.cf, - {u'ch1': abbr('cf-engi', u'ch1', 'testhostb', 2222, 'Inactive'), - u'ch5': abbr('cf-engi', u'ch5', 'testhostb', 2222, 'Inactive'), - u'ch6': abbr('cf-engi', u'ch6', 'testhostb', 2222, 'Inactive'), - u'ch7': abbr('cf-engi', u'ch7', 'testhostb', 2222, 'Inactive')}) - TR1 = mock_TR() - TR1.src.host = 'testhosta' - TR1.src.port = 1111 - TR1.addrec = {1: ('ch1', 'longin'), 5: ('ch5', 'longin'), 6: ('ch6', 'stringin'), 7: ('ch7', 'longout')} - deferred = yield self.cfstore.commit(TR1) - self.assertDictEqual(self.cfstore.client.cf, - {u'ch1': abbr('cf-engi', u'ch1', 'testhosta', 1111, 'Active'), - u'ch5': abbr('cf-engi', u'ch5', 'testhosta', 1111, 'Active'), - u'ch6': abbr('cf-engi', u'ch6', 'testhosta', 1111, 'Active'), - u'ch7': abbr('cf-engi', u'ch7', 'testhosta', 1111, 'Active')}) - - def assertCommit(self, dict): - self.assertDictEqual(self.cfstore.client.cf, dict) - - def simulate_reconnect(self): - self.cfclient.connected = True # There is the potential for a race condition here - self.cfclient.fail_set = False # This would cause a connection failure and re-polling at a different point - self.cfclient.fail_find = False - -def assertChannelsDictEqual(self, chs1, chs2, msg=None): - """ - check equality ignoring the order of the properties - """ - for ch in chs1: - chs1[ch][u'properties'].sort() - for ch in chs2: - chs2[ch][u'properties'].sort() - self.assertDictEqual(chs1, chs2, msg=msg) - -def abbr(owner, name, hostname, iocname, status): - return {u'owner': owner, - u'name': name, - u'properties': [ - {u'owner': 'cf-engi', u'name': 'hostName', u'value': hostname}, - {u'owner': 'cf-engi', u'name': 'iocName', u'value': iocname}, - {u'owner': 'cf-engi', u'name': 'iocid', u'value': hostname+":"+str(iocname)}, - {u'owner': 'cf-engi', u'name': 'pvStatus', u'value': status}, - {u'owner': 'cf-engi', u'name': 'time', u'value': '2016-08-18 12:33:09.953985'}]} - - -def getTime(): - return '2016-08-18 12:33:09.953985' - -# if __name__ == '__main__': -# unittest.main() diff --git a/server/recceiver/udpbcast.py b/server/recceiver/udpbcast.py index 1b28a61..e79a05b 100644 --- a/server/recceiver/udpbcast.py +++ b/server/recceiver/udpbcast.py @@ -3,7 +3,8 @@ from twisted.internet import udp from twisted.application import internet -__all__ = ['SharedUDP','SharedUDPServer'] +__all__ = ["SharedUDP", "SharedUDPServer"] + class SharedUDP(udp.Port): """A UDP socket which can share @@ -17,21 +18,20 @@ class SharedUDP(udp.Port): def createInternetSocket(self): import socket - sock=udp.Port.createInternetSocket(self) - sock.setsockopt(socket.SOL_SOCKET, - socket.SO_REUSEADDR, 1) - sock.setsockopt(socket.SOL_SOCKET, - socket.SO_BROADCAST, 1) + + sock = udp.Port.createInternetSocket(self) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) return sock class SharedUDPServer(internet.UDPServer): - """A UDP server using SharedUDP - """ + """A UDP server using SharedUDP""" + def _getPort(self): from twisted.internet import reactor - R = getattr(self, 'reactor', reactor) + R = getattr(self, "reactor", reactor) port = SharedUDP(reactor=R, *self.args, **self.kwargs) port.startListening() return port diff --git a/server/twisted/plugins/recceiver_plugin.py b/server/twisted/plugins/recceiver_plugin.py index 3c0a374..f1b1d96 100644 --- a/server/twisted/plugins/recceiver_plugin.py +++ b/server/twisted/plugins/recceiver_plugin.py @@ -6,6 +6,6 @@ serviceMaker = Maker() -showfactory = processors.ProcessorFactory('show', processors.ShowProcessor) -dbfactory = processors.ProcessorFactory('db', dbstore.DBProcessor) -cffactory = processors.ProcessorFactory('cf', cfstore.CFProcessor) +showfactory = processors.ProcessorFactory("show", processors.ShowProcessor) +dbfactory = processors.ProcessorFactory("db", dbstore.DBProcessor) +cffactory = processors.ProcessorFactory("cf", cfstore.CFProcessor) diff --git a/test-client.py b/test-client.py deleted file mode 100644 index 5fca66a..0000000 --- a/test-client.py +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env python -'''Listen for recceiver Announcements -''' - -import sys, socket - -S = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) - -S.bind(('',5049)) - -print S, S.fileno() -while True: - print '>>', S.recvfrom(1024)