diff --git a/autotest/autotest_services/processes/test02_identity_complex.py b/autotest/autotest_services/processes/test02_identity_complex.py index 1a2b86b45..906d5c9ee 100644 --- a/autotest/autotest_services/processes/test02_identity_complex.py +++ b/autotest/autotest_services/processes/test02_identity_complex.py @@ -37,7 +37,7 @@ from eoxserver.services.ows.wps.parameters import ( ComplexData, CDObject, CDTextBuffer, FormatText, FormatXML, FormatJSON, ) -from django.utils.encoding import smart_text +from django.utils.encoding import smart_str class TestProcess02(Component): """ Test identity process (the outputs are copies of the inputs) @@ -117,7 +117,7 @@ def execute(input00, output00, **kwarg): filename=(output_filename_base + ".xml") ) # text output also accepts Unicode strings - outputs['output01'] = smart_text( + outputs['output01'] = smart_str( etree.tostring( input00.data, encoding='utf-8', pretty_print=True ), 'utf-8' diff --git a/eoxserver/services/ows/wps/parameters/complexdata.py b/eoxserver/services/ows/wps/parameters/complexdata.py index 49eb73e90..2bd9eb951 100644 --- a/eoxserver/services/ows/wps/parameters/complexdata.py +++ b/eoxserver/services/ows/wps/parameters/complexdata.py @@ -51,7 +51,7 @@ from lxml import etree from .base import Parameter from .formats import Format -from django.utils.encoding import smart_text +from django.utils.encoding import smart_str from django.utils.six import string_types, text_type, itervalues, binary_type #------------------------------------------------------------------------------- @@ -172,7 +172,7 @@ class CDTextBuffer(StringIO, CDBase): """ def __init__(self, data=u'', *args, **kwargs): # NOTE: StringIO is an old-style class and super cannot be used! - StringIO.__init__(self, smart_text(data)) + StringIO.__init__(self, smart_str(data)) CDBase.__init__(self, *args, **kwargs) self.text_encoding = kwargs.get('text_encoding', None) @@ -183,9 +183,9 @@ def data(self): def write(self, data): if self.text_encoding is None: - return StringIO.write(self, smart_text(data)) + return StringIO.write(self, smart_str(data)) else: - return StringIO.write(self, smart_text(data, self.text_encoding)) + return StringIO.write(self, smart_str(data, self.text_encoding)) def read(self, size=None): if size is None: @@ -532,7 +532,7 @@ def _unicode(data, encoding): if isinstance(data, text_type): return data elif isinstance(data, bytes): - return smart_text(data, encoding) + return smart_str(data, encoding) raise TypeError( "Byte or Unicode string expected, %s received!" % type(data) ) diff --git a/eoxserver/services/ows/wps/parameters/crs.py b/eoxserver/services/ows/wps/parameters/crs.py index ff5ca7186..f485c5c00 100644 --- a/eoxserver/services/ows/wps/parameters/crs.py +++ b/eoxserver/services/ows/wps/parameters/crs.py @@ -31,7 +31,7 @@ asURL, fromURL, fromURN, fromShortCode, validateEPSGCode, parseEPSGCode, ) from .data_types import BaseType -from django.utils.encoding import smart_text +from django.utils.encoding import smart_str class CRSType(BaseType): @@ -66,7 +66,7 @@ def encode(cls, value): if value == 0: return u'ImageCRS' elif validateEPSGCode(value): - return smart_text(asURL(value)) + return smart_str(asURL(value)) raise ValueError("Invalid CRS %r!" % value) @classmethod diff --git a/eoxserver/services/ows/wps/parameters/data_types.py b/eoxserver/services/ows/wps/parameters/data_types.py index ea0cc228f..376c1d99d 100644 --- a/eoxserver/services/ows/wps/parameters/data_types.py +++ b/eoxserver/services/ows/wps/parameters/data_types.py @@ -30,7 +30,7 @@ from datetime import datetime, date, time, timedelta from django.utils.dateparse import parse_date, parse_datetime, parse_time, utc from django.utils.six import PY2, PY3, string_types -from django.utils.encoding import smart_text +from django.utils.encoding import smart_str from eoxserver.core.util.timetools import parse_duration try: @@ -68,7 +68,7 @@ def parse(cls, raw_value): @classmethod def encode(cls, value): """ Encode value to a Unicode string.""" - return smart_text(value) + return smart_str(value) @classmethod def get_diff_dtype(cls): # difference type - change if differs from the base @@ -97,7 +97,7 @@ class Boolean(BaseType): def parse(cls, raw_value): if isinstance(raw_value, string_types): - raw_value = smart_text(raw_value.lower()) + raw_value = smart_str(raw_value.lower()) if raw_value in ('1', 'true'): return True elif raw_value in ('0', 'false'): @@ -130,7 +130,7 @@ class Integer(BaseType): @classmethod def encode(cls, value): """ Encode value to a Unicode string.""" - return smart_text(int(value)) + return smart_str(int(value)) @classmethod def as_number(cls, value): @@ -176,9 +176,9 @@ class String(BaseType): def encode(cls, value): """ Encode value to a Unicode string.""" try: - return smart_text(value) + return smart_str(value) except UnicodeDecodeError: - return smart_text(value, cls.encoding) + return smart_str(value, cls.encoding) @classmethod def parse(cls, raw_value): @@ -228,7 +228,7 @@ def encode(cls, value): elif seconds != 0: items.append('%dS' % seconds) - return smart_text("".join(items)) + return smart_str("".join(items)) @classmethod def as_number(cls, value): @@ -261,7 +261,7 @@ def parse(cls, raw_value): @classmethod def encode(cls, value): if isinstance(value, cls.dtype): - return smart_text(value.isoformat()) + return smart_str(value.isoformat()) raise ValueError("Invalid value type '%s'!" % type(value)) @classmethod @@ -292,7 +292,7 @@ def parse(cls, raw_value): @classmethod def encode(cls, value): if isinstance(value, cls.dtype): - return smart_text(value.isoformat()) + return smart_str(value.isoformat()) raise ValueError("Invalid value type '%s'!" % type(value)) @classmethod @@ -329,7 +329,7 @@ def parse(cls, raw_value): @classmethod def encode(cls, value): if isinstance(value, cls.dtype): - return smart_text(cls._isoformat(value)) + return smart_str(cls._isoformat(value)) raise ValueError("Invalid value type '%s'!" % type(value)) @classmethod diff --git a/eoxserver/services/ows/wps/parameters/literaldata.py b/eoxserver/services/ows/wps/parameters/literaldata.py index c91120410..89f882cb6 100644 --- a/eoxserver/services/ows/wps/parameters/literaldata.py +++ b/eoxserver/services/ows/wps/parameters/literaldata.py @@ -38,7 +38,7 @@ from .data_types import BaseType, String, DTYPES from .allowed_values import BaseAllowed, AllowedAny, AllowedEnum from .units import UnitOfMeasure, UnitLinear -from django.utils.encoding import smart_text +from django.utils.encoding import smart_str from django.utils.six import text_type @@ -195,9 +195,9 @@ def parse(self, raw_value, uom=None, encoding="utf-8"): if isinstance(raw_value, text_type): _value = raw_value elif isinstance(raw_value, str): - _value = smart_text(raw_value, encoding) + _value = smart_str(raw_value, encoding) else: - _value = smart_text(raw_value) + _value = smart_str(raw_value) _value = self._dtype.parse(raw_value) _value = self.strip_uom(_value, uom or self.default_uom) _value = self._allowed_values.verify(_value) diff --git a/tools/eoxserver-atpd.py b/tools/eoxserver-atpd.py index e27ffb8be..20f9b11af 100755 --- a/tools/eoxserver-atpd.py +++ b/tools/eoxserver-atpd.py @@ -1,26 +1,26 @@ -#!/usr/bin/env python +#!/usr/bin/env python #----------------------------------------------------------------------- # -# Description: +# Description: # # asynchronous processing master daemon # -# This is the master server which keeps track of the aynchronous tasks in the -# queue and distributes task to the workers -# +# This is the master server which keeps track of the aynchronous tasks in the +# queue and distributes task to the workers +# #------------------------------------------------------------------------------- # # Project: EOxServer # Authors: Martin Paces # #------------------------------------------------------------------------------- -# Copyright (C) 2011 Iguassu Software Systems, a.s +# Copyright (C) 2011 Iguassu Software Systems, a.s # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all @@ -35,30 +35,30 @@ # THE SOFTWARE. #------------------------------------------------------------------------------- -# default django settings module +# default django settings module DJANGO_SETTINGS_DEFAULT = "settings" DJANGO_DB_DEFAULT = "default" #------------------------------------------------------------------------------- -import os -import sys -import signal +import os +import sys +import signal import logging import traceback -import os.path -import time +import os.path +import time import struct -import socket -from datetime import datetime, timedelta +import socket +from datetime import datetime, timedelta -from multiprocessing import Lock, Process, Queue , cpu_count +from multiprocessing import Lock, Process, Queue , cpu_count from multiprocessing.queues import Empty as MPQEmpty from multiprocessing.queues import Full as MPQFull -from django.utils.encoding import smart_text -try: import cPickle as pickle -except: import pickle +from django.utils.encoding import smart_str +try: import cPickle as pickle +except: import pickle try: # Python 2 xrange @@ -67,361 +67,361 @@ xrange = range #------------------------------------------------------------------------------- -QUEUE_EMPTY_QUERY_DELAY=1.5 # time in seconds of next query to empty queue -QUEUE_PUT_TIMEOUT=1.0 # time out used by internal task queue put operation -QUEUE_CLEAN_UP_COUNT=300 +QUEUE_EMPTY_QUERY_DELAY=1.5 # time in seconds of next query to empty queue +QUEUE_PUT_TIMEOUT=1.0 # time out used by internal task queue put operation +QUEUE_CLEAN_UP_COUNT=300 #------------------------------------------------------------------------------- -# generate unique server instance ID +# generate unique server instance ID -SERVER_ID=0 -while 0 == SERVER_ID : +SERVER_ID=0 +while 0 == SERVER_ID : tmp = os.urandom(8) SERVER_ID = struct.unpack( 'q' , tmp )[0] - SERVER_ID_STR = "0x%16.16X"%( struct.unpack( 'Q' , tmp )[0] ) + SERVER_ID_STR = "0x%16.16X"%( struct.unpack( 'Q' , tmp )[0] ) #------------------------------------------------------------------------------- dbLock = Lock() writeLock = Lock() -def write(msg) : +def write(msg) : writeLock.acquire() - sys.stdout.write("[%s] %s"%(SERVER_ID_STR,msg) ) + sys.stdout.write("[%s] %s"%(SERVER_ID_STR,msg) ) writeLock.release() -def debug( msg ) : write( ("DEBUG: %s\n"%(msg)).encode('UTF-8') ) -def info( msg ) : write( ("INFO: %s\n"%(msg)).encode('UTF-8') ) -def warn( msg ) : write( ("WARNINIG: %s\n"%(msg)).encode('UTF-8') ) -def error( msg ) : write( ("ERROR: %s\n"%(msg)).encode('UTF-8') ) +def debug( msg ) : write( ("DEBUG: %s\n"%(msg)).encode('UTF-8') ) +def info( msg ) : write( ("INFO: %s\n"%(msg)).encode('UTF-8') ) +def warn( msg ) : write( ("WARNINIG: %s\n"%(msg)).encode('UTF-8') ) +def error( msg ) : write( ("ERROR: %s\n"%(msg)).encode('UTF-8') ) #------------------------------------------------------------------------------- # global worker pool -global GWP +global GWP -GWP = None +GWP = None #------------------------------------------------------------------------------- # iterrupt and terminate signal handlers -class ExcTerminate( Exception ) : pass +class ExcTerminate( Exception ) : pass -SS = { signal.SIGINT:"SIGINT" , signal.SIGTERM:"SIGTERM" } +SS = { signal.SIGINT:"SIGINT" , signal.SIGTERM:"SIGTERM" } def signal_handler_dummy(sig, frm): pass -def signal_handler_sigint(sig, frm): - global GWP - if GWP is not None : - GWP.terminate = True +def signal_handler_sigint(sig, frm): + global GWP + if GWP is not None : + GWP.terminate = True -def signal_handler_sigterm(sig, frm): - global GWP - if GWP is not None : - GWP.terminate = True - GWP.killChild = True # force immediate termination +def signal_handler_sigterm(sig, frm): + global GWP + if GWP is not None : + GWP.terminate = True + GWP.killChild = True # force immediate termination #------------------------------------------------------------------------------- -class Importer( object ) : - """ smart importer of the handler subroutine - - E.g. function 'd()' in module 'a.b.c' +class Importer( object ) : + """ smart importer of the handler subroutine + + E.g. function 'd()' in module 'a.b.c' given by a path 'a.b.c.d' is imported """ - def __init__( self , path ) : - """Initialize class from the given handler + def __init__( self , path ) : + """Initialize class from the given handler subroutine dot-path""" - self.path = path + self.path = path self.module , _ , self.func = path.rpartition(".") - self.modules = [] - self.handler = None + self.modules = [] + self.handler = None + + def loadHandler( self ) : + """ load the handler subroutine """ + + if not self.handler : - def loadHandler( self ) : - """ load the handler subroutine """ - - if not self.handler : + # initial list of modules + ml0 = set( sys.modules ) - # initial list of modules - ml0 = set( sys.modules ) - - # new list of modules - ml1 = set( sys.modules ) + # new list of modules + ml1 = set( sys.modules ) - self.handler = getattr( __import__( self.module , fromlist=[self.func] ) , self.func ) + self.handler = getattr( __import__( self.module , fromlist=[self.func] ) , self.func ) - # store list of loaded modules - self.modules = ml1 - ml0 + # store list of loaded modules + self.modules = ml1 - ml0 - return self.handler + return self.handler - def unloadHandler( self ) : - """ unload the handler subroutine """ + def unloadHandler( self ) : + """ unload the handler subroutine """ - if self.handler : + if self.handler : - self.handler = None + self.handler = None - # unload the loaded modules - for m in self.modules : - del( sys.modules[m] ) + # unload the loaded modules + for m in self.modules : + del( sys.modules[m] ) - # store list of loaded modules - self.modules = [] + # store list of loaded modules + self.modules = [] -def taskDispatch( taskID , threadID ) : - """ - task dispatcher +def taskDispatch( taskID , threadID ) : + """ + task dispatcher - based on the request class the right request hadler is used - to process the asynchronous requets - """ - # status logger - pStatus = TaskStatus( taskID , dbLock ) + based on the request class the right request hadler is used + to process the asynchronous requets + """ + # status logger + pStatus = TaskStatus( taskID , dbLock ) - try: + try: - # get task parameters - requestType , requestID , requestHandler , inputs = dbLocker( dbLock , startTask , taskID ) + # get task parameters + requestType , requestID , requestHandler , inputs = dbLocker( dbLock , startTask , taskID ) - info( "[%3.3i] PROCESS: %s %s is running ... " % ( threadID , requestType , requestID ) ) + info( "[%3.3i] PROCESS: %s %s is running ... " % ( threadID , requestType , requestID ) ) - # create importer object - imp = Importer( requestHandler ) + # create importer object + imp = Importer( requestHandler ) - # try to load the right module and handler - imp.loadHandler() + # try to load the right module and handler + imp.loadHandler() - # execute handler - proper status logging is duty of the callback - imp.handler( pStatus , inputs ) + # execute handler - proper status logging is duty of the callback + imp.handler( pStatus , inputs ) - # try to unload the handler - imp.unloadHandler() + # try to unload the handler + imp.unloadHandler() - # if no terminating status has been set do it right now + # if no terminating status has been set do it right now dbLocker( dbLock , stopTaskSuccessIfNotFinished , taskID ) - info( "[%3.3i] PROCESS: %s %s is finished ... " % ( threadID , requestType , requestID ) ) + info( "[%3.3i] PROCESS: %s %s is finished ... " % ( threadID , requestType , requestID ) ) - except (KeyboardInterrupt,SystemExit): raise - except Exception as e : + except (KeyboardInterrupt,SystemExit): raise + except Exception as e : - pStatus.setFailure( smart_text((e) ) + pStatus.setFailure( smart_str((e) ) - # finish the task - error( "[%3.3i] %s " % ( threadID , smart_text((e) ) ) + # finish the task + error( "[%3.3i] %s " % ( threadID , smart_str((e) ) ) #------------------------------------------------------------------------------- def worker( queue , id ) : - """ worker function executed by worker subprocesses """ + """ worker function executed by worker subprocesses """ #def signal_handler(sig, frm): raise KeyboardInterrupt #signal.signal( signal.SIGINT, signal_handler ) - # use the dummy hadlers + # use the dummy hadlers signal.signal( signal.SIGINT, signal_handler_dummy ) #signal.signal( signal.SIGTERM, signal_handler_dummy ) - try : + try : - while True : + while True : - try : - item = queue.get() - except IOError as e : - warn( str(e) ) - continue + try : + item = queue.get() + except IOError as e : + warn( str(e) ) + continue - # gracefull termination - if ( item is None ) : break + # gracefull termination + if ( item is None ) : break - # run the task - taskDispatch( item , id ) + # run the task + taskDispatch( item , id ) - except (KeyboardInterrupt,SystemExit): pass + except (KeyboardInterrupt,SystemExit): pass info( "[%3.3i] PROCESS: termination " % id ) -def cleanup() : - """ cleanup function performing reenquing of zombie tasks """ +def cleanup() : + """ cleanup function performing reenquing of zombie tasks """ - tasks = dbLocker( dbLock , reenqueueZombieTasks , "Reenqueued by ATPD after timeout." ) + tasks = dbLocker( dbLock , reenqueueZombieTasks , "Reenqueued by ATPD after timeout." ) - for (id,task) in tasks : - warn( "[MASTER] Task %i:%s renqueued after timeout!"%(id,task) ) + for (id,task) in tasks : + warn( "[MASTER] Task %i:%s renqueued after timeout!"%(id,task) ) - tasks = dbLocker( dbLock , deleteRetiredTasks ) + tasks = dbLocker( dbLock , deleteRetiredTasks ) - for (id,task) in tasks : - info( "[MASTER] Task %i:%s deleted after expiration!"%(id,task) ) + for (id,task) in tasks : + info( "[MASTER] Task %i:%s deleted after expiration!"%(id,task) ) -class WorkerPool( object ) : +class WorkerPool( object ) : def __init__( self , nthread ) : self.queue = Queue( nthread ) - self.terminate = False - self.killChild = False - self.proces = [] - - # start subprocesses - for i in xrange( nthread ) : - p = Process( target=worker , args=( self.queue , i ) ) - p.start() + self.terminate = False + self.killChild = False + self.proces = [] + + # start subprocesses + for i in xrange( nthread ) : + p = Process( target=worker , args=( self.queue , i ) ) + p.start() self.proces.append(p) - - def __del__( self ) : - # if possible process gracefull termination - debug( "[MASTER]: enqueueing terminators ... " ) - for p in self.proces : - self.queue.put( None ) - - if not self.killChild : - debug( "[MASTER]: joining subprocesses ... " ) - for p in self.proces : - p.join() - else : - debug( "[MASTER]: terminating subprocesses ... " ) - for p in self.proces : - p.terminate() + def __del__( self ) : + + # if possible process gracefull termination + debug( "[MASTER]: enqueueing terminators ... " ) + for p in self.proces : + self.queue.put( None ) + + if not self.killChild : + debug( "[MASTER]: joining subprocesses ... " ) + for p in self.proces : + p.join() + else : + debug( "[MASTER]: terminating subprocesses ... " ) + for p in self.proces : + p.terminate() - def startLoop( self ) : + def startLoop( self ) : # reenqueue hanging tasks - # TODO: reenqueuePendingTasks() + # TODO: reenqueuePendingTasks() - cnt = 0 - taskIds = [] - self.terminate = False + cnt = 0 + taskIds = [] + self.terminate = False - while not self.terminate : + while not self.terminate : - try: + try: - # get a pending task from the queue - taskIds = dbLocker( dbLock , dequeueTask , SERVER_ID ) + # get a pending task from the queue + taskIds = dbLocker( dbLock , dequeueTask , SERVER_ID ) - except QueueEmpty : # no task to be processed + except QueueEmpty : # no task to be processed - # perform DB cleanup + # perform DB cleanup cleanup() - # wait some ammount of time - time.sleep( QUEUE_EMPTY_QUERY_DELAY ) + # wait some ammount of time + time.sleep( QUEUE_EMPTY_QUERY_DELAY ) - # clear counter - cnt = 0 + # clear counter + cnt = 0 - continue + continue - # send task to worker - for taskId in list(taskIds) : - while not self.terminate : - try : self.queue.put(taskId,True,QUEUE_PUT_TIMEOUT) - except MPQFull : continue - taskIds.remove(taskId) - break + # send task to worker + for taskId in list(taskIds) : + while not self.terminate : + try : self.queue.put(taskId,True,QUEUE_PUT_TIMEOUT) + except MPQFull : continue + taskIds.remove(taskId) + break - # increment counter - cnt += 1 + # increment counter + cnt += 1 - # perform DB cleanup - if ( cnt > QUEUE_CLEAN_UP_COUNT ) : + # perform DB cleanup + if ( cnt > QUEUE_CLEAN_UP_COUNT ) : cleanup() - cnt = 0 + cnt = 0 - info( "[MASTER]: termination in progress ... " ) + info( "[MASTER]: termination in progress ... " ) - # try to reenequeue processes taken from the DB task queue - for item in taskIds : - debug( "[MASTER]: reenquing task ID=%i ... " % item ) - dbLocker( dbLock , reenqueueTask , item , message = "Reenqued by ATPD." ) - try: - while True : + # try to reenequeue processes taken from the DB task queue + for item in taskIds : + debug( "[MASTER]: reenquing task ID=%i ... " % item ) + dbLocker( dbLock , reenqueueTask , item , message = "Reenqued by ATPD." ) + try: + while True : item = self.queue.get(False) - debug( "[MASTER]: reenquing task ID=%i ... " % item ) - dbLocker( dbLock , reenqueueTask , item , message = "Reenqued by ATPD." ) - except MPQEmpty : pass + debug( "[MASTER]: reenquing task ID=%i ... " % item ) + dbLocker( dbLock , reenqueueTask , item , message = "Reenqued by ATPD." ) + except MPQEmpty : pass #------------------------------------------------------------------------------- -def usage() : - """ print usage info """ - - s = [] - s.append( "USAGE: %s [-h][-p ][-s ][-d ][-n ] " % ( os.path.basename( sys.argv[0] ) ) ) - s.append( "" ) - s.append( "PARAMETERS: " ) - s.append( " -h print this info" ) - s.append( " -p append an addition Python search path (can be repeated)" ) - s.append( " -n number of worker instance to be started ( N >= 1 , number of CPUs used by default )" ) - s.append( " -s django settings module (default '%s')"%DJANGO_SETTINGS_DEFAULT ) - #s.append( " -d django DB name (default '%s')"%DJANGO_DB_DEFAULT ) - s.append( "" ) - +def usage() : + """ print usage info """ + + s = [] + s.append( "USAGE: %s [-h][-p ][-s ][-d ][-n ] " % ( os.path.basename( sys.argv[0] ) ) ) + s.append( "" ) + s.append( "PARAMETERS: " ) + s.append( " -h print this info" ) + s.append( " -p append an addition Python search path (can be repeated)" ) + s.append( " -n number of worker instance to be started ( N >= 1 , number of CPUs used by default )" ) + s.append( " -s django settings module (default '%s')"%DJANGO_SETTINGS_DEFAULT ) + #s.append( " -d django DB name (default '%s')"%DJANGO_DB_DEFAULT ) + s.append( "" ) + return "\n".join(s) - + #------------------------------------------------------------------------------- -if __name__ == "__main__" : +if __name__ == "__main__" : - # django settings module + # django settings module - DJANGO_SETTINGS = os.environ.get("DJANGO_SETTINGS_MODULE",DJANGO_SETTINGS_DEFAULT) + DJANGO_SETTINGS = os.environ.get("DJANGO_SETTINGS_MODULE",DJANGO_SETTINGS_DEFAULT) DJANGO_DB = DJANGO_DB_DEFAULT # try to get number of CPUs - try : + try : NTHREAD = cpu_count() - except NotImplementedError : - NTHREAD = 1 - warn( "Failed to get number of CPUs! Setting to 1 asynchronous execution thread." ) + except NotImplementedError : + NTHREAD = 1 + warn( "Failed to get number of CPUs! Setting to 1 asynchronous execution thread." ) - info( "Default number of working threads: %i" % NTHREAD ) + info( "Default number of working threads: %i" % NTHREAD ) - # parse commandline arguments + # parse commandline arguments - idx = 1 - while idx < len(sys.argv) : - arg = sys.argv[idx] ; idx +=1 ; - if arg == '-p' : - sys.path.append( sys.argv[idx] ) + idx = 1 + while idx < len(sys.argv) : + arg = sys.argv[idx] ; idx +=1 ; + if arg == '-p' : + sys.path.append( sys.argv[idx] ) info("'%s' ... adding to Python search path." % sys.argv[idx] ) - idx += 1 - elif arg == '-s' : + idx += 1 + elif arg == '-s' : DJANGO_SETTINGS = sys.argv[idx] - idx += 1 - elif arg == '-d' : + idx += 1 + elif arg == '-d' : DJANGO_DB = sys.argv[idx] - idx += 1 - elif arg == '-n' : + idx += 1 + elif arg == '-n' : NTHREAD = max( 1 , int(sys.argv[idx]) ) - info("Setting number of working threads to: %i" % NTHREAD ) + info("Setting number of working threads to: %i" % NTHREAD ) idx += 1 - elif arg == '-h' : - sys.stderr.write(usage()) ; sys.exit(0) - else : - sys.stderr.write(usage()) - error( "Invalid commandline option '%s' !" % arg ) - sys.exit(1) + elif arg == '-h' : + sys.stderr.write(usage()) ; sys.exit(0) + else : + sys.stderr.write(usage()) + error( "Invalid commandline option '%s' !" % arg ) + sys.exit(1) #------------------------------------------------------------------- - # initialize the working enviroment + # initialize the working enviroment - # django settings module - os.environ["DJANGO_SETTINGS_MODULE"] = DJANGO_SETTINGS + # django settings module + os.environ["DJANGO_SETTINGS_MODULE"] = DJANGO_SETTINGS info("'%s' ... is set as the Django settings module " % DJANGO_SETTINGS ) info("'%s' ... is set as the Django database " % DJANGO_DB ) @@ -431,30 +431,30 @@ def usage() : from eoxserver.core.system import System from eoxserver.resources.processes.tracker import TaskStatus, QueueEmpty, \ dequeueTask, startTask, reenqueueTask, stopTaskSuccessIfNotFinished, \ - reenqueueZombieTasks, deleteRetiredTasks, dbLocker + reenqueueZombieTasks, deleteRetiredTasks, dbLocker - # initialize the system + # initialize the system System.init() #------------------------------------------------------------------- - info( "ATPD Asynchronous Task Processing Daemon has just been started!") - info( "ATPD: id=%s (%i)" % ( SERVER_ID_STR , SERVER_ID ) ) - info( "ATPD: hostname=%s" % socket.getfqdn() ) + info( "ATPD Asynchronous Task Processing Daemon has just been started!") + info( "ATPD: id=%s (%i)" % ( SERVER_ID_STR , SERVER_ID ) ) + info( "ATPD: hostname=%s" % socket.getfqdn() ) info( "ATPD: pid=%i " % os.getpid() ) #------------------------------------------------------------------- - # start the worker pool - - GWP = WorkerPool( NTHREAD ) + # start the worker pool + + GWP = WorkerPool( NTHREAD ) - # use the GWP terminating hadlers + # use the GWP terminating hadlers signal.signal( signal.SIGINT, signal_handler_sigint ) signal.signal( signal.SIGTERM, signal_handler_sigterm ) - # start the main loop + # start the main loop GWP.startLoop() - # use the dummy hadlers + # use the dummy hadlers signal.signal( signal.SIGINT, signal_handler_dummy ) signal.signal( signal.SIGTERM, signal_handler_dummy )