diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index cbe631912c..e0b8caa938 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2221,7 +2221,11 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi } char buf[256] = {0}; - (void) epsetToStr(&pEntry->epset, buf, tListLen(buf)); // ignore this error since it is only for log file + int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf)); // ignore this error since it is only for log file + if (ret != 0) { // print error and continue + mError("failed to convert epset to str, code:%s", tstrerror(ret)); + } + mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf); } @@ -2231,7 +2235,7 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi return code; } -static int32_t addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) { +static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) { void *pIter = NULL; int32_t code = 0; while (1) { @@ -2249,8 +2253,6 @@ static int32_t addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) { mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size); } } - - return code; } // this function runs by only one thread, so it is not multi-thread safe @@ -2311,7 +2313,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans"); updateAllVgroups = true; execInfo.switchFromFollower = false; // reset the flag - (void) addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb); + addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb); } } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 43f9d8d055..941956ae2b 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -194,10 +194,13 @@ int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList) { return terrno; } - (void)tSerializeDropOrphanTaskMsg(pReq, contLen, &msg); + int32_t code = tSerializeDropOrphanTaskMsg(pReq, contLen, &msg); + if (code <= 0) { + mError("failed to serialize the drop orphan task msg, code:%s", tstrerror(code)); + } SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_DROP_ORPHANTASKS, .pCont = pReq, .contLen = contLen}; - int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); if (code) { mError("failed to put drop-orphan task msg into write queue, code:%s", tstrerror(code)); } else { @@ -216,7 +219,7 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) { mndKillTransImpl(pMnode, pMsg->transId, ""); streamMutexLock(&execInfo.lock); - (void) mndResetChkptReportInfo(execInfo.pChkptStreams, pMsg->streamId); + code = mndResetChkptReportInfo(execInfo.pChkptStreams, pMsg->streamId); // do thing if failed streamMutexUnlock(&execInfo.lock); code = mndGetStreamObj(pMnode, pMsg->streamId, &pStream); @@ -393,7 +396,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId); - (void) setNodeEpsetExpiredFlag(req.pUpdateNodes); + int32_t unused = setNodeEpsetExpiredFlag(req.pUpdateNodes); } bool snodeChanged = false; diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index b2e35827af..bad44a8687 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -165,7 +165,10 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { } char buf[256] = {0}; - (void)epsetToStr(&entry.epset, buf, tListLen(buf)); + code = epsetToStr(&entry.epset, buf, tListLen(buf)); + if (code != 0) { // print error and continue + mError("failed to convert epset to str, code:%s", tstrerror(code)); + } void *p = taosArrayPush(pVgroupList, &entry); if (p == NULL) { @@ -198,7 +201,10 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { } char buf[256] = {0}; - (void)epsetToStr(&entry.epset, buf, tListLen(buf)); + code = epsetToStr(&entry.epset, buf, tListLen(buf)); + if (code != 0) { // print error and continue + mError("failed to convert epset to str, code:%s", tstrerror(code)); + } void *p = taosArrayPush(pVgroupList, &entry); if (p == NULL) { @@ -424,9 +430,12 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa } char buf[256] = {0}; - (void) epsetToStr(&epset, buf, tListLen(buf)); - mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); + code = epsetToStr(&epset, buf, tListLen(buf)); + if (code != 0) { // print error and continue + mError("failed to convert epset to str, code:%s", tstrerror(code)); + } + mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != 0) { taosMemoryFree(pReq); @@ -639,8 +648,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) { void *pBuf = NULL; int32_t len = 0; - (void)streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); - + bool unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); if (code) { return code; @@ -914,8 +922,15 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { } // 2. remove stream entry in consensus hash table and checkpoint-report hash table - (void) mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid); - (void) mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid); + code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid); + if (code) { + mError("failed to clear consensus checkpointId, code:%s", tstrerror(code)); + } + + code = mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid); + if (code) { + mError("failed to clear the checkpoint report info, code:%s", tstrerror(code)); + } streamMutexUnlock(&pExecNode->lock); destroyStreamTaskIter(pIter);