From 94f596c4ced8484f1b254669dec28b19a29c4a8b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 Jul 2024 18:52:01 +0800 Subject: [PATCH] refactor: remove the invalid return value. --- include/common/tmsgcb.h | 4 +- include/libs/executor/storageapi.h | 6 +- include/libs/sync/sync.h | 3 +- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 2 - source/dnode/vnode/src/tsdb/tsdbCache.c | 5 +- source/dnode/vnode/src/tsdb/tsdbRead2.c | 466 ++++++++++++++------- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 7 +- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 2 +- source/libs/executor/src/scanoperator.c | 19 +- source/libs/sync/inc/syncInt.h | 14 +- source/libs/transport/inc/transComm.h | 2 +- 12 files changed, 344 insertions(+), 188 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 03bf8da707..0c61aa5a51 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -17,13 +17,13 @@ #define _TD_COMMON_MSG_CB_H_ #include "os.h" +#include "tmsg.h" #ifdef __cplusplus extern "C" { #endif typedef struct SRpcMsg SRpcMsg; -typedef struct SEpSet SEpSet; typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SRpcHandleInfo SRpcHandleInfo; @@ -46,7 +46,7 @@ typedef int32_t (*PutToQueueFp)(void* pMgmt, EQueueType qtype, SRpcMsg* pMsg); typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg); typedef void (*SendRspFp)(SRpcMsg* pMsg); -typedef void (*RegisterBrokenLinkArgFp)(SRpcMsg* pMsg); +typedef void (*RegisterBrokenLinkArgFp)(struct SRpcMsg* pMsg); typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type); typedef void (*ReportStartup)(const char* name, const char* desc); diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 45f9f73fb1..edb6ec9cbf 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -105,7 +105,7 @@ typedef struct SMTbCursor { } SMTbCursor; typedef struct SMCtbCursor { - SMeta* pMeta; + struct SMeta* pMeta; void* pCur; tb_uid_t suid; void* pKey; @@ -134,7 +134,7 @@ typedef struct SMetaTableInfo { } SMetaTableInfo; typedef struct SSnapContext { - SMeta* pMeta; + struct SMeta* pMeta; int64_t snapVersion; void* pCur; int64_t suid; @@ -178,7 +178,7 @@ typedef struct TsdReader { int32_t (*tsdNextDataBlock)(); int32_t (*tsdReaderRetrieveBlockSMAInfo)(); - SSDataBlock *(*tsdReaderRetrieveDataBlock)(); + int32_t (*tsdReaderRetrieveDataBlock)(); void (*tsdReaderReleaseDataBlock)(); diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 0be03b82f3..1fb077e3ca 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -70,7 +70,6 @@ typedef int64_t SyncIndex; typedef int64_t SyncTerm; typedef struct SSyncNode SSyncNode; -typedef struct SWal SWal; typedef struct SSyncRaftEntry SSyncRaftEntry; typedef enum { @@ -238,7 +237,7 @@ typedef struct SSyncInfo { int32_t batchSize; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; - SWal* pWal; + struct SWal* pWal; SSyncFSM* pFsm; SMsgCb* msgcb; int32_t pingMs; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e35c152e9b..990d03f940 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -167,7 +167,7 @@ void tsdbReaderClose2(STsdbReader *pReader); int32_t tsdbNextDataBlock2(STsdbReader *pReader, bool *hasNext); int32_t tsdbRetrieveDatablockSMA2(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA); void tsdbReleaseDataBlock2(STsdbReader *pReader); -SSDataBlock *tsdbRetrieveDataBlock2(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); +int32_t tsdbRetrieveDataBlock2(STsdbReader *pReader, SSDataBlock **pBlock, SArray *pIdList); int32_t tsdbReaderReset2(STsdbReader *pReader, SQueryTableDataCond *pCond); int32_t tsdbGetFileBlocksDistInfo2(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader *pHandle); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 4a47e08730..70aa55915a 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -54,7 +54,6 @@ extern "C" { #endif typedef struct SVnodeInfo SVnodeInfo; -typedef struct SMeta SMeta; typedef struct SSma SSma; typedef struct STsdb STsdb; typedef struct STQ STQ; @@ -153,7 +152,6 @@ int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, SQueryNode* pQNode); void vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode, bool proactive); // meta -typedef struct SMCtbCursor SMCtbCursor; typedef struct SMStbCursor SMStbCursor; typedef struct STbUidStore STbUidStore; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index ccdf4a14d6..ed09ca821b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -3212,7 +3212,10 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->pMemDelData = NULL; - loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer); + code = loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } pIter->idx = (SBlockIdx){.suid = suid, .uid = uid}; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index d1702a94fe..7d1e65ba81 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -398,75 +398,85 @@ static void initReaderStatus(SReaderStatus* pStatus) { pStatus->loadFromFile = true; } -static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) { - SSDataBlock* pResBlock = createDataBlock(); - if (pResBlock == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; +static int32_t createResBlock(SQueryTableDataCond* pCond, int32_t capacity, SSDataBlock** pResBlock) { + *pResBlock = createDataBlock(); + if (*pResBlock == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; } for (int32_t i = 0; i < pCond->numOfCols; ++i) { SColumnInfoData colInfo = {0}; colInfo.info = pCond->colList[i]; - blockDataAppendColInfo(pResBlock, &colInfo); + int32_t code = blockDataAppendColInfo(*pResBlock, &colInfo); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(*pResBlock); + *pResBlock = NULL; + return code; + } } - int32_t code = blockDataEnsureCapacity(pResBlock, capacity); + int32_t code = blockDataEnsureCapacity(*pResBlock, capacity); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - taosMemoryFree(pResBlock); - return NULL; + taosMemoryFree(*pResBlock); + *pResBlock = NULL; } - return pResBlock; + + return code; } static int32_t tsdbInitReaderLock(STsdbReader* pReader) { int32_t code = taosThreadMutexInit(&pReader->readerMutex, NULL); - qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + tsdbTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); return code; } static int32_t tsdbUninitReaderLock(STsdbReader* pReader) { - int32_t code = -1; - qTrace("tsdb/read: %p, pre-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + int32_t code = TSDB_CODE_SUCCESS; + tsdbTrace("tsdb/read: %p, pre-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); code = taosThreadMutexDestroy(&pReader->readerMutex); - qTrace("tsdb/read: %p, post-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + tsdbTrace("tsdb/read: %p, post-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); return code; } static int32_t tsdbAcquireReader(STsdbReader* pReader) { int32_t code = -1; - qTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + tsdbTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); code = taosThreadMutexLock(&pReader->readerMutex); - qTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + tsdbTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); return code; } static int32_t tsdbTryAcquireReader(STsdbReader* pReader) { int32_t code = taosThreadMutexTryLock(&pReader->readerMutex); - qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); - + if (code != TSDB_CODE_SUCCESS) { + tsdbError("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + } else { + tsdbTrace("tsdb/read: %p, post-trytask read mutex: %p", pReader, &pReader->readerMutex); + } return code; } static int32_t tsdbReleaseReader(STsdbReader* pReader) { int32_t code = taosThreadMutexUnlock(&pReader->readerMutex); - qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); - + if (code != TSDB_CODE_SUCCESS) { + tsdbError("tsdb/read: %p post-untake read mutex:%p failed, code:%d", pReader, &pReader->readerMutex, code); + } else { + tsdbTrace("tsdb/read: %p, post-untake read mutex: %p", pReader, &pReader->readerMutex); + } return code; } void tsdbReleaseDataBlock2(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; if (!pStatus->composedDataBlock) { - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); } } @@ -474,11 +484,16 @@ static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacit SQueryTableDataCond* pCond, SBlockLoadSuppInfo* pSup) { pResBlockInfo->capacity = capacity; pResBlockInfo->pResBlock = pResBlock; - terrno = 0; + int32_t code = 0; if (pResBlockInfo->pResBlock == NULL) { pResBlockInfo->freeBlock = true; - pResBlockInfo->pResBlock = createResBlock(pCond, pResBlockInfo->capacity); + pResBlockInfo->pResBlock = NULL; + + code = createResBlock(pCond, pResBlockInfo->capacity, &pResBlockInfo->pResBlock); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (pSup->numOfPks > 0) { SSDataBlock* p = pResBlockInfo->pResBlock; @@ -505,7 +520,7 @@ static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacit pResBlockInfo->freeBlock = false; } - return terrno; + return code; } static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void** ppReader, int32_t capacity, @@ -576,8 +591,15 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void goto _end; } - tsdbInitReaderLock(pReader); - tsem_init(&pReader->resumeAfterSuspend, 0, 0); + code = tsdbInitReaderLock(pReader); + if (code != TSDB_CODE_SUCCESS) { + goto _end; + } + + code = tsem_init(&pReader->resumeAfterSuspend, 0, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _end; + } *ppReader = pReader; return code; @@ -648,7 +670,11 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead break; } - taosArrayPush(pIndexList, pBrinBlk); + void* p1 = taosArrayPush(pIndexList, pBrinBlk); + if (p1 == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + i += 1; } @@ -788,13 +814,17 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S pBlockNum->numOfBlocks += 1; if (taosArrayGetSize(pTableScanInfoList) == 0) { - taosArrayPush(pTableScanInfoList, &pScanInfo); + p1 = taosArrayPush(pTableScanInfoList, &pScanInfo); } else { STableBlockScanInfo** p = taosArrayGetLast(pTableScanInfoList); if ((*p)->uid != uid) { - taosArrayPush(pTableScanInfoList, &pScanInfo); + p1 = taosArrayPush(pTableScanInfoList, &pScanInfo); } } + + if (p1 == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } clearBrinBlockIter(&iter); @@ -838,6 +868,8 @@ static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDa static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, SBlockLoadSuppInfo* pSup) { + int32_t code = 0; + if (IS_VAR_DATA_TYPE(pColVal->value.type)) { if (!COL_VAL_IS_VALUE(pColVal)) { colDataSetNULL(pColInfoData, rowIndex); @@ -853,13 +885,13 @@ static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int (void)memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData); } - colDataSetVal(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false); + code = colDataSetVal(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false); } } else { - colDataSetVal(pColInfoData, rowIndex, (const char*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal)); + code = colDataSetVal(pColInfoData, rowIndex, (const char*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal)); } - return TSDB_CODE_SUCCESS; + return code; } static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo) { @@ -1420,8 +1452,7 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte int32_t step) { int32_t code = TSDB_CODE_SUCCESS; if (index < 0 || index >= pBlockIter->numOfBlocks) { - code = -1; - return code; + return TSDB_CODE_FAILED; } SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index); @@ -1813,7 +1844,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (ps == NULL) { return terrno; } - tsdbRowMergerInit(pMerger, ps); + int32_t code = tsdbRowMergerInit(pMerger, ps); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } SRowKey minKey = k; @@ -1846,7 +1880,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); + code = doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (pkCompEx(&minKey, pSttKey) == 0) { @@ -1855,7 +1892,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + code = doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (pkCompEx(&minKey, &k) == 0) { @@ -1897,7 +1937,10 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* if (ps == NULL) { return terrno; } - tsdbRowMergerInit(pMerger, ps); + code = tsdbRowMergerInit(pMerger, ps); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo); @@ -1944,7 +1987,10 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); + code = doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); code = tsdbRowMergerAdd(pMerger, pRow1, NULL); @@ -1953,7 +1999,10 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* } // pSttKey will be changed when sttBlockReader iterates to the next row, so use pKey instead. - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + code = doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -2019,7 +2068,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (ps == NULL) { return terrno; } - tsdbRowMergerInit(pMerger, ps); + + code = tsdbRowMergerInit(pMerger, ps); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } SRowKey minKey = k; @@ -2059,7 +2112,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); + code = doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (pkCompEx(&minKey, pSttKey) == 0) { @@ -2069,7 +2125,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + code = doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (pkCompEx(&minKey, &ik) == 0) { @@ -2160,10 +2219,9 @@ static void doForwardDataIter(SRowKey* pKey, SIterInfo* pIter, STableBlockScanIn } // handle the open interval issue. Find the first row key that is greater than the given one. -static int32_t forwardDataIter(SRowKey* pKey, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { +static void forwardDataIter(SRowKey* pKey, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { doForwardDataIter(pKey, &pBlockScanInfo->iter, pBlockScanInfo, pReader); doForwardDataIter(pKey, &pBlockScanInfo->iiter, pBlockScanInfo, pReader); - return TSDB_CODE_SUCCESS; } static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { @@ -2196,7 +2254,10 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea return code; } - loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer); + code = loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (forward) { forwardDataIter(&startKey.key, pBlockScanInfo, pReader); @@ -2309,8 +2370,15 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan return false; } - initMemDataIterator(pScanInfo, pReader); - initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); + code = initMemDataIterator(pScanInfo, pReader); + if (code != TSDB_CODE_SUCCESS) { + return false; + } + + code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (conf.rspRows) { pScanInfo->cleanSttBlocks = isCleanSttBlock(info.pKeyRangeList, &pReader->info.window, pScanInfo, order); @@ -2383,7 +2451,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc if (ps == NULL) { return terrno; } - tsdbRowMergerInit(pMerger, ps); + + code = tsdbRowMergerInit(pMerger, ps); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey); @@ -2399,7 +2471,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); + code = doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2446,8 +2522,16 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn } TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - tsdbRowMergerAdd(pMerger, pRow1, NULL); - doMergeRowsInSttBlock(pSttBlockReader, pScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + code = tsdbRowMergerAdd(pMerger, pRow1, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = doMergeRowsInSttBlock(pSttBlockReader, pScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2527,7 +2611,10 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock // 2. remove it from the scan block list int32_t neighborIndex = tableDataBlockIdx->globalIndex; - setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step); + code = setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step); + if (code != TSDB_CODE_SUCCESS) { + return code; + } // 3. load the neighbor block, and set it to be the currently accessed file data block code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid); @@ -2599,7 +2686,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } SBlockData* pBlockData = &pReader->status.fileBlockData; - initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader); + (void) initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader); while (1) { bool hasBlockData = false; @@ -2696,7 +2783,10 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde if (pSource == NULL) { pSource = pBlockScanInfo->pMemDelData; } else { - taosArrayAddAll(pSource, pBlockScanInfo->pMemDelData); + void* p1 = taosArrayAddAll(pSource, pBlockScanInfo->pMemDelData); + if (p1 == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } code = tsdbBuildDeleteSkyline(pSource, 0, taosArrayGetSize(pSource) - 1, pBlockScanInfo->delSkyline); @@ -3094,7 +3184,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) { - initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + (void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader); } TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); @@ -3148,7 +3238,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { int64_t st = taosGetTimestampUs(); // let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files - initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + (void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader); // no data in stt block, no need to proceed. while (hasDataInSttBlock(pScanInfo)) { @@ -3209,10 +3299,17 @@ static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_ continue; } - initMemDataIterator(*pBlockScanInfo, pReader); - initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); + int32_t code = initMemDataIterator(*pBlockScanInfo, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); + code = initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3251,10 +3348,17 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en continue; } - initMemDataIterator(*pBlockScanInfo, pReader); - initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); + int32_t code = initMemDataIterator(*pBlockScanInfo, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); + code = initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3693,6 +3797,7 @@ FORCE_INLINE TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, S int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArray* pDelList, STsdbReader* pReader) { SRowMerger* pMerger = &pReader->status.merger; + int32_t code = 0; while (1) { pIter->hasVal = tsdbTbDataIterNext(pIter->iter); @@ -3727,10 +3832,13 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArra } } - tsdbRowMergerAdd(pMerger, pRow, pTSchema); + code = tsdbRowMergerAdd(pMerger, pRow, pTSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } - return TSDB_CODE_SUCCESS; + return code; } static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, SRowKey* pKey, SRowMerger* pMerger, @@ -3748,7 +3856,7 @@ static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowInd } TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex); - tsdbRowMergerAdd(pMerger, &fRow, NULL); + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL); rowIndex += step; } @@ -3790,6 +3898,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc bool asc = ASCENDING_TRAVERSE(pReader->info.order); int32_t step = asc ? 1 : -1; SVersionRange* pRange = &pReader->info.verRange; + int32_t code = 0; pDumpInfo->rowIndex += step; if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) { @@ -3802,7 +3911,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc CHECK_FILEBLOCK_STATE st; SFileDataBlockInfo* pFileBlockInfo = NULL; - int32_t code = getCurrentBlockInfo(&pReader->status.blockIter, &pFileBlockInfo); + code = getCurrentBlockInfo(&pReader->status.blockIter, &pFileBlockInfo); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3812,19 +3921,20 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc break; } - checkForNeighborFileBlock(pReader, pScanInfo, pFileBlockInfo, pMerger, pKey, &st); - if (st == CHECK_FILEBLOCK_QUIT) { + code = checkForNeighborFileBlock(pReader, pScanInfo, pFileBlockInfo, pMerger, pKey, &st); + if (st == CHECK_FILEBLOCK_QUIT || code != TSDB_CODE_SUCCESS) { break; } } } - return TSDB_CODE_SUCCESS; + return code; } int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowMerger* pMerger, int32_t pkSrcSlot, SVersionRange* pVerRange, const char* idStr) { SRowKey* pRowKey = &pScanInfo->lastProcKey; + int32_t code = TSDB_CODE_SUCCESS; while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange)) { SRowKey* pNextKey = getCurrentKeyInSttBlock(pSttBlockReader); @@ -3832,7 +3942,10 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI int32_t ret = pkCompEx(pRowKey, pNextKey); if (ret == 0) { TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - tsdbRowMergerAdd(pMerger, pRow1, NULL); + code = tsdbRowMergerAdd(pMerger, pRow1, NULL); + if (code != TSDB_CODE_SUCCESS) { + break; + } } else { tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid, pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), @@ -3841,7 +3954,7 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI } } - return TSDB_CODE_SUCCESS; + return code; } int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList, @@ -4100,7 +4213,11 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT if (colId == pSchema->columns[j].colId) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]); - tRowGet(pTSRow, pSchema, j, &colVal); + code = tRowGet(pTSRow, pSchema, j, &colVal); + if (code) { + return code; + } + code = doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo); if (code) { return code; @@ -4185,7 +4302,11 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e do { TSDBROW row = {.type = -1}; bool freeTSRow = false; - tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow); + code = tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (row.type == -1) { break; } @@ -4278,20 +4399,6 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t return TDB_CODE_SUCCESS; } -void* tsdbGetIdx2(SMeta* pMeta) { - if (pMeta == NULL) { - return NULL; - } - return metaGetIdx(pMeta); -} - -void* tsdbGetIvtIdx2(SMeta* pMeta) { - if (pMeta == NULL) { - return NULL; - } - return metaGetIvtIdx(pMeta); -} - uint64_t tsdbGetReaderMaxVersion2(STsdbReader* pReader) { return pReader->info.verRange.maxVer; } static int32_t doOpenReaderImpl(STsdbReader* pReader) { @@ -4304,7 +4411,11 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { pReader->status.bProcMemFirstFileset = true; } - initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); + code = initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + code = resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); if (code != TSDB_CODE_SUCCESS) { return code; @@ -4336,7 +4447,7 @@ static void clearSharedPtr(STsdbReader* p) { p->pSchemaMap = NULL; } -static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { +static int32_t setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { pDst->status.pTableMap = pSrc->status.pTableMap; pDst->status.uidList = pSrc->status.uidList; pDst->info.pSchema = pSrc->info.pSchema; @@ -4345,8 +4456,10 @@ static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { pDst->pReadSnap->pfSetArray = pSrc->pReadSnap->pfSetArray; if (pDst->info.pSchema) { - tsdbRowMergerInit(&pDst->status.merger, pDst->info.pSchema); + return tsdbRowMergerInit(&pDst->status.merger, pDst->info.pSchema); } + + return TSDB_CODE_SUCCESS; } // ====================================== EXPOSED APIs ====================================== @@ -4427,7 +4540,10 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi } if (pReader->info.pSchema != NULL) { - tsdbRowMergerInit(&pReader->status.merger, pReader->info.pSchema); + code = tsdbRowMergerInit(&pReader->status.merger, pReader->info.pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash); @@ -4482,7 +4598,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { return; } - tsdbAcquireReader(pReader); + (void)tsdbAcquireReader(pReader); { if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) { @@ -4521,7 +4637,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { clearBlockScanInfoBuf(&pReader->blockInfoBuf); if (pReader->pFileReader != NULL) { - tsdbDataFileReaderClose(&pReader->pFileReader); + (void) tsdbDataFileReaderClose(&pReader->pFileReader); } SReadCostSummary* pCost = &pReader->cost; @@ -4537,15 +4653,15 @@ void tsdbReaderClose2(STsdbReader* pReader) { destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); taosMemoryFreeClear(pReader->status.uidList.tableUidList); - qTrace("tsdb/reader-close: %p, untake snapshot", pReader); + tsdbTrace("tsdb/reader-close: %p, untake snapshot", pReader); void* p = pReader->pReadSnap; if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) { tsdbUntakeReadSnap2(pReader, p, true); } - tsem_destroy(&pReader->resumeAfterSuspend); - tsdbReleaseReader(pReader); - tsdbUninitReaderLock(pReader); + (void) tsem_destroy(&pReader->resumeAfterSuspend); + (void) tsdbReleaseReader(pReader); + (void) tsdbUninitReaderLock(pReader); tsdbDebug( "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 @@ -4574,11 +4690,14 @@ static int32_t doSuspendCurrentReader(STsdbReader* pCurrentReader) { SReaderStatus* pStatus = &pCurrentReader->status; if (pStatus->loadFromFile) { - tsdbDataFileReaderClose(&pCurrentReader->pFileReader); + (void) tsdbDataFileReaderClose(&pCurrentReader->pFileReader); SReadCostSummary* pCost = &pCurrentReader->cost; destroySttBlockReader(pStatus->pLDataIterArray, &pCost->sttCost); pStatus->pLDataIterArray = taosArrayInit(4, POINTER_BYTES); + if (pStatus->pLDataIterArray == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } // resetDataBlockScanInfo excluding lastKey @@ -4606,14 +4725,14 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pReader->step == EXTERNAL_ROWS_PREV) { - doSuspendCurrentReader(pReader->innerReader[0]); + code = doSuspendCurrentReader(pReader->innerReader[0]); } else if (pReader->step == EXTERNAL_ROWS_MAIN) { - doSuspendCurrentReader(pReader); + code = doSuspendCurrentReader(pReader); } else { - doSuspendCurrentReader(pReader->innerReader[1]); + code = doSuspendCurrentReader(pReader->innerReader[1]); } } else { - doSuspendCurrentReader(pReader); + code = doSuspendCurrentReader(pReader); } // make sure only release once @@ -4648,13 +4767,12 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) { code = tsdbTryAcquireReader(pReader); if (code == 0) { if (pReader->flag == READER_STATUS_SUSPEND) { - tsdbReleaseReader(pReader); + code = tsdbReleaseReader(pReader); return code; } - tsdbReaderSuspend2(pReader); - tsdbReleaseReader(pReader); - + code = tsdbReaderSuspend2(pReader); + (void) tsdbReleaseReader(pReader); return code; } else if (code == EBUSY) { return TSDB_CODE_VND_QUERY_BUSY; @@ -4671,7 +4789,7 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) { // restore reader's state, task snapshot int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); if (numOfTables > 0) { - qTrace("tsdb/reader: %p, take snapshot", pReader); + tsdbTrace("tsdb/reader: %p, take snapshot", pReader); code = tsdbTakeReadSnap2(pReader, tsdbSetQueryReseek, &pReader->pReadSnap); if (code != TSDB_CODE_SUCCESS) { goto _err; @@ -4692,10 +4810,16 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) { // we need only one row pPrevReader->resBlockInfo.capacity = 1; - setSharedPtr(pPrevReader, pReader); + code = setSharedPtr(pPrevReader, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pNextReader->resBlockInfo.capacity = 1; - setSharedPtr(pNextReader, pReader); + code = setSharedPtr(pNextReader, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (pReader->step == 0 || pReader->step == EXTERNAL_ROWS_PREV) { code = doOpenReaderImpl(pPrevReader); @@ -4864,12 +4988,17 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { #endif code = tsdbAcquireReader(pReader); - qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + tsdbTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code); if (pReader->flag == READER_STATUS_SUSPEND) { code = tsdbReaderResume2(pReader); if (code != TSDB_CODE_SUCCESS) { - tsdbReleaseReader(pReader); + // release reader failure should be suppressed here, to avoid over-write the original error code + (void) tsdbReleaseReader(pReader); return code; } } @@ -4877,7 +5006,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { if (pReader->innerReader[0] != NULL && pReader->step == 0) { code = doTsdbNextDataBlock2(pReader->innerReader[0], hasNext); if (code) { - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } @@ -4885,8 +5014,8 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { if (*hasNext) { pStatus = &pReader->innerReader[0]->status; if (pStatus->composedDataBlock) { - qTrace("tsdb/read: %p, unlock read mutex", pReader); - tsdbReleaseReader(pReader); + tsdbTrace("tsdb/read: %p, unlock read mutex", pReader); + code = tsdbReleaseReader(pReader); } return code; @@ -4908,14 +5037,14 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { code = doTsdbNextDataBlock2(pReader, hasNext); if (code != TSDB_CODE_SUCCESS) { - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } if (*hasNext) { if (pStatus->composedDataBlock) { - qTrace("tsdb/read: %p, unlock read mutex", pReader); - tsdbReleaseReader(pReader); + tsdbTrace("tsdb/read: %p, unlock read mutex", pReader); + code = tsdbReleaseReader(pReader); } return code; } @@ -4931,7 +5060,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { code = doTsdbNextDataBlock2(pReader->innerReader[1], hasNext); if (code != TSDB_CODE_SUCCESS) { - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } @@ -4939,17 +5068,16 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { if (*hasNext) { pStatus = &pReader->innerReader[1]->status; if (pStatus->composedDataBlock) { - qTrace("tsdb/read: %p, unlock read mutex", pReader); - tsdbReleaseReader(pReader); + tsdbTrace("tsdb/read: %p, unlock read mutex", pReader); + code = tsdbReleaseReader(pReader); } return code; } } - qTrace("tsdb/read: %p, unlock read mutex", pReader); - tsdbReleaseReader(pReader); - + tsdbTrace("tsdb/read: %p, unlock read mutex", pReader); + code = tsdbReleaseReader(pReader); return code; } @@ -5136,7 +5264,9 @@ static int32_t doRetrieveDataBlock(STsdbReader* pReader, SSDataBlock** pBlock) { return code; } -SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) { +int32_t tsdbRetrieveDataBlock2(STsdbReader* pReader, SSDataBlock** pBlock, SArray* pIdList) { + *pBlock = NULL; + STsdbReader* pTReader = pReader; if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pReader->step == EXTERNAL_ROWS_PREV) { @@ -5151,39 +5281,39 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) { // tsdbReaderSuspend2(pReader); // tsdbReaderResume2(pReader); - return pTReader->resBlockInfo.pResBlock; + *pBlock = pTReader->resBlockInfo.pResBlock; } - SSDataBlock* ret = NULL; - doRetrieveDataBlock(pTReader, &ret); + int32_t code = doRetrieveDataBlock(pTReader, pBlock); - qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader); - tsdbReleaseReader(pReader); + tsdbTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader); + (void) tsdbReleaseReader(pReader); // tsdbReaderSuspend2(pReader); // tsdbReaderResume2(pReader); - - return ret; + return code; } int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { int32_t code = TSDB_CODE_SUCCESS; - qTrace("tsdb/reader-reset: %p, take read mutex", pReader); - tsdbAcquireReader(pReader); + tsdbTrace("tsdb/reader-reset: %p, take read mutex", pReader); + code = tsdbAcquireReader(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (pReader->flag == READER_STATUS_SUSPEND) { code = tsdbReaderResume2(pReader); if (code != TSDB_CODE_SUCCESS) { - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } } if (isEmptyQueryTimeWindow(&pReader->info.window) || pReader->pReadSnap == NULL) { tsdbDebug("tsdb reader reset return %p, %s", pReader->pReadSnap, pReader->idStr); - tsdbReleaseReader(pReader); - return TSDB_CODE_SUCCESS; + return tsdbReleaseReader(pReader); } SReaderStatus* pStatus = &pReader->status; @@ -5199,14 +5329,18 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg)); pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; - tsdbDataFileReaderClose(&pReader->pFileReader); + (void) tsdbDataFileReaderClose(&pReader->pFileReader); int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap); - initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); + code = initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); if (code != TSDB_CODE_SUCCESS) { - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } @@ -5232,7 +5366,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader, numOfTables, pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr); - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } } @@ -5242,8 +5376,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { pReader, pReader->info.suid, numOfTables, pCond->twindows.skey, pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr); - tsdbReleaseReader(pReader); - + code = tsdbReleaseReader(pReader); return code; } @@ -5267,12 +5400,15 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT pTableBlockInfo->numOfVgroups = 1; // find the start data block in file - tsdbAcquireReader(pReader); + code = tsdbAcquireReader(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (pReader->flag == READER_STATUS_SUSPEND) { code = tsdbReaderResume2(pReader); if (code != TSDB_CODE_SUCCESS) { - tsdbReleaseReader(pReader); - return code; + return tsdbReleaseReader(pReader); } } @@ -5354,7 +5490,7 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT } // record the data in stt files - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } @@ -5411,11 +5547,15 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) { int64_t rows = 0; SReaderStatus* pStatus = &pReader->status; - tsdbAcquireReader(pReader); + code = tsdbAcquireReader(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (pReader->flag == READER_STATUS_SUSPEND) { code = tsdbReaderResume2(pReader); if (code != TSDB_CODE_SUCCESS) { - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } } @@ -5446,7 +5586,7 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) { pStatus->pTableIter = tSimpleHashIterate(pStatus->pTableMap, pStatus->pTableIter, &iter); } - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return rows; } @@ -5493,12 +5633,16 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs SVersionRange* pRange = &pReader->info.verRange; // lock - taosThreadMutexLock(&pTsdb->mutex); + code = taosThreadMutexLock(&pTsdb->mutex); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("failed to lock tsdb, code:%d", tstrerror(code)); + return code; + } // alloc STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); if (pSnap == NULL) { - taosThreadMutexUnlock(&pTsdb->mutex); + (void) taosThreadMutexUnlock(&pTsdb->mutex); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -5508,7 +5652,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pMem = pTsdb->mem; pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode)); if (pSnap->pNode == NULL) { - taosThreadMutexUnlock(&pTsdb->mutex); + (void) taosThreadMutexUnlock(&pTsdb->mutex); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -5516,14 +5660,14 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pNode->pQHandle = pReader; pSnap->pNode->reseek = reseek; - tsdbRefMemTable(pTsdb->mem, pSnap->pNode); + (void)tsdbRefMemTable(pTsdb->mem, pSnap->pNode); } if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { pSnap->pIMem = pTsdb->imem; pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode)); if (pSnap->pINode == NULL) { - taosThreadMutexUnlock(&pTsdb->mutex); + (void) taosThreadMutexUnlock(&pTsdb->mutex); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -5531,14 +5675,14 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pINode->pQHandle = pReader; pSnap->pINode->reseek = reseek; - tsdbRefMemTable(pTsdb->imem, pSnap->pINode); + (void)tsdbRefMemTable(pTsdb->imem, pSnap->pINode); } // fs code = tsdbFSCreateRefSnapshotWithoutLock(pTsdb->pFS, &pSnap->pfSetArray); // unlock - taosThreadMutexUnlock(&pTsdb->mutex); + (void) taosThreadMutexUnlock(&pTsdb->mutex); if (code == TSDB_CODE_SUCCESS) { tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); @@ -5566,17 +5710,17 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact if (pSnap) { if (pSnap->pMem) { - tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, proactive); + (void) tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, proactive); } if (pSnap->pIMem) { - tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive); + (void) tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive); } if (pSnap->pNode) taosMemoryFree(pSnap->pNode); if (pSnap->pINode) taosMemoryFree(pSnap->pINode); - tsdbFSDestroyRefSnapshot(&pSnap->pfSetArray); + (void) tsdbFSDestroyRefSnapshot(&pSnap->pfSetArray); taosMemoryFree(pSnap); } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 4bca7f73f8..1d330d6deb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -842,9 +842,12 @@ int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileRead return doLoadTombDataFromTombBlk(pBlkArray, pReader, pSttFileReader, false); } -void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) { +int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) { if (*ppMemDelData == NULL) { *ppMemDelData = taosArrayInit(4, sizeof(SDelData)); + if (*ppMemDelData == NULL) { + return TSDB_CODE_SUCCESS; + } } SArray* pMemDelData = *ppMemDelData; @@ -872,6 +875,8 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemT p = p->pNext; } } + + return TSDB_CODE_SUCCESS; } int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index ba75cb77a8..45a2384b65 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -339,7 +339,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr); // load tomb data API (stt/mem only for one table each, tomb data from data files are load for all tables at one time) -void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver); +int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver); int32_t loadDataFileTombDataForAll(STsdbReader* pReader); int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo); int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ce4915ca4d..f6c5118c88 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -388,9 +388,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca pCost->totalCheckedRows += pBlock->info.rows; pCost->loadBlocks += 1; - SSDataBlock* p = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, NULL); - if (p == NULL) { - return terrno; + SSDataBlock* p = NULL; + int32_t code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, &p, NULL); + if (p == NULL || code != TSDB_CODE_SUCCESS) { + return code; } ASSERT(p == pBlock); @@ -1351,7 +1352,12 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU } if (hasNext) { - /*SSDataBlock* p = */ pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, NULL); + SSDataBlock* p = NULL; + code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, &p, NULL); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); } @@ -2929,8 +2935,9 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } - SSDataBlock* pBlock = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pInfo->dataReader, NULL); - if (pBlock == NULL) { + SSDataBlock* pBlock = NULL; + code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pInfo->dataReader, &pBlock, NULL); + if (pBlock == NULL || code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, terrno); } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 3d8588a55a..009854b45b 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -74,16 +74,16 @@ typedef struct SRaftStore { TdThreadMutex mutex; } SRaftStore; -typedef struct SSyncHbTimerData { +struct SSyncHbTimerData { int64_t syncNodeRid; SSyncTimer* pTimer; SRaftId destId; uint64_t logicClock; int64_t execTime; int64_t rid; -} SSyncHbTimerData; +}; -typedef struct SSyncTimer { +struct SSyncTimer { void* pTimer; TAOS_TMR_CALLBACK timerCb; uint64_t logicClock; @@ -92,7 +92,7 @@ typedef struct SSyncTimer { int64_t timeStamp; SRaftId destId; int64_t hbDataRid; -} SSyncTimer; +}; typedef struct SElectTimerParam { uint64_t logicClock; @@ -106,7 +106,7 @@ typedef struct SPeerState { int64_t lastSendTime; } SPeerState; -typedef struct SSyncNode { +struct SSyncNode { // init by SSyncInfo SyncGroupId vgId; SRaftCfg raftCfg; @@ -116,7 +116,7 @@ typedef struct SSyncNode { // sync io SSyncLogBuffer* pLogBuf; - SWal* pWal; + struct SWal* pWal; const SMsgCb* msgcb; int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); @@ -234,7 +234,7 @@ typedef struct SSyncNode { bool isStart; -} SSyncNode; +}; // open/close -------------- SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index e62ca7d69a..acb9bd20f3 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -103,7 +103,7 @@ typedef void* queue[2]; #define TRANS_MAGIC_NUM 0x5f375a86 #define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) -typedef SRpcMsg STransMsg; +typedef struct SRpcMsg STransMsg; typedef SRpcCtx STransCtx; typedef SRpcCtxVal STransCtxVal; typedef SRpcInfo STrans;