From e527f2fd0b3d5226c9f5d102d2e4d6de71f4afcd Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 12 Jul 2024 15:22:39 +0800 Subject: [PATCH] fix: validate function return value --- include/util/tarray.h | 2 +- source/client/src/clientHb.c | 3 +- source/common/src/tdatablock.c | 4 +- source/common/src/tmsg.c | 6 +- source/dnode/mnode/impl/src/mndDef.c | 4 +- source/dnode/mnode/impl/src/mndStreamUtil.c | 3 +- source/dnode/vnode/src/tq/tqSink.c | 9 +- source/dnode/vnode/src/tq/tqUtil.c | 12 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 3 +- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 3 +- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 24 ++- source/libs/executor/src/executil.c | 6 +- source/libs/executor/src/streamfilloperator.c | 3 +- .../executor/src/streamtimewindowoperator.c | 3 +- source/libs/executor/src/timewindowoperator.c | 3 +- source/libs/qworker/inc/qwInt.h | 2 +- source/libs/qworker/src/qwMsg.c | 4 +- source/libs/qworker/src/qwUtil.c | 15 +- source/libs/qworker/src/qworker.c | 140 +++++++++++------- source/libs/stream/src/streamCheckStatus.c | 4 +- source/libs/stream/src/streamMsg.c | 6 +- source/libs/stream/src/streamTask.c | 15 +- source/util/src/tarray.c | 6 +- source/util/src/tconfig.c | 3 +- 24 files changed, 184 insertions(+), 99 deletions(-) diff --git a/include/util/tarray.h b/include/util/tarray.h index e494f78f48..cef09e40f8 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -205,7 +205,7 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)); void taosArrayClearP(SArray* pArray, void (*fp)(void*)); -void* taosArrayDestroy(SArray* pArray); +void taosArrayDestroy(SArray* pArray); void taosArrayDestroyP(SArray* pArray, FDelete fp); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 1b7b9263a5..a7d459ff31 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -1398,7 +1398,8 @@ void hbMgrCleanUp() { taosThreadMutexLock(&clientHbMgr.lock); appHbMgrCleanup(); - clientHbMgr.appHbMgrs = taosArrayDestroy(clientHbMgr.appHbMgrs); + taosArrayDestroy(clientHbMgr.appHbMgrs); + clientHbMgr.appHbMgrs = NULL; taosThreadMutexUnlock(&clientHbMgr.lock); } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 809721d606..7710dbdb3f 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1512,7 +1512,9 @@ void blockDataFreeRes(SSDataBlock* pBlock) { colDataDestroy(pColInfoData); } - pBlock->pDataBlock = taosArrayDestroy(pBlock->pDataBlock); + taosArrayDestroy(pBlock->pDataBlock); + pBlock->pDataBlock = NULL; + taosMemoryFreeClear(pBlock->pBlockAgg); memset(&pBlock->info, 0, sizeof(SDataBlockInfo)); } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7e89753241..fa79d16970 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9505,7 +9505,8 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, void *pRsp) { static void tDeleteMqDataRspCommon(void *rsp) { SMqDataRspCommon *pRsp = rsp; - pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen); + taosArrayDestroy(pRsp->blockDataLen); + pRsp->blockDataLen = NULL; taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); pRsp->blockData = NULL; taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper); @@ -9558,7 +9559,8 @@ void tDeleteSTaosxRsp(void *rsp) { tDeleteMqDataRspCommon(rsp); STaosxRsp *pRsp = (STaosxRsp *)rsp; - pRsp->createTableLen = taosArrayDestroy(pRsp->createTableLen); + taosArrayDestroy(pRsp->createTableLen); + pRsp->createTableLen = NULL; taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree); pRsp->createTableReq = NULL; } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 5164557184..9dc5f920ad 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -195,7 +195,9 @@ void *freeStreamTasks(SArray *pTaskLevel) { taosArrayDestroy(pLevel); } - return taosArrayDestroy(pTaskLevel); + taosArrayDestroy(pTaskLevel); + + return NULL; } void tFreeStreamObj(SStreamObj *pStream) { diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index c4adbd0fc3..e4e30bdf10 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -978,7 +978,8 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo } void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) { - pInfo->pTaskList = taosArrayDestroy(pInfo->pTaskList); + taosArrayDestroy(pInfo->pTaskList); + pInfo->pTaskList = NULL; } int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 5f3e1e3d14..1c8f102725 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -249,7 +249,8 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S } tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag); - tagArray = taosArrayDestroy(tagArray); + taosArrayDestroy(tagArray); + tagArray = NULL; if (pCreateTbReq->ctb.pTag == NULL) { tdDestroySVCreateTbReq(pCreateTbReq); code = TSDB_CODE_OUT_OF_MEMORY; @@ -505,7 +506,8 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*)); if (pTableData->aRowP == NULL || pVals == NULL) { - pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); + taosArrayDestroy(pTableData->aRowP); + pTableData->aRowP = NULL; taosArrayDestroy(pVals); code = TSDB_CODE_OUT_OF_MEMORY; tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code)); @@ -530,7 +532,8 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat if (ts < earlyTs) { tqError("s-task:%s ts:%" PRId64 " of generated results out of valid time range %" PRId64 " , discarded", id, ts, earlyTs); - pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); + taosArrayDestroy(pTableData->aRowP); + pTableData->aRowP = NULL; taosArrayDestroy(pVals); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 5290c39d42..eb1da51b45 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -54,19 +54,23 @@ static int32_t tqInitTaosxRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset) { if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) { if (pRsp->blockData != NULL) { - pRsp->blockData = taosArrayDestroy(pRsp->blockData); + taosArrayDestroy(pRsp->blockData); + pRsp->blockData = NULL; } if (pRsp->blockDataLen != NULL) { - pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen); + taosArrayDestroy(pRsp->blockDataLen); + pRsp->blockDataLen = NULL; } if (pRsp->blockTbName != NULL) { - pRsp->blockTbName = taosArrayDestroy(pRsp->blockTbName); + taosArrayDestroy(pRsp->blockTbName); + pRsp->blockTbName = NULL; } if (pRsp->blockSchema != NULL) { - pRsp->blockSchema = taosArrayDestroy(pRsp->blockSchema); + taosArrayDestroy(pRsp->blockSchema); + pRsp->blockSchema = NULL; } return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 4597a44845..38dd945b28 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -3187,7 +3187,8 @@ static void clearLastFileSet(SFSNextRowIter *state) { int32_t iter = 0; while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) { STableLoadInfo *pInfo = *(STableLoadInfo **)pe; - pInfo->pTombData = taosArrayDestroy(pInfo->pTombData); + taosArrayDestroy(pInfo->pTombData); + pInfo->pTombData = NULL; } } } diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 4d6ebf721e..817468c6b6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -311,7 +311,8 @@ void* tsdbCacherowsReaderClose(void* pReader) { int32_t iter = 0; while ((pe = tSimpleHashIterate(p->pTableMap, pe, &iter)) != NULL) { STableLoadInfo* pInfo = *(STableLoadInfo**)pe; - pInfo->pTombData = taosArrayDestroy(pInfo->pTombData); + taosArrayDestroy(pInfo->pTombData); + pInfo->pTombData = NULL; } tSimpleHashCleanup(p->pTableMap); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 273ec6d797..4bd9bee050 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -308,7 +308,8 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) { pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter); } - pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); + taosArrayDestroy(pInfo->delSkyline); + pInfo->delSkyline = NULL; pInfo->lastProcKey.ts = ts; // todo check the nextProcKey info pInfo->sttKeyInfo.nextProcKey.ts = ts + step; @@ -329,11 +330,16 @@ void clearBlockScanInfo(STableBlockScanInfo* p) { p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter); } - p->delSkyline = taosArrayDestroy(p->delSkyline); - p->pBlockList = taosArrayDestroy(p->pBlockList); - p->pBlockIdxList = taosArrayDestroy(p->pBlockIdxList); - p->pMemDelData = taosArrayDestroy(p->pMemDelData); - p->pFileDelData = taosArrayDestroy(p->pFileDelData); + taosArrayDestroy(p->delSkyline); + p->delSkyline = NULL; + taosArrayDestroy(p->pBlockList); + p->pBlockList = NULL; + taosArrayDestroy(p->pBlockIdxList); + p->pBlockIdxList = NULL; + taosArrayDestroy(p->pMemDelData); + p->pMemDelData = NULL; + taosArrayDestroy(p->pFileDelData); + p->pFileDelData = NULL; clearRowKey(&p->lastProcKey); clearRowKey(&p->sttRange.skey); @@ -579,7 +585,8 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 } taosArrayAddAll(pBlockIter->blockList, pTableScanInfo->pBlockList); - pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList); + taosArrayDestroy(pTableScanInfo->pBlockList); + pTableScanInfo->pBlockList = NULL; int64_t et = taosGetTimestampUs(); tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s", @@ -624,7 +631,8 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 for (int32_t i = 0; i < numOfTables; ++i) { STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i); - pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList); + taosArrayDestroy(pTableScanInfo->pBlockList); + pTableScanInfo->pBlockList = NULL; } int64_t et = taosGetTimestampUs(); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 173ef0b829..c623e94a12 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -96,7 +96,8 @@ void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) { pGroupResInfo->freeItem = false; pGroupResInfo->pRows = NULL; } else { - pGroupResInfo->pRows = taosArrayDestroy(pGroupResInfo->pRows); + taosArrayDestroy(pGroupResInfo->pRows); + pGroupResInfo->pRows = NULL; } pGroupResInfo->index = 0; } @@ -2102,7 +2103,8 @@ void* tableListDestroy(STableListInfo* pTableListInfo) { return NULL; } - pTableListInfo->pTableList = taosArrayDestroy(pTableListInfo->pTableList); + taosArrayDestroy(pTableListInfo->pTableList); + pTableListInfo->pTableList = NULL; taosMemoryFreeClear(pTableListInfo->groupOffset); taosHashCleanup(pTableListInfo->map); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index c1a38b66ba..5f188c2c8a 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -129,7 +129,8 @@ static void destroyStreamFillOperatorInfo(void* param) { pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock); pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); - pInfo->matchInfo.pList = taosArrayDestroy(pInfo->matchInfo.pList); + taosArrayDestroy(pInfo->matchInfo.pList); + pInfo->matchInfo.pList = NULL; taosMemoryFree(pInfo); } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index d68108b9a5..d3f90c3134 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -406,7 +406,8 @@ void clearGroupResInfo(SGroupResInfo* pGroupResInfo) { } pGroupResInfo->freeItem = false; } - pGroupResInfo->pRows = taosArrayDestroy(pGroupResInfo->pRows); + taosArrayDestroy(pGroupResInfo->pRows); + pGroupResInfo->pRows = NULL; pGroupResInfo->index = 0; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index bad15d1834..23bd2b87a1 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1136,7 +1136,8 @@ void destroyIntervalOperatorInfo(void* param) { tdListFree(pInfo->binfo.resultRowInfo.openWindow); - pInfo->pInterpCols = taosArrayDestroy(pInfo->pInterpCols); + taosArrayDestroy(pInfo->pInterpCols); + pInfo->pInterpCols = NULL; taosArrayDestroyEx(pInfo->pPrevValues, freeItem); pInfo->pPrevValues = NULL; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 2222f9cb31..dfd00dc5a7 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -391,7 +391,7 @@ void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx); int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode); int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status, bool dynamicTask); int32_t qwDropTask(QW_FPARAMS_DEF); -void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); +int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); int32_t qwOpenRef(void); void qwSetHbParam(int64_t refId, SQWHbParam **pParam); int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index d30b100147..fad4b1efa8 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -495,9 +495,9 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg)); + code = qwProcessCQuery(QW_FPARAMS(), &qwMsg); - QW_SCH_TASK_DLOG("processCQuery end, node:%p", node); + QW_SCH_TASK_DLOG("processCQuery end, node:%p, code:0x%x", node, code); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index f00c4aef30..619e2576cd 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -503,14 +503,16 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { *pParam = &gQwMgmt.param[paramIdx]; } -void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { +int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { char dbFName[TSDB_DB_FNAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN]; STbVerInfo tbInfo; int32_t i = 0; + int32_t code = TSDB_CODE_SUCCESS; while (true) { - if (qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &tbInfo.sversion, &tbInfo.tversion, i) < 0) { + code = qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &tbInfo.sversion, &tbInfo.tversion, i); + if (TSDB_CODE_SUCCESS != code) { break; } @@ -522,12 +524,19 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { if (NULL == ctx->tbInfo) { ctx->tbInfo = taosArrayInit(1, sizeof(tbInfo)); + if (NULL == ctx->tbInfo) { + QW_ERR_RET(terrno); + } } - taosArrayPush(ctx->tbInfo, &tbInfo); + if (NULL == taosArrayPush(ctx->tbInfo, &tbInfo)) { + QW_ERR_RET(terrno); + } i++; } + + QW_RET(code); } void qwCloseRef(void) { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index c2cd07e9ed..8ab095bc32 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -18,10 +18,11 @@ SQWorkerMgmt gQwMgmt = { .qwNum = 0, }; -int32_t qwStopAllTasks(SQWorker *mgmt) { +void qwStopAllTasks(SQWorker *mgmt) { uint64_t qId, tId, sId; int32_t eId; int64_t rId = 0; + int32_t code = TSDB_CODE_SUCCESS; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { @@ -44,22 +45,29 @@ int32_t qwStopAllTasks(SQWorker *mgmt) { } if (QW_QUERY_RUNNING(ctx)) { - qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); - QW_TASK_DLOG_E("task running, async killed"); + code = qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); + if (TSDB_CODE_SUCCESS != code) { + QW_TASK_ELOG("task running, async kill failed, error: %x", code); + } else { + QW_TASK_DLOG_E("task running, async killed"); + } } else if (QW_FETCH_RUNNING(ctx)) { QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); QW_TASK_DLOG_E("task fetching, update drop received"); } else { - qwDropTask(QW_FPARAMS()); + code = qwDropTask(QW_FPARAMS()); + if (TSDB_CODE_SUCCESS != code) { + QW_TASK_ELOG("task drop failed, error: %x", code); + } else { + QW_TASK_DLOG_E("task dropped"); + } } QW_UNLOCK(QW_WRITE, &ctx->lock); pIter = taosHashIterate(mgmt->ctxHash, pIter); } - - return TSDB_CODE_SUCCESS; } int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { @@ -111,7 +119,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t rspCode, bool quickRsp) { if ((!quickRsp) || QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy) { if (!ctx->localExec) { - qwBuildAndSendQueryRsp(msgType, &ctx->ctrlConnInfo, rspCode, ctx); + QW_ERR_RET(qwBuildAndSendQueryRsp(msgType, &ctx->ctrlConnInfo, rspCode, ctx)); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, rspCode, tstrerror(rspCode)); } @@ -140,6 +148,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { } SArray *pResList = taosArrayInit(4, POINTER_BYTES); + if (NULL == pResList) { + QW_ERR_RET(terrno); + } + while (true) { QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); @@ -165,6 +177,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { size_t numOfResBlock = taosArrayGetSize(pResList); for (int32_t j = 0; j < numOfResBlock; ++j) { SSDataBlock *pRes = taosArrayGetP(pResList, j); + if (NULL == pRes) { + QW_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } SInputData inputData = {.pData = pRes}; code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); @@ -226,7 +241,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { } _return: + taosArrayDestroy(pResList); + QW_RET(code); } @@ -241,7 +258,8 @@ bool qwTaskNotInExec(SQWTaskCtx *ctx) { int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { int32_t taskNum = 0; - + int32_t code = TSDB_CODE_SUCCESS; + hbInfo->connInfo = sch->hbConnInfo; hbInfo->rsp.epId = sch->hbEpId; @@ -272,7 +290,11 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) status.status = taskStatus->status; status.refId = taskStatus->refId; - taosArrayPush(hbInfo->rsp.taskStatus, &status); + if (NULL == taosArrayPush(hbInfo->rsp.taskStatus, &status)) { + taosHashCancelIterate(sch->tasksHash, pIter); + code = terrno; + break; + } ++i; pIter = taosHashIterate(sch->tasksHash, pIter); @@ -280,7 +302,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) QW_UNLOCK(QW_READ, &sch->tasksLock); - return TSDB_CODE_SUCCESS; + return code; } int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, int32_t *pRawDataLen, void **rspMsg, @@ -320,7 +342,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, pOutput->numOfRows); if (!ctx->dynamicTask) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); + QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask)); } if (NULL == pRsp) { @@ -375,7 +397,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks, pOutput->numOfRows); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); + QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask)); break; } @@ -464,7 +486,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32 qwMsg->connInfo = ctx->dataConnInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + QW_ERR_RET(qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code)); rsp = NULL; QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), @@ -650,7 +672,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp _return: if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC, ctx->dynamicTask); + code = qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC, ctx->dynamicTask); ctx->queryGotData = true; } @@ -660,7 +682,7 @@ _return: qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); if (!rsped) { - qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false); + code = qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false); } } @@ -672,7 +694,7 @@ _return: } if (code) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); + code = qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); } QW_UNLOCK(QW_WRITE, &ctx->lock); @@ -687,11 +709,11 @@ _return: int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) { QW_ERR_RET(qwDropTask(QW_FPARAMS())); - QW_RET(TSDB_CODE_SUCCESS); + return TSDB_CODE_SUCCESS; } int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; SQWTaskCtx *ctx = NULL; QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); @@ -706,7 +728,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); - qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true); + QW_ERR_JRET(qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true)); _return: @@ -715,7 +737,7 @@ _return: qwReleaseTaskCtx(mgmt, ctx); } - QW_RET(TSDB_CODE_SUCCESS); + return TSDB_CODE_SUCCESS; } int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { @@ -761,7 +783,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->sinkHandle, sinkHandle); - qwSaveTbVersionInfo(pTaskInfo, ctx); + QW_ERR_JRET(qwSaveTbVersionInfo(pTaskInfo, ctx)); if (!ctx->dynamicTask) { QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); @@ -778,7 +800,7 @@ _return: input.msgType = qwMsg->msgType; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); - qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code); + QW_ERR_RET(qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code)); QW_RET(TSDB_CODE_SUCCESS); } @@ -829,7 +851,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { qwMsg->connInfo = ctx->dataConnInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + QW_ERR_JRET(qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code)); rsp = NULL; QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, @@ -851,9 +873,14 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { rsp = NULL; qwMsg->connInfo = ctx->dataConnInfo; - qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code); - QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), - 0); + code = qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code); + if (TSDB_CODE_SUCCESS != code) { + QW_TASK_ELOG("fetch rsp send fail, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), + 0); + } else { + QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), + 0); + } } QW_LOCK(QW_WRITE, &ctx->lock); @@ -869,7 +896,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } while (true); input.code = code; - qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL); + QW_ERR_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL)); QW_RET(TSDB_CODE_SUCCESS); } @@ -922,7 +949,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } else if (QW_QUERY_RUNNING(ctx)) { atomic_store_8((int8_t *)&ctx->queryContinue, 1); } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask); + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask)); atomic_store_8((int8_t *)&ctx->queryInQueue, 1); QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo)); @@ -952,9 +979,14 @@ _return: } if (!rsped) { - qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); - QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), - qwMsg->connInfo.handle, code, tstrerror(code), dataLen); + code = qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + if (TSDB_CODE_SUCCESS != code) { + QW_TASK_ELOG("fetch rsp send fail, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), + qwMsg->connInfo.handle, code, tstrerror(code), dataLen); + } else { + QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), + qwMsg->connInfo.handle, code, tstrerror(code), dataLen); + } } else { qwFreeFetchRsp(rsp); rsp = NULL; @@ -985,7 +1017,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_QUERY_RUNNING(ctx)) { QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED)); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP, ctx->dynamicTask); + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP, ctx->dynamicTask)); } else { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); dropped = true; @@ -1001,7 +1033,7 @@ _return: if (code) { if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); + (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // task already failed, no more error handling } else { tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); } @@ -1035,7 +1067,7 @@ int32_t qwProcessNotify(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_QUERY_RUNNING(ctx)) { QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED)); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask)); } switch (qwMsg->msgType) { @@ -1055,7 +1087,7 @@ _return: if (code) { if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); + (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // task already failed, no more error handling } } @@ -1104,7 +1136,7 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { _return: memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); - qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); + code = qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); if (code) { tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); @@ -1125,7 +1157,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { int64_t refId = hbParam->refId; SQWorker *mgmt = qwAcquire(refId); if (NULL == mgmt) { - QW_DLOG("qwAcquire %" PRIx64 "failed", refId); + QW_DLOG("qwAcquire %" PRIx64 "failed, code:0x%x", refId, terrno); return; } @@ -1137,7 +1169,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { qwDbgDumpMgmtInfo(mgmt); if (gQWDebug.forceStop) { - (void)qwStopAllTasks(mgmt); + qwStopAllTasks(mgmt); } QW_LOCK(QW_READ, &mgmt->schLock); @@ -1145,8 +1177,8 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { int32_t schNum = taosHashGetSize(mgmt->schHash); if (schNum <= 0) { QW_UNLOCK(QW_READ, &mgmt->schLock); - taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); - qwRelease(refId); + (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error + (void)qwRelease(refId); // ignore error return; } @@ -1156,9 +1188,9 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { QW_UNLOCK(QW_READ, &mgmt->schLock); taosMemoryFree(rspList); taosArrayDestroy(pExpiredSch); - QW_ELOG("calloc %d SQWHbInfo failed", schNum); - taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); - qwRelease(refId); + QW_ELOG("calloc %d SQWHbInfo failed, code:%x", schNum, terrno); + (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error + (void)qwRelease(refId); // ignore error return; } @@ -1174,7 +1206,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && taosHashGetSize(sch1->tasksHash) <= 0) { - taosArrayPush(pExpiredSch, sId); + if (NULL == taosArrayPush(pExpiredSch, sId)) { + QW_ELOG("add sId 0x%" PRIx64 " to expiredSch failed, code:%x", *sId, terrno); + taosHashCancelIterate(mgmt->schHash, pIter); + break; + } } pIter = taosHashIterate(mgmt->schHash, pIter); @@ -1196,7 +1232,7 @@ _return: QW_UNLOCK(QW_READ, &mgmt->schLock); for (int32_t j = 0; j < i; ++j) { - qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); + (void)qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); // ignore error /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/ /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/ tFreeSSchedulerHbRsp(&rspList[j].rsp); @@ -1209,8 +1245,8 @@ _return: taosMemoryFreeClear(rspList); taosArrayDestroy(pExpiredSch); - taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); - qwRelease(refId); + (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error + (void)qwRelease(refId); // ignore error } int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { @@ -1333,7 +1369,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S _return: if (mgmt->refId >= 0) { - qwRelease(mgmt->refId); + qwRelease(mgmt->refId); // ignore error } else { taosHashCleanup(mgmt->schHash); taosHashCleanup(mgmt->ctxHash); @@ -1353,7 +1389,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { atomic_store_8(&mgmt->nodeStopped, 1); - (void)qwStopAllTasks(mgmt); + qwStopAllTasks(mgmt); } void qWorkerDestroy(void **qWorkerMgmt) { @@ -1383,7 +1419,7 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SDataSinkStat sinkStat = {0}; - dsDataSinkGetCacheSize(&sinkStat); + QW_ERR_RET(dsDataSinkGetCacheSize(&sinkStat)); pStat->cacheDataSize = sinkStat.cachedSize; pStat->queryProcessed = QW_STAT_GET(mgmt->stat.msgStat.queryProcessed); @@ -1427,6 +1463,10 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64 ctx->explainRes = explainRes; rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb)); + if (NULL == rHandle.pMsgCb) { + QW_ERR_JRET(terrno); + } + rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle; code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 226a06be7e..620f5f1248 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -308,7 +308,9 @@ int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) { ASSERT(pInfo->inCheckProcess == 0); - pInfo->pList = taosArrayDestroy(pInfo->pList); + taosArrayDestroy(pInfo->pList); + pInfo->pList = NULL; + if (pInfo->checkRspTmr != NULL) { /*bool ret = */ taosTmrStop(pInfo->checkRspTmr); pInfo->checkRspTmr = NULL; diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 40582b5144..1cc48f02b6 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -438,11 +438,13 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { } if (pMsg->pUpdateNodes != NULL) { - pMsg->pUpdateNodes = taosArrayDestroy(pMsg->pUpdateNodes); + taosArrayDestroy(pMsg->pUpdateNodes); + pMsg->pUpdateNodes = NULL; } if (pMsg->pTaskStatus != NULL) { - pMsg->pTaskStatus = taosArrayDestroy(pMsg->pTaskStatus); + taosArrayDestroy(pMsg->pTaskStatus); + pMsg->pTaskStatus = NULL; } pMsg->msgId = -1; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 9bcad87264..4cbe0cb136 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -280,10 +280,12 @@ void tFreeStreamTask(SStreamTask* pTask) { taosMemoryFree(pTask->outputInfo.pTokenBucket); taosThreadMutexDestroy(&pTask->lock); - pTask->msgInfo.pSendInfo = taosArrayDestroy(pTask->msgInfo.pSendInfo); + taosArrayDestroy(pTask->msgInfo.pSendInfo); + pTask->msgInfo.pSendInfo = NULL; taosThreadMutexDestroy(&pTask->msgInfo.lock); - pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); + taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); + pTask->outputInfo.pNodeEpsetUpdateList = NULL; if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128); @@ -1055,9 +1057,12 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { } taosThreadMutexDestroy(&pInfo->lock); - pInfo->pDispatchTriggerList = taosArrayDestroy(pInfo->pDispatchTriggerList); - pInfo->pReadyMsgList = taosArrayDestroy(pInfo->pReadyMsgList); - pInfo->pCheckpointReadyRecvList = taosArrayDestroy(pInfo->pCheckpointReadyRecvList); + taosArrayDestroy(pInfo->pDispatchTriggerList); + pInfo->pDispatchTriggerList = NULL; + taosArrayDestroy(pInfo->pReadyMsgList); + pInfo->pReadyMsgList = NULL; + taosArrayDestroy(pInfo->pCheckpointReadyRecvList); + pInfo->pCheckpointReadyRecvList = NULL; if (pInfo->pChkptTriggerTmr != NULL) { taosTmrStop(pInfo->pChkptTriggerTmr); diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index d9686d77f8..4c056178c2 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -34,14 +34,12 @@ SArray* taosArrayInit(size_t size, size_t elemSize) { SArray* pArray = taosMemoryMalloc(sizeof(SArray)); if (pArray == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } pArray->size = 0; pArray->pData = taosMemoryCalloc(size, elemSize); if (pArray->pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pArray); return NULL; } @@ -387,13 +385,11 @@ void taosArrayClearP(SArray* pArray, void (*fp)(void*)) { taosArrayClear(pArray); } -void* taosArrayDestroy(SArray* pArray) { +void taosArrayDestroy(SArray* pArray) { if (pArray) { taosMemoryFree(pArray->pData); taosMemoryFree(pArray); } - - return NULL; } void taosArrayDestroyP(SArray* pArray, FDelete fp) { diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index adde3a3331..3ab0f83238 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -96,7 +96,8 @@ void cfgItemFreeVal(SConfigItem *pItem) { } if (pItem->array) { - pItem->array = taosArrayDestroy(pItem->array); + taosArrayDestroy(pItem->array); + pItem->array = NULL; } }