From 1a8cf049b7cd653c7e6fda116f314f7092359abd Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 30 Jun 2022 13:39:13 +0800 Subject: [PATCH 1/6] refactor(sync): add syncAppendEntriesBatch --- include/common/tmsgdef.h | 1 + include/libs/sync/syncTools.h | 27 ++ source/libs/sync/inc/syncSnapshot.h | 14 +- source/libs/sync/src/syncMessage.c | 275 +++++++++++++++++- source/libs/sync/src/syncRaftCfg.c | 8 +- source/libs/sync/src/syncSnapshot.c | 10 +- source/libs/sync/test/CMakeLists.txt | 14 + .../sync/test/syncAppendEntriesBatchTest.cpp | 139 +++++++++ 8 files changed, 471 insertions(+), 17 deletions(-) create mode 100644 source/libs/sync/test/syncAppendEntriesBatchTest.cpp diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index acf08bd47e..926d4eb51f 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -236,6 +236,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE, "sync-request-vote", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE_REPLY, "sync-request-vote-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES, "sync-append-entries", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES_BATCH, "sync-append-entries-batch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES_REPLY, "sync-append-entries-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_NOOP, "sync-noop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_UNKNOWN, "sync-unknown", NULL, NULL) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 46f279ed85..8975f930db 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -325,22 +325,49 @@ void syncAppendEntriesLog(const SyncAppendEntries* pMsg); void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg); // --------------------------------------------- +typedef struct SOffsetAndContLen { + int32_t offset; + int32_t contLen; +} SOffsetAndContLen; + typedef struct SyncAppendEntriesBatch { uint32_t bytes; int32_t vgId; uint32_t msgType; SRaftId srcId; SRaftId destId; + // private data SyncTerm term; SyncIndex prevLogIndex; SyncTerm prevLogTerm; SyncIndex commitIndex; SyncTerm privateTerm; + int32_t dataCount; uint32_t dataLen; char data[]; } SyncAppendEntriesBatch; +SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId); +void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg); +void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen); +void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg); +char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len); +SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len); +void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg); +void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg); +SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg); +char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg); +void syncAppendEntriesBatch2RpcMsgArray(SyncAppendEntriesBatch* pSyncMsg, SRpcMsg* rpcMsgArr, int32_t maxArrSize, + int32_t* pRetArrSize); + +// for debug ---------------------- +void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg); +void syncAppendEntriesBatchPrint2(char* s, const SyncAppendEntriesBatch* pMsg); +void syncAppendEntriesBatchLog(const SyncAppendEntriesBatch* pMsg); +void syncAppendEntriesBatchLog2(char* s, const SyncAppendEntriesBatch* pMsg); + // --------------------------------------------- typedef struct SyncAppendEntriesReply { uint32_t bytes; diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 14688f8912..a9b4470b42 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -40,8 +40,8 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void *pReader; - void *pCurrentBlock; + void * pReader; + void * pCurrentBlock; int32_t blockLen; SSnapshot snapshot; SSyncCfg lastConfig; @@ -62,14 +62,14 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char *snapshotSender2Str(SSyncSnapshotSender *pSender); -char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); +char * snapshotSender2Str(SSyncSnapshotSender *pSender); +char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); //--------------------------------------------------- typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void *pWriter; + void * pWriter; SyncTerm term; SyncTerm privateTerm; SSnapshot snapshot; @@ -85,8 +85,8 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); +char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); //--------------------------------------------------- // on message diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 119a178893..acece6d48a 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -591,7 +591,7 @@ void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLe void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) { memcpy(pMsg, buf, len); ASSERT(len == pMsg->bytes); - ASSERT(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen); + ASSERT(pMsg->bytes == sizeof(SyncPingReply) + pMsg->dataLen); } char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len) { @@ -1426,6 +1426,279 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) { } } +// ---- message process SyncAppendEntriesBatch---- + +// block1: SOffsetAndContLen +// block2: SOffsetAndContLen Array +// block3: SRpcMsg Array +// block4: SRpcMsg pCont Array + +SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId) { + ASSERT(rpcMsgArr != NULL); + ASSERT(arrSize > 0); + + int32_t dataLen = 0; + int32_t metaArrayLen = sizeof(SOffsetAndContLen) * arrSize; // + int32_t rpcArrayLen = sizeof(SRpcMsg) * arrSize; // SRpcMsg + int32_t contArrayLen = 0; + for (int i = 0; i < arrSize; ++i) { // SRpcMsg pCont + contArrayLen += rpcMsgArr[i].contLen; + } + dataLen += (metaArrayLen + rpcArrayLen + contArrayLen); + + uint32_t bytes = sizeof(SyncAppendEntriesBatch) + dataLen; + SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_BATCH; + pMsg->dataCount = arrSize; + pMsg->dataLen = dataLen; + + SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data); + SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + metaArrayLen); + char* pData = pMsg->data; + + for (int i = 0; i < arrSize; ++i) { + // init + if (i == 0) { + metaArr[i].offset = metaArrayLen + rpcArrayLen; + metaArr[i].contLen = rpcMsgArr[i].contLen; + } else { + metaArr[i].offset = metaArr[i - 1].offset + metaArr[i - 1].contLen; + metaArr[i].contLen = rpcMsgArr[i].contLen; + } + + // init msgArr + msgArr[i] = rpcMsgArr[i]; + + // init data + ASSERT(rpcMsgArr[i].contLen == metaArr[i].contLen); + memcpy(pData + metaArr[i].offset, rpcMsgArr[i].pCont, rpcMsgArr[i].contLen); + } + + return pMsg; +} + +void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); + ASSERT(pMsg->bytes == sizeof(SyncAppendEntriesBatch) + pMsg->dataLen); +} + +char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncAppendEntriesBatchSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncAppendEntriesBatchDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncAppendEntriesBatchSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg) { + syncAppendEntriesBatchDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->prevLogIndex); + cJSON_AddStringToObject(pRoot, "prevLogIndex", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogTerm); + cJSON_AddStringToObject(pRoot, "prevLogTerm", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->commitIndex); + cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->privateTerm); + cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); + + cJSON_AddNumberToObject(pRoot, "dataCount", pMsg->dataCount); + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + + int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pMsg->dataCount; // + int32_t rpcArrayLen = sizeof(SRpcMsg) * pMsg->dataCount; // SRpcMsg + int32_t contArrayLen = pMsg->dataLen - metaArrayLen - rpcArrayLen; + + cJSON_AddNumberToObject(pRoot, "metaArrayLen", metaArrayLen); + cJSON_AddNumberToObject(pRoot, "rpcArrayLen", rpcArrayLen); + cJSON_AddNumberToObject(pRoot, "contArrayLen", contArrayLen); + + SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data); + SRpcMsg* msgArr = (SRpcMsg*)(pMsg->data + metaArrayLen); + void* pData = (void*)(pMsg->data + metaArrayLen + rpcArrayLen); + + cJSON* pMetaArr = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr); + for (int i = 0; i < pMsg->dataCount; ++i) { + cJSON* pMeta = cJSON_CreateObject(); + cJSON_AddNumberToObject(pMeta, "offset", metaArr[i].offset); + cJSON_AddNumberToObject(pMeta, "contLen", metaArr[i].contLen); + cJSON_AddItemToArray(pMetaArr, pMeta); + } + + cJSON* pMsgArr = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "msgArr", pMsgArr); + for (int i = 0; i < pMsg->dataCount; ++i) { + cJSON* pRpcMsgJson = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRpcMsgJson, "code", msgArr[i].code); + cJSON_AddNumberToObject(pRpcMsgJson, "contLen", msgArr[i].contLen); + cJSON_AddNumberToObject(pRpcMsgJson, "msgType", msgArr[i].msgType); + cJSON_AddItemToArray(pMsgArr, pRpcMsgJson); + } + + char* s; + s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + taosMemoryFree(s); + s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + taosMemoryFree(s); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncAppendEntriesBatch", pRoot); + return pJson; +} + +char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg) { + cJSON* pJson = syncAppendEntriesBatch2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +void syncAppendEntriesBatch2RpcMsgArray(SyncAppendEntriesBatch* pSyncMsg, SRpcMsg* rpcMsgArr, int32_t maxArrSize, + int32_t* pRetArrSize) { + if (pRetArrSize != NULL) { + *pRetArrSize = pSyncMsg->dataCount; + } + + int32_t arrSize = pSyncMsg->dataCount; + if (arrSize > maxArrSize) { + arrSize = maxArrSize; + } + + int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pSyncMsg->dataCount; // + int32_t rpcArrayLen = sizeof(SRpcMsg) * pSyncMsg->dataCount; // SRpcMsg + int32_t contArrayLen = pSyncMsg->dataLen - metaArrayLen - rpcArrayLen; + + SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pSyncMsg->data); + SRpcMsg* msgArr = (SRpcMsg*)(pSyncMsg->data + metaArrayLen); + void* pData = pSyncMsg->data + metaArrayLen + rpcArrayLen; + + for (int i = 0; i < arrSize; ++i) { + rpcMsgArr[i] = msgArr[i]; + rpcMsgArr[i].pCont = rpcMallocCont(msgArr[i].contLen); + void* pRpcCont = pSyncMsg->data + metaArr[i].offset; + memcpy(rpcMsgArr[i].pCont, pRpcCont, rpcMsgArr[i].contLen); + } +} + +// for debug ---------------------- +void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg) { + char* serialized = syncAppendEntriesBatch2Str(pMsg); + printf("syncAppendEntriesBatchPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncAppendEntriesBatchPrint2(char* s, const SyncAppendEntriesBatch* pMsg) { + char* serialized = syncAppendEntriesBatch2Str(pMsg); + printf("syncAppendEntriesBatchPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncAppendEntriesBatchLog(const SyncAppendEntriesBatch* pMsg) { + char* serialized = syncAppendEntriesBatch2Str(pMsg); + sTrace("syncAppendEntriesBatchLog | len:%lu | %s", strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncAppendEntriesBatchLog2(char* s, const SyncAppendEntriesBatch* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncAppendEntriesBatch2Str(pMsg); + sTraceLong("syncAppendEntriesBatchLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} + // ---- message process SyncAppendEntriesReply---- SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncAppendEntriesReply); diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 43890c196c..ec3f18132d 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) { cJSON *pJson = syncCfg2Json(pSyncCfg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { if (pSyncCfg != NULL) { int32_t len = 512; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); memset(s, 0, len); snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); @@ -205,7 +205,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) { cJSON *pJson = raftCfg2Json(pRaftCfg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -280,7 +280,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); } - cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); + cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); ASSERT(code == 0); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 70f3801e61..56f5aaa239 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -349,14 +349,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { int32_t len = 256; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; char host[64]; @@ -604,7 +604,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON *pTmp = pFromId; + cJSON * pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -637,14 +637,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { int32_t len = 256; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); SRaftId fromId = pReceiver->fromId; char host[128]; diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index bc63462a9e..c549b78399 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -21,6 +21,7 @@ add_executable(syncEntryCacheTest "") add_executable(syncRequestVoteTest "") add_executable(syncRequestVoteReplyTest "") add_executable(syncAppendEntriesTest "") +add_executable(syncAppendEntriesBatchTest "") add_executable(syncAppendEntriesReplyTest "") add_executable(syncClientRequestTest "") add_executable(syncTimeoutTest "") @@ -146,6 +147,10 @@ target_sources(syncAppendEntriesTest PRIVATE "syncAppendEntriesTest.cpp" ) +target_sources(syncAppendEntriesBatchTest + PRIVATE + "syncAppendEntriesBatchTest.cpp" +) target_sources(syncAppendEntriesReplyTest PRIVATE "syncAppendEntriesReplyTest.cpp" @@ -387,6 +392,11 @@ target_include_directories(syncAppendEntriesTest "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncAppendEntriesBatchTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_include_directories(syncAppendEntriesReplyTest PUBLIC "${TD_SOURCE_DIR}/include/libs/sync" @@ -636,6 +646,10 @@ target_link_libraries(syncAppendEntriesTest sync gtest_main ) +target_link_libraries(syncAppendEntriesBatchTest + sync + gtest_main +) target_link_libraries(syncAppendEntriesReplyTest sync gtest_main diff --git a/source/libs/sync/test/syncAppendEntriesBatchTest.cpp b/source/libs/sync/test/syncAppendEntriesBatchTest.cpp new file mode 100644 index 0000000000..8784ddd637 --- /dev/null +++ b/source/libs/sync/test/syncAppendEntriesBatchTest.cpp @@ -0,0 +1,139 @@ +//#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" +#include "trpc.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) { + SRpcMsg *pRpcMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg)); + memset(pRpcMsg, 0, sizeof(SRpcMsg)); + + pRpcMsg->msgType = TDMT_SYNC_PING; + pRpcMsg->contLen = dataLen; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + pRpcMsg->code = 10 * i; + snprintf((char *)pRpcMsg->pCont, pRpcMsg->contLen, "value_%d", i); + + return pRpcMsg; +} + +SyncAppendEntriesBatch *createMsg() { + SRpcMsg rpcMsgArr[5]; + memset(rpcMsgArr, 0, sizeof(rpcMsgArr)); + + for (int32_t i = 0; i < 5; ++i) { + SRpcMsg *pRpcMsg = createRpcMsg(i, 20); + rpcMsgArr[i] = *pRpcMsg; + taosMemoryFree(pRpcMsg); + } + + SyncAppendEntriesBatch *pMsg = syncAppendEntriesBatchBuild(rpcMsgArr, 5, 1234); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->prevLogIndex = 11; + pMsg->prevLogTerm = 22; + pMsg->commitIndex = 33; + pMsg->privateTerm = 44; + return pMsg; +} + +void test1() { + SyncAppendEntriesBatch *pMsg = createMsg(); + syncAppendEntriesBatchLog2((char *)"test1:", pMsg); + + SRpcMsg rpcMsgArr[5]; + int32_t retArrSize; + syncAppendEntriesBatch2RpcMsgArray(pMsg, rpcMsgArr, 5, &retArrSize); + for (int i = 0; i < retArrSize; ++i) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "==test1 decode rpc msg %d: msgType:%d, code:%d, contLen:%d, pCont:%s \n", i, + rpcMsgArr[i].msgType, rpcMsgArr[i].code, rpcMsgArr[i].contLen, (char *)rpcMsgArr[i].pCont); + sTrace("%s", logBuf); + } + + syncAppendEntriesBatchDestroy(pMsg); +} + +/* +void test2() { + SyncAppendEntries *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)taosMemoryMalloc(len); + syncAppendEntriesSerialize(pMsg, serialized, len); + SyncAppendEntries *pMsg2 = syncAppendEntriesBuild(pMsg->dataLen, 1000); + syncAppendEntriesDeserialize(serialized, len, pMsg2); + syncAppendEntriesLog2((char *)"test2: syncAppendEntriesSerialize -> syncAppendEntriesDeserialize ", pMsg2); + + taosMemoryFree(serialized); + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); +} + +void test3() { + SyncAppendEntries *pMsg = createMsg(); + uint32_t len; + char * serialized = syncAppendEntriesSerialize2(pMsg, &len); + SyncAppendEntries *pMsg2 = syncAppendEntriesDeserialize2(serialized, len); + syncAppendEntriesLog2((char *)"test3: syncAppendEntriesSerialize3 -> syncAppendEntriesDeserialize2 ", pMsg2); + + taosMemoryFree(serialized); + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); +} + +void test4() { + SyncAppendEntries *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncAppendEntries2RpcMsg(pMsg, &rpcMsg); + SyncAppendEntries *pMsg2 = (SyncAppendEntries *)taosMemoryMalloc(rpcMsg.contLen); + syncAppendEntriesFromRpcMsg(&rpcMsg, pMsg2); + syncAppendEntriesLog2((char *)"test4: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); +} + +void test5() { + SyncAppendEntries *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncAppendEntries2RpcMsg(pMsg, &rpcMsg); + SyncAppendEntries *pMsg2 = syncAppendEntriesFromRpcMsg2(&rpcMsg); + syncAppendEntriesLog2((char *)"test5: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg2 ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); +} +*/ + +int main() { + gRaftDetailLog = true; + tsAsyncLog = 0; + sDebugFlag = DEBUG_DEBUG + DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + logTest(); + + test1(); + + /* + test2(); + test3(); + test4(); + test5(); + */ + + return 0; +} From 376bf46a64dd515d7cf043f2d055261f968b0409 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 30 Jun 2022 15:28:23 +0800 Subject: [PATCH 2/6] refactor(sync): add SyncClientRequestBatch --- include/common/tmsgdef.h | 1 + include/libs/sync/sync.h | 1 + include/libs/sync/syncTools.h | 31 ++++++++++++++++++++++++ source/libs/sync/src/syncMain.c | 2 ++ source/libs/sync/src/syncMessage.c | 39 ++++++++++++++++++++++++++++++ 5 files changed, 74 insertions(+) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 926d4eb51f..c0c1482828 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -232,6 +232,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_BATCH, "sync-client-request-batch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_REPLY, "sync-client-request-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE, "sync-request-vote", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE_REPLY, "sync-request-vote-reply", NULL, NULL) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 5fa7eed40c..dea6d35d4d 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -202,6 +202,7 @@ SyncGroupId syncGetVgId(int64_t rid); void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); +int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize); bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 8975f930db..c4ce9b9694 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -219,6 +219,33 @@ void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg); void syncClientRequestLog(const SyncClientRequest* pMsg); void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg); +// --------------------------------------------- +typedef struct SOffsetAndContLen { + int32_t offset; + int32_t contLen; +} SOffsetAndContLen; + +typedef struct SRaftMeta { + uint64_t seqNum; + bool isWeak; +} SRaftMeta; + +// block1: +// block2: SRaftMeta array +// block3: rpc msg array (with pCont) + +typedef struct SyncClientRequestBatch { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; // SyncClientRequestBatch msgType + uint32_t dataCount; + uint32_t dataLen; // user RpcMsg.contLen + char data[]; // user RpcMsg.pCont +} SyncClientRequestBatch; + +SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize, + int32_t vgId); + // --------------------------------------------- typedef struct SyncClientRequestReply { uint32_t bytes; @@ -325,10 +352,14 @@ void syncAppendEntriesLog(const SyncAppendEntries* pMsg); void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg); // --------------------------------------------- + +// define ahead +/* typedef struct SOffsetAndContLen { int32_t offset; int32_t contLen; } SOffsetAndContLen; +*/ typedef struct SyncAppendEntriesBatch { uint32_t bytes; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d32153e5ed..c065f74663 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -627,6 +627,8 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { return ret; } +int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) { return 0; } + int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { int32_t ret = 0; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index acece6d48a..38ecb5c9d1 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -956,6 +956,45 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) { } } +// ---- message process SyncClientRequestBatch---- + +// block1: +// block2: SRaftMeta array +// block3: rpc msg array (with pCont) + +SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize, + int32_t vgId) { + ASSERT(rpcMsgArr != NULL); + ASSERT(arrSize > 0); + + int32_t dataLen = 0; + int32_t raftMetaArrayLen = sizeof(SRpcMsg) * arrSize; + int32_t rpcArrayLen = sizeof(SRaftMeta) * arrSize; + + uint32_t bytes = sizeof(SyncClientRequestBatch) + dataLen; + SyncClientRequestBatch* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST_BATCH; + pMsg->dataCount = arrSize; + pMsg->dataLen = dataLen; + + SRaftMeta* raftMetaArr = (SRaftMeta*)(pMsg->data); + SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen); + + for (int i = 0; i < arrSize; ++i) { + // init raftMetaArr + raftMetaArr[i].isWeak = raftArr[i].isWeak; + raftMetaArr[i].seqNum = raftArr[i].seqNum; + + // init msgArr + msgArr[i] = rpcMsgArr[i]; + } + + return pMsg; +} + // ---- message process SyncRequestVote---- SyncRequestVote* syncRequestVoteBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncRequestVote); From 24a3c817a46baac28fc9081b94db5da098f1dd84 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 30 Jun 2022 16:44:49 +0800 Subject: [PATCH 3/6] refactor(sync): add batch propose --- include/libs/sync/sync.h | 2 + include/libs/sync/syncTools.h | 2 + include/util/taoserror.h | 1 + source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/src/syncMain.c | 132 ++++++++++++++++++++++++++++- source/libs/sync/src/syncMessage.c | 7 +- source/util/src/terror.c | 1 + 7 files changed, 142 insertions(+), 4 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index dea6d35d4d..d5d6fa5706 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -26,6 +26,7 @@ extern "C" { extern bool gRaftDetailLog; +#define SYNC_MAX_BATCH_SIZE 100 #define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_INVALID -1 #define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF @@ -179,6 +180,7 @@ typedef struct SSyncInfo { bool isStandBy; bool snapshotEnable; SyncGroupId vgId; + int32_t batchSize; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; SWal* pWal; diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index c4ce9b9694..a180755728 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -245,6 +245,7 @@ typedef struct SyncClientRequestBatch { SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize, int32_t vgId); +void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg); // --------------------------------------------- typedef struct SyncClientRequestReply { @@ -600,6 +601,7 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex); +int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* pMsg); int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 08e09dca2f..737c7abecb 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -422,6 +422,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_RECONFIG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0910) #define TSDB_CODE_SYN_PROPOSE_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0911) #define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912) +#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) // tq diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index d58ae83dd3..7af752ff42 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -170,6 +170,7 @@ void syncNodeStart(SSyncNode* pSyncNode); void syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); +int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize); // option bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c065f74663..ddda60bc18 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -50,7 +50,6 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths); // process message ---- int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); -int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex); // life cycle static void syncFreeNode(void* param); @@ -627,7 +626,93 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { return ret; } -int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) { return 0; } +int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) { + if (arrSize < 0) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } + + int32_t ret = 0; + SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } + ASSERT(rid == pSyncNode->rid); + ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return ret; +} + +static bool syncNodeBatchOK(SRpcMsg* pMsgArr, int32_t arrSize) { + for (int32_t i = 0; i < arrSize; ++i) { + if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE) { + return false; + } + + if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) { + return false; + } + } + + return true; +} + +int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) { + if (!syncNodeBatchOK(pMsgArr, arrSize)) { + syncNodeErrorLog(pSyncNode, "sync propose batch error"); + terrno = TSDB_CODE_SYN_BATCH_ERROR; + return -1; + } + + if (arrSize > SYNC_MAX_BATCH_SIZE) { + syncNodeErrorLog(pSyncNode, "sync propose match batch error"); + terrno = TSDB_CODE_SYN_BATCH_ERROR; + return -1; + } + + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + syncNodeErrorLog(pSyncNode, "sync propose not leader"); + terrno = TSDB_CODE_SYN_NOT_LEADER; + return -1; + } + + if (pSyncNode->changing) { + syncNodeErrorLog(pSyncNode, "sync propose not ready"); + terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; + return -1; + } + + SRaftMeta raftArr[SYNC_MAX_BATCH_SIZE]; + for (int i = 0; i < arrSize; ++i) { + SRespStub stub; + stub.createTime = taosGetTimestampMs(); + stub.rpcMsg = pMsgArr[i]; + uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); + + raftArr[i].isWeak = pIsWeakArr[i]; + raftArr[i].seqNum = seqNum; + } + + SyncClientRequestBatch* pSyncMsg = syncClientRequestBatchBuild(pMsgArr, raftArr, arrSize, pSyncNode->vgId); + ASSERT(pSyncMsg != NULL); + + SRpcMsg rpcMsg; + syncClientRequestBatch2RpcMsg(pSyncMsg, &rpcMsg); + taosMemoryFree(pSyncMsg); // only free msg body, do not free rpc msg content + + if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { + // enqueue msg ok + + } else { + sError("enqueue msg error, FpEqMsg is NULL"); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } + + return 0; +} int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { int32_t ret = 0; @@ -2364,6 +2449,49 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI return ret; } +int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* pMsg) { + int32_t code = 0; + + if (ths->state != TAOS_SYNC_STATE_LEADER) { + // call FpCommitCb, delete resp mgr + return -1; + } + + SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); + SyncTerm term = ths->pRaftStore->currentTerm; + + int32_t raftMetaArrayLen = sizeof(SRaftMeta) * pMsg->dataCount; + int32_t rpcArrayLen = sizeof(SRpcMsg) * pMsg->dataCount; + SRaftMeta* raftMetaArr = (SRaftMeta*)(pMsg->data); + SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen); + for (int32_t i = 0; i < pMsg->dataCount; ++i) { + SSyncRaftEntry* pEntry = syncEntryBuild(msgArr[i].contLen); + ASSERT(pEntry != NULL); + + pEntry->originalRpcType = msgArr[i].msgType; + pEntry->seqNum = raftMetaArr[i].seqNum; + pEntry->isWeak = raftMetaArr[i].isWeak; + pEntry->term = term; + pEntry->index = index; + memcpy(pEntry->data, msgArr[i].pCont, msgArr[i].contLen); + ASSERT(msgArr[i].contLen == pEntry->dataLen); + + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); + if (code != 0) { + // del resp mgr, call FpCommitCb + ASSERT(0); + return -1; + } + } + + // fsync once + SSyncLogStoreData* pData = ths->pLogStore->data; + SWal* pWal = pData->pWal; + walFsync(pWal, true); + + return 0; +} + static void syncFreeNode(void* param) { SSyncNode* pNode = param; // inner object already free diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 38ecb5c9d1..ad352df59f 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -968,8 +968,9 @@ SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMet ASSERT(arrSize > 0); int32_t dataLen = 0; - int32_t raftMetaArrayLen = sizeof(SRpcMsg) * arrSize; - int32_t rpcArrayLen = sizeof(SRaftMeta) * arrSize; + int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize; + int32_t rpcArrayLen = sizeof(SRpcMsg) * arrSize; + dataLen += (raftMetaArrayLen + rpcArrayLen); uint32_t bytes = sizeof(SyncClientRequestBatch) + dataLen; SyncClientRequestBatch* pMsg = taosMemoryMalloc(bytes); @@ -995,6 +996,8 @@ SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMet return pMsg; } +void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg) {} + // ---- message process SyncRequestVote---- SyncRequestVote* syncRequestVoteBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncRequestVote); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 96f0fb21ca..bb20039666 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -429,6 +429,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEW_CONFIG_ERROR, "Sync new config error TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RECONFIG_NOT_READY, "Sync not ready for reconfig") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for propose") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for standby") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") // wal From 4f39cd31b0527f285b64c91cb8c997e8c959bde6 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 1 Jul 2022 09:50:20 +0800 Subject: [PATCH 4/6] refactor(sync): add snapshot2 interface --- include/libs/sync/sync.h | 2 +- source/libs/sync/inc/syncAppendEntries.h | 1 + source/libs/sync/inc/syncAppendEntriesReply.h | 1 + source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/inc/syncReplication.h | 7 ++ source/libs/sync/src/syncAppendEntries.c | 2 + source/libs/sync/src/syncAppendEntriesReply.c | 57 +++++++++ source/libs/sync/src/syncReplication.c | 111 ++++++++++++++++++ 8 files changed, 181 insertions(+), 1 deletion(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index d5d6fa5706..9a4a00251c 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -204,7 +204,7 @@ SyncGroupId syncGetVgId(int64_t rid); void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); -int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize); +// int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize); bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index 98df22d51b..e15c85d73b 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -94,6 +94,7 @@ extern "C" { // int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg); +int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index e509a50dc4..03148252fb 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -42,6 +42,7 @@ extern "C" { // int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 7af752ff42..66e5d28bdd 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -67,6 +67,7 @@ typedef struct SSyncNode { char path[TSDB_FILENAME_LEN]; char raftStorePath[TSDB_FILENAME_LEN * 2]; char configPath[TSDB_FILENAME_LEN * 2]; + int32_t batchSize; // sync io SWal* pWal; diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 4b1f5b4638..d6aa52523c 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -53,8 +53,15 @@ extern "C" { // int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode); +int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode); int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); +int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg); + +typedef struct SReaderParam { + SyncIndex start; + SyncIndex end; +} SReaderParam; #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index a10b81d165..029374368f 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -719,6 +719,8 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries return false; } +int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg) { return 0; } + int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t ret = 0; int32_t code = 0; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 10f2745651..39803587e4 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -170,6 +170,63 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p } #endif +int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { + int32_t ret = 0; + + // if already drop replica, do not process + if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { + syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped"); + return -1; + } + + // drop stale response + if (pMsg->term < ths->pRaftStore->currentTerm) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%lu, drop stale response", pMsg->term); + syncNodeEventLog(ths, logBuf); + return -1; + } + + if (pMsg->term > ths->pRaftStore->currentTerm) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%lu", pMsg->term); + syncNodeErrorLog(ths, logBuf); + return -1; + } + + ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + + if (pMsg->success) { + // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); + + if (gRaftDetailLog) { + sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success); + } + + // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] + syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); + + // maybe commit + if (ths->state == TAOS_SYNC_STATE_LEADER) { + syncMaybeAdvanceCommitIndex(ths); + } + + } else { + SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + + // notice! int64, uint64 + if (nextIndex > SYNC_INDEX_BEGIN) { + --nextIndex; + } else { + nextIndex = SYNC_INDEX_BEGIN; + } + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); + } + + return 0; +} + int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 46850758f1..bc834786c6 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -116,6 +116,97 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { return ret; } +int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + return -1; + } + + int32_t ret = 0; + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SRaftId* pDestId = &(pSyncNode->peersId[i]); + + // next index + SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); + + // if pre-entry not exist, create snapshot + SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); + SSyncRaftEntry* pPreEntry = NULL; + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preLogIndex, &pPreEntry); + if (code == -1 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; + if (pSyncNode->pFsm->FpGetSnapshot != NULL) { + void* pReader = NULL; + SReaderParam readerParam = {.start = 0, .end = nextIndex}; + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot, &readerParam, &pReader); + + // get sender + SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); + ASSERT(pSender != NULL); + + if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 && + !snapshotSenderIsStart(pSender)) { + // start snapshot + snapshotSenderStart(pSender, snapshot, pReader); + } else { + // no snapshot + if (pReader != NULL) { + pSyncNode->pFsm->FpSnapshotStopRead(pSyncNode->pFsm, pReader); + } + } + } + } + + // pre index, pre term + preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); + SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); + if (preLogTerm == SYNC_TERM_INVALID) { + SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; + syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); + syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); + sError("vgId:%d sync get pre term error, nextIndex:%ld, update next-index:%ld, match-index:%d, raftid:%ld", + pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr); + + return -1; + } + + SRpcMsg rpcMsgArr[SYNC_MAX_BATCH_SIZE]; + memset(rpcMsgArr, 0, sizeof(rpcMsgArr)); + + int32_t getCount = 0; + for (int32_t i = 0; i < pSyncNode->batchSize; ++i) { + SSyncRaftEntry* pEntry; + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry); + if (code == 0) { + ASSERT(pEntry != NULL); + // get rpc msg [i] from entry + syncEntryDestory(pEntry); + getCount++; + } else { + break; + } + } + + SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(rpcMsgArr, getCount, pSyncNode->vgId); + ASSERT(pMsg != NULL); + + // prepare msg + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = *pDestId; + pMsg->term = pSyncNode->pRaftStore->currentTerm; + pMsg->prevLogIndex = preLogIndex; + pMsg->prevLogTerm = preLogTerm; + pMsg->commitIndex = pSyncNode->commitIndex; + pMsg->privateTerm = 0; + pMsg->dataCount = getCount; + + // send msg + syncNodeAppendEntriesBatch(pSyncNode, pDestId, pMsg); + syncAppendEntriesBatchDestroy(pMsg); + } + + return 0; +} + int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); @@ -234,4 +325,24 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c syncAppendEntries2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); return ret; +} + +int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, + const SyncAppendEntriesBatch* pMsg) { + do { + char host[128]; + uint16_t port; + syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port); + sDebug( + "vgId:%d, send sync-append-entries-batch to %s:%d, {term:%lu, pre-index:%ld, pre-term:%lu, pterm:%lu, " + "commit:%ld, " + "datalen:%d, dataCount:%d}", + pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, + pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount); + } while (0); + + SRpcMsg rpcMsg; + syncAppendEntriesBatch2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); + return 0; } \ No newline at end of file From 91238e823ebaaa3fce08ec88988503f0741c4766 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 1 Jul 2022 14:22:14 +0800 Subject: [PATCH 5/6] refactor(sync): add snapshot2 interface --- include/libs/sync/sync.h | 1 + source/libs/sync/inc/syncAppendEntriesReply.h | 5 + source/libs/sync/inc/syncReplication.h | 5 - source/libs/sync/src/syncAppendEntriesReply.c | 122 +++++++++--------- source/libs/sync/src/syncReplication.c | 36 +----- 5 files changed, 70 insertions(+), 99 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 9a4a00251c..bd148fd158 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -165,6 +165,7 @@ typedef struct SSyncLogStore { bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore); int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore); int32_t (*syncLogRestoreFromSnapshot)(struct SSyncLogStore* pLogStore, SyncIndex index); + bool (*syncLogExist)(struct SSyncLogStore* pLogStore, SyncIndex index); SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore); diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 03148252fb..70ce5a72c6 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -44,6 +44,11 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +typedef struct SReaderParam { + SyncIndex start; + SyncIndex end; +} SReaderParam; + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index d6aa52523c..07e12460da 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -58,11 +58,6 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg); -typedef struct SReaderParam { - SyncIndex start; - SyncIndex end; -} SReaderParam; - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 39803587e4..827b2a59a5 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -108,67 +108,29 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p return ret; } -#if 0 -int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { - int32_t ret = 0; +// only start once +static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm, + SyncAppendEntriesReply* pMsg) { + // get sender + SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); + ASSERT(pSender != NULL); - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesReplyCb== term:%lu", ths->pRaftStore->currentTerm); - syncAppendEntriesReplyLog2(logBuf, pMsg); + SSnapshot snapshot = { + .data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID}; - if (pMsg->term < ths->pRaftStore->currentTerm) { - sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, - ths->pRaftStore->currentTerm); - return ret; - } - - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex); - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex); - - // no need this code, because if I receive reply.term, then I must have sent for that term. - // if (pMsg->term > ths->pRaftStore->currentTerm) { - // syncNodeUpdateTerm(ths, pMsg->term); - // } - - if (pMsg->term > ths->pRaftStore->currentTerm) { - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "syncNodeOnAppendEntriesReplyCb error term, receive:%lu current:%lu", pMsg->term, - ths->pRaftStore->currentTerm); - syncNodeLog2(logBuf, ths); - sError("%s", logBuf); - return ret; - } - - ASSERT(pMsg->term == ths->pRaftStore->currentTerm); - - if (pMsg->success) { - // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] - syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); - - // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] - syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); - - // maybe commit - syncMaybeAdvanceCommitIndex(ths); + void* pReader = NULL; + SReaderParam readerParam = {.start = beginIndex, .end = endIndex}; + ths->pFsm->FpSnapshotStartRead(ths->pFsm, &pReader); + if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) { + ASSERT(pReader != NULL); + snapshotSenderStart(pSender, snapshot, pReader); } else { - SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - - // notice! int64, uint64 - if (nextIndex > SYNC_INDEX_BEGIN) { - --nextIndex; - } else { - nextIndex = SYNC_INDEX_BEGIN; + if (pReader != NULL) { + ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader); } - syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); } - - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex); - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex); - - return ret; } -#endif int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; @@ -187,6 +149,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie return -1; } + // error term if (pMsg->term > ths->pRaftStore->currentTerm) { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%lu", pMsg->term); @@ -197,19 +160,29 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ASSERT(pMsg->term == ths->pRaftStore->currentTerm); if (pMsg->success) { - // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] - syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); + SyncIndex newNextIndex = pMsg->matchIndex + 1; + SyncIndex newMatchIndex = pMsg->matchIndex; - if (gRaftDetailLog) { - sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success); - } + if (ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex) && + ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex - 1)) { + // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), newNextIndex); - // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] - syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); + // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] + syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex); - // maybe commit - if (ths->state == TAOS_SYNC_STATE_LEADER) { - syncMaybeAdvanceCommitIndex(ths); + // maybe commit + if (ths->state == TAOS_SYNC_STATE_LEADER) { + syncMaybeAdvanceCommitIndex(ths); + } + } else { + // start snapshot + SSnapshot snapshot; + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); + syncNodeStartSnapshot(ths, newMatchIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pMsg); + + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), snapshot.lastApplyIndex + 1); + syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex); } } else { @@ -218,6 +191,27 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie // notice! int64, uint64 if (nextIndex > SYNC_INDEX_BEGIN) { --nextIndex; + + if (ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex) && + ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex - 1)) { + // do nothing + } else { + SSyncRaftEntry* pEntry; + int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, nextIndex, &pEntry); + ASSERT(code == 0); + syncNodeStartSnapshot(ths, SYNC_INDEX_BEGIN, nextIndex, pEntry->term, pMsg); + + // get sender + SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); + ASSERT(pSender != NULL); + SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1; + + // update nextIndex to sentryIndex + if (nextIndex <= sentryIndex) { + nextIndex = sentryIndex; + } + } + } else { nextIndex = SYNC_INDEX_BEGIN; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index bc834786c6..4908822a3a 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -128,38 +128,14 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { // next index SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); - // if pre-entry not exist, create snapshot - SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); - SSyncRaftEntry* pPreEntry = NULL; - int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preLogIndex, &pPreEntry); - if (code == -1 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; - if (pSyncNode->pFsm->FpGetSnapshot != NULL) { - void* pReader = NULL; - SReaderParam readerParam = {.start = 0, .end = nextIndex}; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot, &readerParam, &pReader); - - // get sender - SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); - ASSERT(pSender != NULL); - - if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 && - !snapshotSenderIsStart(pSender)) { - // start snapshot - snapshotSenderStart(pSender, snapshot, pReader); - } else { - // no snapshot - if (pReader != NULL) { - pSyncNode->pFsm->FpSnapshotStopRead(pSyncNode->pFsm, pReader); - } - } - } - } - // pre index, pre term - preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); - SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); + SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); + SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); if (preLogTerm == SYNC_TERM_INVALID) { + SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); + ASSERT(pSender != NULL); + ASSERT(!snapshotSenderIsStart(pSender)); + SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); From 0ffc6c46f328f3c886159def06ef7721833a4630 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 1 Jul 2022 14:47:12 +0800 Subject: [PATCH 6/6] refactor(sync): add snapshot2 interface --- include/libs/sync/sync.h | 2 +- source/dnode/mnode/impl/src/mndSync.c | 2 +- source/dnode/vnode/src/vnd/vnodeSync.c | 10 +++++----- source/libs/sync/src/syncAppendEntriesReply.c | 2 +- source/libs/sync/test/syncConfigChangeSnapshotTest.cpp | 2 +- source/libs/sync/test/syncSnapshotSenderTest.cpp | 2 +- source/libs/sync/test/syncTestTool.cpp | 2 +- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index bd148fd158..d07cf0adf1 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -121,7 +121,7 @@ typedef struct SSyncFSM { int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader); int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); - int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader); + int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader); int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader); int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index c58ce97588..693eed5222 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -117,7 +117,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM } } -int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { +int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { mDebug("start to read snapshot from sdb"); SMnode *pMnode = pFsm->data; return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index f9e811fc32..80ba80e61a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -119,7 +119,7 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { } void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SVnode * pVnode = pInfo->ahandle; + SVnode *pVnode = pInfo->ahandle; int32_t vgId = pVnode->config.vgId; int32_t code = 0; SRpcMsg *pMsg = NULL; @@ -199,7 +199,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { } void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SVnode * pVnode = pInfo->ahandle; + SVnode *pVnode = pInfo->ahandle; int32_t vgId = pVnode->config.vgId; int32_t code = 0; SRpcMsg *pMsg = NULL; @@ -240,7 +240,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { STraceId *trace = &pMsg->info.traceId; do { - char * syncNodeStr = sync2SimpleStr(pVnode->sync); + char *syncNodeStr = sync2SimpleStr(pVnode->sync); static int64_t vndTick = 0; if (++vndTick % 10 == 1) { vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); @@ -375,7 +375,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon } static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SVnode * pVnode = pFsm->data; + SVnode *pVnode = pFsm->data; SSnapshot snapshot = {0}; SyncIndex beginIndex = SYNC_INDEX_INVALID; char logBuf[256] = {0}; @@ -409,7 +409,7 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } -static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; } +static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { return 0; } static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 827b2a59a5..629d83eb51 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -120,7 +120,7 @@ static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncInde void* pReader = NULL; SReaderParam readerParam = {.start = beginIndex, .end = endIndex}; - ths->pFsm->FpSnapshotStartRead(ths->pFsm, &pReader); + ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader); if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) { ASSERT(pReader != NULL); snapshotSenderStart(pSender, snapshot, pReader); diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index b4f173ff02..2908dd5907 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -77,7 +77,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } -int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { +int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) { *ppReader = (void*)0xABCD; char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader); diff --git a/source/libs/sync/test/syncSnapshotSenderTest.cpp b/source/libs/sync/test/syncSnapshotSenderTest.cpp index 04a02460af..dc38cd3df5 100644 --- a/source/libs/sync/test/syncSnapshotSenderTest.cpp +++ b/source/libs/sync/test/syncSnapshotSenderTest.cpp @@ -25,7 +25,7 @@ void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) int32_t GetSnapshot(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } -int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { return 0; } +int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) { return 0; } int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; } int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; } diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 91a16cc033..4e84ff0f7e 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -74,7 +74,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } -int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { +int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) { *ppReader = (void*)0xABCD; char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader);