9
9
from .settings import POSTPROCESS_ERROR , CATALOG_DATA_READY
10
10
from .settings import REDUCTION_DATA_READY , REDUCTION_CATALOG_DATA_READY
11
11
from .database import transactions
12
+
13
+ import importlib
14
+ import inspect
12
15
import json
13
16
import logging
17
+ import re
14
18
15
19
16
20
class StateAction :
@@ -47,6 +51,30 @@ def _call_default_task(self, headers, message):
47
51
action_cls = globals ()[destination ]
48
52
action_cls (connection = self ._send_connection )(headers , message )
49
53
54
+ def _get_class_from_path (self , class_path : str ):
55
+ """
56
+ Returns the class given by the class path
57
+ :param class_path: the class, e.g. "module_name.ClassName"
58
+ :return: class or None
59
+ """
60
+ # check that the string is in the format "package_name.module_name.class_name"
61
+ pattern = r"^[a-zA-Z0-9_\.]+\.[a-zA-Z0-9_]+$"
62
+ if not re .match (pattern , class_path ):
63
+ logging .error (f"task_class { class_path } does not match pattern module_name.ClassName" )
64
+ return None
65
+ module_name , class_name = class_path .rsplit ("." , 1 )
66
+
67
+ # try importing the class
68
+ try :
69
+ module = importlib .import_module (module_name )
70
+ cls = getattr (module , class_name )
71
+ if not inspect .isclass (cls ):
72
+ raise ValueError
73
+ return cls
74
+ except (ModuleNotFoundError , AttributeError , ValueError ):
75
+ logging .error (f"task_class { class_path } cannot be imported" )
76
+ return None
77
+
50
78
def _call_db_task (self , task_data , headers , message ):
51
79
"""
52
80
:param task_data: JSON-encoded task definition
@@ -59,14 +87,12 @@ def _call_db_task(self, task_data, headers, message):
59
87
and (task_def ["task_class" ] is not None )
60
88
and len (task_def ["task_class" ].strip ()) > 0
61
89
):
62
- try :
63
- toks = task_def ["task_class" ].strip ().split ("." )
64
- module = "." .join (toks [: len (toks ) - 1 ])
65
- cls = toks [len (toks ) - 1 ]
66
- exec ("from %s import %s as action_cls" % (module , cls ))
67
- action_cls (connection = self ._send_connection )(headers , message ) # noqa: F821
68
- except : # noqa: E722
69
- logging .exception ("Task [%s] failed:" , headers ["destination" ])
90
+ action_cls = self ._get_class_from_path (task_def ["task_class" ])
91
+ if action_cls :
92
+ try :
93
+ action_cls (connection = self ._send_connection )(headers , message ) # noqa: F821
94
+ except : # noqa: E722
95
+ logging .exception ("Task [%s] failed:" , headers ["destination" ])
70
96
if "task_queues" in task_def :
71
97
for item in task_def ["task_queues" ]:
72
98
destination = "/queue/%s" % item
0 commit comments