From 78efbaabe151856ce3659d14c5ec4eb88b111219 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 6 Jul 2022 15:16:52 +0800 Subject: [PATCH] refactor(sync): add syncGetSnapshotByIndex --- include/libs/sync/sync.h | 1 + source/libs/sync/inc/syncRaftCfg.h | 10 +++--- source/libs/sync/inc/syncSnapshot.h | 18 +++++------ source/libs/sync/src/syncMain.c | 32 +++++++++++++++++++ source/libs/sync/src/syncRaftCfg.c | 8 ++--- source/libs/sync/src/syncRespMgr.c | 2 +- source/libs/sync/src/syncSnapshot.c | 10 +++--- .../sync/test/syncAppendEntriesBatchTest.cpp | 18 +++++------ .../test/syncConfigChangeSnapshotTest.cpp | 2 +- source/libs/sync/test/syncIndexTest.cpp | 5 ++- source/libs/sync/test/syncRespMgrTest.cpp | 2 +- .../sync/test/syncSnapshotReceiverTest.cpp | 2 +- source/libs/sync/test/syncTestTool.cpp | 26 +++++++-------- 13 files changed, 84 insertions(+), 52 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 5c539f0ef3..8a6d6b7722 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -215,6 +215,7 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, in bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); +int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot); int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); diff --git a/source/libs/sync/inc/syncRaftCfg.h b/source/libs/sync/inc/syncRaftCfg.h index fecd66e4da..fead7cdc76 100644 --- a/source/libs/sync/inc/syncRaftCfg.h +++ b/source/libs/sync/inc/syncRaftCfg.h @@ -50,14 +50,14 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg); int32_t raftCfgPersist(SRaftCfg *pRaftCfg); int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex); -cJSON *syncCfg2Json(SSyncCfg *pSyncCfg); -char *syncCfg2Str(SSyncCfg *pSyncCfg); -char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg); +cJSON * syncCfg2Json(SSyncCfg *pSyncCfg); +char * syncCfg2Str(SSyncCfg *pSyncCfg); +char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg); int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg); int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg); -cJSON *raftCfg2Json(SRaftCfg *pRaftCfg); -char *raftCfg2Str(SRaftCfg *pRaftCfg); +cJSON * raftCfg2Json(SRaftCfg *pRaftCfg); +char * raftCfg2Str(SRaftCfg *pRaftCfg); int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 3b1e4f4560..0dc67cf150 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -40,14 +40,14 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void *pReader; - void *pCurrentBlock; + void * pReader; + void * pCurrentBlock; int32_t blockLen; SSnapshotParam snapshotParam; SSnapshot snapshot; SSyncCfg lastConfig; int64_t sendingMS; - SSyncNode *pSyncNode; + SSyncNode * pSyncNode; int32_t replicaIndex; SyncTerm term; SyncTerm privateTerm; @@ -64,20 +64,20 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char *snapshotSender2Str(SSyncSnapshotSender *pSender); -char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); +char * snapshotSender2Str(SSyncSnapshotSender *pSender); +char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); //--------------------------------------------------- typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void *pWriter; + void * pWriter; SyncTerm term; SyncTerm privateTerm; SSnapshotParam snapshotParam; SSnapshot snapshot; SRaftId fromId; - SSyncNode *pSyncNode; + SSyncNode * pSyncNode; } SSyncSnapshotReceiver; @@ -88,8 +88,8 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); +char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); //--------------------------------------------------- // on message diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 814fad5858..8532cec027 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -397,6 +397,38 @@ bool syncIsRestoreFinish(int64_t rid) { return b; } +int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) { + if (index < SYNC_INDEX_BEGIN) { + return -1; + } + + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return -1; + } + ASSERT(rid == pSyncNode->rid); + + SSyncRaftEntry* pEntry = NULL; + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry); + if (code != 0) { + if (pEntry != NULL) { + syncEntryDestory(pEntry); + } + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return -1; + } + ASSERT(pEntry != NULL); + + pSnapshot->data = NULL; + pSnapshot->lastApplyIndex = index; + pSnapshot->lastApplyTerm = pEntry->term; + pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index); + + syncEntryDestory(pEntry); + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; +} + int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 6c381f6e7d..ead1168632 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) { cJSON *pJson = syncCfg2Json(pSyncCfg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { if (pSyncCfg != NULL) { int32_t len = 512; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); memset(s, 0, len); snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); @@ -206,7 +206,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) { cJSON *pJson = raftCfg2Json(pRaftCfg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -285,7 +285,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); } - cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); + cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); ASSERT(code == 0); diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 990a92aad7..8dd1349edb 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -127,7 +127,7 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) { while (pStub) { size_t len; - void *key = taosHashGetKey(pStub, &len); + void * key = taosHashGetKey(pStub, &len); SyncIndex *pIndex = (SyncIndex *)key; int64_t nowMS = taosGetTimestampMs(); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 5cdfec72c5..a33f66733b 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -374,14 +374,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { int32_t len = 256; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; char host[64]; @@ -644,7 +644,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON *pTmp = pFromId; + cJSON * pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -677,14 +677,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { int32_t len = 256; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); SRaftId fromId = pReceiver->fromId; char host[128]; diff --git a/source/libs/sync/test/syncAppendEntriesBatchTest.cpp b/source/libs/sync/test/syncAppendEntriesBatchTest.cpp index 515d580b35..f2544d8fec 100644 --- a/source/libs/sync/test/syncAppendEntriesBatchTest.cpp +++ b/source/libs/sync/test/syncAppendEntriesBatchTest.cpp @@ -54,15 +54,15 @@ void test1() { SyncAppendEntriesBatch *pMsg = createMsg(); syncAppendEntriesBatchLog2((char *)"==test1==", pMsg); -/* - SOffsetAndContLen *metaArr = syncAppendEntriesBatchMetaTableArray(pMsg); - int32_t retArrSize = pMsg->dataCount; - for (int i = 0; i < retArrSize; ++i) { - SSyncRaftEntry *pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset); - ASSERT(pEntry->bytes == metaArr[i].contLen); - syncEntryPrint(pEntry); - } -*/ + /* + SOffsetAndContLen *metaArr = syncAppendEntriesBatchMetaTableArray(pMsg); + int32_t retArrSize = pMsg->dataCount; + for (int i = 0; i < retArrSize; ++i) { + SSyncRaftEntry *pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset); + ASSERT(pEntry->bytes == metaArr[i].contLen); + syncEntryPrint(pEntry); + } + */ syncAppendEntriesBatchDestroy(pMsg); } diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 968baff952..7cd97695f9 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -114,7 +114,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32 return 0; } -int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { +int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { *ppWriter = (void*)0xCDEF; char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter); diff --git a/source/libs/sync/test/syncIndexTest.cpp b/source/libs/sync/test/syncIndexTest.cpp index 8627a6c174..07a05d437e 100644 --- a/source/libs/sync/test/syncIndexTest.cpp +++ b/source/libs/sync/test/syncIndexTest.cpp @@ -8,11 +8,10 @@ void print(SHashObj *pNextIndex) { printf("----------------\n"); uint64_t *p = (uint64_t *)taosHashIterate(pNextIndex, NULL); while (p) { - size_t len; - void* key = taosHashGetKey(p, &len); + void * key = taosHashGetKey(p, &len); - SRaftId *pRaftId = (SRaftId*)key; + SRaftId *pRaftId = (SRaftId *)key; printf("key:<%lu, %d>, value:%lu \n", pRaftId->addr, pRaftId->vgId, *p); p = (uint64_t *)taosHashIterate(pNextIndex, p); diff --git a/source/libs/sync/test/syncRespMgrTest.cpp b/source/libs/sync/test/syncRespMgrTest.cpp index fd18109280..93a7ce430f 100644 --- a/source/libs/sync/test/syncRespMgrTest.cpp +++ b/source/libs/sync/test/syncRespMgrTest.cpp @@ -74,7 +74,7 @@ void syncRespMgrGetAndDelTest(uint64_t i) { } SSyncNode *createSyncNode() { - SSyncNode *pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); + SSyncNode *pSyncNode = (SSyncNode *)taosMemoryMalloc(sizeof(SSyncNode)); memset(pSyncNode, 0, sizeof(SSyncNode)); return pSyncNode; } diff --git a/source/libs/sync/test/syncSnapshotReceiverTest.cpp b/source/libs/sync/test/syncSnapshotReceiverTest.cpp index b4bf08dd40..e5d93ddff4 100644 --- a/source/libs/sync/test/syncSnapshotReceiverTest.cpp +++ b/source/libs/sync/test/syncSnapshotReceiverTest.cpp @@ -29,7 +29,7 @@ int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { return 0; } int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; } int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; } -int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { return 0; } +int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { return 0; } int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; } int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; } diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index c6d3a3e4af..2c08910aa8 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -111,7 +111,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32 return 0; } -int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { +int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { *ppWriter = (void*)0xCDEF; char logBuf[256] = {0}; @@ -314,18 +314,18 @@ int main(int argc, char** argv) { exit(-1); } - int32_t replicaNum = atoi(argv[1]); - int32_t myIndex = atoi(argv[2]); - ESyncStrategy enableSnapshot = (ESyncStrategy)atoi(argv[3]); - int32_t lastApplyIndex = atoi(argv[4]); - int32_t lastApplyTerm = atoi(argv[5]); - int32_t writeRecordNum = atoi(argv[6]); - bool isStandBy = atoi(argv[7]); - int32_t isConfigChange = atoi(argv[8]); - int32_t iterTimes = atoi(argv[9]); - int32_t finishLastApplyIndex = atoi(argv[10]); - int32_t finishLastApplyTerm = atoi(argv[11]); - int32_t leaderTransfer = atoi(argv[12]); + int32_t replicaNum = atoi(argv[1]); + int32_t myIndex = atoi(argv[2]); + ESyncStrategy enableSnapshot = (ESyncStrategy)atoi(argv[3]); + int32_t lastApplyIndex = atoi(argv[4]); + int32_t lastApplyTerm = atoi(argv[5]); + int32_t writeRecordNum = atoi(argv[6]); + bool isStandBy = atoi(argv[7]); + int32_t isConfigChange = atoi(argv[8]); + int32_t iterTimes = atoi(argv[9]); + int32_t finishLastApplyIndex = atoi(argv[10]); + int32_t finishLastApplyTerm = atoi(argv[11]); + int32_t leaderTransfer = atoi(argv[12]); sInfo( "args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, "