refactor(sync): add syncBeginSnapshot, syncEndSnapshot
This commit is contained in:
parent
8a6eaed6f6
commit
45bce6ad49
|
@ -297,10 +297,30 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
||||||
ASSERT(rid == pSyncNode->rid);
|
ASSERT(rid == pSyncNode->rid);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
|
int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
|
||||||
|
if (lastApplyIndex > matchIndex) {
|
||||||
|
do {
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
|
||||||
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"new-snapshot-index:%ld is greater than match-index:%ld of %s:%d, do not delete wal", lastApplyIndex,
|
||||||
|
matchIndex, host, port);
|
||||||
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pSyncNode->replicaNum == 1) {
|
if (pSyncNode->replicaNum == 1) {
|
||||||
SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
|
SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
|
||||||
code = walBeginSnapshot(pData->pWal, lastApplyIndex);
|
code = walBeginSnapshot(pData->pWal, lastApplyIndex);
|
||||||
} else {
|
} else {
|
||||||
|
// calculate snapshot index
|
||||||
|
|
||||||
SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
|
SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
|
||||||
|
|
||||||
if (snapshottingIndex == SYNC_INDEX_INVALID) {
|
if (snapshottingIndex == SYNC_INDEX_INVALID) {
|
||||||
|
@ -310,7 +330,10 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
||||||
code = walBeginSnapshot(pData->pWal, lastApplyIndex);
|
code = walBeginSnapshot(pData->pWal, lastApplyIndex);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
sError("vgId:%d snapshotting index:%ld, lastApplyIndex:%ld", snapshottingIndex, lastApplyIndex);
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "snapshotting for %ld, do not delete wal for new-snapshot-index:%ld",
|
||||||
|
snapshottingIndex, lastApplyIndex);
|
||||||
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -326,8 +349,13 @@ int32_t syncEndSnapshot(int64_t rid) {
|
||||||
}
|
}
|
||||||
ASSERT(rid == pSyncNode->rid);
|
ASSERT(rid == pSyncNode->rid);
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
|
||||||
SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
|
SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
|
||||||
int32_t code = walEndSnapshot(pData->pWal);
|
code = walEndSnapshot(pData->pWal);
|
||||||
|
|
||||||
|
atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
|
||||||
|
}
|
||||||
|
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue