diff --git a/turbinia/api/models/request_status.py b/turbinia/api/models/request_status.py index 1c9b380a2..20fe1814d 100644 --- a/turbinia/api/models/request_status.py +++ b/turbinia/api/models/request_status.py @@ -54,11 +54,74 @@ def get_request_data( Returns: bool: True if the request has at least one task associated with it. """ - if not tasks: - _state_manager = state_manager.get_state_manager() - tasks = _state_manager.get_task_data( - instance=turbinia_config.INSTANCE_ID, request_id=request_id) + _state_manager = state_manager.get_state_manager() + + self.request_id = request_id + + # Gets the information from the request if it is stored in Redis + if _state_manager.key_exists(f'TurbiniaRequest:{request_id}'): + saved_request = _state_manager.get_request_data(request_id) + self.evidence_name = saved_request.get('original_evidence').get('name') + self.evidence_id = saved_request.get('original_evidence').get('id') + self.requester = saved_request.get('requester') + self.reason = saved_request.get('reason') + self.task_status = saved_request.get('status') + self.task_last_update = saved_request.get('last_update') + self.sucessful_tasks = len(saved_request.get('succesful_tasks')) + self.failed_tasks = len(saved_request.get('failed_tasks')) + self.queued_tasks = len(saved_request.get('queued_tasks')) + self.running_tasks = len(saved_request.get('running_tasks')) + # If the request is not stored in redis, uses legacy get_request_data + else: + if not tasks: + tasks = _state_manager.get_task_data( + instance=turbinia_config.INSTANCE_ID, request_id=request_id) + self.get_request_data_legacy(request_id, tasks, summary) + + if self.last_task_update_time: + if isinstance(self.last_task_update_time, float): + self.last_task_update_time = datetime.datetime.fromtimestamp( + self.last_task_update_time).strftime( + turbinia_config.DATETIME_FORMAT) + + completed_tasks = self.successful_tasks + self.failed_tasks + + if completed_tasks == self.task_count and self.failed_tasks > 0: + self.status = 'completed_with_errors' + elif self.failed_tasks == self.task_count: + self.status = 'failed' + elif self.successful_tasks == self.task_count: + self.status = 'successful' + else: + # TODO(leaniz): Add a 'pending' state to tasks for cases 2 and 3. + # ref: https://github.com/google/turbinia/issues/1239 + # + # A 'running' status for a request covers multiple cases: + # 1) One or more tasks are still in a running status. + # 2) Zero tasks are running, zero or more tasks are queued + # and none have failed/succeeded. + # (e.g. all tasks scheduled on the Turbinia server and none picked + # up by any worker yet.) + # 3) Zero tasks are running, one or more tasks are queued + # and some have failed/succeeded. + # (e.g. some tasks have completed, others are scheduled on the + # Turbinia server but not picked up by a worker yet.) + # + # Note that this method is concerned with a Turbiania request's status + # which is different than the status of an individual task. + self.status = 'running' + + return bool(self.tasks) + + def get_request_data_legacy(self, request_id: str, tasks: Optional[List[Dict]] = None, + summary: bool = False): + """Gets information about the original evidence for a specific Turbinia + request. + Args: + request_id (str): A Turbinia request identifier. + tasks (list): List of tasks. + """ if not summary: for task in tasks: current_request_id = task.get('request_id') @@ -73,7 +136,6 @@ def get_request_data( # evidence name. There is a small chance of the first task having a # different evidence_name, so getting it from arguments is prefered when # they exist. - # todo(igormr): Save request information in redis to get the evidence_name name_from_args = False if tasks: if tasks[0].get('all_args'): @@ -86,11 +148,12 @@ def get_request_data( initial_start_time = datetime.datetime.now().strftime( turbinia_config.DATETIME_FORMAT) + + self.taks_count = len(tasks) + for task in tasks: - self.request_id = task.get('request_id') self.requester = task.get('requester') self.reason = task.get('reason') - self.task_count = len(tasks) task_status = task.get('status') # Gets the evidence_name from the first started task. if name_from_args and task.get('evidence_name') == self.evidence_name: @@ -127,42 +190,6 @@ def get_request_data( task['last_update'] = task['last_update'].strftime( turbinia_config.DATETIME_FORMAT) - if self.last_task_update_time: - if isinstance(self.last_task_update_time, float): - self.last_task_update_time = datetime.datetime.fromtimestamp( - self.last_task_update_time).strftime( - turbinia_config.DATETIME_FORMAT) - - completed_tasks = self.successful_tasks + self.failed_tasks - - if completed_tasks == self.task_count and self.failed_tasks > 0: - self.status = 'completed_with_errors' - elif self.failed_tasks == self.task_count: - self.status = 'failed' - elif self.successful_tasks == self.task_count: - self.status = 'successful' - else: - # TODO(leaniz): Add a 'pending' state to tasks for cases 2 and 3. - # ref: https://github.com/google/turbinia/issues/1239 - # - # A 'running' status for a request covers multiple cases: - # 1) One or more tasks are still in a running status. - # 2) Zero tasks are running, zero or more tasks are queued - # and none have failed/succeeded. - # (e.g. all tasks scheduled on the Turbinia server and none picked - # up by any worker yet.) - # 3) Zero tasks are running, one or more tasks are queued - # and some have failed/succeeded. - # (e.g. some tasks have completed, others are scheduled on the - # Turbinia server but not picked up by a worker yet.) - # - # Note that this method is concerned with a Turbiania request's status - # which is different than the status of an individual task. - self.status = 'running' - - return bool(self.tasks) - - class RequestsSummary(BaseModel): """Represents a summary view of multiple Turbinia requests.""" requests_status: List[RequestStatus] = [] diff --git a/turbinia/message.py b/turbinia/message.py index 664457a31..7198b6023 100644 --- a/turbinia/message.py +++ b/turbinia/message.py @@ -28,7 +28,6 @@ log = logging.getLogger('turbinia') - class TurbiniaRequest: """An object to request evidence to be processed. @@ -54,6 +53,9 @@ def __init__( self.recipe = recipe if recipe else {'globals': {}} self.context = context if context else {} self.evidence = evidence if evidence else [] + self.original_evidence = {} + if evidence and len(evidence) > 0: + self.original_evidence = {'id': evidence[0].id, 'name': evidence[0].name} self.group_name = group_name if group_name else '' self.reason = reason if reason else '' self.all_args = all_args if all_args else '' @@ -70,13 +72,11 @@ def to_json(self, json_values=False): A JSON serialized object. """ serializable = copy.deepcopy(self.__dict__) - if json_values: if evidence_list := serializable.pop('evidence'): - serializable['original_evidence'] = { - 'name': evidence_list[0].name, - 'id': evidence_list[0].id - } + if not serializable.get('original_evidence') and len(evidence_list) > 0: + serializable['original_evidence'] = {'name': evidence_list[0].name, + 'id': evidence_list[0].id} serializable['evidence_ids'] = [ evidence.id for evidence in evidence_list ] diff --git a/turbinia/state_manager.py b/turbinia/state_manager.py index a40dcdd4e..9ad69f34a 100644 --- a/turbinia/state_manager.py +++ b/turbinia/state_manager.py @@ -114,8 +114,9 @@ def get_task_dict(self, task): task_dict[attr] = six.u(task_dict[attr]) # We'll store the run_time as seconds instead of a timedelta() - if task_dict.get('run_time'): - task_dict['run_time'] = task_dict['run_time'].total_seconds() + #DELETE + #if task_dict.get('run_time'): + # task_dict['run_time'] = task_dict['run_time'].total_seconds() #Importing these here to avoid circular dependencies. from turbinia.workers import TurbiniaTask @@ -277,7 +278,7 @@ def get_task(self, task_id: str) -> dict: Returns: task_dict (dict): Dict containing task attributes. """ - task_key = ':'.join(('TurbiniaEvidence', task_id)) + task_key = ':'.join(('TurbiniaTask', task_id)) if self.get_key_type(task_key) == 'string': task_dict = self.get_task_legacy(task_id) @@ -286,7 +287,6 @@ def get_task(self, task_id: str) -> dict: for attribute_name, attribute_value in self.iterate_attributes( task_key): task_dict[attribute_name] = attribute_value - if task_dict.get('last_update'): task_dict['last_update'] = datetime.strptime( task_dict.get('last_update'), DATETIME_FORMAT) @@ -294,6 +294,17 @@ def get_task(self, task_id: str) -> dict: task_dict['run_time'] = timedelta(seconds=task_dict['run_time']) return task_dict + + def validate_task(self, task, instance: str, days: int, group_id: str, user: str): + if days: + start_time = datetime.now() - timedelta(days=days) + valid_days = task.get('last_update') > start_time + else: + valid_days = True + valid_instance = not instance or task.get('instance') == instance + valid_group = not group_id or task.get('group_id') == group_id + valid_user = not user or task.get('requester') == user + return valid_days and valid_instance and valid_group and valid_user def get_task_data( self, instance: str, days: int=0, task_id: str=None, @@ -312,31 +323,30 @@ def get_task_data( Returns: List of Task dict objects. """ + # If task_id is passed, simply gets and validates the corresponding task if task_id: task = self.get_task(task_id) - tasks = [task] if not request_id or task.request_id == request_id else [] - elif request_id: - request_key = ':'.join(('TurbiniaRequest', request_id)) - if self.key_exists(request_key): - task_ids = self.get_attribute( - request_key, 'task_ids', decode_json = True) - tasks = [self.get_task(task_id) for task_id in task_ids] + valid_request = not request_id or task.get('request_id') == request_id + valid_task = self.validate_task(task, instance, days, group_id, user) + return [task] if valid_request and valid_task else [] + + request_key = f'TurbiniaRequest:{request_id}' if request_id else None + + # If request_id is passed, gets valid tasks from that request + if request_key and self.key_exists(request_key): + task_ids = self.get_attribute( + request_key, 'task_ids', decode_json = True) + # If no task_id or request_id is passed, gets all valid saved tasks else: - tasks = [ - self.get_data(task_key) for task_key in self.iterate_keys('Task')] + task_ids = [task_key.split(':')[1] for task_key in self.iterate_keys('Task')] - # pylint: disable=no-else-return - if instance: - tasks = [task for task in tasks if task.get('instance') == instance] - if days: - start_time = datetime.now() - timedelta(days=days) - # Redis only supports strings; we convert to/from datetime here and below - tasks = [task for task in tasks if task.get('last_update') > start_time] - if group_id: - tasks = [task for task in tasks if task.get('group_id') == group_id] - if user: - tasks = [task for task in tasks if task.get('requester') == user] + tasks = [] + for task_id in task_ids: + task = self.get_task(task_id) + if self.validate_task(task, instance, days, group_id, user): + tasks.append(task) + return tasks def format_task(self, task): @@ -358,6 +368,29 @@ def format_task(self, task): raise TurbiniaException(error_message) from exception return task_dict + def update_request_task(self, task): + request_key = ':'.join(('TurbiniaRequest', task.request_id)) + self.add_to_list(request_key, 'task_ids', task.id) + request_last_update = datetime.strptime(self.get_attribute( + request_key, 'last_update', decode_json=False).decode(), DATETIME_FORMAT) + last_update = max(request_last_update, task.last_update).strftime( + DATETIME_FORMAT) + self.set_attribute(request_key, 'last_update',last_update) + # 'successful' could be None or False, which means different things. + # If False, the task has failed, If None, could be queued or running. + if hasattr(task, 'succesful'): + if task.successful: + self.add_to_list(request_key, 'succesful_tasks', task.id) + if task.successful is False: + self.add_to_list(request_key, 'failed_tasks', task.id) + elif task.successful is None: + if task.status: + if 'running' in task.status: + self.add_to_list(request_key, 'running_tasks', task.id) + else: + # 'successful' is None and 'status' is None + self.add_to_list(request_key, 'running_tasks', task.id) + def write_new_task(self, task): """Writes task into redis. @@ -375,6 +408,7 @@ def write_new_task(self, task): """ log.info(f'Writing new task {task.name:s} into Redis') task_key = ':'.join(('TurbiniaTask', task.id)) + self.update_request_task(task) task_dict = self.format_task(task) self.write_hash_object(task_key, task_dict) task.state_key = task_key @@ -396,6 +430,7 @@ def update_task(self, task): log.info(f'Updating task {task.name:s} in Redis') task_dict = self.format_task(task) self.write_hash_object(task_key, task_dict) + return task_key def set_attribute( @@ -414,10 +449,7 @@ def set_attribute( TurbiniaException: When Redis fails in updating the attribute. """ try: - if not self.client.hset(redis_key, attribute_name, json_value): - log.error(f'Error setting {attribute_name} on {redis_key} in Redis') - return False - return True + self.client.hset(redis_key, attribute_name, json_value) except redis.RedisError as exception: error_message = ( f'Error setting {attribute_name} on {redis_key} in Redis') @@ -521,7 +553,7 @@ def key_exists(self, redis_key) -> bool: redis_key (str): The key to be checked. Returns: - exists (bool): Boolean indicating if evidence is saved. + exists (bool): Boolean indicating if key is saved. Raises: TurbiniaException: If Redis fails in checking the existence of the key. @@ -532,6 +564,28 @@ def key_exists(self, redis_key) -> bool: error_message = f'Error checking existence of {redis_key} in Redis' log.error(f'{error_message}: {exception}') raise TurbiniaException(error_message) from exception + + def attribute_exists(self, redis_key, attribute_name) -> bool: + """Checks if the attribute of the hashed key is saved in Redis. + + Args: + redis_key (str): The key to be checked. + attribute_name (str): The attribute to be checked. + + Returns: + exists (bool): Boolean indicating if attribute is saved. + + Raises: + TurbiniaException: If Redis fails in checking the existence. + """ + try: + return self.client.hexists(redis_key, attribute_name) + except redis.RedisError as exception: + error_message = ( + f'Error checking existence of attribute {attribute_name}' + f'in {redis_key} in Redis') + log.error(f'{error_message}: {exception}') + raise TurbiniaException(error_message) from exception def get_key_type(self, redis_key) -> bool: """Gets the type of the Redis key. @@ -552,7 +606,7 @@ def get_key_type(self, redis_key) -> bool: log.error(f'{error_message}: {exception}') raise TurbiniaException(error_message) from exception - def add_to_list(self, redis_key, list_name, new_item, repeated=False): + def add_to_list(self, redis_key, list_name, new_item, allow_repeated=False): """Appends new item to a list attribute in a hashed Redis object. Args: @@ -564,16 +618,19 @@ def add_to_list(self, redis_key, list_name, new_item, repeated=False): Returns: redis_key (str): The key corresponding to the object in Redis """ - try: - saved_list = self.get_attribute(redis_key, list_name) - if new_item not in saved_list and not repeated: - saved_list.append(new_item) - self.set_attribute(redis_key, list_name, json.dumps(saved_list)) - except (TypeError, ValueError) as exception: - error_message = ( - f'Error encoding list {saved_list} from {redis_key} in Redis') - log.error(f'{error_message}: {exception}') - raise TurbiniaException(error_message) from exception + if not self.attribute_exists(redis_key, list_name): + list_attribute = [new_item] + else: + try: + list_attribute = self.get_attribute(redis_key, list_name) + if new_item not in list_attribute and not allow_repeated: + list_attribute.append(new_item) + except (TypeError, ValueError) as exception: + error_message = ( + f'Error encoding list {list_attribute} from {redis_key} in Redis') + log.error(f'{error_message}: {exception}') + raise TurbiniaException(error_message) from exception + self.set_attribute(redis_key, list_name, json.dumps(list_attribute)) def write_hash_object(self, redis_key, object_dict): """Writes new hash object into redis. To save storage, the function does not @@ -751,7 +808,10 @@ def write_request(self, request_dict: dict, update=False): error_message = 'Error deserializing request attribute.' log.error(f'{error_message}: {exception}') raise TurbiniaException(error_message) from exception - + if not request_dict.get('last_update'): + request_dict['start_time'] = datetime.now().strftime(DATETIME_FORMAT) + if not request_dict.get('last_update'): + request_dict['last_update'] = datetime.now().strftime(DATETIME_FORMAT) # Either updates or write new key if update == self.key_exists(request_key): self.write_hash_object(request_key, request_dict) @@ -770,6 +830,10 @@ def get_request_data(self, request_id: str) -> dict: request_dict = {} for attribute_name, attribute_value in self.iterate_attributes(request_key): request_dict[attribute_name] = attribute_value + request_dict['last_update'] = datetime.strptime( + request_dict.get('last_update'), DATETIME_FORMAT) + request_dict['start_time'] = datetime.strptime( + request_dict.get('start_time'), DATETIME_FORMAT) return request_dict def query_requests(