From 79ad7585eff0bce9c4d0ca1f7c78358703734ccb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 29 Jul 2024 10:29:40 +0800 Subject: [PATCH] fix(stream): check return value. --- include/libs/executor/executor.h | 2 +- source/dnode/vnode/src/tq/tqScan.c | 40 +++++++++++++------- source/dnode/vnode/src/tsdb/tsdbRead2.c | 20 +++++----- source/libs/executor/inc/executorInt.h | 2 +- source/libs/executor/src/executor.c | 11 +++--- source/libs/executor/src/executorInt.c | 12 ++++-- source/libs/executor/src/mergejoinoperator.c | 10 +---- 7 files changed, 54 insertions(+), 43 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 324804aa7c..09143dde29 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -201,7 +201,7 @@ void qStreamSetOpen(qTaskInfo_t tinfo); void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded); -void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); +int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 5df8c97962..d072d7199c 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -115,14 +115,15 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* if (pDataBlock == NULL) { break; } + STqOffsetVal offset = {0}; - qStreamExtractOffset(task, &offset); + code = qStreamExtractOffset(task, &offset); + TSDB_CHECK_CODE(code, line, END); + pHandle->block = NULL; code = createOneDataBlock(pDataBlock, true, &pHandle->block); - if (code) { - return code; - } + TSDB_CHECK_CODE(code, line, END); pHandle->blockTime = offset.ts; tOffsetDestroy(&offset); @@ -140,8 +141,11 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* } else { code = copyDataBlock(pHandle->block, pDataBlock); TSDB_CHECK_CODE(code, line, END); + STqOffsetVal offset = {0}; - qStreamExtractOffset(task, &offset); + code = qStreamExtractOffset(task, &offset); + TSDB_CHECK_CODE(code, line, END); + pRsp->sleepTime = offset.ts - pHandle->blockTime; pHandle->blockTime = offset.ts; tOffsetDestroy(&offset); @@ -164,10 +168,11 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d", pHandle->consumerId, vgId, pRsp->common.blockNum, totalRows); - qStreamExtractOffset(task, &pRsp->common.rspOffset); + code = qStreamExtractOffset(task, &pRsp->common.rspOffset); END: - if ( code!= 0){ - tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line, code); + if (code != 0) { + tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line, + code); } return code; } @@ -241,31 +246,40 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatc // get meta SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task); if (taosArrayGetSize(tmp->batchMetaReq) > 0) { - qStreamExtractOffset(task, &tmp->rspOffset); + code = qStreamExtractOffset(task, &tmp->rspOffset); + if (code) { + return code; + } + *pBatchMetaRsp = *tmp; tqDebug("tmqsnap task get meta"); break; } if (pDataBlock == NULL) { - qStreamExtractOffset(task, pOffset); + code = qStreamExtractOffset(task, pOffset); + if (code) { + break; + } + if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { continue; } + tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1); - qStreamExtractOffset(task, &pRsp->common.rspOffset); + code = qStreamExtractOffset(task, &pRsp->common.rspOffset); break; } if (pRsp->common.blockNum > 0) { tqDebug("tmqsnap task exec exited, get data"); - qStreamExtractOffset(task, &pRsp->common.rspOffset); + code = qStreamExtractOffset(task, &pRsp->common.rspOffset); break; } } - return 0; + return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index a3210dbfd9..4fedb68a78 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -48,7 +48,7 @@ typedef struct { static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo); static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); -static int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes); +static void getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey, STsdbReader* pReader); static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, @@ -3866,11 +3866,11 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t return false; } -FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes) { +FORCE_INLINE void getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes) { *pRes = NULL; if (!pIter->hasVal) { - return TSDB_CODE_SUCCESS; + return; } int32_t order = pReader->info.order; @@ -3880,20 +3880,20 @@ FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, ST TSDBROW_INIT_KEY(pRow, key); if (outOfTimeWindow(key.ts, &pReader->info.window)) { pIter->hasVal = false; - return TSDB_CODE_SUCCESS; + return; } // it is a valid data version if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) { if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) { *pRes = pRow; - return TSDB_CODE_SUCCESS; + return; } else { bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0); if (!dropped) { *pRes = pRow; - return TSDB_CODE_SUCCESS; + return; } } } @@ -3901,7 +3901,7 @@ FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, ST while (1) { pIter->hasVal = tsdbTbDataIterNext(pIter->iter); if (!pIter->hasVal) { - return TSDB_CODE_SUCCESS; + return; } pRow = tsdbTbDataIterGet(pIter->iter); @@ -3909,19 +3909,19 @@ FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, ST TSDBROW_INIT_KEY(pRow, key); if (outOfTimeWindow(key.ts, &pReader->info.window)) { pIter->hasVal = false; - return TSDB_CODE_SUCCESS; + return; } if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) { if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) { *pRes = pRow; - return TSDB_CODE_SUCCESS; + return; } else { bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0); if (!dropped) { *pRes = pRow; - return TSDB_CODE_SUCCESS; + return; } } } diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 03dd0c4581..668d40dd0b 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -1002,7 +1002,7 @@ int32_t doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, S int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, TSKEY* primaryKeys, int32_t prevPosition, int32_t order); -void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status); +int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index dc910888ad..bd82d8114d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -295,7 +295,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 return NULL; } - createRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot); + code = createRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot); if (NULL == pTaskInfo->pRoot || code != 0) { taosMemoryFree(pTaskInfo); return NULL; @@ -1158,9 +1158,9 @@ SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { return &pTaskInfo->streamInfo.btMetaRsp; } -void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { +int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset); + return tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset); } int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) { @@ -1231,7 +1231,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } if (subType == TOPIC_SUB_TYPE__COLUMN) { - extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator); + code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator); if (pOperator == NULL || code != 0) { return code; } @@ -1431,8 +1431,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } end: - tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset); - + (void) tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset); return 0; } diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index d56b288129..2f87cd7fc1 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -567,7 +567,8 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); QUERY_CHECK_CODE(code, lino, _err); - extractQualifiedTupleByFilterResult(pBlock, p, status); + code = extractQualifiedTupleByFilterResult(pBlock, p, status); + QUERY_CHECK_CODE(code, lino, _err); if (pColMatchInfo != NULL) { size_t size = taosArrayGetSize(pColMatchInfo->pList); @@ -591,18 +592,21 @@ _err: return code; } -void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) { +int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) { + int32_t code = 0; int8_t* pIndicator = (int8_t*)p->pData; if (status == FILTER_RESULT_ALL_QUALIFIED) { // here nothing needs to be done } else if (status == FILTER_RESULT_NONE_QUALIFIED) { - trimDataBlock(pBlock, pBlock->info.rows, NULL); + code = trimDataBlock(pBlock, pBlock->info.rows, NULL); pBlock->info.rows = 0; } else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) { - trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator); + code = trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator); } else { qError("unknown filter result type: %d", status); } + + return code; } void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) { diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 542f161a80..2c485cdd1b 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -308,12 +308,9 @@ int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo } } - extractQualifiedTupleByFilterResult(pBlock, p, status); - - code = TSDB_CODE_SUCCESS; + code = extractQualifiedTupleByFilterResult(pBlock, p, status); _err: - colDataDestroy(p); taosMemoryFree(p); @@ -375,12 +372,9 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM } } - extractQualifiedTupleByFilterResult(pBlock, p, status); - - code = TSDB_CODE_SUCCESS; + code = extractQualifiedTupleByFilterResult(pBlock, p, status); _return: - colDataDestroy(p); taosMemoryFree(p);