diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 38a2dd3ab2..af5b3523e8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -355,7 +355,7 @@ void tsdbCacherowsReaderClose(void* pReader) { return; } - if (p->pSchema != NULL) { + if (p->pSchema != NULL && p->transferBuf != NULL) { for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) { taosMemoryFreeClear(p->transferBuf[i]); } @@ -450,23 +450,27 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 return TSDB_CODE_INVALID_PARA; } + int32_t code = TSDB_CODE_SUCCESS; + bool hasRes = false; + SArray* pRow = NULL; + void** pRes = NULL; SCacheRowsReader* pr = pReader; + int32_t pkBufLen = 0; - int32_t code = TSDB_CODE_SUCCESS; - bool hasRes = false; - SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol)); + pr->pReadSnap = NULL; + pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol)); if (pRow == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _end; } - void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); + pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); if (pRes == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _end; } - int32_t pkBufLen = (pr->rowKey.numOfPKs > 0) ? pr->pkColumn.bytes : 0; + pkBufLen = (pr->rowKey.numOfPKs > 0) ? pr->pkColumn.bytes : 0; for (int32_t j = 0; j < pr->numOfCols; ++j) { int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes; @@ -690,6 +694,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 _end: tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true); + pr->pReadSnap = NULL; + if (pr->pCurFileSet) { pr->pCurFileSet = NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 60568b1380..09ca1cdc84 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2434,21 +2434,25 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))}; if (info.pKeyRangeList == NULL) { + pReader->code = terrno; return false; } int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info); if (code != TSDB_CODE_SUCCESS) { + pReader->code = code; return false; } code = initMemDataIterator(pScanInfo, pReader); if (code != TSDB_CODE_SUCCESS) { + pReader->code = code; return false; } code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); if (code != TSDB_CODE_SUCCESS) { + pReader->code = code; return code; } @@ -2461,7 +2465,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan for (int32_t i = 0; i < taosArrayGetSize(info.pKeyRangeList); ++i) { SSttKeyRange* pKeyRange = taosArrayGet(info.pKeyRangeList, i); if (pKeyRange == NULL) { - return TSDB_CODE_INVALID_PARA; + continue; } if (pkCompEx(&pScanInfo->sttRange.skey, &pKeyRange->skey) > 0) { @@ -2766,6 +2770,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { SBlockData* pBlockData = &pReader->status.fileBlockData; (void) initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader); + if (pReader->code != 0) { + code = pReader->code; + goto _end; + } while (1) { bool hasBlockData = false; @@ -3180,6 +3188,10 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { } bool hasDataInSttFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + if (pReader->code != TSDB_CODE_SUCCESS) { + return pReader->code; + } + if (!hasDataInSttFile) { bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { @@ -3273,6 +3285,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) { (void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + if (pReader->code != 0) { + return pReader->code; + } } TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); @@ -3314,6 +3329,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files (void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + if (pReader->code != 0) { + return pReader->code; + } // no data in stt block, no need to proceed. while (hasDataInSttBlock(pScanInfo)) { @@ -4795,6 +4813,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { void* p = pReader->pReadSnap; if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) { tsdbUntakeReadSnap2(pReader, p, true); + pReader->pReadSnap = NULL; } (void) tsem_destroy(&pReader->resumeAfterSuspend); @@ -4877,6 +4896,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { void* p = pReader->pReadSnap; if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) { tsdbUntakeReadSnap2(pReader, p, false); + pReader->pReadSnap = NULL; } if (pReader->bFilesetDelimited) { diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index ca79f3f285..46fc618f76 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -26,7 +26,7 @@ #define T_LONG_JMP(_obj, _c) \ do { \ - ASSERT((_c) != -1); \ + ASSERT(1); \ longjmp((_obj), (_c)); \ } while (0) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 78b18e24cc..198ff2e920 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -180,11 +180,11 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SAggOperatorInfo* pAggInfo = pOperator->info; - if (pOperator->blocking && pAggInfo->hasValidBlock) return false; - - SExprSupp* pSup = &pOperator->exprSupp; - SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (pOperator->blocking && pAggInfo->hasValidBlock) { + return false; + } + SExprSupp* pSup = &pOperator->exprSupp; int64_t st = taosGetTimestampUs(); int32_t order = pAggInfo->binfo.inputTsOrder; SSDataBlock* pBlock = pAggInfo->pNewGroupBlock; @@ -458,7 +458,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin * not assign result buffer yet, add new result buffer * all group belong to one result set, and each group result has different group id so set the id to be one */ - if (pResultRow->pageId == -1) { + if (pResultRow == NULL || pResultRow->pageId == -1) { int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); if (ret != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, terrno); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 8a7f1ad77d..cbb124b9e0 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -158,8 +158,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR if (isIntervalQuery) { if (p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists. pResult = getResultRowByPos(pResultBuf, p1, true); - if (NULL == pResult) { - T_LONG_JMP(pTaskInfo->env, terrno); + if (pResult == NULL) { + pTaskInfo->code = terrno; + return NULL; } ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); @@ -171,7 +172,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // todo pResult = getResultRowByPos(pResultBuf, p1, true); if (NULL == pResult) { - T_LONG_JMP(pTaskInfo->env, terrno); + pTaskInfo->code = terrno; + return NULL; } ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); @@ -184,7 +186,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR SFilePage* pPage = getBufPage(pResultBuf, pos.pageId); if (pPage == NULL) { qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, terrno); + pTaskInfo->code = terrno; + return NULL; } releaseBufPage(pResultBuf, pPage); } @@ -193,7 +196,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR if (pResult == NULL) { pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize); if (pResult == NULL) { - T_LONG_JMP(pTaskInfo->env, terrno); + pTaskInfo->code = terrno; + return NULL; } // add a new result set for a new group @@ -202,7 +206,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR sizeof(SResultRowPosition)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - T_LONG_JMP(pTaskInfo->env, code); + pTaskInfo->code = code; + return NULL; } } @@ -212,7 +217,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // too many time window in query if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); + pTaskInfo->code = TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW; + return NULL; } return pResult; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 84a83b4709..60d065a241 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -615,8 +615,12 @@ _error: if (pInfo != NULL) { destroyGroupOperatorInfo(pInfo); } - destroyOperator(pOperator); - taosMemoryFreeClear(pOperator); + + if (pOperator) { + pOperator->info = NULL; + destroyOperator(pOperator); + } + return code; } @@ -1254,6 +1258,9 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, SResultRow* pResultRow = doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup, false); + if (pResultRow == NULL || pTaskInfo->code != 0) { + return pTaskInfo->code; + } return setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset); } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 3f86291d6a..d6fb57e335 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -703,6 +703,9 @@ int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, int64_t groupId = 0; SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup, true); + if (pRow == NULL || pTaskInfo->code != 0) { + return pTaskInfo->code; + } for (int32_t i = 0; i < numOfExprs; ++i) { struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset); diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 5c4d3fe009..7100e10276 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -155,6 +155,11 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo schemaInfo.tablename = taosStrdup(mr.me.name); schemaInfo.dbname = taosStrdup(dbName); + if (schemaInfo.tablename == NULL || schemaInfo.dbname == NULL) { + pAPI->metaReaderFn.clearReader(&mr); + cleanupQueriedTableScanInfo(&schemaInfo); + return terrno; + } if (mr.me.type == TSDB_SUPER_TABLE) { schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); @@ -166,8 +171,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, suid); if (code != TSDB_CODE_SUCCESS) { pAPI->metaReaderFn.clearReader(&mr); - taosMemoryFree(schemaInfo.tablename); - taosMemoryFree(schemaInfo.dbname); + cleanupQueriedTableScanInfo(&schemaInfo); return code; } @@ -177,18 +181,26 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); } + pAPI->metaReaderFn.clearReader(&mr); + if (schemaInfo.sw == NULL) { + cleanupQueriedTableScanInfo(&schemaInfo); return terrno; } - pAPI->metaReaderFn.clearReader(&mr); schemaInfo.qsw = extractQueriedColumnSchema(pScanNode); if (schemaInfo.qsw == NULL) { + cleanupQueriedTableScanInfo(&schemaInfo); return terrno; } void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo); - return (p != NULL)? TSDB_CODE_SUCCESS:TSDB_CODE_OUT_OF_MEMORY; + if (p == NULL) { + cleanupQueriedTableScanInfo(&schemaInfo); + return terrno; + } + + return code; } SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 8fb0646495..394b4f7741 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -130,7 +130,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN pGroupIdCalc->lastKeysLen = 0; pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen); if (!pGroupIdCalc->keyBuf) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; } } } @@ -370,8 +370,13 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + if (ps == NULL) { + return terrno; + } + ps->param = pOperator->pDownstream[0]; ps->onlyRef = true; + code = tsortAddSource(pInfo->pSortHandle, ps); if (code) { taosMemoryFree(ps); @@ -464,6 +469,9 @@ void destroySortOperatorInfo(void* param) { int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); + if (pInfo == NULL) { + return terrno; + } SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info; @@ -638,6 +646,10 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam)); + if (ps == NULL || param == NULL) { + T_LONG_JMP(pTaskInfo->env, terrno); + } + param->childOpInfo = pOperator->pDownstream[0]; param->grpSortOpInfo = pInfo; ps->param = param; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 2383c09dac..9bbc81e4f6 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -75,9 +75,9 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId, pTaskInfo, true, pAggSup, true); - if (pResultRow == NULL) { + if (pResultRow == NULL || pTaskInfo->code != 0) { *pResult = NULL; - return TSDB_CODE_SUCCESS; + return pTaskInfo->code; } // set time window for current result diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 5e538c1e42..752101afbd 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -235,7 +235,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte void* newRet = NULL; int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet); if (newRet == NULL) { - if (code) { + if (code != -1) { stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, tstrerror(code)); }