diff --git a/source/libs/sync/inc/syncRespMgr.h b/source/libs/sync/inc/syncRespMgr.h index 1d41c0a85f..611c6575c1 100644 --- a/source/libs/sync/inc/syncRespMgr.h +++ b/source/libs/sync/inc/syncRespMgr.h @@ -35,14 +35,14 @@ typedef struct SSyncRespMgr { uint64_t seqNum; } SSyncRespMgr; -SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl); -void syncRespMgrDestroy(SSyncRespMgr *pObj); -uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub); -int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t seq); -int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub); -int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *pInfo); -void syncRespClean(SSyncRespMgr *pObj); -void syncRespCleanRsp(SSyncRespMgr *pObj); +int32_t syncRespMgrCreate(void *data, int64_t ttl, SSyncRespMgr **ppObj); +void syncRespMgrDestroy(SSyncRespMgr *pObj); +uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub); +int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t seq); +int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub); +int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *pInfo); +void syncRespClean(SSyncRespMgr *pObj); +void syncRespCleanRsp(SSyncRespMgr *pObj); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 2c43ba67d6..3033c50984 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -949,7 +949,6 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { // open/close -------------- SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { - int32_t code = 0; SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode)); if (pSyncNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1042,9 +1041,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg; // create raft log ring buffer - code = syncLogBufferCreate(&pSyncNode->pLogBuf); + (void)syncLogBufferCreate(&pSyncNode->pLogBuf); // TODO: check return value if (pSyncNode->pLogBuf == NULL) { - sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId); + sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId); goto _error; } @@ -1219,7 +1218,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { } // tools - pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS); + (void)syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr); // TODO: check return value if (pSyncNode->pSyncRespMgr == NULL) { sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId); goto _error; diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 3506d477d3..d161fb885b 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -19,16 +19,21 @@ #include "syncRaftStore.h" #include "syncUtil.h" -SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { - SSyncRespMgr *pObj = taosMemoryCalloc(1, sizeof(SSyncRespMgr)); - if (pObj == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; +int32_t syncRespMgrCreate(void *data, int64_t ttl, SSyncRespMgr **ppObj) { + SSyncRespMgr *pObj = NULL; + + *ppObj = NULL; + + if ((pObj = taosMemoryCalloc(1, sizeof(SSyncRespMgr))) == NULL) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pObj->pRespHash = taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - if (pObj->pRespHash == NULL) return NULL; + if (pObj->pRespHash == NULL) { + taosMemoryFree(pObj); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } pObj->ttl = ttl; pObj->data = data; @@ -37,7 +42,10 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { SSyncNode *pNode = pObj->data; sDebug("vgId:%d, resp manager create", pNode->vgId); - return pObj; + + *ppObj = pObj; + + TAOS_RETURN(0); } void syncRespMgrDestroy(SSyncRespMgr *pObj) { @@ -80,7 +88,7 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub) { SRespStub *pTmp = taosHashGet(pObj->pRespHash, &seq, sizeof(uint64_t)); if (pTmp != NULL) { - memcpy(pStub, pTmp, sizeof(SRespStub)); + (void)memcpy(pStub, pTmp, sizeof(SRespStub)); sNTrace(pObj->data, "get message handle, type:%s seq:%" PRIu64 " handle:%p", TMSG_INFO(pStub->rpcMsg.msgType), seq, pStub->rpcMsg.info.handle); @@ -102,7 +110,7 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *p *pInfo = pStub->rpcMsg.info; sNTrace(pObj->data, "get-and-del message handle:%p, type:%s seq:%" PRIu64, pStub->rpcMsg.info.handle, TMSG_INFO(pStub->rpcMsg.msgType), seq); - taosHashRemove(pObj->pRespHash, &seq, sizeof(uint64_t)); + (void)taosHashRemove(pObj->pRespHash, &seq, sizeof(uint64_t)); taosThreadMutexUnlock(&pObj->mutex); return 1; // get one object @@ -114,14 +122,16 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *p return 0; // get none object } -static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { +static int32_t syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL); int cnt = 0; int sum = 0; SSyncNode *pNode = pObj->data; SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t)); - if (delIndexArray == NULL) return; + if (delIndexArray == NULL) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } sDebug("vgId:%d, resp manager begin clean by ttl", pNode->vgId); while (pStub) { @@ -153,7 +163,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { SRpcMsg rpcMsg = {.info = pStub->rpcMsg.info, .code = TSDB_CODE_SYN_TIMEOUT}; sInfo("vgId:%d, message handle:%p expired, type:%s ahandle:%p", pNode->vgId, rpcMsg.info.handle, TMSG_INFO(pStub->rpcMsg.msgType), rpcMsg.info.ahandle); - rpcSendResponse(&rpcMsg); + (void)rpcSendResponse(&rpcMsg); } pStub = taosHashIterate(pObj->pRespHash, pStub); @@ -164,10 +174,12 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { for (int32_t i = 0; i < arraySize; ++i) { uint64_t *pSeqNum = taosArrayGet(delIndexArray, i); - taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t)); + (void)taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t)); sDebug("vgId:%d, resp manager clean by ttl, seq:%" PRId64, pNode->vgId, *pSeqNum); } taosArrayDestroy(delIndexArray); + + return 0; } void syncRespCleanRsp(SSyncRespMgr *pObj) { @@ -177,7 +189,7 @@ void syncRespCleanRsp(SSyncRespMgr *pObj) { sTrace("vgId:%d, clean all resp", pNode->vgId); taosThreadMutexLock(&pObj->mutex); - syncRespCleanByTTL(pObj, -1, true); + (void)syncRespCleanByTTL(pObj, -1, true); taosThreadMutexUnlock(&pObj->mutex); } @@ -186,6 +198,6 @@ void syncRespClean(SSyncRespMgr *pObj) { sTrace("vgId:%d, clean resp by ttl", pNode->vgId); taosThreadMutexLock(&pObj->mutex); - syncRespCleanByTTL(pObj, pObj->ttl, false); + (void)syncRespCleanByTTL(pObj, pObj->ttl, false); taosThreadMutexUnlock(&pObj->mutex); } diff --git a/source/libs/sync/test/syncRespMgrTest.cpp b/source/libs/sync/test/syncRespMgrTest.cpp index 74246abf62..714f42a836 100644 --- a/source/libs/sync/test/syncRespMgrTest.cpp +++ b/source/libs/sync/test/syncRespMgrTest.cpp @@ -77,7 +77,7 @@ SSyncNode *createSyncNode() { void test1() { printf("------- test1 ---------\n"); - pMgr = syncRespMgrCreate(createSyncNode(), 0); + (void)syncRespMgrCreate(createSyncNode(), 0, &pMgr); assert(pMgr != NULL); syncRespMgrInsert(10); @@ -102,7 +102,7 @@ void test1() { void test2() { printf("------- test2 ---------\n"); - pMgr = syncRespMgrCreate(createSyncNode(), 0); + (void)syncRespMgrCreate(createSyncNode(), 0, &pMgr); assert(pMgr != NULL); syncRespMgrInsert(10); @@ -119,7 +119,7 @@ void test2() { void test3() { printf("------- test3 ---------\n"); - pMgr = syncRespMgrCreate(createSyncNode(), 0); + (void)syncRespMgrCreate(createSyncNode(), 0, &pMgr); assert(pMgr != NULL); syncRespMgrInsert(10); @@ -136,7 +136,7 @@ void test3() { void test4() { printf("------- test4 ---------\n"); - pMgr = syncRespMgrCreate(createSyncNode(), 2); + (void)syncRespMgrCreate(createSyncNode(), 2, &pMgr); assert(pMgr != NULL); syncRespMgrInsert(5);