From 57437312db5d2af564871ba24923904496a3af19 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 4 Sep 2022 16:54:07 +0800 Subject: [PATCH 1/4] other: rsma code optimization --- source/dnode/vnode/src/inc/sma.h | 1 + source/dnode/vnode/src/sma/smaCommit.c | 1 - source/dnode/vnode/src/sma/smaEnv.c | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 25 +++++++++++++------------ 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index e303faf7de..c9d1c3d015 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -148,6 +148,7 @@ struct SRSmaInfoItem { }; struct SRSmaInfo { + SSma *pSma; STSchema *pTSchema; int64_t suid; int64_t lastRecv; // ms diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 6d3db47b18..77bf5e9783 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -364,7 +364,6 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { } SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - SRSmaInfoItem *pItem = NULL; // step 1: merge qTaskInfo and iQTaskInfo // lock diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 8bf79ca30e..013481ab3d 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -220,7 +220,7 @@ static void tRSmaInfoHashFreeNode(void *data) { if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) { taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES); } - tdFreeRSmaInfo(NULL, pRSmaInfo, true); + tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo, true); } } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6732403846..fce244bfb8 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -121,27 +121,27 @@ static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t l */ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { if (pInfo) { - int32_t vid = pSma ? SMA_VID(pSma) : -1; for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = &pInfo->items[i]; if (isDeepFree && pItem->tmrId) { - smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", vid, pItem->tmrId, pInfo->suid, i + 1); + smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId, + pInfo->suid, i + 1); taosTmrStopA(&pItem->tmrId); } if (isDeepFree && pInfo->taskInfo[i]) { - tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], vid, i + 1); + tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); } else { - smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", vid, + smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), pInfo->suid, i + 1); } if (pInfo->iTaskInfo[i]) { - tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], vid, i + 1); + tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1); } else { - smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo", vid, - pInfo->suid, i + 1); + smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo", + SMA_VID(pSma), pInfo->suid, i + 1); } } if (isDeepFree) { @@ -377,7 +377,10 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; goto _err; } + pRSmaInfo->pSma = pSma; pRSmaInfo->pTSchema = pTSchema; + pRSmaInfo->suid = suid; + T_REF_INIT_VAL(pRSmaInfo, 1); if (!(pRSmaInfo->queue = taosOpenQueue())) { goto _err; } @@ -391,8 +394,6 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con if (!(pRSmaInfo->iQall = taosAllocateQall())) { goto _err; } - pRSmaInfo->suid = suid; - T_REF_INIT_VAL(pRSmaInfo, 1); if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) { goto _err; @@ -1586,8 +1587,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) { smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", - smaMgmt.rsetId, pRSmaRef->refId); // pRSmaRef freed in taosHashRemove - taosHashRemove(smaMgmt.refHash, ¶m, POINTER_BYTES); + smaMgmt.rsetId, pRSmaRef->refId); // pRSmaRef freed in taosHashRemove + taosHashRemove(smaMgmt.refHash, ¶m, POINTER_BYTES); return; } @@ -1636,7 +1637,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { switch (fetchTriggerStat) { case TASK_TRIGGER_STAT_ACTIVE: { smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", - SMA_VID(pSma), pItem->level, pRSmaInfo->suid); + SMA_VID(pSma), pItem->level, pRSmaInfo->suid); // async process pItem->fetchLevel = pItem->level; #if 0 From 59912fedf116a28330d601a6d576f185fe9be4a0 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 4 Sep 2022 20:14:21 +0800 Subject: [PATCH 2/4] fix: stream input data block memory leak --- source/dnode/vnode/src/sma/smaEnv.c | 3 ++- source/dnode/vnode/src/tsdb/tsdbReaderWriter.c | 2 +- source/libs/executor/src/executor.c | 10 ++++------ 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 013481ab3d..2595a629ba 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -62,7 +62,7 @@ int32_t smaInit() { } int32_t type = (8 == POINTER_BYTES) ? TSDB_DATA_TYPE_UBIGINT : TSDB_DATA_TYPE_UINT; - smaMgmt.refHash = taosHashInit(1, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK); + smaMgmt.refHash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK); if (!smaMgmt.refHash) { taosCloseRef(smaMgmt.rsetId); atomic_store_8(&smaMgmt.inited, 0); @@ -107,6 +107,7 @@ void smaCleanUp() { if (old == 1) { taosCloseRef(smaMgmt.rsetId); taosHashCleanup(smaMgmt.refHash); + smaMgmt.refHash = NULL; taosTmrCleanUp(smaMgmt.tmrHandle); smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle); atomic_store_8(&smaMgmt.inited, 0); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index c8f3862071..a4b13e6a6b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -1303,7 +1303,7 @@ _err: int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) { int32_t code = 0; SLastFile *pLastFile = &pWriter->fLast; - int64_t size; + int64_t size = 0; int64_t n; // check diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 037bf543b1..136b2de596 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -30,6 +30,8 @@ static void cleanupRefPool() { taosCloseRef(ref); } +static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); } + static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { @@ -53,7 +55,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu // prevent setting a different type of block pInfo->validBlockIndex = 0; if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { - taosArrayClearP(pInfo->pBlockLists, taosMemoryFree); + taosArrayClearP(pInfo->pBlockLists, streamInputBlockDataDestory); } else { taosArrayClear(pInfo->pBlockLists); } @@ -107,11 +109,7 @@ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) { if (pOptrInfo->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { SStreamScanInfo* pInfo = pOptrInfo->info; if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBlockLists); ++i) { - SSDataBlock* p = *(SSDataBlock**)taosArrayGet(pInfo->pBlockLists, i); - taosArrayDestroy(p->pDataBlock); - taosMemoryFreeClear(p); - } + taosArrayClearP(pInfo->pBlockLists, streamInputBlockDataDestory); } else { ASSERT(0); } From bbd8eeaabe12e7e4bea790340058bfbfe31757b5 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 4 Sep 2022 20:47:22 +0800 Subject: [PATCH 3/4] fix: qtaskinfo file cleanup logic --- source/dnode/vnode/src/sma/smaCommit.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 77bf5e9783..fd300ef34a 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -188,13 +188,15 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { for (int32_t i = 0; i < taosArrayGetSize(pFS->aQTaskInf);) { SQTaskFile *pTaskF = taosArrayGet(pFS->aQTaskInf, i); - if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) { + int32_t oldVal = atomic_fetch_sub_32(&pTaskF->nRef, 1); + if ((oldVal <= 1) && (pTaskF->version < committed)) { tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); if (taosRemoveFile(qTaskInfoFullName) < 0) { - smaWarn("vgId:%d, cleanup qinf, failed to remove %s since %s", TD_VID(pVnode), qTaskInfoFullName, - tstrerror(TAOS_SYSTEM_ERROR(errno))); + smaWarn("vgId:%d, cleanup qinf, committed %" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed, + qTaskInfoFullName, tstrerror(TAOS_SYSTEM_ERROR(errno))); } else { - smaDebug("vgId:%d, cleanup qinf, success to remove %s", TD_VID(pVnode), qTaskInfoFullName); + smaDebug("vgId:%d, cleanup qinf, committed %" PRIi64 ", success to remove %s", TD_VID(pVnode), committed, + qTaskInfoFullName); } taosArrayRemove(pFS->aQTaskInf, i); continue; From 407773b67b0127413e45cddb6712d741e66d9289 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 5 Sep 2022 09:44:51 +0800 Subject: [PATCH 4/4] fix: double free for stream input block data --- source/libs/executor/src/executor.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 327242c4f7..95415e1113 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -30,8 +30,6 @@ static void cleanupRefPool() { taosCloseRef(ref); } -static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); } - static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { @@ -55,7 +53,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu // prevent setting a different type of block pInfo->validBlockIndex = 0; if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { - taosArrayClearP(pInfo->pBlockLists, streamInputBlockDataDestory); + taosArrayClearP(pInfo->pBlockLists, taosMemoryFree); } else { taosArrayClear(pInfo->pBlockLists); } @@ -99,6 +97,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } +static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); } + void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) {