Merge pull request #16301 from taosdata/feature/3.0_mhli

refactor(sync): close receiver when become leader
This commit is contained in:
Shengliang Guan 2022-08-22 18:48:05 +08:00 committed by GitHub
commit 99c7556021
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 19 deletions

View File

@ -28,10 +28,10 @@ extern "C" {
#include "syncMessage.h" #include "syncMessage.h"
#include "taosdef.h" #include "taosdef.h"
#define SYNC_SNAPSHOT_SEQ_INVALID -1 #define SYNC_SNAPSHOT_SEQ_INVALID -1
#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -2 #define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -2
#define SYNC_SNAPSHOT_SEQ_BEGIN 0 #define SYNC_SNAPSHOT_SEQ_BEGIN 0
#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF #define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF
#define SYNC_SNAPSHOT_RETRY_MS 5000 #define SYNC_SNAPSHOT_RETRY_MS 5000
@ -40,14 +40,14 @@ typedef struct SSyncSnapshotSender {
bool start; bool start;
int32_t seq; int32_t seq;
int32_t ack; int32_t ack;
void * pReader; void *pReader;
void * pCurrentBlock; void *pCurrentBlock;
int32_t blockLen; int32_t blockLen;
SSnapshotParam snapshotParam; SSnapshotParam snapshotParam;
SSnapshot snapshot; SSnapshot snapshot;
SSyncCfg lastConfig; SSyncCfg lastConfig;
int64_t sendingMS; int64_t sendingMS;
SSyncNode * pSyncNode; SSyncNode *pSyncNode;
int32_t replicaIndex; int32_t replicaIndex;
SyncTerm term; SyncTerm term;
SyncTerm privateTerm; SyncTerm privateTerm;
@ -64,20 +64,20 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender); char *snapshotSender2Str(SSyncSnapshotSender *pSender);
char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
//--------------------------------------------------- //---------------------------------------------------
typedef struct SSyncSnapshotReceiver { typedef struct SSyncSnapshotReceiver {
bool start; bool start;
int32_t ack; int32_t ack;
void * pWriter; void *pWriter;
SyncTerm term; SyncTerm term;
SyncTerm privateTerm; SyncTerm privateTerm;
SSnapshotParam snapshotParam; SSnapshotParam snapshotParam;
SSnapshot snapshot; SSnapshot snapshot;
SRaftId fromId; SRaftId fromId;
SSyncNode * pSyncNode; SSyncNode *pSyncNode;
} SSyncSnapshotReceiver; } SSyncSnapshotReceiver;
@ -86,10 +86,11 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver)
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg); int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
//--------------------------------------------------- //---------------------------------------------------
// on message // on message

View File

@ -2181,6 +2181,11 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
(pMySender->privateTerm) += 100; (pMySender->privateTerm) += 100;
} }
// close receiver
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
}
// stop elect timer // stop elect timer
syncNodeStopElectTimer(pSyncNode); syncNodeStopElectTimer(pSyncNode);

View File

@ -24,7 +24,6 @@
//---------------------------------- //----------------------------------
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg); static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg);
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg); static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg); static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg); static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
@ -374,14 +373,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender); cJSON *pJson = snapshotSender2Json(pSender);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
int32_t len = 256; int32_t len = 256;
char * s = taosMemoryMalloc(len); char *s = taosMemoryMalloc(len);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
char host[64]; char host[64];
@ -480,7 +479,7 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapsh
} }
// force stop // force stop
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
// force close, abandon incomplete data // force close, abandon incomplete data
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
@ -653,7 +652,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf); cJSON_AddStringToObject(pFromId, "addr", u64buf);
{ {
uint64_t u64 = pReceiver->fromId.addr; uint64_t u64 = pReceiver->fromId.addr;
cJSON * pTmp = pFromId; cJSON *pTmp = pFromId;
char host[128] = {0}; char host[128] = {0};
uint16_t port; uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port); syncUtilU642Addr(u64, host, sizeof(host), &port);
@ -686,14 +685,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver); cJSON *pJson = snapshotReceiver2Json(pReceiver);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
int32_t len = 256; int32_t len = 256;
char * s = taosMemoryMalloc(len); char *s = taosMemoryMalloc(len);
SRaftId fromId = pReceiver->fromId; SRaftId fromId = pReceiver->fromId;
char host[128]; char host[128];