From 0335799e1cc9b813aff10d065cf71b1e522123db Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 12 Sep 2024 21:48:54 +0800 Subject: [PATCH 1/2] refactor: remove void. --- source/dnode/mnode/impl/src/mndStream.c | 12 ++++---- source/dnode/mnode/impl/src/mndStreamHb.c | 11 +++++--- source/dnode/mnode/impl/src/mndStreamUtil.c | 31 +++++++++++++++------ 3 files changed, 37 insertions(+), 17 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index cbe631912c..e0b8caa938 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2221,7 +2221,11 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi } char buf[256] = {0}; - (void) epsetToStr(&pEntry->epset, buf, tListLen(buf)); // ignore this error since it is only for log file + int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf)); // ignore this error since it is only for log file + if (ret != 0) { // print error and continue + mError("failed to convert epset to str, code:%s", tstrerror(ret)); + } + mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf); } @@ -2231,7 +2235,7 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi return code; } -static int32_t addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) { +static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) { void *pIter = NULL; int32_t code = 0; while (1) { @@ -2249,8 +2253,6 @@ static int32_t addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) { mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size); } } - - return code; } // this function runs by only one thread, so it is not multi-thread safe @@ -2311,7 +2313,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans"); updateAllVgroups = true; execInfo.switchFromFollower = false; // reset the flag - (void) addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb); + addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb); } } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 43f9d8d055..941956ae2b 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -194,10 +194,13 @@ int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList) { return terrno; } - (void)tSerializeDropOrphanTaskMsg(pReq, contLen, &msg); + int32_t code = tSerializeDropOrphanTaskMsg(pReq, contLen, &msg); + if (code <= 0) { + mError("failed to serialize the drop orphan task msg, code:%s", tstrerror(code)); + } SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_DROP_ORPHANTASKS, .pCont = pReq, .contLen = contLen}; - int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); if (code) { mError("failed to put drop-orphan task msg into write queue, code:%s", tstrerror(code)); } else { @@ -216,7 +219,7 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) { mndKillTransImpl(pMnode, pMsg->transId, ""); streamMutexLock(&execInfo.lock); - (void) mndResetChkptReportInfo(execInfo.pChkptStreams, pMsg->streamId); + code = mndResetChkptReportInfo(execInfo.pChkptStreams, pMsg->streamId); // do thing if failed streamMutexUnlock(&execInfo.lock); code = mndGetStreamObj(pMnode, pMsg->streamId, &pStream); @@ -393,7 +396,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId); - (void) setNodeEpsetExpiredFlag(req.pUpdateNodes); + int32_t unused = setNodeEpsetExpiredFlag(req.pUpdateNodes); } bool snodeChanged = false; diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index b2e35827af..bad44a8687 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -165,7 +165,10 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { } char buf[256] = {0}; - (void)epsetToStr(&entry.epset, buf, tListLen(buf)); + code = epsetToStr(&entry.epset, buf, tListLen(buf)); + if (code != 0) { // print error and continue + mError("failed to convert epset to str, code:%s", tstrerror(code)); + } void *p = taosArrayPush(pVgroupList, &entry); if (p == NULL) { @@ -198,7 +201,10 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { } char buf[256] = {0}; - (void)epsetToStr(&entry.epset, buf, tListLen(buf)); + code = epsetToStr(&entry.epset, buf, tListLen(buf)); + if (code != 0) { // print error and continue + mError("failed to convert epset to str, code:%s", tstrerror(code)); + } void *p = taosArrayPush(pVgroupList, &entry); if (p == NULL) { @@ -424,9 +430,12 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa } char buf[256] = {0}; - (void) epsetToStr(&epset, buf, tListLen(buf)); - mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); + code = epsetToStr(&epset, buf, tListLen(buf)); + if (code != 0) { // print error and continue + mError("failed to convert epset to str, code:%s", tstrerror(code)); + } + mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != 0) { taosMemoryFree(pReq); @@ -639,8 +648,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) { void *pBuf = NULL; int32_t len = 0; - (void)streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); - + bool unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); if (code) { return code; @@ -914,8 +922,15 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { } // 2. remove stream entry in consensus hash table and checkpoint-report hash table - (void) mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid); - (void) mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid); + code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid); + if (code) { + mError("failed to clear consensus checkpointId, code:%s", tstrerror(code)); + } + + code = mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid); + if (code) { + mError("failed to clear the checkpoint report info, code:%s", tstrerror(code)); + } streamMutexUnlock(&pExecNode->lock); destroyStreamTaskIter(pIter); From 0cb4c927ec38d9a02e0511b34c5a919ecc5160db Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Wed, 11 Sep 2024 15:49:44 +0800 Subject: [PATCH 2/2] enh(query)[TD-31903]. Handle return values for function calls --- source/common/src/tdatablock.c | 52 ++++++++++++++---------- source/libs/executor/src/mergeoperator.c | 2 +- source/libs/executor/src/tlinearhash.c | 27 ++++++++---- source/libs/executor/src/tsort.c | 3 +- source/os/src/osFile.c | 3 +- source/os/src/osString.c | 3 +- source/util/src/tpagedbuf.c | 11 +++-- 7 files changed, 63 insertions(+), 38 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 08f6842c4b..22c4be5cc9 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2517,24 +2517,25 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version, pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName); if (len >= size - 1) { - return code; + goto _exit; } for (int32_t j = 0; j < rows; j++) { len += snprintf(dumpBuf + len, size - len, "%s|", flag); if (len >= size - 1) { - return code; + goto _exit; } for (int32_t k = 0; k < colNum; k++) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); if (pColInfoData == NULL) { - return terrno; + code = terrno; + goto _exit; } if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) { len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL"); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; continue; } @@ -2542,53 +2543,53 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf switch (pColInfoData->info.type) { case TSDB_DATA_TYPE_TIMESTAMP: memset(pBuf, 0, sizeof(pBuf)); - (void) formatTimestamp(pBuf, *(uint64_t*)var, pColInfoData->info.precision); + (void)formatTimestamp(pBuf, *(uint64_t*)var, pColInfoData->info.precision); len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; break; case TSDB_DATA_TYPE_TINYINT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(int8_t*)var); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; break; case TSDB_DATA_TYPE_UTINYINT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(uint8_t*)var); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; break; case TSDB_DATA_TYPE_SMALLINT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(int16_t*)var); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; break; case TSDB_DATA_TYPE_USMALLINT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(uint16_t*)var); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; break; case TSDB_DATA_TYPE_INT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; break; case TSDB_DATA_TYPE_UINT: len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; break; case TSDB_DATA_TYPE_BIGINT: len += snprintf(dumpBuf + len, size - len, " %15" PRId64 " |", *(int64_t*)var); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; break; case TSDB_DATA_TYPE_UBIGINT: len += snprintf(dumpBuf + len, size - len, " %15" PRIu64 " |", *(uint64_t*)var); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; break; case TSDB_DATA_TYPE_FLOAT: len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; break; case TSDB_DATA_TYPE_DOUBLE: len += snprintf(dumpBuf + len, size - len, " %15f |", *(double*)var); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; break; case TSDB_DATA_TYPE_BOOL: len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; break; case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: @@ -2599,24 +2600,33 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf dataSize = TMIN(dataSize, 50); memcpy(pBuf, varDataVal(pData), dataSize); len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; } break; case TSDB_DATA_TYPE_NCHAR: { char* pData = colDataGetVarData(pColInfoData, j); int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData)); memset(pBuf, 0, sizeof(pBuf)); - (void)taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf); + code = taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf); + if (code < 0) { + uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code)); + goto _exit; + } len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); - if (len >= size - 1) return 0; + if (len >= size - 1) goto _exit; } break; } } len += snprintf(dumpBuf + len, size - len, "%d\n", j); - if (len >= size - 1) return code; + if (len >= size - 1) goto _exit; } len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag); *pDataBuf = dumpBuf; + dumpBuf = NULL; +_exit: + if (dumpBuf) { + taosMemoryFree(dumpBuf); + } return code; } diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index c94a330dbc..49973ac373 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -229,7 +229,7 @@ int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { resetLimitInfoForNextGroup(&pInfo->limitInfo); } - (void)applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); + bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); if (p->info.rows > 0) { break; diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c index 763fc6a412..69ce50e150 100644 --- a/source/libs/executor/src/tlinearhash.c +++ b/source/libs/executor/src/tlinearhash.c @@ -150,10 +150,11 @@ static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket pBucket->size -= 1; } -static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) { +static int32_t doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) { + int32_t code = 0; size_t numOfPages = taosArrayGetSize(pBucket->pPageIdList); if (numOfPages <= 1) { - return; + return code; } int32_t* firstPage = taosArrayGet(pBucket->pPageIdList, 0); @@ -164,11 +165,14 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) { if (pLast->num <= sizeof(SFilePage)) { // this is empty - // TODO check ret - (void)dBufSetBufPageRecycled(pHashObj->pBuf, pLast); + code = dBufSetBufPageRecycled(pHashObj->pBuf, pLast); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed to recycle buf page since %s", __func__, tstrerror(code)); + return code; + } releaseBufPage(pHashObj->pBuf, pFirst); taosArrayRemove(pBucket->pPageIdList, numOfPages - 1); - return; + return code; } char* pStart = pLast->data; @@ -191,8 +195,11 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) { pStart += nodeSize; if (pLast->num <= sizeof(SFilePage)) { // this is empty - // TODO check ret - (void)dBufSetBufPageRecycled(pHashObj->pBuf, pLast); + code = dBufSetBufPageRecycled(pHashObj->pBuf, pLast); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed to recycle buf page since %s", __func__, tstrerror(code)); + return code; + } releaseBufPage(pHashObj->pBuf, pFirst); taosArrayRemove(pBucket->pPageIdList, numOfPages - 1); break; @@ -210,6 +217,7 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) { break; } } + return code; } static int32_t doAddNewBucket(SLHashObj* pHashObj) { @@ -403,7 +411,10 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data releaseBufPage(pHashObj->pBuf, p); } - doTrimBucketPages(pHashObj, pBucket); + code = doTrimBucketPages(pHashObj, pBucket); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } return code; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 796ebbeb84..19b825b0ca 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1308,8 +1308,7 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i return terrno; } - // todo - (void)taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET); + TAOS_CHECK_RETURN(taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET)); int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize); int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile); diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 0e5b6b71a1..40f48af266 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -1535,8 +1535,9 @@ int taosSeekCFile(FILE *file, int64_t offset, int whence) { int code = fseeko(file, offset, whence); if (-1 == code) { terrno = TAOS_SYSTEM_ERROR(errno); + code = terrno; } - return terrno; + return code; #endif } diff --git a/source/os/src/osString.c b/source/os/src/osString.c index ffc64f3493..b1732a2ae1 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -351,7 +351,8 @@ int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs) { int32_t code = 0; iconv_t conv = taosAcquireConv(&idx, C2M); if ((iconv_t)-1 == conv || (iconv_t)0 == conv) { - return TSDB_CODE_APP_ERROR; + code = TAOS_SYSTEM_ERROR(errno);; + return code; } size_t ucs4_input_len = ucs4_max_len; diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index f22233a757..e8303b563e 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -316,7 +316,7 @@ static char* evictBufPage(SDiskbasedBuf* pBuf) { } terrno = 0; - (void)tdListPopNode(pBuf->lruList, pn); + pn = tdListPopNode(pBuf->lruList, pn); SPageInfo* d = *(SPageInfo**)pn->data; @@ -337,7 +337,7 @@ static int32_t lruListPushFront(SList* pList, SPageInfo* pi) { } static void lruListMoveToFront(SList* pList, SPageInfo* pi) { - (void)tdListPopNode(pList, pi->pn); + pi->pn = tdListPopNode(pList, pi->pn); tdListPrependNode(pList, pi->pn); } @@ -474,8 +474,11 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { pBuf->totalBufSize += pBuf->pageSize; } else { taosMemoryFree(availablePage); - (void)taosArrayPop(pBuf->pIdList); - (void)tSimpleHashRemove(pBuf->all, pageId, sizeof(int32_t)); + SPageInfo **pLast = taosArrayPop(pBuf->pIdList); + int32_t ret = tSimpleHashRemove(pBuf->all, pageId, sizeof(int32_t)); + if (ret != TSDB_CODE_SUCCESS) { + uError("%s failed to clear pageId %d from buf hash-set since %s", __func__, *pageId, tstrerror(ret)); + } taosMemoryFree(pi); terrno = code; return NULL;