Merge pull request #14654 from taosdata/feature/3.0_mhli
fix(sync): snapshot strategy wal first
This commit is contained in:
commit
1e1e373220
|
@ -238,10 +238,12 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
|
|
|
@ -377,10 +377,12 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
|
|
|
@ -67,7 +67,6 @@ typedef struct SSyncNode {
|
|||
char path[TSDB_FILENAME_LEN];
|
||||
char raftStorePath[TSDB_FILENAME_LEN * 2];
|
||||
char configPath[TSDB_FILENAME_LEN * 2];
|
||||
int32_t batchSize;
|
||||
|
||||
// sync io
|
||||
SWal* pWal;
|
||||
|
|
|
@ -842,8 +842,8 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
|||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"recv sync-append-entries-batch, fake match2, pre-index:%ld, pre-term:%lu, datalen:%d",
|
||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
||||
"recv sync-append-entries-batch, fake match2, {pre-index:%ld, pre-term:%lu, datalen:%d, datacount:%d}",
|
||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
} while (0);
|
||||
|
||||
|
@ -876,7 +876,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
|||
code = syncNodePreCommit(ths, pAppendEntry);
|
||||
ASSERT(code == 0);
|
||||
|
||||
syncEntryDestory(pAppendEntry);
|
||||
// syncEntryDestory(pAppendEntry);
|
||||
}
|
||||
|
||||
// fsync once
|
||||
|
@ -931,8 +931,8 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
|||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"recv sync-append-entries-batch, not match, pre-index:%ld, pre-term:%lu, datalen:%d",
|
||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
||||
"recv sync-append-entries-batch, not match, {pre-index:%ld, pre-term:%lu, datalen:%d, datacount:%d}",
|
||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
} while (0);
|
||||
|
||||
|
@ -976,8 +976,9 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
|||
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, match, pre-index:%ld, pre-term:%lu, datalen:%d",
|
||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"recv sync-append-entries, match, {pre-index:%ld, pre-term:%lu, datalen:%d, datacount:%d}",
|
||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
} while (0);
|
||||
|
||||
|
@ -999,7 +1000,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
|||
code = syncNodePreCommit(ths, pAppendEntry);
|
||||
ASSERT(code == 0);
|
||||
|
||||
syncEntryDestory(pAppendEntry);
|
||||
// syncEntryDestory(pAppendEntry);
|
||||
}
|
||||
|
||||
// fsync once
|
||||
|
|
|
@ -174,8 +174,12 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
|
|||
SyncIndex newNextIndex = pMsg->matchIndex + 1;
|
||||
SyncIndex newMatchIndex = pMsg->matchIndex;
|
||||
|
||||
if (ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex) &&
|
||||
ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex - 1)) {
|
||||
bool needStartSnapshot = false;
|
||||
if (newMatchIndex >= SYNC_INDEX_BEGIN && !ths->pLogStore->syncLogExist(ths->pLogStore, newMatchIndex)) {
|
||||
needStartSnapshot = true;
|
||||
}
|
||||
|
||||
if (!needStartSnapshot) {
|
||||
// update next-index, match-index
|
||||
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), newNextIndex);
|
||||
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
|
||||
|
@ -197,15 +201,36 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
|
|||
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
|
||||
}
|
||||
|
||||
// event log, update next-index
|
||||
do {
|
||||
char host[64];
|
||||
int16_t port;
|
||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf), "reset next-index:%ld, match-index:%ld for %s:%d", newNextIndex, newMatchIndex,
|
||||
host, port);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
|
||||
} while (0);
|
||||
|
||||
} else {
|
||||
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
|
||||
|
||||
if (nextIndex > SYNC_INDEX_BEGIN) {
|
||||
--nextIndex;
|
||||
|
||||
if (ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex) &&
|
||||
ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex - 1)) {
|
||||
bool needStartSnapshot = false;
|
||||
if (nextIndex >= SYNC_INDEX_BEGIN && !ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex)) {
|
||||
needStartSnapshot = true;
|
||||
}
|
||||
if (nextIndex - 1 >= SYNC_INDEX_BEGIN && !ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex - 1)) {
|
||||
needStartSnapshot = true;
|
||||
}
|
||||
|
||||
if (!needStartSnapshot) {
|
||||
// do nothing
|
||||
|
||||
} else {
|
||||
SSyncRaftEntry* pEntry;
|
||||
int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, nextIndex, &pEntry);
|
||||
|
@ -227,6 +252,21 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
|
|||
nextIndex = SYNC_INDEX_BEGIN;
|
||||
}
|
||||
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
|
||||
|
||||
// event log, update next-index
|
||||
do {
|
||||
char host[64];
|
||||
int16_t port;
|
||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
|
||||
SyncIndex newNextIndex = nextIndex;
|
||||
SyncIndex newMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf), "reset2 next-index:%ld, match-index:%ld for %s:%d", newNextIndex, newMatchIndex,
|
||||
host, port);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
|
||||
} while (0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -1526,12 +1526,14 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
|||
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
|
||||
"strategy:%d, batch:%d, "
|
||||
"replica-num:%d, "
|
||||
"lconfig:%ld, changing:%d, restore:%d, %s",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
|
||||
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
|
||||
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
||||
pSyncNode->changing, pSyncNode->restoreFinish, printStr);
|
||||
pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize,
|
||||
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing,
|
||||
pSyncNode->restoreFinish, printStr);
|
||||
} else {
|
||||
snprintf(logBuf, sizeof(logBuf), "%s", str);
|
||||
}
|
||||
|
@ -1543,12 +1545,14 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
|||
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
|
||||
snprintf(s, len,
|
||||
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
|
||||
"strategy:%d, batch:%d, "
|
||||
"replica-num:%d, "
|
||||
"lconfig:%ld, changing:%d, restore:%d, %s",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
|
||||
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
|
||||
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
||||
pSyncNode->changing, pSyncNode->restoreFinish, printStr);
|
||||
pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize,
|
||||
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing,
|
||||
pSyncNode->restoreFinish, printStr);
|
||||
} else {
|
||||
snprintf(s, len, "%s", str);
|
||||
}
|
||||
|
|
|
@ -1605,7 +1605,7 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) {
|
|||
|
||||
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId) {
|
||||
ASSERT(entryPArr != NULL);
|
||||
ASSERT(arrSize > 0);
|
||||
ASSERT(arrSize >= 0);
|
||||
|
||||
int32_t dataLen = 0;
|
||||
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * arrSize; // <offset, contLen>
|
||||
|
|
|
@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
|
|||
|
||||
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
||||
cJSON *pJson = syncCfg2Json(pSyncCfg);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
|||
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
|
||||
if (pSyncCfg != NULL) {
|
||||
int32_t len = 512;
|
||||
char * s = taosMemoryMalloc(len);
|
||||
char *s = taosMemoryMalloc(len);
|
||||
memset(s, 0, len);
|
||||
|
||||
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
|
||||
|
@ -206,7 +206,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
|
|||
|
||||
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
|
||||
cJSON *pJson = raftCfg2Json(pRaftCfg);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
@ -285,7 +285,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
|
|||
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
|
||||
}
|
||||
|
||||
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
|
||||
ASSERT(code == 0);
|
||||
|
||||
|
|
|
@ -148,9 +148,17 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
|
|||
// get entry batch
|
||||
int32_t getCount = 0;
|
||||
SyncIndex getEntryIndex = nextIndex;
|
||||
for (int32_t i = 0; i < pSyncNode->batchSize; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) {
|
||||
SSyncRaftEntry* pEntry = NULL;
|
||||
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, getEntryIndex, &pEntry);
|
||||
|
||||
// event log
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "get index:%d, code:%d, %s", getEntryIndex, code, tstrerror(terrno));
|
||||
syncNodeEventLog(pSyncNode, logBuf);
|
||||
} while (0);
|
||||
|
||||
if (code == 0) {
|
||||
ASSERT(pEntry != NULL);
|
||||
entryPArr[i] = pEntry;
|
||||
|
@ -162,12 +170,19 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
|
|||
}
|
||||
}
|
||||
|
||||
// event log
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "build batch:%d", getCount);
|
||||
syncNodeEventLog(pSyncNode, logBuf);
|
||||
} while (0);
|
||||
|
||||
// build msg
|
||||
SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(entryPArr, getCount, pSyncNode->vgId);
|
||||
ASSERT(pMsg != NULL);
|
||||
|
||||
// free entries
|
||||
for (int32_t i = 0; i < pSyncNode->batchSize; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) {
|
||||
SSyncRaftEntry* pEntry = entryPArr[i];
|
||||
if (pEntry != NULL) {
|
||||
syncEntryDestory(pEntry);
|
||||
|
|
|
@ -145,9 +145,9 @@ system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
|||
sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
|
||||
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
|
||||
|
||||
sql flush database db;
|
||||
#sql flush database db;
|
||||
|
||||
system sh/exec.sh -n dnode4 -s start
|
||||
#system sh/exec.sh -n dnode4 -s start
|
||||
|
||||
sql insert into ct1 values(now+1s, 81, 8.1, 8.1)(now+2s, -92, -9.2, -9.2)(now+3s, -73, -7.3, -7.3)
|
||||
|
||||
|
|
Loading…
Reference in New Issue