From 533e3f0da61eb9450752aa8c18621532bda34241 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 23 Jul 2024 07:40:00 +0000 Subject: [PATCH 1/7] refactor tq backend --- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 38 +++--- source/libs/stream/inc/streamBackendRocksdb.h | 8 +- source/libs/stream/src/streamBackendRocksdb.c | 83 ++++++------ source/libs/stream/src/streamMeta.c | 119 +++++++++--------- source/libs/stream/test/backendTest.cpp | 6 +- 5 files changed, 130 insertions(+), 124 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index dda5173ad9..c7beee6e8a 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -39,13 +39,15 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa // alloc pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader)); if (pReader == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pReader->pTq = pTq; pReader->sver = sver; pReader->ever = ever; pReader->tdbTbList = taosArrayInit(4, sizeof(STablePair)); + if (pReader->tdbTbList == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _err); + } STablePair pair1 = {.tbl = pTq->pStreamMeta->pTaskDb, .type = SNAP_DATA_STREAM_TASK}; taosArrayPush(pReader->tdbTbList, &pair1); @@ -60,16 +62,14 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa if (code) { tqInfo("vgId:%d, vnode stream-task snapshot reader failed to open, reason: %s", TD_VID(pTq->pVnode), tstrerror(code)); - taosMemoryFree(pReader); - goto _err; + TAOS_CHECK_GOTO(code, NULL, _err); } code = tdbTbcMoveToFirst(pReader->pCur); if (code) { tqInfo("vgId:%d, vnode stream-task snapshot reader failed to iterate, reason: %s", TD_VID(pTq->pVnode), tstrerror(code)); - taosMemoryFree(pReader); - goto _err; + TAOS_CHECK_GOTO(code, NULL, _err); } tqDebug("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode)); @@ -79,11 +79,14 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa _err: tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + streamTaskSnapReaderClose(pReader); *ppReader = NULL; return code; } int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) { + if (pReader == NULL) return 0; + int32_t code = 0; tqInfo("vgId:%d, vnode stream-task snapshot reader closed", TD_VID(pReader->pTq->pVnode)); taosArrayDestroy(pReader->tdbTbList); @@ -116,6 +119,10 @@ NextTbl: break; } else { pVal = taosMemoryCalloc(1, tLen); + if (pVal == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } memcpy(pVal, tVal, tLen); vLen = tLen; } @@ -174,8 +181,7 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa // alloc pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); if (pWriter == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pWriter->pTq = pTq; pWriter->sver = sver; @@ -184,12 +190,6 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa *ppWriter = pWriter; tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode)); return code; - -_err: - tqError("vgId:%d, vnode stream-task snapshot writer failed to write since %s", TD_VID(pTq->pVnode), tstrerror(code)); - *ppWriter = NULL; - return code; - return 0; } int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { @@ -207,8 +207,7 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { if (code) goto _err; } - if (tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { - code = -1; + if ((code = tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0)) < 0) { taosMemoryFree(pWriter); goto _err; } @@ -241,10 +240,11 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t int64_t key[2] = {taskId.streamId, taskId.taskId}; streamMetaWLock(pTq->pStreamMeta); - if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), - nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) { + if ((code = + tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), + nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn)) < 0) { streamMetaWUnLock(pTq->pStreamMeta); - return -1; + return code; } streamMetaWUnLock(pTq->pStreamMeta); } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index c3eeb12209..0f158591b4 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -143,9 +143,9 @@ SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst); -STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer); -void taskDbDestroy(void* pBackend, bool flush); -void taskDbDestroy2(void* pBackend); +int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer, STaskDbWrapper** ppTaskDb); +void taskDbDestroy(void* pBackend, bool flush); +void taskDbDestroy2(void* pBackend); void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); @@ -252,7 +252,7 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId); -SBkdMgt* bkdMgtCreate(char* path); +int32_t bkdMgtCreate(char* path, SBkdMgt **bm); int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path); int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name); int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 869877c9a8..21b71add82 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2525,16 +2525,14 @@ _EXIT: return NULL; } -STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer) { +int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer, STaskDbWrapper** ppTaskDb) { char* statePath = NULL; char* dbPath = NULL; int code = 0; - terrno = 0; if ((code = restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer)) < 0) { - terrno = code; stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, - chkptId, tstrerror(terrno)); - return NULL; + chkptId, tstrerror(code)); + return code; } STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath); @@ -2543,17 +2541,17 @@ STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, i if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver) == 0)) { *processVer = ver; } else { - terrno = code; stError("failed to load extra info, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId, - tstrerror(terrno)); + tstrerror(code)); taskDbDestroy(pTaskDb, false); - return NULL; + return code; } } taosMemoryFree(dbPath); taosMemoryFree(statePath); - return pTaskDb; + *ppTaskDb = pTaskDb; + return code; } void taskDbDestroy(void* pDb, bool flush) { @@ -2794,8 +2792,10 @@ int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) { int32_t code = 0; int64_t processVer = -1; - STaskDbWrapper* pTaskDb = taskDbOpen(path, key, 0, &processVer); - RocksdbCfInst* pSrcBackend = pCfInst; + STaskDbWrapper* pTaskDb = NULL; + + code = taskDbOpen(path, key, 0, &processVer, &pTaskDb); + RocksdbCfInst* pSrcBackend = pCfInst; for (int i = 0; i < nCf; i++) { rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i]; @@ -4626,10 +4626,11 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { void dbChkpDestroy(SDbChkp* pChkp); -SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { +int32_t dbChkpCreate(char* path, int64_t initChkpId, SDbChkp** ppChkp) { + int32_t code = 0; SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp)); if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } @@ -4637,41 +4638,41 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { p->preCkptId = -1; p->pSST = taosArrayInit(64, sizeof(void*)); if (p->pSST == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; dbChkpDestroy(p); - return NULL; + return code; } p->path = path; p->len = strlen(path) + 128; p->buf = taosMemoryCalloc(1, p->len); if (p->buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } p->idx = 0; p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (p->pSstTbl[0] == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (p->pSstTbl[1] == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } p->pAdd = taosArrayInit(64, sizeof(void*)); if (p->pAdd == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } p->pDel = taosArrayInit(64, sizeof(void*)); if (p->pDel == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } @@ -4679,15 +4680,15 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { taosThreadRwlockInit(&p->rwLock, NULL); SArray* list = NULL; - int32_t code = dbChkpGetDelta(p, initChkpId, list); + code = dbChkpGetDelta(p, initChkpId, list); if (code != 0) { goto _EXIT; } - - return p; + *ppChkp = p; + return code; _EXIT: dbChkpDestroy(p); - return NULL; + return code; } void dbChkpDestroy(SDbChkp* pChkp) { @@ -4880,35 +4881,36 @@ _ERROR: return code; } -SBkdMgt* bkdMgtCreate(char* path) { - terrno = 0; +int32_t bkdMgtCreate(char* path, SBkdMgt** mgt) { + int32_t code = 0; SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt)); if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (p->pDbChkpTbl == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; bkdMgtDestroy(p); - return NULL; + return code; } p->path = taosStrdup(path); if (p->path == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; bkdMgtDestroy(p); - return NULL; + return code; } if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); bkdMgtDestroy(p); - return NULL; + return code; } + *mgt = p; - return p; + return code; } void bkdMgtDestroy(SBkdMgt* bm) { @@ -4949,11 +4951,11 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, return code; } - SDbChkp* p = dbChkpCreate(path, chkpId); - if (p == NULL) { + SDbChkp* p = NULL; + code = dbChkpCreate(path, chkpId, &p); + if (code != 0) { taosMemoryFree(path); taosThreadRwlockUnlock(&bm->rwLock); - code = terrno; return code; } @@ -4986,8 +4988,9 @@ int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) { taosThreadRwlockWrlock(&bm->rwLock); SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task)); if (pp == NULL) { - SDbChkp* p = dbChkpCreate(path, 0); - if (p != NULL) { + SDbChkp* p = NULL; + code = dbChkpCreate(path, 0, &p); + if (code != 0) { taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*)); code = 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 1283f8e20b..69a9d12f79 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -68,12 +68,12 @@ static void streamMetaEnvInit() { } } -void streamMetaInit() { (void) taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } +void streamMetaInit() { (void)taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaCleanup() { - (void) taosCloseRef(streamBackendId); - (void) taosCloseRef(streamBackendCfWrapperId); - (void) taosCloseRef(streamMetaId); + (void)taosCloseRef(streamBackendId); + (void)taosCloseRef(streamBackendCfWrapperId); + (void)taosCloseRef(streamMetaId); metaRefMgtCleanup(); streamTimerCleanUp(); @@ -128,12 +128,12 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) { code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*)); if (code) { - stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t) vgId, *rid); + stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t)vgId, *rid); return code; } } else { SArray* list = *(SArray**)p; - void* px = taosArrayPush(list, &rid); + void* px = taosArrayPush(list, &rid); if (px == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; } @@ -186,7 +186,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { code = tdbTbcMoveToFirst(pCur); if (code) { - (void) tdbTbcClose(pCur); + (void)tdbTbcClose(pCur); stError("vgId:%d failed to open stream meta file cursor, not perform compatible check", pMeta->vgId); return ret; } @@ -215,7 +215,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { tdbFree(pKey); tdbFree(pVal); - (void) tdbTbcClose(pCur); + (void)tdbTbcClose(pCur); return ret; } @@ -276,6 +276,7 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { } int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) { + int32_t code = 0; int64_t chkpId = pTask->chkInfo.checkpointId; streamMutexLock(&pMeta->backendMutex); @@ -299,8 +300,8 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) STaskDbWrapper* pBackend = NULL; int64_t processVer = -1; while (1) { - pBackend = taskDbOpen(pMeta->path, key, chkpId, &processVer); - if (pBackend != NULL) { + code = taskDbOpen(pMeta->path, key, chkpId, &processVer, &pBackend); + if (code == 0) { break; } @@ -319,7 +320,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) if (processVer != -1) pTask->chkInfo.processedVer = processVer; - int32_t code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); + code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); if (code) { stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId); } @@ -469,8 +470,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); - pMeta->bkdChkptMgt = bkdMgtCreate(tpath); - if (pMeta->bkdChkptMgt == NULL) { + code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt); + if (code != 0) { goto _err; } @@ -485,7 +486,7 @@ _err: if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); if (pMeta->pTaskDb) (void)tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) (void)tdbTbClose(pMeta->pCheckpointDb); - if (pMeta->db) (void) tdbClose(pMeta->db); + if (pMeta->db) (void)tdbClose(pMeta->db); if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo); if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks); if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet); @@ -531,7 +532,7 @@ void streamMetaClear(SStreamMeta* pMeta) { // release the ref by timer if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt); - (void) taosTmrStop(p->schedInfo.pDelayTimer); + (void)taosTmrStop(p->schedInfo.pDelayTimer); p->info.delaySchedParam = 0; streamMetaReleaseTask(pMeta, p); } @@ -541,7 +542,7 @@ void streamMetaClear(SStreamMeta* pMeta) { int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid); if (code) { - stError("vgId:%d remove stream backend Ref failed, rid:%"PRId64, pMeta->vgId, pMeta->streamBackendRid); + stError("vgId:%d remove stream backend Ref failed, rid:%" PRId64, pMeta->vgId, pMeta->streamBackendRid); } taosHashClear(pMeta->pTasksMap); @@ -564,7 +565,7 @@ void streamMetaClose(SStreamMeta* pMeta) { if (pMeta == NULL) { return; } - (void) taosRemoveRef(streamMetaId, pMeta->rid); + (void)taosRemoveRef(streamMetaId, pMeta->rid); } void streamMetaCloseImpl(void* arg) { @@ -581,10 +582,10 @@ void streamMetaCloseImpl(void* arg) { streamMetaWUnLock(pMeta); // already log the error, ignore here - (void) tdbAbort(pMeta->db, pMeta->txn); - (void) tdbTbClose(pMeta->pTaskDb); - (void) tdbTbClose(pMeta->pCheckpointDb); - (void) tdbClose(pMeta->db); + (void)tdbAbort(pMeta->db, pMeta->txn); + (void)tdbTbClose(pMeta->pTaskDb); + (void)tdbTbClose(pMeta->pCheckpointDb); + (void)tdbClose(pMeta->db); taosArrayDestroy(pMeta->pTaskList); taosArrayDestroy(pMeta->chkpSaved); @@ -608,7 +609,7 @@ void streamMetaCloseImpl(void* arg) { bkdMgtDestroy(pMeta->bkdChkptMgt); pMeta->role = NODE_ROLE_UNINIT; - (void) taosThreadRwlockDestroy(&pMeta->lock); + (void)taosThreadRwlockDestroy(&pMeta->lock); taosMemoryFree(pMeta); stDebug("vgId:%d end to close stream meta", vgId); @@ -689,13 +690,13 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa p = taosArrayPush(pMeta->pTaskList, &pTask->id); if (p == NULL) { - stError("s-task:0x%"PRIx64" failed to register task into meta-list, code: out of memory", id.taskId); + stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId); return TSDB_CODE_OUT_OF_MEMORY; } code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); if (code) { - stError("s-task:0x%"PRIx64" failed to register task into meta-list, code: out of memory", id.taskId); + stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId); return code; } @@ -708,7 +709,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if (pTask->info.fillHistory == 0) { - (void) atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); + (void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); } *pAdded = true; @@ -777,7 +778,7 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - (void) streamTaskSendCheckpointSourceRsp(pTask); + (void)streamTaskSendCheckpointSourceRsp(pTask); } return 0; } @@ -800,7 +801,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } // handle the dropping event - (void) streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL); + (void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL); } else { stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); streamMetaWUnLock(pMeta); @@ -839,12 +840,12 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t pTask = *ppTask; // it is an fill-history task, remove the related stream task's id that points to it if (pTask->info.fillHistory == 0) { - (void) atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); + (void)atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); } - (void) taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); + (void)taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); - (void) streamMetaRemoveTask(pMeta, &id); + (void)streamMetaRemoveTask(pMeta, &id); ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); streamMetaWUnLock(pMeta); @@ -852,7 +853,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ASSERT(pTask->status.timerActive == 0); if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); - (void) taosTmrStop(pTask->schedInfo.pDelayTimer); + (void)taosTmrStop(pTask->schedInfo.pDelayTimer); pTask->info.delaySchedParam = 0; streamMetaReleaseTask(pMeta, pTask); } @@ -913,7 +914,7 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { code = tdbTbcMoveToFirst(pCur); if (code) { - (void) tdbTbcClose(pCur); + (void)tdbTbcClose(pCur); stError("failed to open stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId); return checkpointId; } @@ -951,7 +952,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { SDecoder decoder; int32_t vgId = 0; int32_t code = 0; - SArray* pRecycleList = NULL; + SArray* pRecycleList = NULL; if (pMeta == NULL) { return; @@ -973,7 +974,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (code) { stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno)); taosArrayDestroy(pRecycleList); - (void) tdbTbcClose(pCur); + (void)tdbTbcClose(pCur); return; } @@ -1006,7 +1007,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { tFreeStreamTask(pTask); STaskId id = streamTaskGetTaskId(pTask); - (void) taosArrayPush(pRecycleList, &id); + (void)taosArrayPush(pRecycleList, &id); int32_t total = taosArrayGetSize(pRecycleList); stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total); @@ -1027,7 +1028,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { continue; } - (void) taosArrayPush(pMeta->pTaskList, &pTask->id); + (void)taosArrayPush(pMeta->pTaskList, &pTask->id); } else { // todo this should replace the existed object put by replay creating stream task msg from mnode stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId); @@ -1037,17 +1038,17 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) != 0) { stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno)); - (void) taosArrayPop(pMeta->pTaskList); + (void)taosArrayPop(pMeta->pTaskList); tFreeStreamTask(pTask); continue; } if (pTask->info.fillHistory == 0) { - (void) atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); + (void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); } if (streamTaskShouldPause(pTask)) { - (void) atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); + (void)atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); } ASSERT(pTask->status.downstreamReady == 0); @@ -1063,7 +1064,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (taosArrayGetSize(pRecycleList) > 0) { for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { STaskId* pId = taosArrayGet(pRecycleList, i); - (void) streamMetaRemoveTask(pMeta, pId); + (void)streamMetaRemoveTask(pMeta, pId); } } @@ -1091,7 +1092,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->status.timerActive >= 1) { stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId); - (void) streamTaskStop(pTask); + (void)streamTaskStop(pTask); inTimer = true; } } @@ -1124,7 +1125,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { SStreamTask* pTask = *(SStreamTask**)pIter; stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr); - (void) streamTaskStop(pTask); + (void)streamTaskStop(pTask); } streamMetaWUnLock(pMeta); @@ -1171,7 +1172,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) { void streamMetaRLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-rlock", pMeta->vgId); - (void) taosThreadRwlockRdlock(&pMeta->lock); + (void)taosThreadRwlockRdlock(&pMeta->lock); } void streamMetaRUnLock(SStreamMeta* pMeta) { @@ -1186,13 +1187,13 @@ void streamMetaRUnLock(SStreamMeta* pMeta) { void streamMetaWLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-wlock", pMeta->vgId); - (void) taosThreadRwlockWrlock(&pMeta->lock); + (void)taosThreadRwlockWrlock(&pMeta->lock); // stTrace("vgId:%d meta-wlock completed", pMeta->vgId); } void streamMetaWUnLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-wunlock", pMeta->vgId); - (void) taosThreadRwlockUnlock(&pMeta->lock); + (void)taosThreadRwlockUnlock(&pMeta->lock); } int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { @@ -1318,7 +1319,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - (void) streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); continue; } @@ -1341,7 +1342,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - (void) streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); continue; } @@ -1359,10 +1360,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", pTask->id.idStr); - (void) streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task? + (void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task? } - (void) streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, true); + (void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, + true); streamMetaReleaseTask(pMeta, pTask); continue; } @@ -1418,14 +1420,14 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = NULL; + SStreamTask* pTask = NULL; code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); if (code != TSDB_CODE_SUCCESS) { continue; } - (void) streamTaskStop(pTask); + (void)streamTaskStop(pTask); streamMetaReleaseTask(pMeta, pTask); } @@ -1465,7 +1467,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId); - (void) streamMetaAddFailedTask(pMeta, streamId, taskId); + (void)streamMetaAddFailedTask(pMeta, streamId, taskId); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } @@ -1556,9 +1558,8 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; - int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); + int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); if (code) { - } int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); @@ -1630,9 +1631,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta streamMetaRUnLock(pMeta); // add the failed task info, along with the related fill-history task info into tasks list. - (void) streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); + (void)streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); if (hasFillhistoryTask) { - (void) streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); + (void)streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); } } else { streamMetaRUnLock(pMeta); @@ -1647,12 +1648,12 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) { int32_t startTs = pTask->execInfo.checkTs; - (void) streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); + (void)streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - (void) streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); + (void)streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); } } @@ -1660,7 +1661,7 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt int64_t startTs) { const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; - int32_t code = 0; + int32_t code = 0; // keep the already updated info STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId}; diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 104b1c27d8..9e5d388e9f 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -408,8 +408,10 @@ TEST_F(BackendEnv, checkOpen) { const char *dump = "/tmp/backend/stream/dump"; // taosMkDir(dump); taosMulMkDir(dump); - SBkdMgt *mgt = bkdMgtCreate((char *)path); - SArray *result = taosArrayInit(4, sizeof(void *)); + SBkdMgt *mgt = NULL; + + int32_t code = bkdMgtCreate((char *)path, &mgt); + SArray *result = taosArrayInit(4, sizeof(void *)); bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0); From 1bfb8692eb0cd346ff8ec557f3c50588b8b3a41d Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 23 Jul 2024 11:36:11 +0000 Subject: [PATCH 2/7] refactor tq backend --- source/libs/stream/test/tstreamUpdateTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index 59171876ff..4360fc7d54 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -53,7 +53,7 @@ TEST(TD_STREAM_UPDATE_TEST, update) { void *p = NULL; // SBackendWrapper *p = streamBackendInit(streamPath, -1, 2); // p = taskDbOpen((char *)streamPath, (char *)"test", -1); - p = bkdMgtCreate((char *)streamPath); + int32_t code = bkdMgtCreate((char *)streamPath, (SBkdMgt **)&p); // const int64_t interval = 20 * 1000; // const int64_t watermark = 10 * 60 * 1000; From 53003ce30b5588b4ffbdf9b758b66348120b6504 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 24 Jul 2024 01:38:43 +0000 Subject: [PATCH 3/7] refactor tq backend --- source/libs/stream/src/streamBackendRocksdb.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 21b71add82..09f6573052 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2538,7 +2538,7 @@ int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath); if (pTaskDb != NULL) { int64_t chkpId = -1, ver = -1; - if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver) == 0)) { + if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver)) == 0) { *processVer = ver; } else { stError("failed to load extra info, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId, @@ -2546,6 +2546,8 @@ int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* taskDbDestroy(pTaskDb, false); return code; } + } else { + code = TSDB_CODE_INVALID_PARA; } taosMemoryFree(dbPath); @@ -4906,7 +4908,7 @@ int32_t bkdMgtCreate(char* path, SBkdMgt** mgt) { if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) { code = TAOS_SYSTEM_ERROR(errno); bkdMgtDestroy(p); - return code; + return code; } *mgt = p; From 073d7f7117238b8321383763522c9f9794f5fd53 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 25 Jul 2024 11:30:56 +0000 Subject: [PATCH 4/7] fix/TD-30989-scan1-4-2 --- source/dnode/mnode/impl/src/mndQnode.c | 18 ++++++------- source/dnode/mnode/impl/src/mndQuery.c | 7 +++-- source/dnode/mnode/impl/src/mndScheduler.c | 31 ++++++++++++++++------ source/dnode/mnode/impl/src/mndShow.c | 10 +++---- 4 files changed, 42 insertions(+), 24 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index d6647f88f2..2e352915d8 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -224,7 +224,7 @@ int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeOb terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - tSerializeSCreateDropMQSNodeReq(pReq, contLen, &createReq); + (void)tSerializeSCreateDropMQSNodeReq(pReq, contLen, &createReq); STransAction action = {0}; action.epSet = mndGetDnodeEpset(pDnode); @@ -252,7 +252,7 @@ static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, S code = TSDB_CODE_OUT_OF_MEMORY; TAOS_RETURN(code); } - tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq); + (void)tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq); STransAction action = {0}; action.epSet = mndGetDnodeEpset(pDnode); @@ -330,7 +330,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) { if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; char obj[33] = {0}; - sprintf(obj, "%d", createReq.dnodeId); + (void)sprintf(obj, "%d", createReq.dnodeId); auditRecord(pReq, pMnode->clusterId, "createQnode", "", obj, createReq.sql, createReq.sqlLen); _OVER: @@ -383,7 +383,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn code = TSDB_CODE_OUT_OF_MEMORY; TAOS_RETURN(code); } - tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq); + (void)tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq); STransAction action = {0}; action.epSet = mndGetDnodeEpset(pDnode); @@ -459,7 +459,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) { if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; char obj[33] = {0}; - sprintf(obj, "%d", dropReq.dnodeId); + (void)sprintf(obj, "%d", dropReq.dnodeId); auditRecord(pReq, pMnode->clusterId, "dropQnode", "", obj, dropReq.sql, dropReq.sqlLen); @@ -531,7 +531,7 @@ static int32_t mndProcessQnodeListReq(SRpcMsg *pReq) { goto _OVER; } - tSerializeSQnodeListRsp(pRsp, rspLen, &qlistRsp); + (void)tSerializeSQnodeListRsp(pRsp, rspLen, &qlistRsp); pReq->info.rspLen = rspLen; pReq->info.rsp = pRsp; @@ -556,15 +556,15 @@ static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB cols = 0; SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false); + (void)colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false); char ep[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(ep, pObj->pDnode->ep, pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)ep, false); + (void)colDataSetVal(pColInfo, numOfRows, (const char *)ep, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false); + (void)colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false); numOfRows++; sdbRelease(pSdb, pObj); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 7c86b9aa74..c743aafd13 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -27,7 +27,7 @@ int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) { void mndPostProcessQueryMsg(SRpcMsg *pMsg) { if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) return; SMnode *pMnode = pMsg->info.node; - qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); + (void)qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); } int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo* pInfo) { @@ -134,7 +134,10 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { rsp.msgLen = reqMsg.info.rspLen; rsp.msg = reqMsg.info.rsp; - taosArrayPush(batchRsp.pRsps, &rsp); + if (taosArrayPush(batchRsp.pRsps, &rsp) == NULL) { + mError("msg:%p, failed to put array since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, + TMSG_INFO(pMsg->msgType)); + } } rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 3593e21e11..3f03102a7a 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -115,7 +115,7 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) { } else { pInfo->type = TASK_OUTPUT__TABLE; pInfo->tbSink.stbUid = pStream->targetStbUid; - memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); + (void)memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); if (pInfo->tbSink.pSchemaWrapper == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -145,7 +145,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList); if (isShuffle) { - memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); + (void)memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgroups = taosArrayGetSize(pVgs); @@ -363,10 +363,14 @@ static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillh static void addNewTaskList(SStreamObj* pStream) { SArray* pTaskList = taosArrayInit(0, POINTER_BYTES); - taosArrayPush(pStream->tasks, &pTaskList); + if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) { + mError("failed to put array"); + } if (pStream->conf.fillHistory) { pTaskList = taosArrayInit(0, POINTER_BYTES); - taosArrayPush(pStream->pHTasksList, &pTaskList); + if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) { + mError("failed to put array"); + } } } @@ -584,10 +588,15 @@ static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) } static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) { - mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task); + int32_t code = 0; + if ((code = mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task)) != 0) { + mError("failed bind task to sink task since %s", tstrerror(code)); + } for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) { SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k); - streamTaskSetUpstreamInfo(pSinkTask, task); + if ((code = streamTaskSetUpstreamInfo(pSinkTask, task)) != 0) { + mError("failed bind task to sink task since %s", tstrerror(code)); + } } mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr); } @@ -604,6 +613,7 @@ static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) { } static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) { + int32_t code = 0; SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL); SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL); @@ -614,12 +624,15 @@ static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, b if (hasExtraSink) { bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask); } else { - mndSetSinkTaskInfo(pStream, pSourceTask); + if ((code = mndSetSinkTaskInfo(pStream, pSourceTask)) != 0) { + mError("failed bind task to sink task since %s", tstrerror(code)); + } } } } static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { + int32_t code = 0; size_t size = taosArrayGetSize(tasks); ASSERT(size >= 2); SArray* pDownTaskList = taosArrayGetP(tasks, size - 1); @@ -631,7 +644,9 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i); pUpTask->info.selfChildId = i - begin; streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask); - streamTaskSetUpstreamInfo(*pDownTask, pUpTask); + if ((code = streamTaskSetUpstreamInfo(*pDownTask, pUpTask)) != 0) { + mError("failed bind task to sink task since %s", tstrerror(code)); + } } mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr); } diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 894d888e2d..a07f0e1f4d 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -158,7 +158,7 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) { showObj.id = showId; showObj.pMnode = pMnode; showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb)); - memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN); + (void)memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN); tstrncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN); int32_t keepTime = tsShellActivityTimer * 6 * 1000; @@ -270,9 +270,9 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type); if (retrieveReq.user[0] != 0) { - memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN); + (void)memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN); } else { - memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1); + (void)memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1); } code = -1; if (retrieveReq.db[0] && @@ -298,10 +298,10 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { idata.info.bytes = p->bytes; idata.info.type = p->type; idata.info.colId = p->colId; - blockDataAppendColInfo(pBlock, &idata); + TAOS_CHECK_RETURN(blockDataAppendColInfo(pBlock, &idata)); } - blockDataEnsureCapacity(pBlock, rowsToRead); + TAOS_CHECK_RETURN(blockDataEnsureCapacity(pBlock, rowsToRead)); if (mndCheckRetrieveFinished(pShow)) { mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows); From 334dcd381bcb7de7aedaa4fc21f594ad8752e8ef Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 29 Jul 2024 09:34:20 +0800 Subject: [PATCH 5/7] void unused return values --- source/dnode/vnode/src/tsdb/tsdbCache.c | 154 +++++++++++------------- source/libs/sync/src/syncRaftStore.c | 2 +- source/libs/sync/src/syncReplication.c | 4 +- source/libs/sync/src/syncRequestVote.c | 2 +- source/util/src/tlrucache.c | 6 +- 5 files changed, 75 insertions(+), 93 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 7b785e0844..51de400305 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -117,7 +117,7 @@ typedef struct { static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { SVnode *pVnode = pTsdb->pVnode; - vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN); + (void)vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN); int32_t offset = strlen(path); snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP); @@ -722,20 +722,14 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; { SLastCol *pLastCol = NULL; - code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol); - if (code) { - tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); - } + (void)tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol); if (NULL != pLastCol) { rocksdb_writebatch_delete(wb, keys_list[0], klen); } taosMemoryFreeClear(pLastCol); pLastCol = NULL; - code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol); - if (code) { - tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); - } + (void)tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol); if (NULL != pLastCol) { rocksdb_writebatch_delete(wb, keys_list[1], klen); } @@ -748,7 +742,7 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[0], klen); if (h) { erase = true; - taosLRUCacheRelease(pTsdb->lruCache, h, erase); + (void)taosLRUCacheRelease(pTsdb->lruCache, h, erase); } if (erase) { taosLRUCacheErase(pTsdb->lruCache, keys_list[0], klen); @@ -758,7 +752,7 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[1], klen); if (h) { erase = true; - taosLRUCacheRelease(pTsdb->lruCache, h, erase); + (void)taosLRUCacheRelease(pTsdb->lruCache, h, erase); } if (erase) { taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen); @@ -1066,12 +1060,12 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); } - taosLRUCacheRelease(pCache, h, false); + (void)taosLRUCacheRelease(pCache, h, false); } else { if (!remainCols) { remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); } - taosArrayPush(remainCols, &(SIdxKey){i, *key}); + (void)taosArrayPush(remainCols, &(SIdxKey){i, *key}); } } @@ -1123,10 +1117,12 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray SColVal *pColVal = &updCtx->colVal; SLastCol *pLastCol = NULL; - code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); + (void)tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); + /* if (code) { tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); } + */ SLastCol *PToFree = pLastCol; if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) { @@ -1242,18 +1238,18 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 tsdbRowGetKey(&lRow, &tsdbRowKey); STSDBRowIter iter = {0}; - tsdbRowIterOpen(&iter, &lRow, pTSchema); + (void)tsdbRowIterOpen(&iter, &lRow, pTSchema); int32_t iCol = 0; for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; - taosArrayPush(ctxArray, &updateCtx); + (void)taosArrayPush(ctxArray, &updateCtx); if (!COL_VAL_IS_VALUE(pColVal)) { - tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0); + (void)tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0); continue; } updateCtx.lflag = LFLAG_LAST; - taosArrayPush(ctxArray, &updateCtx); + (void)taosArrayPush(ctxArray, &updateCtx); } tsdbRowClose(&iter); @@ -1277,14 +1273,14 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 if (COL_VAL_IS_VALUE(&colVal)) { SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal}; - taosArrayPush(ctxArray, &updateCtx); - tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter); + (void)taosArrayPush(ctxArray, &updateCtx); + (void)tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter); } } } // 3. do update - tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); + (void)tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); _exit: taosMemoryFreeClear(pTSchema); @@ -1317,7 +1313,7 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo .tsdbRowKey = tsdbRowKey, .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = lRow.pBlockData->aTSKEY[lRow.iRow]}))}; - taosArrayPush(ctxArray, &updateCtx); + (void)taosArrayPush(ctxArray, &updateCtx); } TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0); @@ -1338,7 +1334,7 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo tColDataGetValue(pColData, tRow.iRow, &colVal); SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal}; - taosArrayPush(ctxArray, &updateCtx); + (void)taosArrayPush(ctxArray, &updateCtx); break; } } @@ -1346,15 +1342,15 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo // 2. prepare last row STSDBRowIter iter = {0}; - tsdbRowIterOpen(&iter, &lRow, pTSchema); + (void)tsdbRowIterOpen(&iter, &lRow, pTSchema); for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; - taosArrayPush(ctxArray, &updateCtx); + (void)taosArrayPush(ctxArray, &updateCtx); } tsdbRowClose(&iter); // 3. do update - tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); + (void)tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); _exit: taosMemoryFreeClear(pTSchema); @@ -1379,7 +1375,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) { SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID}; - taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key}); + (void)taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key}); } int num_keys = TARRAY_SIZE(remainCols); @@ -1416,7 +1412,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr if (NULL == lastTmpIndexArray) { lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t)); } - taosArrayPush(lastTmpIndexArray, &(i)); + (void)taosArrayPush(lastTmpIndexArray, &(i)); lastColIds[lastIndex] = idxKey->key.cid; lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx]; lastIndex++; @@ -1424,7 +1420,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr if (NULL == lastrowTmpIndexArray) { lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t)); } - taosArrayPush(lastrowTmpIndexArray, &(i)); + (void)taosArrayPush(lastrowTmpIndexArray, &(i)); lastrowColIds[lastrowIndex] = idxKey->key.cid; lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx]; lastrowIndex++; @@ -1434,17 +1430,18 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol)); if (lastTmpIndexArray != NULL) { - mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds); + (void)mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds); for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) { - taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i), taosArrayGet(lastTmpColArray, i)); + (void)taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i), + taosArrayGet(lastTmpColArray, i)); } } if (lastrowTmpIndexArray != NULL) { - mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds); + (void)mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds); for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) { - taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i), - taosArrayGet(lastrowTmpColArray, i)); + (void)taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i), + taosArrayGet(lastrowTmpColArray, i)); } } @@ -1586,10 +1583,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA SLRUCache *pCache = pTsdb->lruCache; for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) { SLastCol *pLastCol = NULL; - code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); - if (code) { - tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); - } + (void)tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); SLastCol *PToFree = pLastCol; SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j]; if (pLastCol) { @@ -1682,19 +1676,19 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache TAOS_CHECK_RETURN(reallocVarDataVal(&lastCol.rowKey.pks[j])); } TAOS_CHECK_RETURN(reallocVarData(&lastCol.colVal)); - taosArrayPush(pLastArray, &lastCol); + (void)taosArrayPush(pLastArray, &lastCol); - taosLRUCacheRelease(pCache, h, false); + (void)taosLRUCacheRelease(pCache, h, false); } else { SLastCol noneCol = {.rowKey.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; - taosArrayPush(pLastArray, &noneCol); + (void)taosArrayPush(pLastArray, &noneCol); if (!remainCols) { remainCols = taosArrayInit(num_keys, sizeof(SIdxKey)); } - taosArrayPush(remainCols, &(SIdxKey){i, key}); + (void)taosArrayPush(remainCols, &(SIdxKey){i, key}); } } @@ -1721,7 +1715,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache } taosArraySet(pLastArray, idxKey->idx, &lastCol); - taosLRUCacheRelease(pCache, h, false); + (void)taosLRUCacheRelease(pCache, h, false); taosArrayRemove(remainCols, i); } else { @@ -1810,10 +1804,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; for (int i = 0; i < num_keys; ++i) { SLastCol *pLastCol = NULL; - code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); - if (code) { - tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); - } + (void)tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); (void)taosThreadMutexLock(&pTsdb->rCache.rMutex); if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[i], klen); @@ -1821,10 +1812,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosMemoryFreeClear(pLastCol); pLastCol = NULL; - code = tsdbCacheDeserialize(values_list[i + num_keys], values_list_sizes[i + num_keys], &pLastCol); - if (code) { - tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); - } + (void)tsdbCacheDeserialize(values_list[i + num_keys], values_list_sizes[i + num_keys], &pLastCol); if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen); } @@ -1846,7 +1834,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) { erase = true; } - taosLRUCacheRelease(pTsdb->lruCache, h, erase); + (void)taosLRUCacheRelease(pTsdb->lruCache, h, erase); } if (erase) { taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen); @@ -1862,7 +1850,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) { erase = true; } - taosLRUCacheRelease(pTsdb->lruCache, h, erase); + (void)taosLRUCacheRelease(pTsdb->lruCache, h, erase); } if (erase) { taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen); @@ -1982,7 +1970,7 @@ static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) { SDelData *pDelData = pTbData ? pTbData->pHead : NULL; for (; pDelData; pDelData = pDelData->pNext) { - taosArrayPush(aDelData, pDelData); + (void)taosArrayPush(aDelData, pDelData); } return code; @@ -2005,7 +1993,7 @@ static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) if (!ppInfo) { pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo)); if (pInfo) { - tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES); + (void)tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES); } return pInfo; @@ -2117,11 +2105,11 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/ SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; - taosArrayPush(pInfo->pTombData, &delData); + (void)taosArrayPush(pInfo->pTombData, &delData); } } - tTombBlockDestroy(&block); + (void)tTombBlockDestroy(&block); if (finished) { TAOS_RETURN(code); @@ -2312,7 +2300,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie state->pr->pCurFileSet = state->pFileSet; - loadDataTomb(state->pr, state->pr->pFileReader); + (void)loadDataTomb(state->pr, state->pr->pFileReader); TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err); } @@ -2329,7 +2317,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie SBrinBlk *pBrinBlk = &pBlkArray->data[i]; if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) { if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) { - taosArrayPush(state->pIndexList, pBrinBlk); + (void)taosArrayPush(state->pIndexList, pBrinBlk); } } else if (state->suid > pBrinBlk->maxTbid.suid || (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) { @@ -2385,7 +2373,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie if (!state->pLastRow) { if (state->pLastIter) { - lastIterClose(&state->pLastIter); + (void)lastIterClose(&state->pLastIter); } clearLastFileSet(state); @@ -2419,7 +2407,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie if (!state->pBrinBlock) { state->pBrinBlock = &state->brinBlock; } else { - tBrinBlockClear(&state->brinBlock); + (void)tBrinBlockClear(&state->brinBlock); } TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err); @@ -2431,7 +2419,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie if (SFSNEXTROW_BRINBLOCK == state->state) { _next_brinrecord: if (state->iBrinRecord < 0) { // empty brin block, goto _next_brinindex - tBrinBlockClear(&state->brinBlock); + (void)tBrinBlockClear(&state->brinBlock); goto _next_brinindex; } @@ -2493,7 +2481,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie if (!state->pLastRow) { if (state->pLastIter) { - lastIterClose(&state->pLastIter); + (void)lastIterClose(&state->pLastIter); } *ppRow = &state->row; @@ -2517,7 +2505,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie } else { // TODO: merge rows and *ppRow = mergedRow SRowMerger *pMerger = &state->rowMerger; - tsdbRowMergerInit(pMerger, state->pTSchema); + (void)tsdbRowMergerInit(pMerger, state->pTSchema); TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err); TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err); @@ -2682,7 +2670,7 @@ int32_t clearNextRowFromFS(void *iter) { } if (state->pLastIter) { - lastIterClose(&state->pLastIter); + (void)lastIterClose(&state->pLastIter); } if (state->pBlockData) { @@ -2691,7 +2679,7 @@ int32_t clearNextRowFromFS(void *iter) { } if (state->pBrinBlock) { - tBrinBlockDestroy(state->pBrinBlock); + (void)tBrinBlockDestroy(state->pBrinBlock); state->pBrinBlock = NULL; } @@ -2715,7 +2703,7 @@ int32_t clearNextRowFromFS(void *iter) { static void clearLastFileSet(SFSNextRowIter *state) { if (state->pLastIter) { - lastIterClose(&state->pLastIter); + (void)lastIterClose(&state->pLastIter); } if (state->pBlockData) { @@ -2724,7 +2712,7 @@ static void clearLastFileSet(SFSNextRowIter *state) { } if (state->pr->pFileReader) { - tsdbDataFileReaderClose(&state->pr->pFileReader); + (void)tsdbDataFileReaderClose(&state->pr->pFileReader); state->pr->pFileReader = NULL; state->pr->pCurFileSet = NULL; @@ -2814,7 +2802,7 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) { for (int i = 0; i < 3; ++i) { if (pIter->input[i].nextRowClearFn) { - pIter->input[i].nextRowClearFn(pIter->input[i].iter); + (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter); } } @@ -2898,7 +2886,7 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI pInfo->pTombData = taosArrayInit(4, sizeof(SDelData)); } - taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData); + (void)taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData); size_t delSize = TARRAY_SIZE(pInfo->pTombData); if (delSize > 0) { @@ -2944,7 +2932,7 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t slotId = slotIds[i]; SLastCol col = {.rowKey.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)}; - taosArrayPush(pColArray, &col); + (void)taosArrayPush(pColArray, &col); } *ppColArray = pColArray; @@ -2998,18 +2986,18 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC } for (int i = 0; i < nCols; ++i) { - taosArrayPush(aColArray, &aCols[i]); + (void)taosArrayPush(aColArray, &aCols[i]); } STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX}; // inverse iterator CacheNextRowIter iter = {0}; - nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr); + (void)nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr); do { TSDBROW *pRow = NULL; - nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); + (void)nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); if (!pRow) { break; @@ -3142,7 +3130,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC } *ppLastArray = pColArray; - nextRowIterClose(&iter); + (void)nextRowIterClose(&iter); taosArrayDestroy(aColArray); TAOS_RETURN(code); @@ -3184,16 +3172,16 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, } for (int i = 0; i < nCols; ++i) { - taosArrayPush(aColArray, &aCols[i]); + (void)taosArrayPush(aColArray, &aCols[i]); } // inverse iterator CacheNextRowIter iter = {0}; - nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr); + (void)nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr); do { TSDBROW *pRow = NULL; - nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); + (void)nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); if (!pRow) { break; @@ -3266,7 +3254,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, } *ppLastArray = pColArray; - nextRowIterClose(&iter); + (void)nextRowIterClose(&iter); taosArrayDestroy(aColArray); TAOS_RETURN(code); @@ -3286,13 +3274,7 @@ _err: TAOS_RETURN(code); } -int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { - int32_t code = 0; - - taosLRUCacheRelease(pCache, h, false); - - return code; -} +void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { taosLRUCacheRelease(pCache, h, false); } void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) { taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity); diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index cb07a63eed..cc2aa7d91e 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -134,7 +134,7 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) { if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER); - taosCloseFile(&pFile); + (void)taosCloseFile(&pFile); if (taosRenameFile(file, realfile) != 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER); code = 0; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 40a359379a..8644ba6bb9 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -84,7 +84,7 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) { int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { SyncAppendEntries* pMsg = pRpcMsg->pCont; pMsg->destId = *destRaftId; - syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); + (void)syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -113,7 +113,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { // send msg syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0, 0); - syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); + (void)syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); } TAOS_RETURN(TSDB_CODE_SUCCESS); diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 0e50cca94c..a30b9a4930 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -135,7 +135,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { // trace log syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, ""); syncLogSendRequestVoteReply(ths, pReply, ""); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + (void)syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); if (resetElect) syncNodeResetElectTimer(ths); diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 7e165a12d5..9e74a3543b 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -320,7 +320,7 @@ static void taosLRUCacheShardEvictLRU(SLRUCacheShard *shard, size_t charge, SArr ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old)); taosLRUCacheShardLRURemove(shard, old); - taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); + (void)taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); ASSERT(shard->usage >= old->totalCharge); @@ -531,7 +531,7 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) { SLRUEntry *old = shard->lru.next; ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old)); taosLRUCacheShardLRURemove(shard, old); - taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); + (void)taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); ASSERT(shard->usage >= old->totalCharge); shard->usage -= old->totalCharge; @@ -577,7 +577,7 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b if (shard->usage > shard->capacity || eraseIfLastRef) { ASSERT(shard->lru.next == &shard->lru || eraseIfLastRef); - taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash); + (void)taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash); TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); } else { taosLRUCacheShardLRUInsert(shard, e); From 6036943b1c965cace205bfe22cd888653851280c Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 29 Jul 2024 09:49:54 +0800 Subject: [PATCH 6/7] remove duplicate fun declaration --- source/dnode/vnode/src/inc/tsdb.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 978821f890..f6f86850a4 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -378,7 +378,7 @@ struct STsdb { struct { SVHashTable *ht; SArray *arr; - } *commitInfo; + } * commitInfo; }; struct TSDBKEY { @@ -937,7 +937,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup); -int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); +void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle); int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h); @@ -945,7 +945,6 @@ int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h); int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle); int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle); int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage); -int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); From 43524f394d8063ca08544b4e52ed5ca1b2af1f74 Mon Sep 17 00:00:00 2001 From: dmchen Date: Mon, 29 Jul 2024 03:54:14 +0000 Subject: [PATCH 7/7] fix/TD-30989 --- source/dnode/mnode/impl/src/mndSync.c | 66 +++++++++++++-------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 70d0b858f6..89f3c6e253 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -212,8 +212,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { mndTransRefresh(pMnode, pTrans); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); - sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta); - code = 0; + code = sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta); _OUT: if (pTrans) mndReleaseTrans(pMnode, pTrans); @@ -222,7 +221,7 @@ _OUT: static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - taosThreadMutexLock(&pMgmt->lock); + (void)taosThreadMutexLock(&pMgmt->lock); if (pMgmt->transId == 0) { goto _OUT; } @@ -232,7 +231,7 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) { pMgmt->transSec = 0; pMgmt->transSeq = 0; pMgmt->errCode = code; - tsem_post(&pMgmt->syncSem); + (void)tsem_post(&pMgmt->syncSem); if (pMgmt->errCode != 0) { mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode)); @@ -241,7 +240,7 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) { } _OUT: - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); return 0; } @@ -304,7 +303,7 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { } else { mInfo("vgId:1, sync restore finished"); } - mndRefreshUserIpWhiteList(pMnode); + (void)mndRefreshUserIpWhiteList(pMnode); ASSERT(commitIdx == mndSyncAppliedIndex(pFsm)); } @@ -350,16 +349,16 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; mInfo("vgId:1, become follower"); - taosThreadMutexLock(&pMgmt->lock); + (void)taosThreadMutexLock(&pMgmt->lock); if (pMgmt->transId != 0) { mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId); pMgmt->transId = 0; pMgmt->transSec = 0; pMgmt->transSeq = 0; pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER; - tsem_post(&pMgmt->syncSem); + (void)tsem_post(&pMgmt->syncSem); } - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); } static void mndBecomeLearner(const SSyncFSM *pFsm) { @@ -367,16 +366,16 @@ static void mndBecomeLearner(const SSyncFSM *pFsm) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; mInfo("vgId:1, become learner"); - taosThreadMutexLock(&pMgmt->lock); + (void)taosThreadMutexLock(&pMgmt->lock); if (pMgmt->transId != 0) { mInfo("vgId:1, become learner and post sem, trans:%d, failed to propose since not leader", pMgmt->transId); pMgmt->transId = 0; pMgmt->transSec = 0; pMgmt->transSeq = 0; pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER; - tsem_post(&pMgmt->syncSem); + (void)tsem_post(&pMgmt->syncSem); } - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); } static void mndBecomeLeader(const SSyncFSM *pFsm) { @@ -435,12 +434,12 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { int32_t mndInitSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - taosThreadMutexInit(&pMgmt->lock, NULL); - taosThreadMutexLock(&pMgmt->lock); + (void)taosThreadMutexInit(&pMgmt->lock, NULL); + (void)taosThreadMutexLock(&pMgmt->lock); pMgmt->transId = 0; pMgmt->transSec = 0; pMgmt->transSeq = 0; - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); SSyncInfo syncInfo = { .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT, @@ -477,7 +476,7 @@ int32_t mndInitSync(SMnode *pMnode) { } int32_t code = 0; - tsem_init(&pMgmt->syncSem, 0, 0); + (void)tsem_init(&pMgmt->syncSem, 0, 0); pMgmt->sync = syncOpen(&syncInfo, true); if (pMgmt->sync <= 0) { if (terrno != 0) code = terrno; @@ -495,15 +494,15 @@ void mndCleanupSync(SMnode *pMnode) { syncStop(pMgmt->sync); mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync); - tsem_destroy(&pMgmt->syncSem); - taosThreadMutexDestroy(&pMgmt->lock); + (void)tsem_destroy(&pMgmt->syncSem); + (void)taosThreadMutexDestroy(&pMgmt->lock); memset(pMgmt, 0, sizeof(SSyncMgmt)); } void mndSyncCheckTimeout(SMnode *pMnode) { mTrace("check sync timeout"); SSyncMgmt *pMgmt = &pMnode->syncMgmt; - taosThreadMutexLock(&pMgmt->lock); + (void)taosThreadMutexLock(&pMgmt->lock); if (pMgmt->transId != 0) { int32_t curSec = taosGetTimestampSec(); int32_t delta = curSec - pMgmt->transSec; @@ -515,7 +514,7 @@ void mndSyncCheckTimeout(SMnode *pMnode) { pMgmt->transSeq = 0; terrno = TSDB_CODE_SYN_TIMEOUT; pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT; - tsem_post(&pMgmt->syncSem); + (void)tsem_post(&pMgmt->syncSem); } else { mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId, pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq); @@ -523,7 +522,7 @@ void mndSyncCheckTimeout(SMnode *pMnode) { } else { // mTrace("check sync timeout msg, no trans waiting for confirm"); } - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); } int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { @@ -536,12 +535,12 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { if (req.pCont == NULL) return TSDB_CODE_OUT_OF_MEMORY; memcpy(req.pCont, pRaw, req.contLen); - taosThreadMutexLock(&pMgmt->lock); + (void)taosThreadMutexLock(&pMgmt->lock); pMgmt->errCode = 0; if (pMgmt->transId != 0) { mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); rpcFreeCont(req.pCont); TAOS_RETURN(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED); } @@ -555,23 +554,24 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { if (code == 0) { mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq); pMgmt->transSeq = seq; - taosThreadMutexUnlock(&pMgmt->lock); - tsem_wait(&pMgmt->syncSem); + (void)taosThreadMutexUnlock(&pMgmt->lock); + (void)tsem_wait(&pMgmt->syncSem); } else if (code > 0) { mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId); pMgmt->transId = 0; pMgmt->transSec = 0; pMgmt->transSeq = 0; - taosThreadMutexUnlock(&pMgmt->lock); - sdbWriteWithoutFree(pMnode->pSdb, pRaw); - sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); - code = 0; + (void)taosThreadMutexUnlock(&pMgmt->lock); + code = sdbWriteWithoutFree(pMnode->pSdb, pRaw); + if (code == 0) { + sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); + } } else { mError("trans:%d, failed to proposed since %s", transId, terrstr()); pMgmt->transId = 0; pMgmt->transSec = 0; pMgmt->transSeq = 0; - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); if (terrno == 0) { terrno = TSDB_CODE_APP_ERROR; } @@ -600,15 +600,15 @@ void mndSyncStart(SMnode *pMnode) { void mndSyncStop(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - taosThreadMutexLock(&pMgmt->lock); + (void)taosThreadMutexLock(&pMgmt->lock); if (pMgmt->transId != 0) { mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId); pMgmt->transId = 0; pMgmt->transSec = 0; pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING; - tsem_post(&pMgmt->syncSem); + (void)tsem_post(&pMgmt->syncSem); } - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); } bool mndIsLeader(SMnode *pMnode) {