enh: refactor return code
This commit is contained in:
parent
a7ad94a42c
commit
90ff2b34ba
|
@ -35,7 +35,7 @@ typedef struct SSyncRespMgr {
|
||||||
uint64_t seqNum;
|
uint64_t seqNum;
|
||||||
} SSyncRespMgr;
|
} SSyncRespMgr;
|
||||||
|
|
||||||
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl);
|
int32_t syncRespMgrCreate(void *data, int64_t ttl, SSyncRespMgr **ppObj);
|
||||||
void syncRespMgrDestroy(SSyncRespMgr *pObj);
|
void syncRespMgrDestroy(SSyncRespMgr *pObj);
|
||||||
uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub);
|
uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub);
|
||||||
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t seq);
|
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t seq);
|
||||||
|
|
|
@ -949,7 +949,6 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
|
||||||
|
|
||||||
// open/close --------------
|
// open/close --------------
|
||||||
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
||||||
int32_t code = 0;
|
|
||||||
SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
|
SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1042,9 +1041,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
||||||
pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
|
pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
|
||||||
|
|
||||||
// create raft log ring buffer
|
// create raft log ring buffer
|
||||||
code = syncLogBufferCreate(&pSyncNode->pLogBuf);
|
(void)syncLogBufferCreate(&pSyncNode->pLogBuf); // TODO: check return value
|
||||||
if (pSyncNode->pLogBuf == NULL) {
|
if (pSyncNode->pLogBuf == NULL) {
|
||||||
sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
|
sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1219,7 +1218,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// tools
|
// tools
|
||||||
pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
|
(void)syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr); // TODO: check return value
|
||||||
if (pSyncNode->pSyncRespMgr == NULL) {
|
if (pSyncNode->pSyncRespMgr == NULL) {
|
||||||
sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
|
sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
|
||||||
goto _error;
|
goto _error;
|
||||||
|
|
|
@ -19,16 +19,21 @@
|
||||||
#include "syncRaftStore.h"
|
#include "syncRaftStore.h"
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
|
|
||||||
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
|
int32_t syncRespMgrCreate(void *data, int64_t ttl, SSyncRespMgr **ppObj) {
|
||||||
SSyncRespMgr *pObj = taosMemoryCalloc(1, sizeof(SSyncRespMgr));
|
SSyncRespMgr *pObj = NULL;
|
||||||
if (pObj == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
*ppObj = NULL;
|
||||||
return NULL;
|
|
||||||
|
if ((pObj = taosMemoryCalloc(1, sizeof(SSyncRespMgr))) == NULL) {
|
||||||
|
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
pObj->pRespHash =
|
pObj->pRespHash =
|
||||||
taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
if (pObj->pRespHash == NULL) return NULL;
|
if (pObj->pRespHash == NULL) {
|
||||||
|
taosMemoryFree(pObj);
|
||||||
|
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
pObj->ttl = ttl;
|
pObj->ttl = ttl;
|
||||||
pObj->data = data;
|
pObj->data = data;
|
||||||
|
@ -37,7 +42,10 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
|
||||||
|
|
||||||
SSyncNode *pNode = pObj->data;
|
SSyncNode *pNode = pObj->data;
|
||||||
sDebug("vgId:%d, resp manager create", pNode->vgId);
|
sDebug("vgId:%d, resp manager create", pNode->vgId);
|
||||||
return pObj;
|
|
||||||
|
*ppObj = pObj;
|
||||||
|
|
||||||
|
TAOS_RETURN(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
|
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
|
||||||
|
@ -80,7 +88,7 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub) {
|
||||||
|
|
||||||
SRespStub *pTmp = taosHashGet(pObj->pRespHash, &seq, sizeof(uint64_t));
|
SRespStub *pTmp = taosHashGet(pObj->pRespHash, &seq, sizeof(uint64_t));
|
||||||
if (pTmp != NULL) {
|
if (pTmp != NULL) {
|
||||||
memcpy(pStub, pTmp, sizeof(SRespStub));
|
(void)memcpy(pStub, pTmp, sizeof(SRespStub));
|
||||||
sNTrace(pObj->data, "get message handle, type:%s seq:%" PRIu64 " handle:%p", TMSG_INFO(pStub->rpcMsg.msgType), seq,
|
sNTrace(pObj->data, "get message handle, type:%s seq:%" PRIu64 " handle:%p", TMSG_INFO(pStub->rpcMsg.msgType), seq,
|
||||||
pStub->rpcMsg.info.handle);
|
pStub->rpcMsg.info.handle);
|
||||||
|
|
||||||
|
@ -102,7 +110,7 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *p
|
||||||
*pInfo = pStub->rpcMsg.info;
|
*pInfo = pStub->rpcMsg.info;
|
||||||
sNTrace(pObj->data, "get-and-del message handle:%p, type:%s seq:%" PRIu64, pStub->rpcMsg.info.handle,
|
sNTrace(pObj->data, "get-and-del message handle:%p, type:%s seq:%" PRIu64, pStub->rpcMsg.info.handle,
|
||||||
TMSG_INFO(pStub->rpcMsg.msgType), seq);
|
TMSG_INFO(pStub->rpcMsg.msgType), seq);
|
||||||
taosHashRemove(pObj->pRespHash, &seq, sizeof(uint64_t));
|
(void)taosHashRemove(pObj->pRespHash, &seq, sizeof(uint64_t));
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pObj->mutex);
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
return 1; // get one object
|
return 1; // get one object
|
||||||
|
@ -114,14 +122,16 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *p
|
||||||
return 0; // get none object
|
return 0; // get none object
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
static int32_t syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
||||||
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
|
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
|
||||||
int cnt = 0;
|
int cnt = 0;
|
||||||
int sum = 0;
|
int sum = 0;
|
||||||
SSyncNode *pNode = pObj->data;
|
SSyncNode *pNode = pObj->data;
|
||||||
|
|
||||||
SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t));
|
SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t));
|
||||||
if (delIndexArray == NULL) return;
|
if (delIndexArray == NULL) {
|
||||||
|
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
sDebug("vgId:%d, resp manager begin clean by ttl", pNode->vgId);
|
sDebug("vgId:%d, resp manager begin clean by ttl", pNode->vgId);
|
||||||
while (pStub) {
|
while (pStub) {
|
||||||
|
@ -153,7 +163,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
||||||
SRpcMsg rpcMsg = {.info = pStub->rpcMsg.info, .code = TSDB_CODE_SYN_TIMEOUT};
|
SRpcMsg rpcMsg = {.info = pStub->rpcMsg.info, .code = TSDB_CODE_SYN_TIMEOUT};
|
||||||
sInfo("vgId:%d, message handle:%p expired, type:%s ahandle:%p", pNode->vgId, rpcMsg.info.handle,
|
sInfo("vgId:%d, message handle:%p expired, type:%s ahandle:%p", pNode->vgId, rpcMsg.info.handle,
|
||||||
TMSG_INFO(pStub->rpcMsg.msgType), rpcMsg.info.ahandle);
|
TMSG_INFO(pStub->rpcMsg.msgType), rpcMsg.info.ahandle);
|
||||||
rpcSendResponse(&rpcMsg);
|
(void)rpcSendResponse(&rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
pStub = taosHashIterate(pObj->pRespHash, pStub);
|
pStub = taosHashIterate(pObj->pRespHash, pStub);
|
||||||
|
@ -164,10 +174,12 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < arraySize; ++i) {
|
for (int32_t i = 0; i < arraySize; ++i) {
|
||||||
uint64_t *pSeqNum = taosArrayGet(delIndexArray, i);
|
uint64_t *pSeqNum = taosArrayGet(delIndexArray, i);
|
||||||
taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t));
|
(void)taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t));
|
||||||
sDebug("vgId:%d, resp manager clean by ttl, seq:%" PRId64, pNode->vgId, *pSeqNum);
|
sDebug("vgId:%d, resp manager clean by ttl, seq:%" PRId64, pNode->vgId, *pSeqNum);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(delIndexArray);
|
taosArrayDestroy(delIndexArray);
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRespCleanRsp(SSyncRespMgr *pObj) {
|
void syncRespCleanRsp(SSyncRespMgr *pObj) {
|
||||||
|
@ -177,7 +189,7 @@ void syncRespCleanRsp(SSyncRespMgr *pObj) {
|
||||||
sTrace("vgId:%d, clean all resp", pNode->vgId);
|
sTrace("vgId:%d, clean all resp", pNode->vgId);
|
||||||
|
|
||||||
taosThreadMutexLock(&pObj->mutex);
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
syncRespCleanByTTL(pObj, -1, true);
|
(void)syncRespCleanByTTL(pObj, -1, true);
|
||||||
taosThreadMutexUnlock(&pObj->mutex);
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -186,6 +198,6 @@ void syncRespClean(SSyncRespMgr *pObj) {
|
||||||
sTrace("vgId:%d, clean resp by ttl", pNode->vgId);
|
sTrace("vgId:%d, clean resp by ttl", pNode->vgId);
|
||||||
|
|
||||||
taosThreadMutexLock(&pObj->mutex);
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
syncRespCleanByTTL(pObj, pObj->ttl, false);
|
(void)syncRespCleanByTTL(pObj, pObj->ttl, false);
|
||||||
taosThreadMutexUnlock(&pObj->mutex);
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,7 +77,7 @@ SSyncNode *createSyncNode() {
|
||||||
|
|
||||||
void test1() {
|
void test1() {
|
||||||
printf("------- test1 ---------\n");
|
printf("------- test1 ---------\n");
|
||||||
pMgr = syncRespMgrCreate(createSyncNode(), 0);
|
(void)syncRespMgrCreate(createSyncNode(), 0, &pMgr);
|
||||||
assert(pMgr != NULL);
|
assert(pMgr != NULL);
|
||||||
|
|
||||||
syncRespMgrInsert(10);
|
syncRespMgrInsert(10);
|
||||||
|
@ -102,7 +102,7 @@ void test1() {
|
||||||
|
|
||||||
void test2() {
|
void test2() {
|
||||||
printf("------- test2 ---------\n");
|
printf("------- test2 ---------\n");
|
||||||
pMgr = syncRespMgrCreate(createSyncNode(), 0);
|
(void)syncRespMgrCreate(createSyncNode(), 0, &pMgr);
|
||||||
assert(pMgr != NULL);
|
assert(pMgr != NULL);
|
||||||
|
|
||||||
syncRespMgrInsert(10);
|
syncRespMgrInsert(10);
|
||||||
|
@ -119,7 +119,7 @@ void test2() {
|
||||||
|
|
||||||
void test3() {
|
void test3() {
|
||||||
printf("------- test3 ---------\n");
|
printf("------- test3 ---------\n");
|
||||||
pMgr = syncRespMgrCreate(createSyncNode(), 0);
|
(void)syncRespMgrCreate(createSyncNode(), 0, &pMgr);
|
||||||
assert(pMgr != NULL);
|
assert(pMgr != NULL);
|
||||||
|
|
||||||
syncRespMgrInsert(10);
|
syncRespMgrInsert(10);
|
||||||
|
@ -136,7 +136,7 @@ void test3() {
|
||||||
|
|
||||||
void test4() {
|
void test4() {
|
||||||
printf("------- test4 ---------\n");
|
printf("------- test4 ---------\n");
|
||||||
pMgr = syncRespMgrCreate(createSyncNode(), 2);
|
(void)syncRespMgrCreate(createSyncNode(), 2, &pMgr);
|
||||||
assert(pMgr != NULL);
|
assert(pMgr != NULL);
|
||||||
|
|
||||||
syncRespMgrInsert(5);
|
syncRespMgrInsert(5);
|
||||||
|
|
Loading…
Reference in New Issue