diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 6aa9f1bb5e..3ca7b79a6b 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -87,7 +87,7 @@ typedef struct SOutputData { * @param pHandle output * @return error code */ -int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id); +int32_t dsCreateDataSinker(void* pSinkManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id); int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat); diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index 805a20d85e..97bb7a29b8 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -52,7 +52,7 @@ typedef struct SDataSinkHandle { FGetSinkFlags fGetFlags; } SDataSinkHandle; -int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle); +int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle); int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam); int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index ed0d07b23c..2e40f39b3d 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -356,6 +356,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize); taosMemoryFreeClear(pDispatcher->nextOutput.pData); + nodesDestroyNode((SNode*)pDispatcher->pSchema); while (!taosQueueEmpty(pDispatcher->pDataBlocks)) { SDataDispatchBuf* pBuf = NULL; @@ -436,7 +437,7 @@ int32_t getOutputColCounts(SDataBlockDescNode* pInputDataBlockDesc) { return numOfCols; } -int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) { +int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle) { int32_t code; code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc); if (code) { @@ -460,6 +461,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD dispatcher->pManager = pManager; pManager = NULL; dispatcher->pSchema = pDataSink->pInputDataBlockDesc; + pDataSink->pInputDataBlockDesc = NULL; dispatcher->outPutColCounts = getOutputColCounts(dispatcher->pSchema); dispatcher->status = DS_BUF_EMPTY; dispatcher->queryEnd = false; diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 007420c927..06f5b5cce6 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -39,7 +39,7 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) { return TSDB_CODE_SUCCESS; } -int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) { +int32_t dsCreateDataSinker(void* pSinkManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) { SDataSinkManager* pManager = pSinkManager; switch ((int)nodeType(pDataSink)) { case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 4f92398b75..822ff8eb29 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -534,6 +534,8 @@ void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx); void qwRetireJob(SQWJobInfo* pJob); void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session); int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession); +void qwFreeTaskHandle(SQWTaskCtx *ctx); +void qwFreeSinkHandle(SQWTaskCtx *ctx); #ifdef __cplusplus } diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 99a0c3a3d3..0b5092ab0f 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -272,10 +272,10 @@ int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTask void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); } -void qwFreeTaskHandle(SQWTaskCtx *ctx, qTaskInfo_t *taskHandle) { +void qwFreeTaskHandle(SQWTaskCtx *ctx) { // Note: free/kill may in RC - qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); - if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { + qTaskInfo_t otaskHandle = atomic_load_ptr(&ctx->taskHandle); + if (otaskHandle && otaskHandle == atomic_val_compare_exchange_ptr(&ctx->taskHandle, otaskHandle, NULL)) { taosEnableFullMemPoolUsage(ctx->memPoolSession); qDestroyTask(otaskHandle); taosDisableFullMemPoolUsage(); @@ -284,6 +284,18 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx, qTaskInfo_t *taskHandle) { } } +void qwFreeSinkHandle(SQWTaskCtx *ctx) { + // Note: free/kill may in RC + void* osinkHandle = atomic_load_ptr(&ctx->sinkHandle); + if (osinkHandle && osinkHandle == atomic_val_compare_exchange_ptr(&ctx->sinkHandle, osinkHandle, NULL)) { + QW_SINK_ENABLE_MEMPOOL(ctx); + dsDestroyDataSinker(osinkHandle); + QW_SINK_DISABLE_MEMPOOL(); + + qDebug("sink handle destroyed"); + } +} + int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) { int32_t code = 0; @@ -308,16 +320,9 @@ void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { // NO need to release dataConnInfo - qwFreeTaskHandle(ctx, &ctx->taskHandle); + qwFreeTaskHandle(ctx); - if (ctx->sinkHandle) { - QW_SINK_ENABLE_MEMPOOL(ctx); - dsDestroyDataSinker(ctx->sinkHandle); - QW_SINK_DISABLE_MEMPOOL(); - - ctx->sinkHandle = NULL; - qDebug("sink handle destroyed"); - } + qwFreeSinkHandle(ctx); taosArrayDestroy(ctx->tbInfo); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index a100a9afe8..d6db77990d 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -71,12 +71,13 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { qTaskInfo_t taskHandle = ctx->taskHandle; - + int32_t code = TSDB_CODE_SUCCESS; + ctx->queryExecDone = true; if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) { if (ctx->explain && !ctx->explainRsped) { - QW_ERR_RET(qwSendExplainResponse(QW_FPARAMS(), ctx)); + QW_ERR_JRET(qwSendExplainResponse(QW_FPARAMS(), ctx)); } if (!ctx->needFetch) { @@ -86,7 +87,13 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { } } - return TSDB_CODE_SUCCESS; +_return: + + if (!ctx->explain || ctx->explainRsped) { + qwFreeTaskHandle(ctx); + } + + return code; } int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t rspCode, bool quickRsp) { @@ -483,6 +490,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32 qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete); if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); + qwFreeSinkHandle(ctx); } } @@ -868,6 +876,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); atomic_store_8((int8_t *)&ctx->queryContinue, 0); + qwFreeSinkHandle(ctx); } qwMsg->connInfo = ctx->dataConnInfo; @@ -957,6 +966,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { qwBuildFetchRsp(rsp, &sOutput, dataLen, rawDataLen, qComplete); if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); + qwFreeSinkHandle(ctx); } } @@ -1582,6 +1592,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64 qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete); if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); + qwFreeSinkHandle(ctx); } break; diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 6e759c98ca..562fef2027 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -363,7 +363,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo if (pHashObj->enableUpdate) { doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode); } else { - FREE_HASH_NODE(pHashObj->freeFp, pNewNode); + taosMemoryFreeClear(pNewNode); terrno = TSDB_CODE_DUP_KEY; code = terrno; goto _exit;