Skip to content
This repository was archived by the owner on Jan 27, 2022. It is now read-only.

[WIP] fix up bridge module bugs #65

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ examples/common/python/connectors/direct/tcs_listener/__pycache__/
examples/common/python/crypto/crypto.py
examples/common/python/crypto/crypto_wrap.cpp
examples/common/python/dist/
examples/common/python/kv_storage
examples/common/python/kv_storage-lock
examples/common/python/tcf_examples_common.egg-info/
examples/enclave_manager/build/
examples/enclave_manager/dist/
Expand All @@ -28,3 +30,6 @@ tools/build/_dev/

# Byte-compiled / optimized / DLL files
__pycache__/

kv_storage
kv_storage-lock
2 changes: 1 addition & 1 deletion config/tcs_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

[KvStorage]
StoragePath = "config/Kv_Shared_tmp"
StorageSize = "1 TB"
StorageSize = "1 GB"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 GByte is very small. The space for the file is not allocated until used, so 1 TByte allows room to grow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 GB is only for local test, which won't be included in the final commit.

# the remote version is of higher priority if enabled
#remote_url = "http://localhost:9090"

Expand Down
109 changes: 67 additions & 42 deletions examples/common/python/connectors/ethereum/direct_registry_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,58 +14,67 @@

''' this Component acts as bridge between smart contract deployed in blockchain and KV storage'''

import os
import sys
from os import urandom
import argparse
import time
import json
import logging
from os.path import dirname, join, abspath

from shared_kv.shared_kv_interface import KvStorage
from connectors.ethereum.ethereum_worker_registry_list_impl import \
EthereumWorkerRegistryListImpl as registry
from utility.tcf_types import RegistryStatus

import toml
from os.path import dirname, join, abspath
import logging
import json
import time
import argparse
from os import urandom, environ
import sys
import os

sys.path.insert(0, abspath(join(dirname(__file__), '..')) + '/tcf_connector/')

import EthereumDirectRegistry as registry

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


def str_list_to_bytes_list(str_list):
bytes_list = []
for str in str_list:
bytes_list.append(str.encode())
bytes_list.append(bytes.fromhex(str))
return bytes_list


def bytes_list_to_str_list(bytes_list):
str_list = []
for byte in bytes_list:
str_list.append(byte.decode().rstrip(' \t\r\n\0'))
str_list.append(byte.hex())
return str_list


def create_json_object(registry_id, registry, status):
def create_json_object(org_id, registry, status):
logger.debug("create_json_object id: %s, registry: %s",
org_id.hex(), registry)
registry_info = dict()
# registry[0] contains registryType which symbolizes if registry present ( value equals to 1).
registry_info["orgId"] = (registry_id.decode()).rstrip(' \t\r\n\0') # Strip off null bytes added by decode
registry_info["uri"] = registry[1]
registry_info["scAddress"] = (registry[2].decode()).rstrip(' \t\r\n\0')
app_ids_bytes = registry[3] # list of application ids
registry_info["orgId"] = org_id.hex()
registry_info["uri"] = registry[0]
registry_info["scAddress"] = registry[1].hex()
app_ids_bytes = registry[2] # list of application ids
logger.debug("app_ids_bytes %s", app_ids_bytes)
app_ids_list = bytes_list_to_str_list(app_ids_bytes)
registry_info["appTypeIds"] = app_ids_list
registry_info["status"] = status
registry_info["status"] = status.value
logger.debug("registry_info %s", registry_info)

json_registry = json.dumps(registry_info)
logger.debug("JSON serialized registry is %s", json_registry)
logger.debug("JSON serialized registry %s is %s", org_id.hex(), json_registry)
return json_registry


def deserialize_json_object(json_reg_info):
reg_info = json.loads(json_reg_info)
uri = reg_info["uri"]
sc_address = reg_info["scAddress"].encode()
sc_address = bytes.fromhex(reg_info["scAddress"])
app_ids_str = reg_info["appTypeIds"]

# Convert list of appTypeIds of type string to bytes
Expand All @@ -76,32 +85,37 @@ def deserialize_json_object(json_reg_info):

def sync_contract_and_lmdb(eth_direct_registry, kv_storage):
# Check if all registries in smart contract are available in KvStorage, if not add them
eth_lookup_result = eth_direct_registry.RegistryLookUp()
eth_registry_list = eth_lookup_result[-1] # last value of lookup will be list of registry id's/org id's
eth_lookup_result = eth_direct_registry.registry_lookup()
# last value of lookup will be list of registry id's/org id's
org_id_list = eth_lookup_result[-1]

logger.info("Syncing registries from Contract to KvStorage")
if not eth_registry_list:
if not org_id_list:
logger.info("No registries available in Direct registry contract")

else:
for eth_registry_id in eth_registry_list:
eth_reg_info = eth_direct_registry.RegistryRetrieve(eth_registry_id)

for org_id in org_id_list:
eth_reg_info = eth_direct_registry.registry_retrieve(org_id)
logger.debug("eth_reg_info: %s", eth_reg_info)
# Check if registry entry present in KvStorage
logger.info("Check if registry with id %s present in KvStorage", eth_registry_id.decode())
kv_reg_info = kv_storage.get("registries", eth_registry_id.decode().rstrip(' \t\r\n\0'))
logger.info(
"Check if registry with id %s present in KvStorage", org_id.hex())
kv_reg_info = kv_storage.get("registries", org_id)
if not kv_reg_info:
logger.info("No matching registry found in KvStorage, ADDING it to KvStorage")
logger.info(
"No matching registry found in KvStorage, ADDING it to KvStorage")
# Create JSON registry object with status 0 equivalent to SUSPENDED
json_registry = create_json_object(eth_registry_id, eth_reg_info, 0)
kv_storage.set("registries", eth_registry_id.decode().rstrip(' \t\r\n\0'), json_registry)
json_registry = create_json_object(org_id, eth_reg_info, RegistryStatus.SUSPENDED)
kv_storage.set("registries", org_id.hex(), json_registry)

else:
logger.info("Matching registry found in KvStorage, hence ADDING")
logger.info(
"Matching registry found in KvStorage, hence ADDING")
# Set the status of registry to ACTIVE
kv_registry = json.loads(kv_reg_info)
kv_registry["status"] = 1 # status = 1 implies ACTIVE
kv_storage.set("registries", eth_registry_id.decode().rstrip(' \t\r\n\0'), json.dumps(kv_registry))
kv_storage.set("registries", org_id.hex(),
json.dumps(kv_registry))

logger.info("Syncing registries from KvStorage to Contract")
# Check if all registries present in KvStorage are available in Smart contract, if not add them
Expand All @@ -114,21 +128,30 @@ def sync_contract_and_lmdb(eth_direct_registry, kv_storage):
for kv_registry_id in kv_registry_list:
kv_reg_info = kv_storage.get("registries", kv_registry_id)
# Check if registry entry present in smart contract
retrieve_result = eth_direct_registry.RegistryRetrieve(kv_registry_id.encode())
logger.debug(
"found registry_retrieve: %s from kv store", kv_registry_id)
retrieve_result = eth_direct_registry.registry_retrieve(
bytes.fromhex(kv_registry_id))

if retrieve_result[0] == 0:
logger.info("Matching registry with id %s not found in Smart contract, hence ADDING", kv_registry_id)
logger.info(
"Matching registry with id %s not found in Smart contract, hence ADDING", kv_registry_id)
reg_info = deserialize_json_object(kv_reg_info)
# Add registry to smart contract and set status to ACTIVE
eth_direct_registry.RegistryAdd(kv_registry_id.encode(), reg_info[0], reg_info[1], reg_info[2])
eth_direct_registry.RegistrySetStatus(kv_registry_id.encode(), json.loads(kv_reg_info)["status"])
eth_direct_registry.registry_add(
bytes.fromhex(kv_registry_id), reg_info[0], reg_info[1], reg_info[2])
eth_direct_registry.registry_set_status(
bytes.fromhex(kv_registry_id), RegistryStatus.ACTIVE)

else:
# Set status of registry to registry status in KvStorage
logger.info("Matching registry %s found in Smart contract, CHANGING status", kv_registry_id)
eth_direct_registry.RegistrySetStatus(kv_registry_id.encode(), json.loads(kv_reg_info)["status"])
logger.info(
"Matching registry %s found in Smart contract, CHANGING status", kv_registry_id)
eth_direct_registry.registry_set_status(
bytes.fromhex(kv_registry_id), RegistryStatus.ACTIVE) # TODO

logger.info("-------------- Direct Registry bridge flow complete ------------------- ")
logger.info(
"-------------- Direct Registry bridge flow complete ------------------- ")
return


Expand All @@ -137,9 +160,11 @@ def main(args=None):

# Smart contract address is the address where smart contract is deployed.
# TODO: Add mechanism to read the address from config file.

eth_direct_registry = registry.EthereumDirectRegistry("0x8c99670a15047248403a3E5A38eb8FBE7a12533e", \
'../connectors/contracts/WorkerRegistryList.sol')
tcf_home = environ.get("TCF_HOME", "../../")
config_file = tcf_home + "/examples/common/python/connectors/" + "tcf_connector.toml"
with open(config_file) as fd:
config = toml.load(fd)
eth_direct_registry = registry(config)
kv_storage = KvStorage()
kv_storage.open("kv_storage")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from eth_utils.hexadecimal import is_hex
import binascii
from os import environ
import logging

from connectors.interfaces.worker_registry_list_interface import WorkerRegistryListInterface
from connectors.ethereum.ethereum_wrapper import EthereumWrapper
from utility.tcf_types import RegistryStatus
from connectors.utils import construct_message

logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO)
logging.level = logging.DEBUG

logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO)

class EthereumWorkerRegistryListImpl(WorkerRegistryListInterface):
"""
Implements WorkerRegistryListInterface interface
"""

def __init__(self, config):
if self.__validate(config):
self.__initialize(config)
Expand All @@ -50,8 +55,8 @@ def registry_add(self, org_id, uri, sc_addr, app_type_ids):
logging.info("Invalid application id {}".format(aid))
return construct_message("failed", "Invalid application id")

txn_hash = self.__contract_instance.functions.registryAdd(org_id, uri, org_id,
app_type_ids).buildTransaction(
txn_hash = self.__contract_instance.functions.registryAdd(org_id, uri, sc_addr,
app_type_ids).buildTransaction(
{
"chainId": self.__eth_client.get_channel_id(),
"gas": self.__eth_client.get_gas_limit(),
Expand All @@ -61,18 +66,19 @@ def registry_add(self, org_id, uri, sc_addr, app_type_ids):
tx = self.__eth_client.execute_transaction(txn_hash)
return tx
else:
logging.error("direct registry contract instance is not initialized")
logging.error(
"direct registry contract instance is not initialized")
return construct_message("failed", "direct registry contract instance is \
not initialized")


def registry_update(self, org_id, uri, sc_addr, app_type_ids):
if (self.__contract_instance != None):
if (is_hex(binascii.hexlify(org_id).decode("utf8")) == False):
logging.error("Invalid Org id {}".format(org_id))
return construct_message("failed", "Invalid Org id")
if (sc_addr is not None and is_hex(binascii.hexlify(sc_addr).decode("utf8")) == False):
logging.error("Invalid smart contract address {}".format(sc_addr))
logging.error(
"Invalid smart contract address {}".format(sc_addr))
return construct_message("failed", "Invalid smart contract address")
if (not uri):
logging.error("Empty uri {}".format(uri))
Expand All @@ -83,17 +89,18 @@ def registry_update(self, org_id, uri, sc_addr, app_type_ids):
return construct_message("failed", "Invalid application id")

txn_hash = self.__contract_instance.functions.registryUpdate(org_id, uri, sc_addr,
app_type_ids).buildTransaction(
app_type_ids).buildTransaction(
{
"chainId": self.__eth_client.get_channel_id(),
"gas": self.__eth_client.get_gas_limit(),
"gasPrice": self.__eth_client.get_gas_price(),
"nonce": self.__eth_client.get_txn_nonce()
"nonce": self.__eth_client.get_txn_nonce()
})
tx = self.__eth_client.execute_transaction(txn_hash)
return tx
else:
logging.error("direct registry contract instance is not initialized")
logging.error(
"direct registry contract instance is not initialized")
return construct_message("failed", "direct registry contract instance is \
not initialized")

Expand All @@ -106,59 +113,72 @@ def registry_set_status(self, org_id, status):
logging.info("Invalid registry status {}".format(status))
return construct_message("failed", "Invalid worker status {}".format(status))
txn_hash = self.__contract_instance.functions.registrySetStatus(org_id,
status.value).buildTransaction(
{
"chainId": self.__eth_client.get_channel_id(),
"gas": self.__eth_client.get_gas_limit(),
"gasPrice": self.__eth_client.get_gas_price(),
"nonce": self.__eth_client.get_txn_nonce()
})
status.value).buildTransaction(
{
"chainId": self.__eth_client.get_channel_id(),
"gas": self.__eth_client.get_gas_limit(),
"gasPrice": self.__eth_client.get_gas_price(),
"nonce": self.__eth_client.get_txn_nonce()
})
tx = self.__eth_client.execute_transaction(txn_hash)
return tx
else:
logging.error("direct registry contract instance is not initialized")
logging.error(
"direct registry contract instance is not initialized")
return construct_message("failed", "direct registry contract instance is \
not initialized")

def registry_lookup(self, app_type_id=None):
if (self.__contract_instance != None):
if app_type_id != None:
if is_hex(binascii.hexlify(app_type_id).decode("utf8")):
lookupResult = self.__contract_instance.functions.registryLookUp(
app_type_id).call()
else:
logging.info("Invalid application type id {}".format(app_type_id))
logging.info(
"Invalid application type id {}".format(app_type_id))
return construct_message("failed", "Invalid application type id")
else:
lookupResult = self.__contract_instance.functions.registryLookUp(b"").call()
lookupResult = self.__contract_instance.functions.registryLookUp(
b"").call()
return lookupResult
else:
logging.error("direct registry contract instance is not initialized")
logging.error(
"direct registry contract instance is not initialized")
return construct_message("failed", "direct registry contract instance is not initialized")

def registry_retrieve(self, org_id):
logging.info("registryRetrieve %s->%s from contract, len %s",
org_id.hex(), org_id, len(org_id))
if (self.__contract_instance != None):
if (is_hex(binascii.hexlify(org_id).decode("utf8")) == False):
logging.info("Invalid Org id {}".format(org_id))
return construct_message("failed", "Invalid Org id")
else:
registryDetails = self.__contract_instance.functions.registryRetrieve(org_id).call()
# TODO may cause panic: OverflowError: Python int too large to convert to C ssize_t
registryDetails = self.__contract_instance.functions.registryRetrieve(
org_id).call()
logging.info("registryRetrieve %s done", org_id.hex())
return registryDetails
else:
logging.error("direct registry contract instance is not initialized")
logging.error(
"direct registry contract instance is not initialized")
return construct_message("failed", "direct registry contract instance is \
not initialized")

def registry_lookup_next(self, app_type_id, lookup_tag):
if (self.__contract_instance != None):
if is_hex(binascii.hexlify(app_type_id).decode("utf8")):
lookupResult = self.__contract_instance.functions.registryLookUpNext(app_type_id, lookup_tag).call()
lookupResult = self.__contract_instance.functions.registryLookUpNext(
app_type_id, lookup_tag).call()
return lookupResult
else:
logging.info("Invalid application type id {}".format(app_type_id))
logging.info(
"Invalid application type id {}".format(app_type_id))
return construct_message("failed", "Invalid application type id")
else:
logging.error("direct registry contract instance is not initialized")
logging.error(
"direct registry contract instance is not initialized")
return construct_message("failed", "direct registry contract instance \
is not initialized")

Expand Down
Loading