Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion source/dnode/vnode/src/meta/metaCache.c
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,9 @@ static void freeUidCachePayload(const void* key, size_t keyLen, void* value, voi
}
}

metaDebug("free uid cache payload in lru, suid: %" PRIu64
" origin key:%" PRIu64 ",%" PRIu64,
p[1], p[2], p[3]);
taosMemoryFree(value);
}

Expand Down Expand Up @@ -800,7 +803,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
if (code == TSDB_CODE_DUP_KEY) {
// we have already found the existed items, no need to added to cache anymore.
(void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
return TSDB_CODE_DUP_KEY;
}
if (code != TSDB_CODE_SUCCESS) {
goto _end;
Expand Down
55 changes: 38 additions & 17 deletions source/libs/executor/src/executil.c
Original file line number Diff line number Diff line change
Expand Up @@ -2094,6 +2094,7 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
char* pTagCondKey = NULL;
int32_t tagCondKeyLen;
SArray* pTagColIds = NULL;
char* pPayload = NULL;
qTrace("getTableList called, suid:%" PRIu64
", tagCond:%p, tagIndexCond:%p, %d %d", pScanNode->suid, pTagCond,
pTagIndexCond, pScanNode->tableType, pScanNode->virtualStableScan);
Expand Down Expand Up @@ -2123,7 +2124,7 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
(void*)&canCacheTagEqCondFilter);
}
if (canCacheTagEqCondFilter) {
qDebug("stable tag filter condition can be optimized");
qDebug("%s, stable tag filter condition can be optimized", idstr);
if (((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
SNode* tmp = NULL;
code = nodesCloneNode((SNode*)pTagCond, &tmp);
Expand All @@ -2148,12 +2149,11 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
QUERY_CHECK_CODE(code, lino, _error);
code = pStorageAPI->metaFn.getStableCachedTableList(
pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
context.digest, tListLen(context.digest),
pUidList, &acquired);
context.digest, tListLen(context.digest), pUidList, &acquired);
QUERY_CHECK_CODE(code, lino, _error);
} else if (tsTagFilterCache) {
// second, try to use normal tag filter cache
qDebug("using normal tag filter cache");
qDebug("%s using normal tag filter cache", idstr);
if (pStreamInfo != NULL && ((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
SNode* tmp = NULL;
code = nodesCloneNode((SNode*)pTagCond, &tmp);
Expand Down Expand Up @@ -2221,42 +2221,62 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status,
pStorageAPI, tsTagFilterCache || tsStableTagFilterCache,
&listAdded, pStreamInfo);
QUERY_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _error);

// let's add the filter results into meta-cache
numOfTables = taosArrayGetSize(pUidList);

if (canCacheTagEqCondFilter) {
qDebug("suid:%" PRIu64 ", %s add uid list to stable tag filter cache, "
"uidListSize:%d", pScanNode->suid, idstr,
(int32_t)taosArrayGetSize(pUidList));
qInfo("%s, suid:%" PRIu64 ", add uid list to stable tag filter cache, "
"uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
idstr, pScanNode->suid, (int32_t)numOfTables,
*(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));

code = pStorageAPI->metaFn.putStableCachedTableList(
pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
context.digest, tListLen(context.digest),
pUidList, &pTagColIds);
QUERY_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _end);

digest[0] = 1;
memcpy(digest + 1, context.digest, tListLen(context.digest));
} else if (tsTagFilterCache) {
qInfo("suid:%" PRIu64 ", %s add uid list to normal tag filter cache, "
"uidListSize:%d", pScanNode->suid, idstr,
(int32_t)taosArrayGetSize(pUidList));
qInfo("%s, suid:%" PRIu64 ", add uid list to normal tag filter cache, "
"uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
idstr, pScanNode->suid, (int32_t)numOfTables,
*(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
char* pPayload = taosMemoryMalloc(size);
pPayload = taosMemoryMalloc(size);
QUERY_CHECK_NULL(pPayload, code, lino, _end, terrno);

*(int32_t*)pPayload = numOfTables;
*(int32_t*)pPayload = (int32_t)numOfTables;
if (numOfTables > 0) {
void* tmp = taosArrayGet(pUidList, 0);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
memcpy(pPayload + sizeof(int32_t), tmp, numOfTables * sizeof(uint64_t));
}

code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest),
code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid,
context.digest,
tListLen(context.digest),
pPayload, size, 1);
QUERY_CHECK_CODE(code, lino, _error);
if (TSDB_CODE_SUCCESS == code) {
/*
data referenced by pPayload is used in lru cache,
reset pPayload to NULL to avoid being freed in _error block
*/
pPayload = NULL;
} else {
if (TSDB_CODE_DUP_KEY == code) {
/*
another thread has already put the same key into cache,
we can just ignore this error
*/
code = TSDB_CODE_SUCCESS;
}
QUERY_CHECK_CODE(code, lino, _end);
}


digest[0] = 1;
memcpy(digest + 1, context.digest, tListLen(context.digest));
Expand All @@ -2281,12 +2301,13 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
}
}

qDebug("table list with %d uids built", (int32_t)taosArrayGetSize(pListInfo->pTableList));
qDebug("%s, table list with %d uids built", idstr, (int32_t)numOfTables);

_error:
taosArrayDestroy(pUidList);
taosArrayDestroy(pTagColIds);
taosMemFreeClear(pTagCondKey);
taosMemFreeClear(pPayload);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def test_stream_tag_cache(self):
"""
tdSql.execute("alter dnode 1 'stableTagFilterCache' '1'")
tdSql.execute("alter dnode 1 'tagFilterCache' '1'")
tdSql.execute("alter dnode 1 'metaDebugFlag' '135'")
tdStream.createSnode()
self.check_all_types_basic()
self.check_all_types_alter()
Expand All @@ -46,13 +47,13 @@ def stable_tag_cache_log_counter(self, db_name, tb_name, retry=5) -> int:
tdSql.query(f"""select uid from information_schema.ins_stables
where stable_name = '{tb_name}' and db_name = '{db_name}'""")
suid = tdSql.getColData(0)[0]
return findTaosdLog(f"suid:{suid}.*stable tag filter cache", retry=retry)
return findTaosdLog(f"suid:{suid}.*add uid list to stable tag filter cache", retry=retry)

def normal_tag_cache_log_counter(self, db_name, tb_name, retry=5) -> int:
tdSql.query(f"""select uid from information_schema.ins_stables
where stable_name = '{tb_name}' and db_name = '{db_name}'""")
suid = tdSql.getColData(0)[0]
return findTaosdLog(f"suid:{suid}.*normal tag filter cache", retry=retry)
return findTaosdLog(f"suid:{suid}.*add uid list to normal tag filter cache", retry=retry)

def check_all_types_basic(self):
db_name = "test_basic"
Expand Down
Loading