diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index e303faf7de..c9d1c3d015 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -148,6 +148,7 @@ struct SRSmaInfoItem { }; struct SRSmaInfo { + SSma *pSma; STSchema *pTSchema; int64_t suid; int64_t lastRecv; // ms diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 6d3db47b18..fd300ef34a 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -188,13 +188,15 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { for (int32_t i = 0; i < taosArrayGetSize(pFS->aQTaskInf);) { SQTaskFile *pTaskF = taosArrayGet(pFS->aQTaskInf, i); - if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) { + int32_t oldVal = atomic_fetch_sub_32(&pTaskF->nRef, 1); + if ((oldVal <= 1) && (pTaskF->version < committed)) { tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); if (taosRemoveFile(qTaskInfoFullName) < 0) { - smaWarn("vgId:%d, cleanup qinf, failed to remove %s since %s", TD_VID(pVnode), qTaskInfoFullName, - tstrerror(TAOS_SYSTEM_ERROR(errno))); + smaWarn("vgId:%d, cleanup qinf, committed %" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed, + qTaskInfoFullName, tstrerror(TAOS_SYSTEM_ERROR(errno))); } else { - smaDebug("vgId:%d, cleanup qinf, success to remove %s", TD_VID(pVnode), qTaskInfoFullName); + smaDebug("vgId:%d, cleanup qinf, committed %" PRIi64 ", success to remove %s", TD_VID(pVnode), committed, + qTaskInfoFullName); } taosArrayRemove(pFS->aQTaskInf, i); continue; @@ -364,7 +366,6 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { } SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - SRSmaInfoItem *pItem = NULL; // step 1: merge qTaskInfo and iQTaskInfo // lock diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 8bf79ca30e..2595a629ba 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -62,7 +62,7 @@ int32_t smaInit() { } int32_t type = (8 == POINTER_BYTES) ? TSDB_DATA_TYPE_UBIGINT : TSDB_DATA_TYPE_UINT; - smaMgmt.refHash = taosHashInit(1, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK); + smaMgmt.refHash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK); if (!smaMgmt.refHash) { taosCloseRef(smaMgmt.rsetId); atomic_store_8(&smaMgmt.inited, 0); @@ -107,6 +107,7 @@ void smaCleanUp() { if (old == 1) { taosCloseRef(smaMgmt.rsetId); taosHashCleanup(smaMgmt.refHash); + smaMgmt.refHash = NULL; taosTmrCleanUp(smaMgmt.tmrHandle); smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle); atomic_store_8(&smaMgmt.inited, 0); @@ -220,7 +221,7 @@ static void tRSmaInfoHashFreeNode(void *data) { if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) { taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES); } - tdFreeRSmaInfo(NULL, pRSmaInfo, true); + tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo, true); } } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6732403846..fce244bfb8 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -121,27 +121,27 @@ static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t l */ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { if (pInfo) { - int32_t vid = pSma ? SMA_VID(pSma) : -1; for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = &pInfo->items[i]; if (isDeepFree && pItem->tmrId) { - smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", vid, pItem->tmrId, pInfo->suid, i + 1); + smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId, + pInfo->suid, i + 1); taosTmrStopA(&pItem->tmrId); } if (isDeepFree && pInfo->taskInfo[i]) { - tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], vid, i + 1); + tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); } else { - smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", vid, + smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), pInfo->suid, i + 1); } if (pInfo->iTaskInfo[i]) { - tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], vid, i + 1); + tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1); } else { - smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo", vid, - pInfo->suid, i + 1); + smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo", + SMA_VID(pSma), pInfo->suid, i + 1); } } if (isDeepFree) { @@ -377,7 +377,10 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; goto _err; } + pRSmaInfo->pSma = pSma; pRSmaInfo->pTSchema = pTSchema; + pRSmaInfo->suid = suid; + T_REF_INIT_VAL(pRSmaInfo, 1); if (!(pRSmaInfo->queue = taosOpenQueue())) { goto _err; } @@ -391,8 +394,6 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con if (!(pRSmaInfo->iQall = taosAllocateQall())) { goto _err; } - pRSmaInfo->suid = suid; - T_REF_INIT_VAL(pRSmaInfo, 1); if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) { goto _err; @@ -1586,8 +1587,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) { smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", - smaMgmt.rsetId, pRSmaRef->refId); // pRSmaRef freed in taosHashRemove - taosHashRemove(smaMgmt.refHash, ¶m, POINTER_BYTES); + smaMgmt.rsetId, pRSmaRef->refId); // pRSmaRef freed in taosHashRemove + taosHashRemove(smaMgmt.refHash, ¶m, POINTER_BYTES); return; } @@ -1636,7 +1637,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { switch (fetchTriggerStat) { case TASK_TRIGGER_STAT_ACTIVE: { smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", - SMA_VID(pSma), pItem->level, pRSmaInfo->suid); + SMA_VID(pSma), pItem->level, pRSmaInfo->suid); // async process pItem->fetchLevel = pItem->level; #if 0 diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5da0f5c51a..95415e1113 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -97,6 +97,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } +static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); } + void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) { @@ -107,11 +109,7 @@ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) { if (pOptrInfo->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { SStreamScanInfo* pInfo = pOptrInfo->info; if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBlockLists); ++i) { - SSDataBlock* p = *(SSDataBlock**)taosArrayGet(pInfo->pBlockLists, i); - taosArrayDestroy(p->pDataBlock); - taosMemoryFreeClear(p); - } + taosArrayClearP(pInfo->pBlockLists, streamInputBlockDataDestory); } else { ASSERT(0); }