Skip to content
This repository was archived by the owner on Jan 27, 2022. It is now read-only.

Restart WPE on KME restart #653

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions common/python/exception/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2020 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

all = []
27 changes: 27 additions & 0 deletions common/python/exception/avalon_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2020 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class AvalonException(Exception):
"""
Abstract exception class for Avalon exceptions.
"""
pass


class WorkerRestartException(AvalonException):
"""
Exception class which indicates that a worker has restarted.
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from abc import abstractmethod
from error_code.error_status import WorkOrderStatus
from avalon_enclave_manager.base_enclave_manager import EnclaveManager
from exception.avalon_exceptions import WorkerRestartException

logger = logging.getLogger(__name__)

Expand All @@ -36,6 +37,7 @@ class WOProcessorManager(EnclaveManager):
def __init__(self, config):
super().__init__(config)
self._identity = None
self._is_restart_required = False

# -------------------------------------------------------------------------

Expand Down Expand Up @@ -99,14 +101,24 @@ def _process_work_order_by_id(self, wo_id):
Returns :
wo_resp - A JSON response of the executed work order
"""

try:
if self._is_restart_required:
self._kv_helper.csv_prepend("wo-worker-scheduled",
self._worker_id, wo_id)
logger.info("Reinstating work order {} to ".format(wo_id)
+ "wo-worker-scheduled; This worker will restart.")
raise WorkerRestartException("This worker needs to restart.")

# Get JSON workorder request corresponding to wo_id
wo_json_req = self._kv_helper.get("wo-requests", wo_id)
if wo_json_req is None:
logger.error("Received empty work order corresponding " +
"to id %s from wo-requests table", wo_id)
return None

except WorkerRestartException:
raise
except Exception as e:
logger.error("Problem while reading the work order %s "
"from wo-requests table", wo_id)
Expand All @@ -126,9 +138,22 @@ def _process_work_order_by_id(self, wo_id):

# Execute work order request

logger.info("Execute workorder with id %s", wo_id)
wo_json_resp = self._execute_work_order(wo_json_req)
wo_resp = json.loads(wo_json_resp)
try:
logger.info("Execute workorder with id %s", wo_id)
wo_json_resp = self._execute_work_order(wo_json_req)
wo_resp = json.loads(wo_json_resp)
except WorkerRestartException:
self._kv_helper.csv_prepend("wo-worker-scheduled",
self._worker_id, wo_id)
logger.info("Reinstating work order {} to ".format(wo_id)
+ "wo-worker-scheduled as this worker will restart.")
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is error message and code client will as a response for the WorkerRestartException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not exposed to a client. It is an internal exception used as a signal to trigger restart.

except Exception as e:
logger.error("Exception while processing work order: {}".format(e))
self._persist_wo_response_to_db(
wo_id, WorkOrderStatus.FAILED, None,
"Exception during work order processing")
raise

logger.info("Update workorder receipt for workorder %s", wo_id)
self._wo_kv_delegate.update_receipt(wo_id, wo_resp)
Expand Down Expand Up @@ -175,21 +200,19 @@ def _execute_work_order(self, input_json_str):
"""
try:
wo_response = self._execute_wo_in_trusted_enclave(input_json_str)
try:
json_response = json.dumps(wo_response, indent=4)
except Exception as err:
wo_response["Response"] = dict()
logger.error("ERROR: Failed to serialize JSON; %s", str(err))
wo_response["Response"]["Status"] = WorkOrderStatus.FAILED
wo_response["Response"]["Message"] = "Failed to serialize JSON"
json_response = json.dumps(wo_response)

json_response = json.dumps(wo_response, indent=4)

except TypeError as err:
logger.error("Failed to serialize response JSON - %s", str(err))
return jrpc_utility.create_error_response(
WorkOrderStatus.FAILED, "0",
"Failed to serialize response JSON")
except WorkerRestartException:
raise
except Exception as e:
wo_response["Response"] = dict()
logger.error("failed to execute work order; %s", str(e))
wo_response["Response"]["Status"] = WorkOrderStatus.FAILED
wo_response["Response"]["Message"] = str(e)
json_response = json.dumps(wo_response)
logger.error("Failed to execute work order - %s", str(e))
return jrpc_utility.create_error_response(
WorkOrderStatus.FAILED, "0", "Failed to execute work order")

return json_response

Expand Down Expand Up @@ -297,6 +320,8 @@ def _start_polling_kvstore(self):
logger.info("Enclave manager sleeping for %d secs",
sleep_interval)
time.sleep(sleep_interval)
except WorkerRestartException:
raise
except Exception as inst:
logger.error("Error while processing work-order; " +
"shutting down enclave manager")
Expand Down Expand Up @@ -353,6 +378,8 @@ def _start_zmq_listener(self):
str(wo_id))
else:
socket.send_string("Work order processed: " + str(wo_id))
except WorkerRestartException:
raise
except Exception as inst:
logger.error("Error while processing work-order; " +
"shutting down enclave manager")
Expand Down
85 changes: 67 additions & 18 deletions enclave_manager/avalon_enclave_manager/wpe/wpe_enclave_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
import sys

import utility.jrpc_utility as jrpc_utility
import avalon_enclave_manager.sgx_work_order_request as work_order_request
import avalon_enclave_manager.wpe.wpe_enclave as enclave
import avalon_enclave_manager.wpe.wpe_enclave_info as enclave_info
Expand All @@ -29,6 +30,9 @@
from error_code.error_status import WorkOrderStatus
from avalon_enclave_manager.work_order_processor_manager \
import WOProcessorManager
from exception.avalon_exceptions import WorkerRestartException
from multiprocessing import Process


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -128,25 +132,72 @@ def _execute_wo_in_trusted_enclave(self, input_json_str):
try:
pre_proc_output = self._wpe_requester\
.preprocess_work_order(input_json_str, self.encryption_key)
if "error" in pre_proc_output:
if pre_proc_output is None or "error" in pre_proc_output:
# If error in preprocessing response, skip workorder processing
logger.error("Failed to preprocess at WPE enclave manager.")
logger.error("Failed to preprocess work order request.")
self._check_for_re_register()
if self._is_restart_required:
raise WorkerRestartException(
"This worker needs to restart.")
return pre_proc_output

wo_request = work_order_request.SgxWorkOrderRequest(
"WPE",
input_json_str,
pre_proc_output)
wo_response = wo_request.execute()
except WorkerRestartException:
raise
except Exception as e:
logger.error("failed to execute work order; %s", str(e))
wo_response = dict()
wo_response["error"] = dict()
wo_response["error"]["code"] = WorkOrderStatus.FAILED
wo_response["error"]["message"] = str(e)
logger.info("unknown enclave type response = %s", wo_response)
logger.error("Failed to execute work order; %s", str(e))
wo_response = jrpc_utility.create_error_response(
WorkOrderStatus.FAILED, "0",
"Failed to execute work order : {}".format(str(e)))
return wo_response

# -------------------------------------------------------------------------

def _check_for_re_register(self):
"""
This function verifies worker details in this instance against that
in database to see if a worker update has taken place. If so, it
enables a flag to re-register this WPE with the KME before next work
order is picked up for processing.
"""
wpes_csv = self._kv_helper.get("worker-pool", self._worker_id)
wpes = [] if wpes_csv is None else wpes_csv.split(",")
if self._identity not in wpes:
# If identity is not found in worker-pool, the KME might have
# restarted in which case the WPEs need to register again.
logger.info("WPE needs to restart and register with KME again.")
self._is_restart_required = True

# -------------------------------------------------------------------------


def run_wpe(config):
"""
Delegate method that spawns up as a new process and runs the WPE Enclave
Manager.

Parameters:
@param config - A map of configurations read from file/command line
"""
try:
logger.info("Initialize WorkOrderProcessor enclave_manager")
enclave_manager = WorkOrderProcessorEnclaveManager(config)
logger.info("About to start WorkOrderProcessor Enclave manager")
enclave_manager.start_enclave_manager()
except WorkerRestartException as ex:
logger.error("Exception occurred while processing work orders.")
logger.error(ex)
logger.info("Will try to restart WPE.")
except Exception as e:
logger.error(e)
logger.error("Exception occurred while running WPE, " +
"exiting WPE enclave manager")
sys.exit(1)

# -------------------------------------------------------------------------


Expand Down Expand Up @@ -195,16 +246,14 @@ def main(args=None):
sys.stderr = plogger.stream_to_logger(
logging.getLogger("STDERR"), logging.WARN)

try:
EnclaveManager.parse_command_line(config, remainder)
logger.info("Initialize WorkOrderProcessor enclave_manager")
enclave_manager = WorkOrderProcessorEnclaveManager(config)
logger.info("About to start WorkOrderProcessor Enclave manager")
enclave_manager.start_enclave_manager()
except Exception as e:
logger.error("Exception occurred while running WPE, " +
"exiting WPE enclave manager")
exit(1)
EnclaveManager.parse_command_line(config, remainder)

while True:
wpe_manager = Process(target=run_wpe, args=(config,))
wpe_manager.start()
wpe_manager.join()
if wpe_manager.exitcode == 1:
sys.exit(1)


main()
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def preprocess_work_order(self, wo_request, encryption_key):
@param encryption_key - WPE's public encryption key
Returns :
@returns result - Result from KME that includes the workorder
key info. error response, in case of failure.
key info. Error response, in case of failure.
"""
workload_id = "kme-preprocess"
in_data = [wo_request, encryption_key]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ enum KmeRegistrationStatus {
ERR_MRSIGNER_NOT_MATCH = 4, /// WPE MRSIGNER value not matched
ERR_WPE_VERIFICATION_FAILED = 5, /// WPE attestation report verification failed
ERR_ENCRYPTION_KEY_NOT_MATCH = 6, /// WPE encryption hash value didn't matched
ERR_UNIQUE_ID_NOT_MATCH = 7 /// WPE unique id didn't match
ERR_UNIQUE_ID_NOT_MATCH = 7, /// WPE unique id didn't match
ERR_WPE_KEY_INFO_CREATION_FAILED = 8 /// CreateWorkOrderKeyInfo for WPE failed
};

enum KmePreProcessStatus {
ERR_WPE_MAX_WO_COUNT_REACHED = 1
ERR_WPE_MAX_WO_COUNT_REACHED = 10
};

class ExtWorkOrderInfoKME : public ExtWorkOrderInfoImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,12 @@ void KMEWorkloadProcessor::PreprocessWorkorder(
}
AddOutput(0, wo_key_data, out_work_order_data);
} else {
SetStatus(ERR_WPE_KEY_NOT_FOUND, out_work_order_data);
SetStatus(ERR_WPE_KEY_INFO_CREATION_FAILED, out_work_order_data);
ThrowIf<ValueError>(true, "WPE key info creation failed");
}
} else {
SetStatus(ERR_WPE_KEY_NOT_FOUND, out_work_order_data);
ThrowIf<ValueError>(true, "WPE encryption key not found");
}
} // KMEWorkloadProcessor::PreprocessWorkorder

Expand Down