diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 61f1339c82..d9af279813 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -51,7 +51,7 @@ typedef struct SAggOperatorInfo { } SAggOperatorInfo; static void destroyAggOperatorInfo(void* param); -static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); +static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock); static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock); @@ -63,7 +63,7 @@ static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, in static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size); -static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); +static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus); static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus); @@ -184,7 +184,8 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { if (pBlock) { pAggInfo->pNewGroupBlock = NULL; tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable); - setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + QUERY_CHECK_CODE(code, lino, _end); code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); QUERY_CHECK_CODE(code, lino, _end); @@ -225,12 +226,19 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { break; } // the pDataBlock are always the same one, no need to call this again - setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + if (code != TSDB_CODE_SUCCESS) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + T_LONG_JMP(pTaskInfo->env, code); + } code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); - QUERY_CHECK_CODE(code, lino, _end); + if (code != TSDB_CODE_SUCCESS) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + T_LONG_JMP(pTaskInfo->env, code); + } code = doAggregateImpl(pOperator, pSup->pCtx); - if (code != 0) { + if (code != TSDB_CODE_SUCCESS) { destroyDataBlockForEmptyInput(blockAllocated, &pBlock); T_LONG_JMP(pTaskInfo->env, code); } @@ -427,20 +435,24 @@ void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) { *ppBlock = NULL; } -void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { +int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { + int32_t code = TSDB_CODE_SUCCESS; SAggOperatorInfo* pAggInfo = pOperator->info; if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) { - return; + return code; } - doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); + code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); // record the current active group id pAggInfo->groupId = groupId; + return code; } -void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { +int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { // for simple group by query without interval, all the tables belong to one group result. + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SAggOperatorInfo* pAggInfo = pOperator->info; @@ -452,23 +464,27 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup, true); if (pResultRow == NULL || pTaskInfo->code != 0) { - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + code = pTaskInfo->code; + lino = __LINE__; + goto _end; } /* * not assign result buffer yet, add new result buffer * all group belong to one result set, and each group result has different group id so set the id to be one */ if (pResultRow->pageId == -1) { - int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); - if (ret != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, terrno); - } + code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); + QUERY_CHECK_CODE(code, lino, _end); } - int32_t ret = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); - if (ret != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, ret); + code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); + QUERY_CHECK_CODE(code, lino, _end); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } + return code; } // a new buffer page for each table. Needs to opt this design diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index f6f3570804..d43f37ca9e 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -464,7 +464,10 @@ _error: void destroyExchangeOperatorInfo(void* param) { SExchangeInfo* pExInfo = (SExchangeInfo*)param; - (void)taosRemoveRef(exchangeObjRefPool, pExInfo->self); + int32_t code = taosRemoveRef(exchangeObjRefPool, pExInfo->self); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } } void freeBlock(void* pParam) { @@ -505,7 +508,10 @@ void doDestroyExchangeOperatorInfo(void* param) { blockDataDestroy(pExInfo->pDummyBlock); tSimpleHashCleanup(pExInfo->pHashSources); - (void)tsem_destroy(&pExInfo->ready); + int32_t code = tsem_destroy(&pExInfo->ready); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } taosMemoryFreeClear(pExInfo->pTaskId); taosMemoryFreeClear(param); @@ -561,9 +567,13 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { if (code != TSDB_CODE_SUCCESS) { code = TAOS_SYSTEM_ERROR(code); qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo); + return code; } - (void)taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId); + code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } return code; } @@ -1190,7 +1200,14 @@ static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeIn return pTask->code; } } - (void)tsem_wait(&pExchangeInfo->ready); + + code = tsem_wait(&pExchangeInfo->ready); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + pTask->code = code; + return pTask->code; + } + if (pTask->pWorkerCb) { code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c8512636d9..62932513f4 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -904,8 +904,14 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { } SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId); if (pExchangeInfo) { - (void)tsem_post(&pExchangeInfo->ready); - (void)taosReleaseRef(exchangeObjRefPool, pStop->refId); + int32_t code = tsem_post(&pExchangeInfo->ready); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } + code = taosReleaseRef(exchangeObjRefPool, pStop->refId); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3cc1d41924..5f137e46f1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -671,7 +671,8 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h); val = *pVal; - (void)taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false); + bool bRes = taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false); + qTrace("release LRU cache, res %d", bRes); } qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch, @@ -893,7 +894,10 @@ void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) { if (pInfo->base.pTableListInfo->groupOffset) { pInfo->countState = TABLE_COUNT_STATE_PROCESSED; } else { - (void)taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId)); + int32_t code = taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId)); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } } } @@ -4529,6 +4533,7 @@ static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRe SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); QUERY_CHECK_NULL(pDst, code, lino, _end, terrno); code = tagScanFillOneCellWithTag(pOperator, pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode); + QUERY_CHECK_CODE(code, lino, _end); } } } else { diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 85a8b035da..42fc5b7f7d 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2177,7 +2177,12 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca T_LONG_JMP(pTaskInfo->env, code); } - (void)tsem_wait(&pInfo->ready); + code = tsem_wait(&pInfo->ready); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } if (pTaskInfo->code) { qError("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo), @@ -2327,7 +2332,10 @@ void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNo void destroySysScanOperator(void* param) { SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param; - (void)tsem_destroy(&pInfo->ready); + int32_t code = tsem_destroy(&pInfo->ready); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } blockDataDestroy(pInfo->pRes); if (pInfo->name.type == TSDB_TABLE_NAME_T) { @@ -2383,7 +2391,10 @@ int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) { } } - (void)tsem_post(&pScanResInfo->ready); + int32_t res = tsem_post(&pScanResInfo->ready); + if (res != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(res)); + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fc91877b66..1999694057 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1231,7 +1231,7 @@ void destroyIntervalOperatorInfo(void* param) { cleanupAggSup(&pInfo->aggSup); cleanupExprSupp(&pInfo->scalarSupp); - (void)tdListFree(pInfo->binfo.resultRowInfo.openWindow); + pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow); taosArrayDestroy(pInfo->pInterpCols); pInfo->pInterpCols = NULL; @@ -2132,7 +2132,7 @@ typedef struct SGroupTimeWindow { void destroyMergeIntervalOperatorInfo(void* param) { SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param; - (void)tdListFree(miaInfo->groupIntervals); + miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals); destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo); taosMemoryFreeClear(param); @@ -2162,7 +2162,8 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t STimeWindow* prevWin = &prevGrpWin->window; if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) { - (void)tdListPopNode(miaInfo->groupIntervals, listNode); + SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode); + taosMemoryFreeClear(tmp); } }