From 21191ae2a82e45d78048b0102ee36200a1c88631 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 31 May 2022 16:48:47 +0800 Subject: [PATCH] enh(sync): add SSyncSnapshotReceiver --- source/libs/sync/inc/syncSnapshot.h | 16 +++-- source/libs/sync/src/syncSnapshot.c | 101 +++++++++++++++++++++++++--- 2 files changed, 99 insertions(+), 18 deletions(-) diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 5f4db4a0b2..99c92539f3 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -56,20 +56,22 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); char *snapshotSender2Str(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { - bool start; - int32_t ack; - void *pWriter; - void *pCurrentBlock; - int32_t len; + bool start; + + int32_t ack; + void *pWriter; + void *pCurrentBlock; + int32_t blockLen; + SyncTerm term; + SSyncNode *pSyncNode; int32_t replicaIndex; } SSyncSnapshotReceiver; -SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode); +SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); -int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 04795b65f3..6b0fea022a 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -115,10 +115,6 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender) { taosMemoryFree(pSender->pCurrentBlock); pSender->blockLen = 0; } - - ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, - &(pSender->pCurrentBlock), &(pSender->blockLen)); - ASSERT(ret == 0); } // send msg from seq, seq is already updated @@ -206,19 +202,102 @@ char *snapshotSender2Str(SSyncSnapshotSender *pSender) { } // ------------------------------------- -SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode) { return NULL; } +SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { + ASSERT(pSyncNode->pFsm->FpSnapshotStartWrite != NULL); + ASSERT(pSyncNode->pFsm->FpSnapshotStopWrite != NULL); + ASSERT(pSyncNode->pFsm->FpSnapshotDoWrite != NULL); -void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {} + SSyncSnapshotReceiver *pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver)); + ASSERT(pReceiver != NULL); + memset(pReceiver, 0, sizeof(*pReceiver)); -void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) {} + pReceiver->start = false; + pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; + pReceiver->pWriter = NULL; + pReceiver->pCurrentBlock = NULL; + pReceiver->blockLen = 0; + pReceiver->pSyncNode = pSyncNode; + pReceiver->replicaIndex = replicaIndex; + pReceiver->term = pSyncNode->pRaftStore->currentTerm; -void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {} + return pReceiver; +} -int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver) { return 0; } +void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { + if (pReceiver != NULL) { + taosMemoryFree(pReceiver); + } +} -cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { return NULL; } +void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) { + if (!(pReceiver->start)) { + pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; + int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter)); + ASSERT(ret == 0); + if (pReceiver->pCurrentBlock != NULL) { + taosMemoryFree(pReceiver->pCurrentBlock); + pReceiver->pCurrentBlock = NULL; + pReceiver->blockLen = 0; + } + pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; + } else { + ASSERT(0); + } +} -char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { return NULL; } +void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { + int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true); + ASSERT(ret == 0); + + if (pReceiver->pCurrentBlock != NULL) { + taosMemoryFree(pReceiver->pCurrentBlock); + pReceiver->blockLen = 0; + } +} + +cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { + char u64buf[128]; + cJSON *pRoot = cJSON_CreateObject(); + + if (pReceiver != NULL) { + cJSON_AddNumberToObject(pRoot, "start", pReceiver->start); + cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack); + + snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter); + cJSON_AddStringToObject(pRoot, "pWriter", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pCurrentBlock); + cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf); + cJSON_AddNumberToObject(pRoot, "blockLen", pReceiver->blockLen); + + if (pReceiver->pCurrentBlock != NULL) { + char *s; + s = syncUtilprintBin((char *)(pReceiver->pCurrentBlock), pReceiver->blockLen); + cJSON_AddStringToObject(pRoot, "pCurrentBlock", s); + taosMemoryFree(s); + s = syncUtilprintBin2((char *)(pReceiver->pCurrentBlock), pReceiver->blockLen); + cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s); + taosMemoryFree(s); + } + + snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + cJSON_AddNumberToObject(pRoot, "replicaIndex", pReceiver->replicaIndex); + snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + } + + cJSON *pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot); + return pJson; +} + +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { + cJSON *pJson = snapshotReceiver2Json(pReceiver); + char *serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { SSyncSnapshotReceiver *pReceiver = NULL;