refactor(sync): add syncGetSnapshotByIndex

This commit is contained in:
Minghao Li 2022-07-06 15:16:52 +08:00
parent 83b639ae8f
commit 78efbaabe1
13 changed files with 84 additions and 52 deletions

View File

@ -215,6 +215,7 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, in
bool syncEnvIsStart(); bool syncEnvIsStart();
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid); bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);

View File

@ -50,14 +50,14 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg);
int32_t raftCfgPersist(SRaftCfg *pRaftCfg); int32_t raftCfgPersist(SRaftCfg *pRaftCfg);
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex); int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg); cJSON * syncCfg2Json(SSyncCfg *pSyncCfg);
char *syncCfg2Str(SSyncCfg *pSyncCfg); char * syncCfg2Str(SSyncCfg *pSyncCfg);
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg); char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg); int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg); int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg);
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg); cJSON * raftCfg2Json(SRaftCfg *pRaftCfg);
char *raftCfg2Str(SRaftCfg *pRaftCfg); char * raftCfg2Str(SRaftCfg *pRaftCfg);
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);

View File

@ -40,14 +40,14 @@ typedef struct SSyncSnapshotSender {
bool start; bool start;
int32_t seq; int32_t seq;
int32_t ack; int32_t ack;
void *pReader; void * pReader;
void *pCurrentBlock; void * pCurrentBlock;
int32_t blockLen; int32_t blockLen;
SSnapshotParam snapshotParam; SSnapshotParam snapshotParam;
SSnapshot snapshot; SSnapshot snapshot;
SSyncCfg lastConfig; SSyncCfg lastConfig;
int64_t sendingMS; int64_t sendingMS;
SSyncNode *pSyncNode; SSyncNode * pSyncNode;
int32_t replicaIndex; int32_t replicaIndex;
SyncTerm term; SyncTerm term;
SyncTerm privateTerm; SyncTerm privateTerm;
@ -64,20 +64,20 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char *snapshotSender2Str(SSyncSnapshotSender *pSender); char * snapshotSender2Str(SSyncSnapshotSender *pSender);
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
//--------------------------------------------------- //---------------------------------------------------
typedef struct SSyncSnapshotReceiver { typedef struct SSyncSnapshotReceiver {
bool start; bool start;
int32_t ack; int32_t ack;
void *pWriter; void * pWriter;
SyncTerm term; SyncTerm term;
SyncTerm privateTerm; SyncTerm privateTerm;
SSnapshotParam snapshotParam; SSnapshotParam snapshotParam;
SSnapshot snapshot; SSnapshot snapshot;
SRaftId fromId; SRaftId fromId;
SSyncNode *pSyncNode; SSyncNode * pSyncNode;
} SSyncSnapshotReceiver; } SSyncSnapshotReceiver;
@ -88,8 +88,8 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
//--------------------------------------------------- //---------------------------------------------------
// on message // on message

View File

@ -397,6 +397,38 @@ bool syncIsRestoreFinish(int64_t rid) {
return b; return b;
} }
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
if (index < SYNC_INDEX_BEGIN) {
return -1;
}
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return -1;
}
ASSERT(rid == pSyncNode->rid);
SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
if (code != 0) {
if (pEntry != NULL) {
syncEntryDestory(pEntry);
}
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return -1;
}
ASSERT(pEntry != NULL);
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = index;
pSnapshot->lastApplyTerm = pEntry->term;
pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);
syncEntryDestory(pEntry);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
}
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {

View File

@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg); cJSON *pJson = syncCfg2Json(pSyncCfg);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
if (pSyncCfg != NULL) { if (pSyncCfg != NULL) {
int32_t len = 512; int32_t len = 512;
char *s = taosMemoryMalloc(len); char * s = taosMemoryMalloc(len);
memset(s, 0, len); memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
@ -206,7 +206,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg); cJSON *pJson = raftCfg2Json(pRaftCfg);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
@ -285,7 +285,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
} }
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0); ASSERT(code == 0);

View File

@ -127,7 +127,7 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
while (pStub) { while (pStub) {
size_t len; size_t len;
void *key = taosHashGetKey(pStub, &len); void * key = taosHashGetKey(pStub, &len);
SyncIndex *pIndex = (SyncIndex *)key; SyncIndex *pIndex = (SyncIndex *)key;
int64_t nowMS = taosGetTimestampMs(); int64_t nowMS = taosGetTimestampMs();

View File

@ -374,14 +374,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender); cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
int32_t len = 256; int32_t len = 256;
char *s = taosMemoryMalloc(len); char * s = taosMemoryMalloc(len);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
char host[64]; char host[64];
@ -644,7 +644,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf); cJSON_AddStringToObject(pFromId, "addr", u64buf);
{ {
uint64_t u64 = pReceiver->fromId.addr; uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId; cJSON * pTmp = pFromId;
char host[128] = {0}; char host[128] = {0};
uint16_t port; uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port); syncUtilU642Addr(u64, host, sizeof(host), &port);
@ -677,14 +677,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver); cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
int32_t len = 256; int32_t len = 256;
char *s = taosMemoryMalloc(len); char * s = taosMemoryMalloc(len);
SRaftId fromId = pReceiver->fromId; SRaftId fromId = pReceiver->fromId;
char host[128]; char host[128];

View File

@ -54,15 +54,15 @@ void test1() {
SyncAppendEntriesBatch *pMsg = createMsg(); SyncAppendEntriesBatch *pMsg = createMsg();
syncAppendEntriesBatchLog2((char *)"==test1==", pMsg); syncAppendEntriesBatchLog2((char *)"==test1==", pMsg);
/* /*
SOffsetAndContLen *metaArr = syncAppendEntriesBatchMetaTableArray(pMsg); SOffsetAndContLen *metaArr = syncAppendEntriesBatchMetaTableArray(pMsg);
int32_t retArrSize = pMsg->dataCount; int32_t retArrSize = pMsg->dataCount;
for (int i = 0; i < retArrSize; ++i) { for (int i = 0; i < retArrSize; ++i) {
SSyncRaftEntry *pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset); SSyncRaftEntry *pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset);
ASSERT(pEntry->bytes == metaArr[i].contLen); ASSERT(pEntry->bytes == metaArr[i].contLen);
syncEntryPrint(pEntry); syncEntryPrint(pEntry);
} }
*/ */
syncAppendEntriesBatchDestroy(pMsg); syncAppendEntriesBatchDestroy(pMsg);
} }

View File

@ -114,7 +114,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32
return 0; return 0;
} }
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) {
*ppWriter = (void*)0xCDEF; *ppWriter = (void*)0xCDEF;
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter); snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter);

View File

@ -8,11 +8,10 @@ void print(SHashObj *pNextIndex) {
printf("----------------\n"); printf("----------------\n");
uint64_t *p = (uint64_t *)taosHashIterate(pNextIndex, NULL); uint64_t *p = (uint64_t *)taosHashIterate(pNextIndex, NULL);
while (p) { while (p) {
size_t len; size_t len;
void* key = taosHashGetKey(p, &len); void * key = taosHashGetKey(p, &len);
SRaftId *pRaftId = (SRaftId*)key; SRaftId *pRaftId = (SRaftId *)key;
printf("key:<%lu, %d>, value:%lu \n", pRaftId->addr, pRaftId->vgId, *p); printf("key:<%lu, %d>, value:%lu \n", pRaftId->addr, pRaftId->vgId, *p);
p = (uint64_t *)taosHashIterate(pNextIndex, p); p = (uint64_t *)taosHashIterate(pNextIndex, p);

View File

@ -74,7 +74,7 @@ void syncRespMgrGetAndDelTest(uint64_t i) {
} }
SSyncNode *createSyncNode() { SSyncNode *createSyncNode() {
SSyncNode *pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); SSyncNode *pSyncNode = (SSyncNode *)taosMemoryMalloc(sizeof(SSyncNode));
memset(pSyncNode, 0, sizeof(SSyncNode)); memset(pSyncNode, 0, sizeof(SSyncNode));
return pSyncNode; return pSyncNode;
} }

View File

@ -29,7 +29,7 @@ int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { return 0; }
int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; } int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; }
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; } int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; }
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { return 0; } int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { return 0; }
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; } int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; }
int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; } int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; }

View File

@ -111,7 +111,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32
return 0; return 0;
} }
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) {
*ppWriter = (void*)0xCDEF; *ppWriter = (void*)0xCDEF;
char logBuf[256] = {0}; char logBuf[256] = {0};
@ -314,18 +314,18 @@ int main(int argc, char** argv) {
exit(-1); exit(-1);
} }
int32_t replicaNum = atoi(argv[1]); int32_t replicaNum = atoi(argv[1]);
int32_t myIndex = atoi(argv[2]); int32_t myIndex = atoi(argv[2]);
ESyncStrategy enableSnapshot = (ESyncStrategy)atoi(argv[3]); ESyncStrategy enableSnapshot = (ESyncStrategy)atoi(argv[3]);
int32_t lastApplyIndex = atoi(argv[4]); int32_t lastApplyIndex = atoi(argv[4]);
int32_t lastApplyTerm = atoi(argv[5]); int32_t lastApplyTerm = atoi(argv[5]);
int32_t writeRecordNum = atoi(argv[6]); int32_t writeRecordNum = atoi(argv[6]);
bool isStandBy = atoi(argv[7]); bool isStandBy = atoi(argv[7]);
int32_t isConfigChange = atoi(argv[8]); int32_t isConfigChange = atoi(argv[8]);
int32_t iterTimes = atoi(argv[9]); int32_t iterTimes = atoi(argv[9]);
int32_t finishLastApplyIndex = atoi(argv[10]); int32_t finishLastApplyIndex = atoi(argv[10]);
int32_t finishLastApplyTerm = atoi(argv[11]); int32_t finishLastApplyTerm = atoi(argv[11]);
int32_t leaderTransfer = atoi(argv[12]); int32_t leaderTransfer = atoi(argv[12]);
sInfo( sInfo(
"args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, " "args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, "