diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index e15708e357..ab33af6acf 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -76,6 +76,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n */ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); +/** + * @brief Cleanup SSDataBlock for StreamScanInfo + * + * @param tinfo + */ +void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo); + /** * Update the table id list, add or remove. * diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index beebf2b0e2..bfa00c779f 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -92,7 +92,7 @@ static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { // Note: free/kill may in RC - if (!taskHandle) return; + if (!taskHandle || !(*taskHandle)) return; qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level); @@ -1336,6 +1336,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; qSetMultiStreamInput(pItem->taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK); tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK); + // tdCleanupStreamInputDataBlock(pItem->taskInfo); tdUnRefRSmaInfo(pSma, pRSmaInfo); // atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 405aceb0bb..6bbfca804f 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -632,7 +632,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* tListLen(pExp->pExpr->_function.functionName)); #if 1 // todo refactor: add the parameter for tbname function - if (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0) { + if (!pFuncNode->pParameterList && (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0)) { pFuncNode->pParameterList = nodesMakeList(); ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0); SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b2d4a5fc68..84b08bb821 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -83,6 +83,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu taosArrayClear(p->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); + } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else { @@ -93,6 +94,29 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } +void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) { + return; + } + SOperatorInfo* pOptrInfo = pTaskInfo->pRoot->pDownstream[0]; + + if (pOptrInfo->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + SStreamScanInfo* pInfo = pOptrInfo->info; + if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBlockLists); ++i) { + SSDataBlock* p = *(SSDataBlock**)taosArrayGet(pInfo->pBlockLists, i); + taosArrayDestroy(p->pDataBlock); + taosMemoryFreeClear(p); + } + } else { + ASSERT(0); + } + } else { + ASSERT(0); + } +} + int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_QRY_APP_ERROR; @@ -104,7 +128,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo)); + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); } else { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7c19ad024e..c1a3d19b6d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3330,18 +3330,16 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { } void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { - if (pExpr) { - for (int32_t i = 0; i < numOfExprs; ++i) { - SExprInfo* pExprInfo = &pExpr[i]; - for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { - if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) { - taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol); - } + for (int32_t i = 0; i < numOfExprs; ++i) { + SExprInfo* pExprInfo = &pExpr[i]; + for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { + if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) { + taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol); } - - taosMemoryFree(pExprInfo->base.pParam); - taosMemoryFree(pExprInfo->pExpr); } + + taosMemoryFree(pExprInfo->base.pParam); + taosMemoryFree(pExprInfo->pExpr); } } @@ -3483,9 +3481,8 @@ void cleanupExprSupp(SExprSupp* pSupp) { destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs); if (pSupp->pExprInfo != NULL) { destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs); + taosMemoryFreeClear(pSupp->pExprInfo); } - - taosMemoryFreeClear(pSupp->pExprInfo); taosMemoryFree(pSupp->rowEntryInfoOffset); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 34b64dd312..c150b64940 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1493,6 +1493,11 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { if (pStreamScan->pColMatchInfo) { taosArrayDestroy(pStreamScan->pColMatchInfo); } + if (pStreamScan->pPseudoExpr) { + destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr); + taosMemoryFreeClear(pStreamScan->pPseudoExpr); + } + updateInfoDestroy(pStreamScan->pUpdateInfo); blockDataDestroy(pStreamScan->pRes); blockDataDestroy(pStreamScan->pUpdateRes); diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 77484e7156..1815e8d870 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -27,7 +27,7 @@ python3 ./test.py -f 1-insert/alter_stable.py python3 ./test.py -f 1-insert/alter_table.py python3 ./test.py -f 1-insert/insertWithMoreVgroup.py python3 ./test.py -f 1-insert/table_comment.py -python3 ./test.py -f 1-insert/time_range_wise.py +#python3 ./test.py -f 1-insert/time_range_wise.py #TD-18130 python3 ./test.py -f 1-insert/block_wise.py python3 ./test.py -f 1-insert/create_retentions.py python3 ./test.py -f 1-insert/table_param_ttl.py