Skip to content

Commit d2aef58

Browse files
committed
Process ndjson from pathfinder responses
1 parent 32bf802 commit d2aef58

File tree

2 files changed

+83
-34
lines changed

2 files changed

+83
-34
lines changed

src/token.js

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { getVersion } from '~/safe';
1818
* @param {Object} userOptions - search arguments
1919
* @param {string} pathfinderType - pathfinder execution type
2020
* @param {number} pathfinderMaxTransferSteps - default pathfinder server max transfer steps
21+
* @param {Boolean} returnIterativeFirstMatch - if true, the pathfinder service iteratively optimizes the result and returns the first match with 'value'. Only available when pathfinderType is 'cli'
2122
*
2223
* @return {Object[]} - transaction steps
2324
*/
@@ -27,6 +28,7 @@ export async function findTransitiveTransfer(
2728
userOptions,
2829
pathfinderType,
2930
pathfinderMaxTransferSteps,
31+
returnIterativeFirstMatch = false,
3032
) {
3133
let result;
3234
if (pathfinderType == 'cli') {
@@ -39,6 +41,7 @@ export async function findTransitiveTransfer(
3941
utils,
4042
userOptions,
4143
pathfinderMaxTransferSteps,
44+
returnIterativeFirstMatch,
4245
);
4346
}
4447
return result;
@@ -109,6 +112,7 @@ async function findTransitiveTransferCli(web3, utils, userOptions) {
109112
* @param {string} userOptions.to - receiver Safe address
110113
* @param {BN} userOptions.value - value of Circles tokens
111114
* @param {number} userOptions.maxTransfers - limit of steps returned by the pathfinder service
115+
* @param {Boolean} returnIterativeFirstMatch - if true, the pathfinder service iteratively optimizes the result and returns the first match with 'value'. Only available when pathfinderType is 'cli'
112116
*
113117
* @return {Object[]} - transaction steps
114118
*/
@@ -117,6 +121,7 @@ async function findTransitiveTransferServer(
117121
utils,
118122
userOptions,
119123
pathfinderMaxTransferSteps,
124+
returnIterativeFirstMatch,
120125
) {
121126
const options = checkOptions(userOptions, {
122127
from: {
@@ -136,7 +141,6 @@ async function findTransitiveTransferServer(
136141

137142
try {
138143
const response = await utils.requestPathfinderAPI({
139-
method: 'POST',
140144
data: {
141145
id: crypto.randomUUID(),
142146
method: 'compute_transfer',
@@ -145,9 +149,9 @@ async function findTransitiveTransferServer(
145149
to: options.to,
146150
value: options.value.toString(),
147151
max_transfers: options.maxTransfers,
152+
iterative: returnIterativeFirstMatch,
148153
},
149154
},
150-
isTrailingSlash: false,
151155
});
152156
return response.result;
153157
} catch (error) {
@@ -586,6 +590,7 @@ export default function createTokenModule(
586590
options,
587591
pathfinderType,
588592
pathfinderMaxTransferSteps,
593+
true,
589594
);
590595
if (web3.utils.toBN(response.maxFlowValue).lt(options.value)) {
591596
throw new TransferError(

src/utils.js

Lines changed: 76 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,79 @@ import { getTokenContract, getSafeContract } from '~/common/getContracts';
1717
/** @access private */
1818
const transactionQueue = new TransactionQueue();
1919

20-
async function request(endpoint, userOptions) {
20+
async function processResponseJson(response) {
21+
return new Promise((resolve, reject) => {
22+
const getJson = (response) => {
23+
return response.json().then((json) => {
24+
if (response.status >= 400) {
25+
throw new RequestError(response.url, json, response.status);
26+
}
27+
return json;
28+
});
29+
};
30+
const contentType = response.headers.get('Content-Type');
31+
if (contentType && contentType.includes('application/json')) {
32+
getJson(response).then(resolve).catch(reject);
33+
} else {
34+
if (response.status >= 400) {
35+
reject(new RequestError(response.url, response.body, response.status));
36+
}
37+
resolve(response.body);
38+
}
39+
})
40+
}
41+
42+
async function processResponseNdjson(response, data) {
43+
let buffer = ''
44+
let jsons = []
45+
let final
46+
return new Promise((resolve, reject) => {
47+
resolve(response.body)
48+
})
49+
.then(res => {
50+
return new Promise((resolve, reject) => {
51+
res.on('readable', () => {
52+
console.log("readable...*");
53+
let result
54+
const decoder = new TextDecoder()
55+
while (null !== (result = res.read())) {
56+
buffer += decoder.decode(result)
57+
let idx = buffer.indexOf("\n")
58+
while(idx !== -1) {
59+
const text = buffer.substring(0, idx)
60+
try {
61+
const jsonText = JSON.parse(text)
62+
console.log(jsonText)
63+
jsons.push(jsonText)
64+
if (jsonText.result.maxFlowValue === data.params.value) {
65+
final = jsonText
66+
res.destroy()
67+
}
68+
} catch(error) {
69+
console.warn(text)
70+
}
71+
buffer = buffer.substring(idx + 1)
72+
idx = buffer.indexOf("\n")
73+
}
74+
}
75+
})
76+
res.on('end', () => { // If haven't received a matching result yet, then return the last result
77+
console.log("END!");
78+
console.log({final});
79+
console.log({jsons});
80+
resolve(jsons.pop())
81+
});
82+
res.on("close", function (err) {
83+
console.log("Stream has been destroyed and file has been closed");
84+
console.log({final});
85+
console.log({jsons});
86+
resolve(final ? final : jsons.pop())
87+
})
88+
})
89+
})
90+
}
91+
92+
async function request(endpoint, userOptions, processResponse = processResponseJson) {
2193
const options = checkOptions(userOptions, {
2294
path: {
2395
type: 'array',
@@ -60,30 +132,7 @@ async function request(endpoint, userOptions) {
60132
const url = `${endpoint}/${path.join('/')}${slash}${paramsStr}`;
61133

62134
try {
63-
return fetch(url, request).then((response) => {
64-
const contentType = response.headers.get('Content-Type');
65-
const transferEncoding = response.headers.get('Transfer-Encoding');
66-
if (contentType && contentType.includes('application/json')) {
67-
return response.json().then((json) => {
68-
if (response.status >= 400) {
69-
throw new RequestError(url, json, response.status);
70-
}
71-
return json;
72-
});
73-
} else {
74-
if (response.status >= 400) {
75-
throw new RequestError(url, response.body, response.status);
76-
}
77-
if (transferEncoding && transferEncoding.includes('chunked')) {
78-
console.log("chunked");
79-
return response.text().then((text) => {
80-
console.log(text.toString());
81-
return JSON.parse(text.toString());
82-
});
83-
}
84-
return response.body;
85-
}
86-
});
135+
return fetch(url, request).then(response => processResponse(response, data));
87136
} catch (err) {
88137
throw new RequestError(url, err.message);
89138
}
@@ -1125,28 +1174,23 @@ export default function createUtilsModule(web3, contracts, globalOptions) {
11251174
* @namespace core.utils.requestPathfinderAPI
11261175
*
11271176
* @param {Object} userOptions - Pathfinder API query options
1128-
* @param {string} userOptions.method - HTTP method
11291177
* @param {Object} userOptions.data - Request body (JSON)
11301178
*
11311179
* @return {Object} - API response
11321180
*/
11331181
requestPathfinderAPI: async (userOptions) => {
11341182
const options = checkOptions(userOptions, {
1135-
method: {
1136-
type: 'string',
1137-
default: 'GET',
1138-
},
11391183
data: {
11401184
type: 'object',
11411185
default: {},
11421186
},
11431187
});
11441188
return request(pathfinderServiceEndpoint, {
11451189
data: options.data,
1146-
method: options.method,
1190+
method: 'POST',
11471191
path: [],
11481192
isTrailingSlash: false,
1149-
});
1193+
}, processResponseNdjson);
11501194
},
11511195

11521196
/**

0 commit comments

Comments
 (0)