Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion source/dnode/vnode/src/meta/metaCache.c
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,9 @@ static void freeUidCachePayload(const void* key, size_t keyLen, void* value, voi
}
}

metaDebug("free uid cache payload in lru, suid: %" PRIu64
" origin key:%" PRIu64 ",%" PRIu64,
p[1], p[2], p[3]);
taosMemoryFree(value);
}

Expand Down Expand Up @@ -800,7 +803,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
if (code == TSDB_CODE_DUP_KEY) {
// we have already found the existed items, no need to added to cache anymore.
(void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
return TSDB_CODE_DUP_KEY;
}
if (code != TSDB_CODE_SUCCESS) {
goto _end;
Expand Down
48 changes: 31 additions & 17 deletions source/libs/executor/src/executil.c
Original file line number Diff line number Diff line change
Expand Up @@ -2094,6 +2094,7 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
char* pTagCondKey = NULL;
int32_t tagCondKeyLen;
SArray* pTagColIds = NULL;
char* pPayload = NULL;
qTrace("getTableList called, suid:%" PRIu64
", tagCond:%p, tagIndexCond:%p, %d %d", pScanNode->suid, pTagCond,
pTagIndexCond, pScanNode->tableType, pScanNode->virtualStableScan);
Expand Down Expand Up @@ -2123,7 +2124,7 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
(void*)&canCacheTagEqCondFilter);
}
if (canCacheTagEqCondFilter) {
qDebug("stable tag filter condition can be optimized");
qDebug("%s, stable tag filter condition can be optimized", idstr);
if (((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
SNode* tmp = NULL;
code = nodesCloneNode((SNode*)pTagCond, &tmp);
Expand All @@ -2148,12 +2149,11 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
QUERY_CHECK_CODE(code, lino, _error);
code = pStorageAPI->metaFn.getStableCachedTableList(
pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
context.digest, tListLen(context.digest),
pUidList, &acquired);
context.digest, tListLen(context.digest), pUidList, &acquired);
QUERY_CHECK_CODE(code, lino, _error);
} else if (tsTagFilterCache) {
// second, try to use normal tag filter cache
qDebug("using normal tag filter cache");
qDebug("%s using normal tag filter cache", idstr);
if (pStreamInfo != NULL && ((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
SNode* tmp = NULL;
code = nodesCloneNode((SNode*)pTagCond, &tmp);
Expand Down Expand Up @@ -2221,42 +2221,55 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status,
pStorageAPI, tsTagFilterCache || tsStableTagFilterCache,
&listAdded, pStreamInfo);
QUERY_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _error);

// let's add the filter results into meta-cache
numOfTables = taosArrayGetSize(pUidList);

if (canCacheTagEqCondFilter) {
qDebug("suid:%" PRIu64 ", %s add uid list to stable tag filter cache, "
"uidListSize:%d", pScanNode->suid, idstr,
(int32_t)taosArrayGetSize(pUidList));
qInfo("%s, suid:%" PRIu64 ", add uid list to stable tag filter cache, "
"uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
idstr, pScanNode->suid, (int32_t)numOfTables,
*(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));

code = pStorageAPI->metaFn.putStableCachedTableList(
pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
context.digest, tListLen(context.digest),
pUidList, &pTagColIds);
QUERY_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _end);

digest[0] = 1;
memcpy(digest + 1, context.digest, tListLen(context.digest));
} else if (tsTagFilterCache) {
qInfo("suid:%" PRIu64 ", %s add uid list to normal tag filter cache, "
"uidListSize:%d", pScanNode->suid, idstr,
(int32_t)taosArrayGetSize(pUidList));
qInfo("%s, suid:%" PRIu64 ", add uid list to normal tag filter cache, "
"uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
idstr, pScanNode->suid, (int32_t)numOfTables,
*(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
char* pPayload = taosMemoryMalloc(size);
pPayload = taosMemoryMalloc(size);
QUERY_CHECK_NULL(pPayload, code, lino, _end, terrno);

*(int32_t*)pPayload = numOfTables;
*(int32_t*)pPayload = (int32_t)numOfTables;
if (numOfTables > 0) {
void* tmp = taosArrayGet(pUidList, 0);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
memcpy(pPayload + sizeof(int32_t), tmp, numOfTables * sizeof(uint64_t));
}

code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest),
code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid,
context.digest,
tListLen(context.digest),
pPayload, size, 1);
QUERY_CHECK_CODE(code, lino, _error);
if (TSDB_CODE_DUP_KEY == code) {
code = TSDB_CODE_SUCCESS; // ignore duplicate key error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DUP_KEY时没有释放payload

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已修复

}
QUERY_CHECK_CODE(code, lino, _end);

/*
data referenced by pPayload is used in lru cache,
reset pPayload to NULL to avoid being freed in _error block
*/
pPayload = NULL;

digest[0] = 1;
memcpy(digest + 1, context.digest, tListLen(context.digest));
Expand All @@ -2281,12 +2294,13 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
}
}

qDebug("table list with %d uids built", (int32_t)taosArrayGetSize(pListInfo->pTableList));
qDebug("%s, table list with %d uids built", idstr, (int32_t)numOfTables);

_error:
taosArrayDestroy(pUidList);
taosArrayDestroy(pTagColIds);
taosMemFreeClear(pTagCondKey);
taosMemFreeClear(pPayload);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def test_stream_tag_cache(self):
"""
tdSql.execute("alter dnode 1 'stableTagFilterCache' '1'")
tdSql.execute("alter dnode 1 'tagFilterCache' '1'")
tdSql.execute("alter dnode 1 'metaDebugFlag' '135'")
tdStream.createSnode()
self.check_all_types_basic()
self.check_all_types_alter()
Expand All @@ -46,13 +47,13 @@ def stable_tag_cache_log_counter(self, db_name, tb_name, retry=5) -> int:
tdSql.query(f"""select uid from information_schema.ins_stables
where stable_name = '{tb_name}' and db_name = '{db_name}'""")
suid = tdSql.getColData(0)[0]
return findTaosdLog(f"suid:{suid}.*stable tag filter cache", retry=retry)
return findTaosdLog(f"suid:{suid}.*add uid list to stable tag filter cache", retry=retry)

def normal_tag_cache_log_counter(self, db_name, tb_name, retry=5) -> int:
tdSql.query(f"""select uid from information_schema.ins_stables
where stable_name = '{tb_name}' and db_name = '{db_name}'""")
suid = tdSql.getColData(0)[0]
return findTaosdLog(f"suid:{suid}.*normal tag filter cache", retry=retry)
return findTaosdLog(f"suid:{suid}.*add uid list to normal tag filter cache", retry=retry)

def check_all_types_basic(self):
db_name = "test_basic"
Expand Down
Loading