Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kucoinfutures): add watchPositions and upgrade ast-transpiler to v0.0.46 #22169

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
15 changes: 13 additions & 2 deletions cs/ccxt/ws/ArrayCache.cs
Expand Up @@ -297,9 +297,11 @@ public class ArrayCacheBySymbolBySide : ArrayCache
{
// tbd incomplete
public Dictionary<string, object> hashmap = new Dictionary<string, object>();
public ArrayCacheBySymbolBySide(int? maxSixe = null) : base(maxSixe)
private bool hedged = true;
public ArrayCacheBySymbolBySide(int? maxSixe = null, bool hedged = true) : base(maxSixe)
{
this.nestedNewUpdatesBySymbol = true;
this.hedged = hedged;
}

public void append(object item)
Expand All @@ -316,6 +318,15 @@ private void _append(object item)
var itemSide = Exchange.SafeString(item, "side");
var bySide = (this.hashmap.ContainsKey(itemSymbol)) ? this.hashmap[itemSide] as Dictionary<string, object> : new Dictionary<string, object>();

if (!this.hedged) {
var sideToReset = itemSide == "long" ? "short" : "long";
if (bySide.ContainsKey(sideToReset)) {
bySide.Remove(sideToReset);
var value = this.Find(x => Exchange.SafeString (x, "symbol") == itemSymbol && Exchange.SafeString (x, "side") == sideToReset);
var indexInt = this.IndexOf(value);
this.RemoveAt(indexInt);
}
}
if (bySide.ContainsKey(itemSide))
{
var reference = bySide[itemSide];
Expand Down Expand Up @@ -365,4 +376,4 @@ private void _append(object item)
this.allNewUpdates = defaultAllNewUpdates + (afterLength - beforeLength);
}
}
// }
// }
19 changes: 19 additions & 0 deletions cs/tests/Generated/Base/Cache.cs
Expand Up @@ -614,5 +614,24 @@ public void CacheTests()
{ "contracts", 3 },
}); // update second position
Assert(isEqual(cacheSymbolSide3.getLimit(null, outsideLimit), 1)); // watch all positions

// ----------------------------------------------------------------------------

// test ArrayCacheBySymbolBySide, watch all positions, same symbol when not positionMode is not hedged reset other side
var oneWayCacheSymbolSide = new ArrayCacheBySymbolBySide(null, false);
symbol = "BTC/USDT";
outsideLimit = 5;
oneWayCacheSymbolSide.append(new Dictionary<string, object>() {
{ "symbol", symbol },
{ "side", "short" },
{ "contracts", 1 },
}); // create first position
oneWayCacheSymbolSide.append(new Dictionary<string, object>() {
{ "symbol", symbol },
{ "side", "long" },
{ "contracts", 2 },
}); // create oposite side position
object arrayLength = getArrayLength(oneWayCacheSymbolSide);
Assert(isEqual(arrayLength, 1));
}
}
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -154,7 +154,7 @@
"as-table": "1.0.37",
"asciichart": "^1.5.25",
"assert": "^2.0.0",
"ast-transpiler": "^0.0.44",
"ast-transpiler": "^0.0.46",
"docsify": "^4.11.4",
"eslint": "8.22.0",
"eslint-config-airbnb-base": "15.0.0",
Expand Down
12 changes: 11 additions & 1 deletion php/pro/ArrayCacheBySymbolBySide.php

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion python/ccxt/async_support/base/ws/cache.py
Expand Up @@ -171,14 +171,22 @@ def append(self, item):


class ArrayCacheBySymbolBySide(ArrayCache):
def __init__(self, max_size=None):
def __init__(self, max_size=None, hedged=True):
super(ArrayCacheBySymbolBySide, self).__init__(max_size)
self._nested_new_updates_by_symbol = True
self._hedged = hedged
self.hashmap = {}
self._index = collections.deque([], max_size)

def append(self, item):
by_side = self.hashmap.setdefault(item['symbol'], {})
if not self._hedged:
side_to_reset = 'long' if item['side'] == 'short' else 'short'
if side_to_reset in by_side:
del by_side[side_to_reset]
index = self._index.index(side_to_reset)
del self._deque[index]
del self._index[index]
if item['side'] in by_side:
reference = by_side[item['side']]
if reference != item:
Expand Down
20 changes: 18 additions & 2 deletions ts/src/base/ws/Cache.ts
Expand Up @@ -221,9 +221,10 @@ class ArrayCacheBySymbolById extends ArrayCache {

class ArrayCacheBySymbolBySide extends ArrayCache {

constructor () {
super ()
constructor (maxSize = undefined, hedged = true) {
carlosmiei marked this conversation as resolved.
Show resolved Hide resolved
super (maxSize)
this.nestedNewUpdatesBySymbol = true
this.hedged = hedged
Object.defineProperty (this, 'hashmap', {
__proto__: null, // make it invisible
value: {},
Expand All @@ -233,6 +234,17 @@ class ArrayCacheBySymbolBySide extends ArrayCache {

append (item) {
const bySide = this.hashmap[item.symbol] = this.hashmap[item.symbol] || {}
if (!this.hedged) {
// if one-way mode reset other side and remove from array
const sideToReset = item.side === 'long' ? 'short' : 'long'
if (sideToReset in bySide) {
delete bySide[sideToReset]
const index = this.findIndex ((x) => x.symbol === item.symbol && x.side === sideToReset)
if (index >= 0) {
this.splice (index, 1)
}
}
}
if (item.side in bySide) {
const reference = bySide[item.side]
if (reference !== item) {
Expand All @@ -247,6 +259,10 @@ class ArrayCacheBySymbolBySide extends ArrayCache {
} else {
bySide[item.side] = item
}
if (this.maxSize && (this.length === this.maxSize)) {
const deleteReference = this.shift ()
delete this.hashmap[deleteReference.symbol][deleteReference.side]
}
this.push (item)
if (this.clearAllUpdates) {
this.clearAllUpdates = false
Expand Down
107 changes: 92 additions & 15 deletions ts/src/pro/kucoinfutures.ts
Expand Up @@ -2,7 +2,7 @@

import kucoinfuturesRest from '../kucoinfutures.js';
import { ExchangeError, ArgumentsRequired } from '../base/errors.js';
import { ArrayCache, ArrayCacheBySymbolById, ArrayCacheByTimestamp } from '../base/ws/Cache.js';
import { ArrayCache, ArrayCacheBySymbolById, ArrayCacheByTimestamp, ArrayCacheBySymbolBySide } from '../base/ws/Cache.js';
import type { Int, Str, OrderBook, Order, Trade, Ticker, Balances, Position, Strings, Tickers, OHLCV } from '../base/types.js';
import Client from '../base/ws/Client.js';

Expand All @@ -22,8 +22,8 @@ export default class kucoinfutures extends kucoinfuturesRest {
'watchOrders': true,
'watchBalance': true,
'watchPosition': true,
'watchPositions': false,
'watchPositionForSymbols': false,
'watchPositions': true,
'watchPositionForSymbols': true,
'watchTradesForSymbols': true,
'watchOrderBookForSymbols': true,
},
Expand Down Expand Up @@ -65,6 +65,10 @@ export default class kucoinfutures extends kucoinfuturesRest {
'fetchPositionSnapshot': true, // or false
'awaitPositionSnapshot': true, // whether to wait for the position snapshot before providing updates
},
'watchPositions': {
'fetchPositionsSnapshot': true, // or false
'awaitPositionsSnapshot': true, // whether to wait for the position snapshot before providing updates
},
},
'streaming': {
// kucoin does not support built-in ws protocol-level ping-pong
Expand Down Expand Up @@ -355,11 +359,11 @@ export default class kucoinfutures extends kucoinfuturesRest {
const request = {
'privateChannel': true,
};
const messageHash = 'position:' + market['symbol'];
const messageHash = 'position::' + market['symbol'];
const client = this.client (url);
this.setPositionCache (client, symbol);
const fetchPositionSnapshot = this.handleOption ('watchPosition', 'fetchPositionSnapshot', true);
const awaitPositionSnapshot = this.safeBool ('watchPosition', 'awaitPositionSnapshot', true);
const fetchPositionSnapshot = this.handleOptionAndParams (params, 'watchPosition', 'fetchPositionSnapshot', true);
const awaitPositionSnapshot = this.handleOptionAndParams (params, 'watchPosition', 'awaitPositionSnapshot', true);
const currentPosition = this.getCurrentPosition (symbol);
if (fetchPositionSnapshot && awaitPositionSnapshot && currentPosition === undefined) {
const snapshot = await client.future ('fetchPositionSnapshot:' + symbol);
Expand All @@ -372,8 +376,9 @@ export default class kucoinfutures extends kucoinfuturesRest {
if (this.positions === undefined) {
return undefined;
}
const cache = this.positions.hashmap;
const symbolCache = this.safeValue (cache, symbol, {});
const cache = this.positions;
const positionsHashMap = cache.hashmap;
const symbolCache = this.safeValue (positionsHashMap, symbol, {});
const values = Object.values (symbolCache);
return this.safeValue (values, 0);
}
Expand All @@ -391,13 +396,79 @@ export default class kucoinfutures extends kucoinfuturesRest {

async loadPositionSnapshot (client, messageHash, symbol) {
const position = await this.fetchPosition (symbol);
this.positions = new ArrayCacheBySymbolById ();
if (this.positions === undefined) {
this.positions = new ArrayCacheBySymbolBySide (undefined, false);
}
const cache = this.positions;
cache.append (position);
// don't remove the future from the .futures cache
const future = client.futures[messageHash];
future.resolve (cache);
client.resolve (position, 'position:' + symbol);
client.resolve (position, 'position::' + symbol);
}

async watchPositions (symbols: Strings = undefined, since: Int = undefined, limit: Int = undefined, params = {}): Promise<Position[]> {
/**
* @method
* @name kucoinfutures#watchPositions
* @description watch open positions for all symbols or a set of symbols
* @see https://www.kucoin.com/docs/websocket/futures-trading/private-channels/all-position-change-events
* @param {string[]|undefined} symbols list of unified market symbols
* @param {object} params extra parameters specific to the exchange API endpoint
* @returns {object} a [position structure]{@link https://docs.ccxt.com/en/latest/manual.html#position-structure}
*/
await this.loadMarkets ();
const url = await this.negotiate (true);
symbols = this.marketSymbols (symbols, undefined, true, true, false);
const topic = '/contract/positionAll';
let messageHash = 'position';
const request = {
'privateChannel': true,
};
if (!this.isEmpty (symbols)) {
messageHash = '::' + symbols.join (',');
}
const client = this.client (url);
this.setPositionsCache (client);
const fetchPositionsSnapshot = this.handleOptionAndParams (params, 'watchPositions', 'fetchPositionsSnapshot', true);
const awaitPositionsSnapshot = this.handleOptionAndParams (params, 'watchPositions', 'awaitPositionsSnapshot', true);
const hasReturnedPositionsSnapshot = this.safeBool (this.options, 'hasReturnedPositionsSnapshot', false);
if (fetchPositionsSnapshot && awaitPositionsSnapshot && !hasReturnedPositionsSnapshot) {
const snapshot = await client.future ('fetchPositionsSnapshot');
this.options['hasReturnedPositionsSnapshot'] = true;
return this.filterBySymbolsSinceLimit (snapshot, symbols, since, limit, true);
}
const newPosition = await this.subscribe (url, messageHash, topic, undefined, this.extend (request, params));
if (this.newUpdates) {
return this.filterBySymbolsSinceLimit ([ newPosition ], symbols, since, limit, true);
}
return this.filterBySymbolsSinceLimit (this.positions, symbols, since, limit, true);
}

setPositionsCache (client: Client) {
const fetchPositionsSnapshot = this.handleOption ('watchPositions', 'fetchPositionsSnapshot', false);
if (fetchPositionsSnapshot) {
const messageHash = 'fetchPositionsSnapshot';
if (!(messageHash in client.futures)) {
client.future (messageHash);
this.spawn (this.loadPositionsSnapshot, client, messageHash);
}
}
}

async loadPositionsSnapshot (client, messageHash) {
const positions = await this.fetchPositions ();
if (this.positions === undefined) {
this.positions = new ArrayCacheBySymbolBySide (undefined, false);
}
const cache = this.positions;
for (let i = 0; i < positions.length; i++) {
cache.append (positions[i]);
}
// don't remove the future from the .futures cache
const future = client.futures[messageHash];
future.resolve (cache);
client.resolve (this.positions, 'position');
}

handlePosition (client: Client, message) {
Expand Down Expand Up @@ -495,13 +566,17 @@ export default class kucoinfutures extends kucoinfuturesRest {
//
const topic = this.safeString (message, 'topic', '');
const parts = topic.split (':');
const marketId = this.safeString (parts, 1);
const symbol = this.safeSymbol (marketId, undefined, '');
const marketIdFromTopic = this.safeString (parts, 1);
const cache = this.positions;
const currentPosition = this.getCurrentPosition (symbol);
const messageHash = 'position:' + symbol;
const data = this.safeValue (message, 'data', {});
const newPosition = this.parsePosition (data);
const marketId = this.safeString (data, 'symbol', marketIdFromTopic);
const symbol = this.safeSymbol (marketId, undefined, '');
const market = this.safeMarket (marketId);
let currentPosition = this.getCurrentPosition (symbol);
if (currentPosition === undefined) {
currentPosition = {};
}
const newPosition = this.parsePosition (data, market);
const keys = Object.keys (newPosition);
for (let i = 0; i < keys.length; i++) {
const key = keys[i];
Expand All @@ -510,8 +585,10 @@ export default class kucoinfutures extends kucoinfuturesRest {
}
}
const position = this.extend (currentPosition, newPosition);
const messageHash = 'position::' + symbol;
cache.append (position);
client.resolve (position, messageHash);
client.resolve (position, 'position');
}

async watchTrades (symbol: string, since: Int = undefined, limit: Int = undefined, params = {}): Promise<Trade[]> {
Expand Down
11 changes: 11 additions & 0 deletions ts/src/pro/test/base/test.Cache.ts
Expand Up @@ -405,3 +405,14 @@ assert (cacheSymbolSide3.getLimit (symbol, outsideLimit) === 1); // watch by sym
assert (cacheSymbolSide3.getLimit (undefined, outsideLimit) === 2); // watch all positions
cacheSymbolSide3.append ({ 'symbol': symbol2, 'side': 'long', 'contracts': 3 }); // update second position
assert (cacheSymbolSide3.getLimit (undefined, outsideLimit) === 1); // watch all positions

// ----------------------------------------------------------------------------
// test ArrayCacheBySymbolBySide, watch all positions, same symbol when not positionMode is not hedged reset other side

const oneWayCacheSymbolSide = new ArrayCacheBySymbolBySide (undefined, false);
symbol = 'BTC/USDT';
outsideLimit = 5;
oneWayCacheSymbolSide.append ({ 'symbol': symbol, 'side': 'short', 'contracts': 1 }); // create first position
oneWayCacheSymbolSide.append ({ 'symbol': symbol, 'side': 'long', 'contracts': 2 }); // create oposite side position
const arrayLength = oneWayCacheSymbolSide.length;
assert (arrayLength === 1);