From 06f42443cb9f7eec382d6939e5b4e8d2f1bbce1e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 8 Sep 2024 22:49:05 +0800 Subject: [PATCH] refactor: remove void. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 25 ++++--- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 23 +++++-- source/dnode/vnode/src/tsdb/tsdbRead2.c | 37 ++++++---- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 75 ++++++++++++++------- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 2 +- 5 files changed, 109 insertions(+), 53 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 68f43d637b..c00d9a93bb 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -207,7 +207,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList); // send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode - (void)streamTaskSendCheckpointsourceRsp(pTask); + code = streamTaskSendCheckpointsourceRsp(pTask); + if (code) { + tqError("%s failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); + } streamTaskResetStatus(pTask); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); @@ -806,25 +809,26 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t type = pReq->reqType; int32_t vgId = pMeta->vgId; + int32_t code = 0; if (type == STREAM_EXEC_T_START_ONE_TASK) { - (void)streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId); + code = streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId); return 0; } else if (type == STREAM_EXEC_T_START_ALL_TASKS) { - (void)streamMetaStartAllTasks(pMeta); + code = streamMetaStartAllTasks(pMeta); return 0; } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) { - (void)restartStreamTasks(pMeta, isLeader); + code = restartStreamTasks(pMeta, isLeader); return 0; } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) { - (void)streamMetaStopAllTasks(pMeta); + code = streamMetaStopAllTasks(pMeta); return 0; } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { - int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId); + code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId); return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); + code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); if (pTask != NULL && (code == 0)) { char* pStatus = NULL; @@ -846,7 +850,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); + code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); if ((pTask != NULL) && (code == 0)) { // even in halt status, the data in inputQ must be processed char* p = NULL; if (streamTaskReadyToRun(pTask, &p)) { @@ -864,7 +868,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. // todo add one function to handle this tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, pReq->taskId); - return -1; + return code; } } @@ -1229,7 +1233,8 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqError( "vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, req.taskId); - (void)streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); + // ignore this code to avoid error code over write + int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index fc2b873054..d4772bac35 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -379,6 +379,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl int32_t lino = 0; void *px = NULL; int32_t startIndex = 0; + double el = 0; int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray); if (numOfBlocks <= 0) { @@ -489,7 +490,9 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl } else { STbStatisRecord record = {0}; while (i < rows) { - (void)tStatisBlockGet(&block, i, &record); + code = tStatisBlockGet(&block, i, &record); + TSDB_CHECK_CODE(code, lino, _end); + if (record.suid != suid) { break; } @@ -536,12 +539,16 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl } _end: - (void)tStatisBlockDestroy(&block); - - double el = (taosGetTimestampUs() - st) / 1000.0; + el = (taosGetTimestampUs() - st) / 1000.0; pBlockLoadInfo->cost.statisElapsedTime += el; - tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el); + if (code != 0) { + tsdbError("%s failed to load block data statistics, %s at line:%d, code:%s", id, __func__, lino, tstrerror(code)); + } else { + tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el); + } + + int32_t ret = tStatisBlockDestroy(&block); return code; } @@ -677,7 +684,11 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32 } void tLDataIterClose2(SLDataIter *pIter) { - (void)tsdbSttFileReaderClose(&pIter->pReader); // always return 0 + int32_t code = tsdbSttFileReaderClose(&pIter->pReader); // always return 0 + if (code != 0) { + tsdbError("%" PRId64 " failed to close tsdb file reader, code:%s", pIter->cid, tstrerror(code)); + } + pIter->pReader = NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 0c61366513..035d13e5a2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -449,10 +449,14 @@ static int32_t tsdbUninitReaderLock(STsdbReader* pReader) { static int32_t tsdbAcquireReader(STsdbReader* pReader) { int32_t code = -1; - tsdbTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + tsdbTrace("tsdb/read: %s, pre-take read mutex: %p, code: %d", pReader->idStr, &pReader->readerMutex, code); code = taosThreadMutexLock(&pReader->readerMutex); - tsdbTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + if (code != 0) { + tsdbError("tsdb/read:%p, failed to lock reader mutex, code:%s", pReader->idStr, tstrerror(code)); + } else { + tsdbTrace("tsdb/read: %s, post-take read mutex: %p, code: %d", pReader->idStr, &pReader->readerMutex, code); + } return code; } @@ -4574,7 +4578,10 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t STableBlockScanInfo** p = NULL; int32_t iter = 0; - (void)tsdbAcquireReader(pReader); + code = tsdbAcquireReader(pReader); + if (code) { + return code; + } while ((p = tSimpleHashIterate(pReader->status.pTableMap, p, &iter)) != NULL) { clearBlockScanInfo(*p); @@ -4805,7 +4812,10 @@ void tsdbReaderClose2(STsdbReader* pReader) { return; } - (void)tsdbAcquireReader(pReader); + int32_t code = tsdbAcquireReader(pReader); + if (code) { + return; + } { if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) { @@ -5853,6 +5863,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs int32_t code = 0; STsdb* pTsdb = pReader->pTsdb; SVersionRange* pRange = &pReader->info.verRange; + int32_t lino = 0; *ppSnap = NULL; // lock @@ -5866,8 +5877,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); if (pSnap == NULL) { (void) taosThreadMutexUnlock(&pTsdb->mutex); - code = terrno; - goto _exit; + TSDB_CHECK_NULL(pSnap, code, lino, _exit, terrno); } // take snapshot @@ -5876,14 +5886,14 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode)); if (pSnap->pNode == NULL) { (void) taosThreadMutexUnlock(&pTsdb->mutex); - code = terrno; - goto _exit; + TSDB_CHECK_NULL(pSnap->pNode, code, lino, _exit, terrno); } pSnap->pNode->pQHandle = pReader; pSnap->pNode->reseek = reseek; - (void)tsdbRefMemTable(pTsdb->mem, pSnap->pNode); + code = tsdbRefMemTable(pTsdb->mem, pSnap->pNode); + TSDB_CHECK_CODE(code, lino, _exit); } if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { @@ -5903,7 +5913,8 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pINode->pQHandle = pReader; pSnap->pINode->reseek = reseek; - (void)tsdbRefMemTable(pTsdb->imem, pSnap->pINode); + code = tsdbRefMemTable(pTsdb->imem, pSnap->pINode); + TSDB_CHECK_CODE(code, lino, _exit); } // fs @@ -5918,8 +5929,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs } (void) taosThreadMutexUnlock(&pTsdb->mutex); - goto _exit; - } + TSDB_CHECK_CODE(code, lino, _exit);} // unlock (void) taosThreadMutexUnlock(&pTsdb->mutex); @@ -5929,7 +5939,8 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs return code; _exit: - tsdbError("vgId:%d take read snapshot failed, code:%s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("%s vgId:%d take read snapshot failed, line:%d code:%s", pReader->idStr, TD_VID(pTsdb->pVnode), lino, + tstrerror(code)); if (pSnap) { if (pSnap->pNode) taosMemoryFree(pSnap->pNode); if (pSnap->pINode) taosMemoryFree(pSnap->pINode); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 9d58e2c7bd..274f4d5346 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -779,8 +779,9 @@ typedef enum { BLK_CHECK_QUIT = 0x2, } ETombBlkCheckEnum; -static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock, - const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j); +static int32_t loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock, + const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, + int32_t* j); static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j, ETombBlkCheckEnum* pRet) { int32_t code = 0; @@ -912,7 +913,7 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs ETombBlkCheckEnum ret = 0; code = doCheckTombBlock(&block, pReader, numOfTables, &j, &ret); - (void)tTombBlockDestroy(&block); + tTombBlockDestroy(&block); if (code != TSDB_CODE_SUCCESS || ret == BLK_CHECK_QUIT) { return code; } @@ -994,11 +995,17 @@ int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList, - int32_t numOfTables) { + int32_t numOfTables, int32_t* pNumOfRows) { int32_t num = 0; + int32_t code = 0; + int32_t lino = 0; + + if (pNumOfRows != 0) { + *pNumOfRows = 0; + } if (TARRAY2_SIZE(pStatisBlkArray) <= 0) { - return 0; + return code; } int32_t i = 0; @@ -1007,18 +1014,19 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo } if (i >= TARRAY2_SIZE(pStatisBlkArray)) { - return 0; + return code; } SStatisBlk* p = &pStatisBlkArray->data[i]; STbStatisBlock* pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock)); - (void)tStatisBlockInit(pStatisBlock); + TSDB_CHECK_NULL(pStatisBlock, code, lino, _err, terrno); + + code = tStatisBlockInit(pStatisBlock); + TSDB_CHECK_CODE(code, lino, _err); int64_t st = taosGetTimestampMs(); - int32_t code = tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock); - if (code != TSDB_CODE_SUCCESS) { - return 0; - } + code = tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock); + TSDB_CHECK_CODE(code, lino, _err); double el = (taosGetTimestampMs() - st) / 1000.0; pBlockLoadInfo->cost.loadStatisBlocks += 1; @@ -1030,9 +1038,10 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo } if (index >= pStatisBlock->numOfRecords) { - (void)tStatisBlockDestroy(pStatisBlock); + code = tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); - return num; + *pNumOfRows = num; + return code; } int32_t j = index; @@ -1040,9 +1049,10 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) { p = &pStatisBlkArray->data[i]; if (p->minTbid.suid > suid) { - (void)tStatisBlockDestroy(pStatisBlock); + code = tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); - return num; + *pNumOfRows = num; + return code; } uint64_t uid = pUidList[uidIndex]; @@ -1051,30 +1061,44 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo num += ((int64_t*)pStatisBlock->counts.data)[j]; uidIndex += 1; j += 1; - loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j); + code = loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j); + TSDB_CHECK_CODE(code, lino, _err); } else if (((int64_t*)pStatisBlock->uids.data)[j] < uid) { j += 1; - loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j); + code = loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j); + TSDB_CHECK_CODE(code, lino, _err); } else { uidIndex += 1; } } - (void)tStatisBlockDestroy(pStatisBlock); + tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); - return num; + *pNumOfRows = num; + return code; + +_err: + tsdbError("%p failed to get number of rows in stt block, %s at line:%d code:%s", pSttFileReader, __func__, lino, + tstrerror(code)); + return code; } // load next stt statistics block -static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock, +static int32_t loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock, const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j) { if ((*j) >= numOfRows) { (*i) += 1; (*j) = 0; if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) { - (void)tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock); + int32_t code = tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock); + if (code != 0) { + tsdbError("%p failed to read statisBlock, code:%s", pSttFileReader, tstrerror(code)); + return code; + } } } + + return 0; } int32_t doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) { @@ -1191,8 +1215,13 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra STsdbReader* pReader = pConf->pReader; int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); uint64_t* pUidList = pReader->status.uidList.tableUidList; - numOfRows += getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, pConf->suid, pUidList, - numOfTables); + int32_t n = 0; + code = getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, pConf->suid, pUidList, + numOfTables, &n); + numOfRows += n; + if (code) { + tsdbError("%s failed to get rows in stt blocks, code:%s", pstr, tstrerror(code)); + } } } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index ed895b7d27..cc37f20cf6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -343,7 +343,7 @@ int32_t loadDataFileTombDataForAll(STsdbReader* pReader); int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo); int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList, - int32_t numOfTables); + int32_t numOfTables, int32_t* pNumOfRows); void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record); void destroyLDataIter(SLDataIter* pIter);