From 8deb3b83df7fde14815bd7290ad2703c040224ea Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 12 Nov 2022 10:23:59 +0800 Subject: [PATCH] refact: adjust timeout msg build --- source/libs/sync/inc/syncMessage.h | 14 ------ source/libs/sync/inc/syncTimeout.h | 2 +- source/libs/sync/src/syncMessage.c | 44 ------------------- source/libs/sync/src/syncTimeout.c | 2 +- .../sync/test/sync_test_lib/inc/syncTest.h | 10 +++++ .../test/sync_test_lib/src/syncMessageDebug.c | 44 +++++++++++++++++++ 6 files changed, 56 insertions(+), 60 deletions(-) diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 3656afd7ab..db3b01a053 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -54,20 +54,9 @@ typedef struct SyncClientRequest { char data[]; // origin RpcMsg.pCont } SyncClientRequest; -SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen); int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak, int32_t vgId); int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId); -void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); // step 2 -void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg); -cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg); -char* syncClientRequest2Str(const SyncClientRequest* pMsg); - -// for debug ---------------------- -void syncClientRequestPrint(const SyncClientRequest* pMsg); -void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg); -void syncClientRequestLog(const SyncClientRequest* pMsg); -void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg); // --------------------------------------------- typedef struct SRaftMeta { @@ -609,9 +598,6 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg); bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); -// --------------------------------------------- -SyncTimeout* syncTimeoutBuildX(); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index c6b87a1bca..85c8e8f58f 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -34,7 +34,7 @@ extern "C" { // /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]] // /\ UNCHANGED <> // -int32_t syncNodeOnTimer(SSyncNode* ths, SRpcMsg* pMsg); +int32_t syncNodeOnTimer(SSyncNode* ths, const SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 55f80b4cc1..c5d693853b 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -42,15 +42,6 @@ int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, return 0; } -SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) { - uint32_t bytes = sizeof(SyncClientRequest) + dataLen; - SyncClientRequest* pMsg = taosMemoryCalloc(1, bytes); - pMsg->bytes = bytes; - pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; - pMsg->dataLen = dataLen; - return pMsg; -} - int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak, int32_t vgId) { int32_t bytes = sizeof(SyncClientRequest) + pOriginalRpcMsg->contLen; @@ -97,41 +88,6 @@ int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const return 0; } -cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->seqNum); - cJSON_AddStringToObject(pRoot, "seqNum", u64buf); - cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak); - cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - - char* s; - s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", s); - taosMemoryFree(s); - s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data2", s); - taosMemoryFree(s); - } - - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncClientRequest", pRoot); - return pJson; -} - -char* syncClientRequest2Str(const SyncClientRequest* pMsg) { - cJSON* pJson = syncClientRequest2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - // ---- message process SyncClientRequestBatch---- // block1: diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index abb6f0c11a..9bf2bb5697 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -85,7 +85,7 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { return 0; } -int32_t syncNodeOnTimer(SSyncNode* ths, SRpcMsg* pRpc) { +int32_t syncNodeOnTimer(SSyncNode* ths, const SRpcMsg* pRpc) { int32_t ret = 0; SyncTimeout* pMsg = pRpc->pCont; diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index f84cc5d053..fef534e0a1 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -231,6 +231,16 @@ void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg); void syncTimeoutLog(const SyncTimeout* pMsg); void syncTimeoutLog2(char* s, const SyncTimeout* pMsg); +SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen); +void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); // step 2 +void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg); +cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg); +char* syncClientRequest2Str(const SyncClientRequest* pMsg); +void syncClientRequestPrint(const SyncClientRequest* pMsg); +void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg); +void syncClientRequestLog(const SyncClientRequest* pMsg); +void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index fcf2d072d2..56c35fcef8 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -938,6 +938,50 @@ void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg) { } } +SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) { + uint32_t bytes = sizeof(SyncClientRequest) + dataLen; + SyncClientRequest* pMsg = taosMemoryCalloc(1, bytes); + pMsg->bytes = bytes; + pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; + pMsg->dataLen = dataLen; + return pMsg; +} + +cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->seqNum); + cJSON_AddStringToObject(pRoot, "seqNum", u64buf); + cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak); + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + + char* s; + s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + taosMemoryFree(s); + s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + taosMemoryFree(s); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncClientRequest", pRoot); + return pJson; +} + +char* syncClientRequest2Str(const SyncClientRequest* pMsg) { + cJSON* pJson = syncClientRequest2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg) { char u64buf[128] = {0}; cJSON* pRoot = cJSON_CreateObject();