Skip to content

Commit a178c80

Browse files
committed
POC: bom link
1 parent 7c179ed commit a178c80

File tree

9 files changed

+188
-198
lines changed

9 files changed

+188
-198
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ dist/
1313
/examples/
1414
venv
1515
.obsidian
16+
*.ipynb
1617
old
1718
offline_experiments
1819
testing/features/cat.py

cat/node.py

Lines changed: 73 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
import json
2+
import pickle
3+
from copy import deepcopy
4+
from pprint import pprint
5+
16
from flask import Flask, request, jsonify
27
cat = Flask(__name__)
38

49
from cats.network import ipfsApi, MeshClient
510
from cats.service import Service
6-
from cats.executor import Executor
711
from cats.factory import Factory
812

913
service = Service(
@@ -12,58 +16,93 @@
1216
)
1317
)
1418

19+
def initFactory(order_request, ipfs_uri):
20+
# if cod_out is False:
21+
# ipfs_uri = f'ipfs://{order_request["invoice"]["data_cid"]}/*csv'
22+
# elif cod_out is True:
23+
# ipfs_uri = f'ipfs://{order_request["invoice"]["data_cid"]}/output/*csv'
24+
service.initBOMcar(
25+
structure_cid=order_request['order']['structure_cid'],
26+
structure_filepath=order_request['order']['structure_filepath'],
27+
function_cid=order_request['order']['function_cid'],
28+
init_data_cid=ipfs_uri
29+
)
30+
catFactory = Factory(service)
31+
return catFactory, order_request
32+
33+
def execute(catFactory, order_request):
34+
enhanced_bom = catFactory.execute()
35+
36+
invoice = {}
37+
enhanced_bom['invoice']['order_cid'] = service.ipfsClient.add_str(
38+
json.dumps(order_request['order'])
39+
)
40+
invoice['invoice_cid'] = service.ipfsClient.add_str(
41+
json.dumps(enhanced_bom['invoice'])
42+
)
43+
invoice['invoice'] = enhanced_bom['invoice']
44+
45+
bom = {
46+
'log_cid': enhanced_bom['log_cid'],
47+
'invoice_cid': invoice['invoice_cid']
48+
}
49+
bom_response = {
50+
'bom': bom,
51+
'bom_cid': service.ipfsClient.add_str(json.dumps(bom))
52+
}
53+
return bom_response
54+
1555

16-
@cat.route('/cat/node/preproc', methods=['POST'])
17-
def initExecute():
56+
@cat.route('/cat/node/init', methods=['POST'])
57+
def execute_init_cat():
1858
try:
1959
# Get JSON data from the request
20-
bom = request.get_json()
60+
order_request = request.get_json()
61+
order_request["order"] = json.loads(service.meshClient.cat(order_request["order_cid"]))
62+
order_request['invoice'] = json.loads(service.meshClient.cat(order_request['order']['invoice_cid']))
63+
pprint(order_request["order"])
64+
pprint(order_request['invoice']['data_cid'])
65+
66+
67+
68+
# bom['invoice']['data_cid'] = service.meshClient.linkData(bom['invoice']['data_cid'])
2169

2270
# IPFS checks
23-
# if 'init_data_cid' not in bom:
71+
# if 'bom_cid' not in bom:
2472
# return jsonify({'error': 'CID not provided'}), 400
2573

26-
data_cid = bom['invoice']['data_cid']
27-
ipfs_uri = f'ipfs://{data_cid}/*csv'
28-
service.initBOMcar(
29-
structure_cid=bom['order']['structure_cid'],
30-
structure_filepath=bom['order']['structure_filepath'],
31-
function_cid=bom['order']['function_cid'],
32-
init_data_cid=ipfs_uri
33-
)
34-
catFactory = Factory(service)
35-
enhanced_bom = catFactory.execute()
74+
75+
ipfs_uri = f'ipfs://{order_request["invoice"]["data_cid"]}/*csv'
76+
catFactory, updated_order_request = initFactory(order_request, ipfs_uri)
77+
bom_response = execute(catFactory, updated_order_request)
3678

3779
# Return BOM
38-
return jsonify(enhanced_bom)
80+
return jsonify(bom_response)
3981

4082
except Exception as e:
4183
return jsonify({'error': str(e)})
4284

43-
@cat.route('/cat/node/postproc', methods=['POST'])
44-
def execute():
85+
@cat.route('/cat/node/link', methods=['POST'])
86+
def execute_link_cat():
4587
try:
4688
# Get JSON data from the request
47-
bom = request.get_json()
89+
order_request = request.get_json()
90+
order_request["order"] = json.loads(service.meshClient.cat(order_request["order_cid"]))
91+
order_request['invoice'] = json.loads(service.meshClient.cat(order_request['order']['invoice_cid']))
92+
pprint(order_request["order"])
93+
pprint(order_request['invoice']['data_cid'])
4894

49-
50-
# IPFS checks
51-
# if 'bom_json_cid' not in bom:
52-
# return jsonify({'error': 'CID not provided'}), 400
53-
54-
data_cid = service.meshClient.linkData(bom['invoice']['data_cid'])
95+
prev_data_cid = order_request['invoice']['data_cid']
96+
data_cid = service.meshClient.linkData(prev_data_cid)
5597
ipfs_uri = f'ipfs://{data_cid}/*csv'
56-
service.initBOMcar(
57-
structure_cid=bom['order']['structure_cid'],
58-
structure_filepath=bom['order']['structure_filepath'],
59-
function_cid=bom['order']['function_cid'],
60-
init_data_cid=ipfs_uri
61-
)
62-
catFactory = Factory(service)
63-
enhanced_bom = catFactory.execute()
98+
catFactory, updated_order_request = initFactory(order_request, ipfs_uri)
99+
bom_response = execute(catFactory, updated_order_request)
100+
101+
# Return BOM
102+
return jsonify(bom_response)
64103

65104
# Return BOM
66-
return jsonify(enhanced_bom)
105+
return jsonify(bom_response)
67106

68107
except Exception as e:
69108
return jsonify({'error': str(e)})

cats/executor/__init__.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,14 @@ def execute(self, enhanced_bom=None):
7070
self.enhanced_bom['invoice']['data_cid'] = self.service.meshClient.getEgressOutput(job_id=self.egress_job_id)
7171
self.enhanced_bom['log_cid'] = self.service.ipfsClient.add_json(self.enhanced_bom['log'])
7272

73-
# del self.enhanced_bom['bom_json_cid']
74-
# del self.enhanced_bom['init_data_cid']
75-
# os.remove("bom.json")
76-
# os.remove("invoice.json")
77-
# os.remove("order.json")
78-
# os.remove("bom.car")
79-
# os.remove("cat-action-plane-config")
73+
del self.enhanced_bom['bom_json_cid']
74+
del self.enhanced_bom['init_data_cid']
75+
76+
os.remove("bom.json")
77+
os.remove("invoice.json")
78+
os.remove("order.json")
79+
os.remove("bom.car")
80+
os.remove("cat-action-plane-config")
8081
return self.enhanced_bom, None
8182
# return self.invoiceCID
8283

cats/service/__init__.py

Lines changed: 101 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
22
import pickle
33
import subprocess
4-
from copy import copy, deepcopy
4+
from copy import deepcopy
55
from pprint import pprint
66

77
from cats.service.utils import executeCMD
@@ -64,12 +64,15 @@ def initBOMcar(self, function_cid, init_data_cid, init_bom_filename='bom.car', s
6464
return self.init_bom_car_cid, self.init_bom_json_cid
6565

6666
def catSubmit(self, bom):
67+
order = json.loads(self.meshClient.cat(bom["order_cid"]))
68+
pprint(order)
6769
ppost = lambda args, endpoint: \
6870
f'curl -X POST -H "Content-Type: application/json" -d \\\n\'{json.dumps(**args)}\' {endpoint}'
6971
post = lambda args, endpoint: \
7072
'curl -X POST -H "Content-Type: application/json" -d \'' + json.dumps(**args) + f'\' {endpoint}'
71-
post_cmd = post({'obj': bom}, bom["order"]["endpoint"])
72-
print(ppost({'obj': bom, 'indent': 4}, bom["order"]["endpoint"]))
73+
74+
post_cmd = post({'obj': bom}, order["endpoint"])
75+
print(ppost({'obj': bom, 'indent': 4}, order["endpoint"]))
7376
response_str = subprocess.check_output(post_cmd, shell=True)
7477
output_bom = json.loads(response_str)
7578
# pprint(output_bom)
@@ -78,8 +81,27 @@ def catSubmit(self, bom):
7881
output_bom['POST'] = post_cmd
7982
return output_bom
8083

84+
def flatten_bom(self, bom_response):
85+
invoice = json.loads(
86+
self.meshClient.cat(bom_response["bom"]["invoice_cid"])
87+
)
88+
invoice['order'] = json.loads(
89+
self.meshClient.cat(invoice['order_cid']),
90+
)
91+
invoice['order']['flat'] = {
92+
'function': json.loads(self.meshClient.cat(invoice['order']["function_cid"])),
93+
'invoice': json.loads(self.meshClient.cat(invoice['order']["invoice_cid"]))
94+
}
95+
bom_response["flat_bom"] = {
96+
'invoice': invoice,
97+
'log': json.loads(
98+
self.meshClient.cat(bom_response["bom"]["log_cid"])
99+
)
100+
}
101+
return bom_response
102+
81103
def resubmit_bom(self,
82-
bom, endpoint='http://127.0.0.1:5000/cat/node/postproc'
104+
bom, endpoint='http://127.0.0.1:5000/cat/node/process'
83105
):
84106
def f(
85107
process_obj, invoice_bom=bom,
@@ -94,56 +116,97 @@ def f(
94116

95117
return f
96118

97-
def create_order(self,
119+
def create_order_request(self,
98120
process_obj, data_dirpath, structure_filepath,
99-
endpoint='http://127.0.0.1:5000/cat/node/preproc'
121+
endpoint='http://127.0.0.1:5000/cat/node/execute'
100122
):
101123
structure_cid, structure_name = self.meshClient.cidFile(structure_filepath)
102124
function = {
103125
'process_cid': self.ipfsClient.add_pyobj(process_obj),
104126
'infrafunction_cid': None
105127
}
106-
order_bom = {
107-
"invoice": {
108-
"data_cid": self.meshClient.cidDir(data_dirpath)
109-
},
110-
"order": {
111-
"function_cid": self.ipfsClient.add_str(json.dumps(function)),
112-
"structure_cid": structure_cid,
113-
"structure_filepath": structure_name,
114-
"endpoint": endpoint
115-
},
116-
"function": function
128+
invoice = {
129+
"data_cid": self.meshClient.cidDir(data_dirpath)
130+
}
131+
order = {
132+
"function_cid": self.ipfsClient.add_str(json.dumps(function)),
133+
"structure_cid": structure_cid,
134+
"invoice_cid": self.ipfsClient.add_str(json.dumps(invoice)),
135+
"structure_filepath": structure_name,
136+
"endpoint": endpoint
137+
}
138+
self.order = {
139+
'order_cid': self.ipfsClient.add_str(json.dumps(order))
117140
}
118-
self.order = order_bom
119141
return self.order
120142

143+
def process_loader(self, process_obj, invoice_bom, endpoint):
144+
new_order_bom = {}
145+
new_order_bom["order"] = json.loads(self.meshClient.cat(invoice_bom["order_cid"]))
146+
new_order_bom["order"]["endpoint"] = endpoint
147+
new_order_bom["function"] = json.loads(self.meshClient.cat(new_order_bom["order"]["function_cid"]))
148+
new_order_bom["function"]['process_cid'] = self.ipfsClient.add_pyobj(process_obj)
149+
new_order_bom["order"]["function_cid"] = self.ipfsClient.add_str(json.dumps(new_order_bom["function"]))
150+
new_order_bom["order_cid"] = self.ipfsClient.add_str(json.dumps(new_order_bom["order"]))
151+
# new_order_bom["order"] = json.loads(self.meshClient.cat(new_order_bom["order_cid"]))
152+
del new_order_bom["function"]
153+
# next_invoice = {
154+
# 'data_cid': invoice_bom['data_cid']
155+
# }
156+
#
157+
# new_order_bom["order"]['invoice'] = next_invoice
158+
# new_order_bom["order"]['invoice']['invoice_cid'] = self.ipfsClient.add_str(json.dumps(next_invoice))
159+
new_order_bom["invoice"] = invoice_bom
160+
del new_order_bom['invoice']['order_cid']
161+
del new_order_bom['invoice']['seed_cid']
162+
new_order_bom['order']['invoice_cid'] = self.ipfsClient.add_str(json.dumps(new_order_bom['invoice']))
163+
new_order_bom['invoice'] = new_order_bom['invoice']
164+
return new_order_bom
165+
121166
def catJob_repl(self,
122-
previous_invoice_bom, modify_bom,
123-
endpoint='http://127.0.0.1:5000/cat/node/postproc'
124-
):
125-
def resubmit_catJob(
167+
previous_invoice, structured_function,
168+
endpoint='http://127.0.0.1:5000/cat/node/process'
169+
):
170+
def resubmit_order(
126171
process_obj
127172
):
128-
def resubmit_order(process_obj=process_obj):
129-
order_bom = modify_bom(process_obj, previous_invoice_bom, endpoint)
130-
return order_bom, self.catSubmit(order_bom)
131-
132-
order_bom, invoice_bom = resubmit_order(process_obj)
133-
return order_bom, invoice_bom, resubmit_order
134-
135-
return resubmit_catJob
173+
def f(process_obj=process_obj):
174+
order_request = structured_function(
175+
service=self,
176+
process_obj=process_obj,
177+
invoice=previous_invoice,
178+
endpoint=endpoint
179+
)
180+
# del order_request['order']
181+
# del order_request['invoice']
182+
pprint(order_request)
183+
print()
184+
cat_response = self.catSubmit(order_request)
185+
pprint(cat_response)
186+
# exit()
187+
return self.flatten_bom(cat_response)
188+
flat_cat_response = f(process_obj)
189+
flat_cat_response['cat_processor'] = f
190+
return flat_cat_response
191+
192+
return resubmit_order
136193

137194
def cat_repl(self,
138-
order_bom, bom_function,
195+
order_bom, structured_function,
139196
endpoint='http://127.0.0.1:5000/cat/node'
140197
):
141-
preproc_endpoint = f'{endpoint}/preproc'
198+
# preproc_endpoint = f'{endpoint}/execute'
142199
# order_bom["order"]['endpoint'] = preproc_endpoint
143-
pprint(order_bom)
144-
invoice_bom = self.catSubmit(order_bom)
145-
146-
postproc_endpoint = f'{endpoint}/postproc'
147-
catJobRepl = self.catJob_repl(invoice_bom, bom_function, postproc_endpoint)
148-
return order_bom, invoice_bom, catJobRepl
200+
cat_response = self.catSubmit(order_bom)
201+
flat_cat_response = self.flatten_bom(cat_response)
202+
203+
postproc_endpoint = f'{endpoint}/process'
204+
# invoice = flat_cat_response['flat_bom']['invoice']
205+
invoice = json.loads(self.meshClient.cat(flat_cat_response['bom']["invoice_cid"]))
206+
# pprint(invoice)
207+
# print()
208+
# exit()
209+
catJobRepl = self.catJob_repl(invoice, structured_function, postproc_endpoint)
210+
flat_cat_response['cat_processor'] = catJobRepl
211+
return flat_cat_response
149212

0 commit comments

Comments
 (0)