diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index e2f9089730..0bd7d9a57b 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -17,7 +17,7 @@ #include "tmsg.h" #include "tq.h" -#define MAX_CATCH_NUM 10240 +#define MAX_CACHE_TABLE_INFO_NUM 10240 typedef struct STableSinkInfo { uint64_t uid; @@ -108,9 +108,10 @@ static int32_t tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableS } int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) { - if (tSimpleHashGetSize(tblInfo) > MAX_CATCH_NUM) { - return TSDB_CODE_SUCCESS; + if (tSimpleHashGetSize(tblInfo) > MAX_CACHE_TABLE_INFO_NUM) { + return TSDB_CODE_FAILED; } + return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES); } @@ -387,7 +388,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tbData.uid = mr.me.uid; pTableSinkInfo->uid = mr.me.uid; - tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTableSinkInfo); + int32_t code = tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTableSinkInfo); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(pTableSinkInfo); + } metaReaderClear(&mr); } }