From 4ddd25a29ca2c69ea5b7ce04b4975062899050be Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 7 Nov 2022 20:31:26 +0800 Subject: [PATCH] enh: adjust sync propose --- source/libs/sync/inc/syncTools.h | 5 ++--- source/libs/sync/src/syncMain.c | 19 +++++++------------ source/libs/sync/src/syncMessage.c | 10 ++++------ source/libs/sync/test/syncApplyMsgTest.cpp | 2 +- .../sync/test/syncClientRequestBatchTest.cpp | 2 +- .../libs/sync/test/syncClientRequestTest.cpp | 8 ++++---- source/libs/sync/test/syncEncodeTest.cpp | 4 ++-- source/libs/sync/test/syncEntryTest.cpp | 4 ++-- source/libs/sync/test/syncRpcMsgTest.cpp | 4 ++-- source/libs/sync/test/syncSnapshotTest.cpp | 4 ++-- source/libs/sync/test/syncWriteTest.cpp | 4 ++-- 11 files changed, 29 insertions(+), 37 deletions(-) diff --git a/source/libs/sync/inc/syncTools.h b/source/libs/sync/inc/syncTools.h index f7a9c404c1..932432d1f8 100644 --- a/source/libs/sync/inc/syncTools.h +++ b/source/libs/sync/inc/syncTools.h @@ -182,9 +182,8 @@ typedef struct SyncClientRequest { char data[]; // origin RpcMsg.pCont } SyncClientRequest; -SyncClientRequest* syncClientRequestBuild(uint32_t dataLen); -SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak, - int32_t vgId); // step 1 +SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen); +SyncClientRequest* syncClientRequestBuild(const SRpcMsg* pMsg, uint64_t seqNum, bool isWeak, int32_t vgId); // step 1 void syncClientRequestDestroy(SyncClientRequest* pMsg); void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen); void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e9facfb381..cb1b19ca5e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -669,13 +669,11 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { goto _END; } - SRespStub stub; - stub.createTime = taosGetTimestampMs(); - stub.rpcMsg = *pMsg; - uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); + SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg}; + uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); - SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId); - SRpcMsg rpcMsg; + SyncClientRequest* pSyncMsg = syncClientRequestBuild(pMsg, seqNum, isWeak, pSyncNode->vgId); + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); // optimized one replica @@ -696,12 +694,9 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType)); } - } else { - if (pSyncNode->syncEqMsg != NULL && (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { - ret = 0; - } else { - ret = -1; + ret = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg); + if (ret != 0) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; sError("vgId:%d, failed to enqueue msg since its null", pSyncNode->vgId); } @@ -2322,7 +2317,7 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { uint32_t entryLen; char* serialized = syncEntrySerialize(pEntry, &entryLen); - SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen); + SyncClientRequest* pSyncMsg = syncClientRequestAlloc(entryLen); ASSERT(pSyncMsg->dataLen == entryLen); memcpy(pSyncMsg->data, serialized, entryLen); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index f4e9d83503..e052a7a4fa 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -831,10 +831,9 @@ void syncPingReplyLog2(char* s, const SyncPingReply* pMsg) { } // ---- message process SyncClientRequest---- -SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { +SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) { uint32_t bytes = sizeof(SyncClientRequest) + dataLen; - SyncClientRequest* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); + SyncClientRequest* pMsg = taosMemoryCalloc(1, bytes); pMsg->bytes = bytes; pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; pMsg->seqNum = 0; @@ -844,8 +843,8 @@ SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { } // step 1. original SRpcMsg => SyncClientRequest, add seqNum, isWeak -SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak, int32_t vgId) { - SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen); +SyncClientRequest* syncClientRequestBuild(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak, int32_t vgId) { + SyncClientRequest* pMsg = syncClientRequestAlloc(pOriginalRpcMsg->contLen); pMsg->vgId = vgId; pMsg->originalRpcType = pOriginalRpcMsg->msgType; pMsg->seqNum = seqNum; @@ -891,7 +890,6 @@ SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len) // step 2. SyncClientRequest => RpcMsg, to queue void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); diff --git a/source/libs/sync/test/syncApplyMsgTest.cpp b/source/libs/sync/test/syncApplyMsgTest.cpp index 9d5e7dd8e1..ce129015c6 100644 --- a/source/libs/sync/test/syncApplyMsgTest.cpp +++ b/source/libs/sync/test/syncApplyMsgTest.cpp @@ -81,7 +81,7 @@ void test4() { void test5() { SyncApplyMsg *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncApplyMsg2RpcMsg(pMsg, &rpcMsg); SyncApplyMsg *pMsg2 = syncApplyMsgFromRpcMsg2(&rpcMsg); syncApplyMsgLog2((char *)"test5: syncClientRequest2RpcMsg -> syncApplyMsgFromRpcMsg2 ", pMsg2); diff --git a/source/libs/sync/test/syncClientRequestBatchTest.cpp b/source/libs/sync/test/syncClientRequestBatchTest.cpp index 5586b7a6ce..f07ee08b2b 100644 --- a/source/libs/sync/test/syncClientRequestBatchTest.cpp +++ b/source/libs/sync/test/syncClientRequestBatchTest.cpp @@ -59,7 +59,7 @@ void test2() { uint32_t len = pMsg->bytes; char * serialized = (char *)taosMemoryMalloc(len); syncClientRequestSerialize(pMsg, serialized, len); - SyncClientRequest *pMsg2 = syncClientRequestBuild(pMsg->dataLen); + SyncClientRequest *pMsg2 = syncClientRequestAlloc(pMsg->dataLen); syncClientRequestDeserialize(serialized, len, pMsg2); syncClientRequestLog2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2); diff --git a/source/libs/sync/test/syncClientRequestTest.cpp b/source/libs/sync/test/syncClientRequestTest.cpp index 56e53cc1c9..b6bfcc2da5 100644 --- a/source/libs/sync/test/syncClientRequestTest.cpp +++ b/source/libs/sync/test/syncClientRequestTest.cpp @@ -21,7 +21,7 @@ SyncClientRequest *createMsg() { rpcMsg.contLen = 20; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); strcpy((char *)rpcMsg.pCont, "hello rpc"); - SyncClientRequest *pMsg = syncClientRequestBuild2(&rpcMsg, 123, true, 1000); + SyncClientRequest *pMsg = syncClientRequestBuild(&rpcMsg, 123, true, 1000); rpcFreeCont(rpcMsg.pCont); return pMsg; } @@ -37,7 +37,7 @@ void test2() { uint32_t len = pMsg->bytes; char *serialized = (char *)taosMemoryMalloc(len); syncClientRequestSerialize(pMsg, serialized, len); - SyncClientRequest *pMsg2 = syncClientRequestBuild(pMsg->dataLen); + SyncClientRequest *pMsg2 = syncClientRequestAlloc(pMsg->dataLen); syncClientRequestDeserialize(serialized, len, pMsg2); syncClientRequestLog2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2); @@ -60,7 +60,7 @@ void test3() { void test4() { SyncClientRequest *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pMsg, &rpcMsg); SyncClientRequest *pMsg2 = (SyncClientRequest *)taosMemoryMalloc(rpcMsg.contLen); syncClientRequestFromRpcMsg(&rpcMsg, pMsg2); @@ -73,7 +73,7 @@ void test4() { void test5() { SyncClientRequest *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pMsg, &rpcMsg); SyncClientRequest *pMsg2 = syncClientRequestFromRpcMsg2(&rpcMsg); syncClientRequestLog2((char *)"test5: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg2 ", pMsg2); diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index 092442bab1..c60176fbf8 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -102,12 +102,12 @@ SRpcMsg *step0() { } SyncClientRequest *step1(const SRpcMsg *pMsg) { - SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000); + SyncClientRequest *pRetMsg = syncClientRequestBuild(pMsg, 123, true, 1000); return pRetMsg; } SRpcMsg *step2(const SyncClientRequest *pMsg) { - SRpcMsg *pRetMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg)); + SRpcMsg *pRetMsg = (SRpcMsg *)taosMemoryCalloc(sizeof(SRpcMsg), 1); syncClientRequest2RpcMsg(pMsg, pRetMsg); return pRetMsg; } diff --git a/source/libs/sync/test/syncEntryTest.cpp b/source/libs/sync/test/syncEntryTest.cpp index 53ae91fc60..b274408c01 100644 --- a/source/libs/sync/test/syncEntryTest.cpp +++ b/source/libs/sync/test/syncEntryTest.cpp @@ -32,7 +32,7 @@ void test1() { } void test2() { - SyncClientRequest* pSyncMsg = syncClientRequestBuild(10); + SyncClientRequest* pSyncMsg = syncClientRequestAlloc(10); pSyncMsg->originalRpcType = 33; pSyncMsg->seqNum = 11; pSyncMsg->isWeak = 1; @@ -46,7 +46,7 @@ void test2() { } void test3() { - SyncClientRequest* pSyncMsg = syncClientRequestBuild(10); + SyncClientRequest* pSyncMsg = syncClientRequestAlloc(10); pSyncMsg->originalRpcType = 33; pSyncMsg->seqNum = 11; pSyncMsg->isWeak = 1; diff --git a/source/libs/sync/test/syncRpcMsgTest.cpp b/source/libs/sync/test/syncRpcMsgTest.cpp index 0b21f41080..941fa7eab5 100644 --- a/source/libs/sync/test/syncRpcMsgTest.cpp +++ b/source/libs/sync/test/syncRpcMsgTest.cpp @@ -47,7 +47,7 @@ SyncClientRequest *createSyncClientRequest() { rpcMsg.contLen = 20; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); strcpy((char *)rpcMsg.pCont, "hello rpc"); - SyncClientRequest *pMsg = syncClientRequestBuild2(&rpcMsg, 123, true, 1000); + SyncClientRequest *pMsg = syncClientRequestBuild(&rpcMsg, 123, true, 1000); return pMsg; } @@ -156,7 +156,7 @@ void test7() { void test8() { SyncClientRequest *pMsg = createSyncClientRequest(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pMsg, &rpcMsg); syncRpcMsgLog2((char *)"test8", &rpcMsg); syncClientRequestDestroy(pMsg); diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 5b5c902a01..a1cedd624a 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -162,7 +162,7 @@ SRpcMsg *step0() { } SyncClientRequest *step1(const SRpcMsg *pMsg) { - SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000); + SyncClientRequest *pRetMsg = syncClientRequestBuild(pMsg, 123, true, 1000); return pRetMsg; } @@ -206,7 +206,7 @@ int main(int argc, char **argv) { for (int i = 0; i < 10; ++i) { SyncClientRequest *pSyncClientRequest = pMsg1; - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg); diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index 570b614f3b..7c5334f668 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -140,7 +140,7 @@ SRpcMsg *step0() { } SyncClientRequest *step1(const SRpcMsg *pMsg) { - SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000); + SyncClientRequest *pRetMsg = syncClientRequestBuild(pMsg, 123, true, 1000); return pRetMsg; } @@ -181,7 +181,7 @@ int main(int argc, char **argv) { for (int i = 0; i < 10; ++i) { SyncClientRequest *pSyncClientRequest = pMsg1; - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg);