From 2dd7abd47e6a6b48bf86bd65affa555ba72505dd Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 11 Jul 2022 10:34:02 +0800 Subject: [PATCH 1/2] refactor(sync): add vnode snapshot case --- source/dnode/vnode/src/vnd/vnodeCommit.c | 4 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 7 + source/libs/sync/src/syncCommit.c | 3 +- source/libs/sync/src/syncMain.c | 35 ++-- source/libs/sync/src/syncRaftCfg.c | 8 +- source/libs/sync/src/syncRaftLog.c | 16 +- source/libs/sync/src/syncReplication.c | 17 +- source/libs/sync/src/syncRequestVote.c | 16 +- source/libs/sync/src/syncRequestVoteReply.c | 9 +- source/libs/sync/src/syncSnapshot.c | 12 +- .../test/syncConfigChangeSnapshotTest.cpp | 22 ++- .../libs/sync/test/syncConfigChangeTest.cpp | 24 ++- source/libs/sync/test/syncEntryCacheTest.cpp | 63 ++++++- source/libs/sync/test/syncReplicateTest.cpp | 17 +- source/libs/sync/test/syncRespMgrTest.cpp | 4 +- source/libs/sync/test/syncSnapshotTest.cpp | 14 +- source/libs/sync/test/syncTestTool.cpp | 25 ++- source/libs/sync/test/syncWriteTest.cpp | 14 +- source/libs/wal/src/walWrite.c | 2 +- tests/script/tsim/sync/vnodesnapshot-test.sim | 178 ++++++++++++++++++ 20 files changed, 391 insertions(+), 99 deletions(-) create mode 100644 tests/script/tsim/sync/vnodesnapshot-test.sim diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index ed829666cd..ebbb691e28 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -15,7 +15,7 @@ #include "vnd.h" -#define VND_INFO_FNAME "vnode.json" +#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); @@ -230,6 +230,7 @@ int vnodeCommit(SVnode *pVnode) { ASSERT(0); return -1; } + walBeginSnapshot(pVnode->pWal, pVnode->state.applied); // preCommit smaPreCommit(pVnode->pSma); @@ -278,6 +279,7 @@ int vnodeCommit(SVnode *pVnode) { smaPostCommit(pVnode->pSma); // apply the commit (TODO) + walEndSnapshot(pVnode->pWal); vnodeBufPoolReset(pVnode->onCommit); pVnode->onCommit->next = pVnode->pPool; pVnode->pPool = pVnode->onCommit; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index e59f8ae558..fe26bd1090 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -117,6 +117,13 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open wal sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR); taosRealPath(tdir, NULL, sizeof(tdir)); + +// for test tsdb snapshot +#if 0 + pVnode->config.walCfg.segSize = 200; + pVnode->config.walCfg.retentionSize = 2000; +#endif + pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg)); if (pVnode->pWal == NULL) { vError("vgId:%d, failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno)); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index f7bee01030..b3cdd079a4 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -83,7 +83,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { newCommitIndex = index; if (gRaftDetailLog) { - sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%" PRId64 " commit, pSyncNode->commitIndex:%" PRId64, + sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%" PRId64 + " commit, pSyncNode->commitIndex:%" PRId64, newCommitIndex, pSyncNode->commitIndex); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1db60495c2..50e2588e19 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -824,8 +824,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { } else { ret = -1; terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - sError("vgId:%d optimized index:%" PRId64 " error, msgtype:%s,%d", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), - pMsg->msgType); + sError("vgId:%d optimized index:%" PRId64 " error, msgtype:%s,%d", pSyncNode->vgId, retIndex, + TMSG_INFO(pMsg->msgType), pMsg->msgType); } } else { @@ -1527,7 +1527,9 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { char logBuf[256 + 256]; if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(logBuf, sizeof(logBuf), - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, " + "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 + ", lastsnapshot:%" PRId64 + ", standby:%d, " "strategy:%d, batch:%d, " "replica-num:%d, " "lconfig:%" PRId64 ", changing:%d, restore:%d, %s", @@ -1546,7 +1548,9 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { char* s = (char*)taosMemoryMalloc(len); if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(s, len, - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, " + "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 + ", lastsnapshot:%" PRId64 + ", standby:%d, " "strategy:%d, batch:%d, " "replica-num:%d, " "lconfig:%" PRId64 ", changing:%d, restore:%d, %s", @@ -1590,7 +1594,9 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { char logBuf[256 + 256]; if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(logBuf, sizeof(logBuf), - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, " + "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 + ", lastsnapshot:%" PRId64 + ", standby:%d, " "replica-num:%d, " "lconfig:%" PRId64 ", changing:%d, restore:%d, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, @@ -1607,7 +1613,9 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { char* s = (char*)taosMemoryMalloc(len); if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(s, len, - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, " + "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 + ", lastsnapshot:%" PRId64 + ", standby:%d, " "replica-num:%d, " "lconfig:%" PRId64 ", changing:%d, restore:%d, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, @@ -1636,7 +1644,9 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); snprintf(s, len, - "vgId:%d, sync %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, " + "vgId:%d, sync %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 + ", lastsnapshot:%" PRId64 + ", standby:%d, " "replica-num:%d, " "lconfig:%" PRId64 ", changing:%d, restore:%d", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, @@ -1839,8 +1849,8 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde char tmpbuf[512]; char* oldStr = syncCfg2SimpleStr(&oldConfig); char* newStr = syncCfg2SimpleStr(pNewConfig); - snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s --> %s", oldConfig.replicaNum, - pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr); + snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s --> %s", + oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr); taosMemoryFree(oldStr); taosMemoryFree(newStr); @@ -1863,8 +1873,8 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde char tmpbuf[512]; char* oldStr = syncCfg2SimpleStr(&oldConfig); char* newStr = syncCfg2SimpleStr(pNewConfig); - snprintf(tmpbuf, sizeof(tmpbuf), "do not config change from %d to %d, index:%" PRId64 ", %s --> %s", oldConfig.replicaNum, - pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr); + snprintf(tmpbuf, sizeof(tmpbuf), "do not config change from %d to %d, index:%" PRId64 ", %s --> %s", + oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr); taosMemoryFree(oldStr); taosMemoryFree(newStr); syncNodeEventLog(pSyncNode, tmpbuf); @@ -2399,7 +2409,8 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { // log state char logBuf[1024] = {0}; snprintf(logBuf, sizeof(logBuf), - "==syncNodeOnPingCb== vgId:%d, state: %d, %s, term:%" PRIu64 " electTimerLogicClock:%" PRIu64 ", " + "==syncNodeOnPingCb== vgId:%d, state: %d, %s, term:%" PRIu64 " electTimerLogicClock:%" PRIu64 + ", " "electTimerLogicClockUser:%" PRIu64 ", electTimerMS:%d", ths->vgId, ths->state, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, ths->electTimerLogicClock, ths->electTimerLogicClockUser, ths->electTimerMS); diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index c06bd2338d..0bbeaaf5b0 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) { cJSON *pJson = syncCfg2Json(pSyncCfg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { if (pSyncCfg != NULL) { int32_t len = 512; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); memset(s, 0, len); snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); @@ -206,7 +206,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) { cJSON *pJson = raftCfg2Json(pRaftCfg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -285,7 +285,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); } - cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); + cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); ASSERT(code == 0); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index a135002f44..918a94aa25 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -122,8 +122,8 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI char logBuf[128]; snprintf(logBuf, sizeof(logBuf), - "wal restore from snapshot error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex, err, - err, errStr, sysErr, sysErrStr); + "wal restore from snapshot error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", + snapshotIndex, err, err, errStr, sysErr, sysErrStr); syncNodeErrorLog(pData->pSyncNode, logBuf); return -1; @@ -207,8 +207,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr SyncIndex writeIndex = raftLogWriteIndex(pLogStore); if (pEntry->index != writeIndex) { - sError("vgId:%d wal write index error, entry-index:%" PRId64 " update to %" PRId64, pData->pSyncNode->vgId, pEntry->index, - writeIndex); + sError("vgId:%d wal write index error, entry-index:%" PRId64 " update to %" PRId64, pData->pSyncNode->vgId, + pEntry->index, writeIndex); pEntry->index = writeIndex; } @@ -272,8 +272,8 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, do { char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, err, - err, errStr, sysErr, sysErrStr); + snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", + index, err, err, errStr, sysErr, sysErrStr); if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { syncNodeEventLog(pData->pSyncNode, logBuf); } else { @@ -418,8 +418,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { do { char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, - err, err, errStr, sysErr, sysErrStr); + snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", + index, err, err, errStr, sysErr, sysErrStr); if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { syncNodeEventLog(pData->pSyncNode, logBuf); } else { diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 18e94e0523..b6bc4bc816 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -135,7 +135,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); - sError("vgId:%d sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 ", match-index:%d, raftid:%" PRId64, + sError("vgId:%d sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 + ", match-index:%d, raftid:%" PRId64, pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr); return -1; @@ -224,7 +225,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); - sError("vgId:%d sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 ", match-index:%d, raftid:%" PRId64, + sError("vgId:%d sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 + ", match-index:%d, raftid:%" PRId64, pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr); return -1; @@ -314,11 +316,12 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c char host[128]; uint16_t port; syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port); - sDebug( - "vgId:%d, send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 ", pterm:%" PRIu64 ", commit:%" PRId64 ", " - "datalen:%d}", - pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, - pMsg->commitIndex, pMsg->dataLen); + sDebug("vgId:%d, send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 + ", pterm:%" PRIu64 ", commit:%" PRId64 + ", " + "datalen:%d}", + pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, + pMsg->commitIndex, pMsg->dataLen); } while (0); SRpcMsg rpcMsg; diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index db1a33c28b..a6ca0e6d78 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -55,7 +55,8 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 ", maybe replica already dropped", + "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 + ", maybe replica already dropped", host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm); syncNodeEventLog(ths, logBuf); } while (0); @@ -97,8 +98,9 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 ", reply-grant:%d", host, port, - pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted); + "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 + ", reply-grant:%d", + host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted); syncNodeEventLog(ths, logBuf); } while (0); @@ -181,7 +183,8 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 ", maybe replica already dropped", + "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 + ", maybe replica already dropped", host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm); syncNodeEventLog(ths, logBuf); } while (0); @@ -221,8 +224,9 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 ", reply-grant:%d", host, port, - pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted); + "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 + ", reply-grant:%d", + host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted); syncNodeEventLog(ths, logBuf); } while (0); diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 12af7cf531..8ab4f75c5c 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -66,8 +66,8 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) if (pMsg->term > ths->pRaftStore->currentTerm) { char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "syncNodeOnRequestVoteReplyCb error term, receive:%" PRIu64 " current:%" PRIu64, pMsg->term, - ths->pRaftStore->currentTerm); + snprintf(logBuf, sizeof(logBuf), "syncNodeOnRequestVoteReplyCb error term, receive:%" PRIu64 " current:%" PRIu64, + pMsg->term, ths->pRaftStore->currentTerm); syncNodePrint2(logBuf, ths); sError("%s", logBuf); return ret; @@ -190,8 +190,9 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl if (pMsg->term > ths->pRaftStore->currentTerm) { char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVoteReply, error term, receive_term:%" PRIu64 " current_term:%" PRIu64, - pMsg->term, ths->pRaftStore->currentTerm); + snprintf(logBuf, sizeof(logBuf), + "recv SyncRequestVoteReply, error term, receive_term:%" PRIu64 " current_term:%" PRIu64, pMsg->term, + ths->pRaftStore->currentTerm); syncNodePrint2(logBuf, ths); sError("%s", logBuf); return ret; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 3079aa17ca..87cc5685f3 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -153,8 +153,8 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapsho // event log do { char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "snapshot sender update lcindex from %" PRId64 " to %" PRId64, oldLastConfigIndex, - newLastConfigIndex); + snprintf(logBuf, sizeof(logBuf), "snapshot sender update lcindex from %" PRId64 " to %" PRId64, + oldLastConfigIndex, newLastConfigIndex); char *eventLog = snapshotSender2SimpleStr(pSender, logBuf); syncNodeEventLog(pSender->pSyncNode, eventLog); taosMemoryFree(eventLog); @@ -389,7 +389,9 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { syncUtilU642Addr(destId.addr, host, sizeof(host), &port); snprintf(s, len, - "%s {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64 " seq:%d ack:%d finish:%d pterm:%" PRIu64 " " + "%s {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64 + " seq:%d ack:%d finish:%d pterm:%" PRIu64 + " " "replica-index:%d %s:%d}", event, pSender, pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, @@ -692,7 +694,9 @@ char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) syncUtilU642Addr(fromId.addr, host, sizeof(host), &port); snprintf(s, len, - "%s {%p start:%d ack:%d term:%" PRIu64 " pterm:%" PRIu64 " from:%s:%d s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " " + "%s {%p start:%d ack:%d term:%" PRIu64 " pterm:%" PRIu64 " from:%s:%d s-param:%" PRId64 " e-param:%" PRId64 + " laindex:%" PRId64 " laterm:%" PRIu64 + " " "lcindex:%" PRId64 "}", event, pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port, pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index de82df3fbd..339ebe90e7 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -45,7 +45,8 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { if (cbMeta.index > beginIndex) { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), - "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64 " \n", + "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 + ", term:%" PRIu64 " \n", pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag, cbMeta.term); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); @@ -56,17 +57,19 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; - snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); + snprintf( + logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); + "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), + cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } @@ -147,8 +150,8 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { - sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64, cbMeta.flag, - cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); + sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64, + cbMeta.flag, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); } SSyncFSM* createFsm() { @@ -267,7 +270,8 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { pMsg->msgType = 9999; pMsg->contLen = 256; pMsg->pCont = rpcMallocCont(pMsg->contLen); - snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, taosGetTimestampMs()); + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, + taosGetTimestampMs()); return pMsg; } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 80a5e65274..ba3fc77650 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -44,8 +44,9 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { if (cbMeta.index > beginIndex) { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), - "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); + "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), + cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } else { sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index); @@ -54,17 +55,19 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; - snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); + snprintf( + logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); + "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), + cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } @@ -78,8 +81,8 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { - sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64, cbMeta.flag, - cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); + sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64, + cbMeta.flag, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); } SSyncFSM* createFsm() { @@ -188,7 +191,8 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { pMsg->msgType = 9999; pMsg->contLen = 256; pMsg->pCont = rpcMallocCont(pMsg->contLen); - snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, taosGetTimestampMs()); + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, + taosGetTimestampMs()); return pMsg; } diff --git a/source/libs/sync/test/syncEntryCacheTest.cpp b/source/libs/sync/test/syncEntryCacheTest.cpp index 787c08e507..7b79b93bde 100644 --- a/source/libs/sync/test/syncEntryCacheTest.cpp +++ b/source/libs/sync/test/syncEntryCacheTest.cpp @@ -5,6 +5,7 @@ #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" +#include "tskiplist.h" void logTest() { sTrace("--- sync log test: trace"); @@ -148,15 +149,69 @@ void test4() { raftCacheLog2((char*)"==test4 after get-and-del entry 3==", pCache); } +static char* keyFn(const void* pData) { + SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData; + return (char*)(pEntry->index); +} + +static int cmpFn(const void* p1, const void* p2) { + SSyncRaftEntry* pEntry1 = (SSyncRaftEntry*)p1; + SSyncRaftEntry* pEntry2 = (SSyncRaftEntry*)p2; + + if (pEntry1->index == pEntry2->index) { + return 0; + } else { + return 1; + } +} + +void printSkipList(SSkipList* pSkipList) { + ASSERT(pSkipList != NULL); + + SSkipListIterator* pIter = tSkipListCreateIter(pSkipList); + while (tSkipListIterNext(pIter)) { + SSkipListNode* pNode = tSkipListIterGet(pIter); + ASSERT(pNode != NULL); + SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode); + syncEntryPrint2((char*)"", pEntry); + } +} + +void test5() { + SSkipList* pSkipList = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SSyncRaftEntry*), cmpFn, + SL_DISCARD_DUP_KEY, keyFn); + ASSERT(pSkipList != NULL); + + for (int i = 0; i <= 4; ++i) { + SSyncRaftEntry* pEntry = createEntry(i); + SyncIndex index = i; + SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry); + } + + for (int i = 9; i >= 5; --i) { + SSyncRaftEntry* pEntry = createEntry(i); + SyncIndex index = i; + SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry); + } + + printSkipList(pSkipList); + + tSkipListDestroy(pSkipList); +} + int main(int argc, char** argv) { gRaftDetailLog = true; tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_DEBUG; - test1(); - test2(); - test3(); - test4(); + /* + test1(); + test2(); + test3(); + test4(); + */ + + test5(); return 0; } diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index 9148ab6195..d3ba4bc136 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -40,8 +40,9 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { if (cbMeta.index > beginIndex) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + snprintf(logBuf, sizeof(logBuf), + "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } else { sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index); @@ -51,15 +52,16 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, - cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + snprintf(logBuf, sizeof(logBuf), + "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } @@ -143,7 +145,8 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { pMsg->msgType = 9999; pMsg->contLen = 256; pMsg->pCont = rpcMallocCont(pMsg->contLen); - snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, taosGetTimestampMs()); + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, + taosGetTimestampMs()); return pMsg; } diff --git a/source/libs/sync/test/syncRespMgrTest.cpp b/source/libs/sync/test/syncRespMgrTest.cpp index 9e982e0a59..35daff796f 100644 --- a/source/libs/sync/test/syncRespMgrTest.cpp +++ b/source/libs/sync/test/syncRespMgrTest.cpp @@ -35,8 +35,8 @@ void syncRespMgrDelTest(uint64_t begin, uint64_t end) { } void printStub(SRespStub *p) { - printf("createTime:%" PRId64 ", rpcMsg.code:%d rpcMsg.ahandle:%" PRId64 " rpcMsg.handle:%" PRId64 " \n", p->createTime, p->rpcMsg.code, - (int64_t)(p->rpcMsg.info.ahandle), (int64_t)(p->rpcMsg.info.handle)); + printf("createTime:%" PRId64 ", rpcMsg.code:%d rpcMsg.ahandle:%" PRId64 " rpcMsg.handle:%" PRId64 " \n", + p->createTime, p->rpcMsg.code, (int64_t)(p->rpcMsg.info.ahandle), (int64_t)(p->rpcMsg.info.handle)); } void syncRespMgrPrint() { printf("\n----------------syncRespMgrPrint--------------\n"); diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 9e50fa62ef..e0d33598b0 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -43,8 +43,9 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { if (cbMeta.index > beginIndex) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + snprintf(logBuf, sizeof(logBuf), + "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } else { sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index); @@ -54,15 +55,16 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, - cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + snprintf(logBuf, sizeof(logBuf), + "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 714b73a9e5..f35c6f8a2f 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -40,7 +40,9 @@ void cleanup() { walCleanUp(); } void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), - "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64 " " + "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 + ", term:%" PRIu64 + " " "currentTerm:%" PRIu64 " \n", pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag, cbMeta.term, cbMeta.currentTerm); @@ -50,7 +52,9 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64 " " + "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 + ", term:%" PRIu64 + " " "currentTerm:%" PRIu64 " \n", pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag, cbMeta.term, cbMeta.currentTerm); @@ -60,7 +64,9 @@ void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), - "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64 " " + "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 + ", term:%" PRIu64 + " " "currentTerm:%" PRIu64 " \n", pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag, cbMeta.term, cbMeta.currentTerm); @@ -128,7 +134,8 @@ int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), - "==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d, gSnapshotLastApplyIndex:%" PRId64 ", " + "==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d, gSnapshotLastApplyIndex:%" PRId64 + ", " "gSnapshotLastApplyTerm:%" PRId64, pFsm, pWriter, isApply, gSnapshotLastApplyIndex, gSnapshotLastApplyTerm); sTrace("%s", logBuf); @@ -148,7 +155,8 @@ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFini void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { char* s = syncCfg2Str(&(cbMeta.newCfg)); - sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64 ", newCfg:%s", + sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64 + ", newCfg:%s", cbMeta.flag, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term, s); taosMemoryFree(s); } @@ -156,7 +164,9 @@ void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMe void LeaderTransferCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), - "==callback== ==LeaderTransferCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64 " " + "==callback== ==LeaderTransferCb== pFsm:%p, index:%" PRId64 + ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64 + " " "currentTerm:%" PRIu64 " \n", pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag, cbMeta.term, cbMeta.currentTerm); @@ -300,7 +310,8 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { pMsg->msgType = TDMT_VND_SUBMIT; pMsg->contLen = 256; pMsg->pCont = rpcMallocCont(pMsg->contLen); - snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, taosGetTimestampMs()); + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, + taosGetTimestampMs()); return pMsg; } diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index d99923a8b9..3bf068e3c7 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -33,23 +33,25 @@ const char *pDir = "./syncWriteTest"; void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + snprintf(logBuf, sizeof(logBuf), + "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, - cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + snprintf(logBuf, sizeof(logBuf), + "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 900d866a1d..c784e9f84f 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -229,7 +229,7 @@ int32_t walEndSnapshot(SWal *pWal) { } // iterate files, until the searched result for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { - if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize) || + if ((pWal->cfg.retentionSize != -1 && newTotSize > pWal->cfg.retentionSize) || (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) { // delete according to file size or close time deleteCnt++; diff --git a/tests/script/tsim/sync/vnodesnapshot-test.sim b/tests/script/tsim/sync/vnodesnapshot-test.sim new file mode 100644 index 0000000000..e4ef6739dd --- /dev/null +++ b/tests/script/tsim/sync/vnodesnapshot-test.sim @@ -0,0 +1,178 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 + +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] +print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6] +if $data[0][0] != 1 then + return -1 +endi +if $data[0][4] != ready then + goto check_dnode_ready +endi + +sql connect +sql create dnode $hostname port 7200 +sql create dnode $hostname port 7300 +sql create dnode $hostname port 7400 + +$loop_cnt = 0 +check_dnode_ready_1: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> dnodes not ready! + return -1 +endi +sql show dnodes +print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] +print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6] +if $data[0][4] != ready then + goto check_dnode_ready_1 +endi +if $data[1][4] != ready then + goto check_dnode_ready_1 +endi +if $data[2][4] != ready then + goto check_dnode_ready_1 +endi +if $data[3][4] != ready then + goto check_dnode_ready_1 +endi + +$replica = 3 +$vgroups = 1 + +print ============= create database +sql create database db replica $replica vgroups $vgroups + +$loop_cnt = 0 +check_db_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 100 then + print ====> db not ready! + return -1 +endi +sql show databases +print ===> rows: $rows +print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19] +if $rows != 3 then + return -1 +endi +if $data[2][19] != ready then + goto check_db_ready +endi + +sql use db + +$loop_cnt = 0 +check_vg_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 300 then + print ====> vgroups not ready! + return -1 +endi + +sql show vgroups +print ===> rows: $rows +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11] + +if $rows != $vgroups then + return -1 +endi + +if $data[0][4] == leader then + if $data[0][6] == follower then + if $data[0][8] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] + endi + endi +elif $data[0][6] == leader then + if $data[0][4] == follower then + if $data[0][8] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] + endi + endi +elif $data[0][8] == leader then + if $data[0][4] == follower then + if $data[0][6] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] + endi + endi +else + goto check_vg_ready +endi + + +vg_ready: +print ====> create stable/child table +sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +sql create table ct1 using stb tags(1000) + + +print ===> stop dnode4 +system sh/exec.sh -n dnode4 -s stop -x SIGINT +sleep 3000 + + +print ===> write 100 records +$N = 100 +$count = 0 +while $count < $N + $ms = 1591200000000 + $count + sql insert into ct1 values( $ms , $count , 2.1, 3.1) + $count = $count + 1 +endw + + +#sql flush database db; + + +sleep 3000 + + +print ===> stop dnode1 dnode2 dnode3 +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT + + + + +print ===> start dnode1 dnode2 dnode3 dnode4 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + + From 9e1b9fe64c45dc2aeac02f6d7172af75544cd32a Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 11 Jul 2022 14:21:36 +0800 Subject: [PATCH 2/2] refactor(sync): add syncEntryCacheTest --- source/libs/sync/test/syncEntryCacheTest.cpp | 100 +++++++++++++++---- 1 file changed, 81 insertions(+), 19 deletions(-) diff --git a/source/libs/sync/test/syncEntryCacheTest.cpp b/source/libs/sync/test/syncEntryCacheTest.cpp index 7b79b93bde..f902d24489 100644 --- a/source/libs/sync/test/syncEntryCacheTest.cpp +++ b/source/libs/sync/test/syncEntryCacheTest.cpp @@ -151,19 +151,10 @@ void test4() { static char* keyFn(const void* pData) { SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData; - return (char*)(pEntry->index); + return (char*)(&(pEntry->index)); } -static int cmpFn(const void* p1, const void* p2) { - SSyncRaftEntry* pEntry1 = (SSyncRaftEntry*)p1; - SSyncRaftEntry* pEntry2 = (SSyncRaftEntry*)p2; - - if (pEntry1->index == pEntry2->index) { - return 0; - } else { - return 1; - } -} +static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); } void printSkipList(SSkipList* pSkipList) { ASSERT(pSkipList != NULL); @@ -177,25 +168,96 @@ void printSkipList(SSkipList* pSkipList) { } } -void test5() { - SSkipList* pSkipList = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SSyncRaftEntry*), cmpFn, - SL_DISCARD_DUP_KEY, keyFn); +void delSkipListFirst(SSkipList* pSkipList, int n) { ASSERT(pSkipList != NULL); - for (int i = 0; i <= 4; ++i) { - SSyncRaftEntry* pEntry = createEntry(i); - SyncIndex index = i; - SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry); + sTrace("delete first %d -------------", n); + SSkipListIterator* pIter = tSkipListCreateIter(pSkipList); + for (int i = 0; i < n; ++i) { + tSkipListIterNext(pIter); + SSkipListNode* pNode = tSkipListIterGet(pIter); + tSkipListRemoveNode(pSkipList, pNode); + } +} + + +SSyncRaftEntry* getLogEntry2(SSkipList* pSkipList, SyncIndex index) { + sTrace("get index: %ld -------------", index); + SyncIndex index2 = index; + SSyncRaftEntry *pEntry = NULL; + + SArray* nodes = tSkipListGet(pSkipList, (char*)(&index2)); + if (taosArrayGetSize(nodes) > 0) { + + } + taosArrayDestroy(nodes); + + + + SSkipListIterator* pIter = tSkipListCreateIterFromVal(pSkipList, (const char *)&index2, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + if (tSkipListIterNext(pIter)) { + SSkipListNode* pNode = tSkipListIterGet(pIter); + ASSERT(pNode != NULL); + pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode); } + syncEntryLog2((char*)"", pEntry); + return pEntry; +} + + +SSyncRaftEntry* getLogEntry(SSkipList* pSkipList, SyncIndex index) { + sTrace("get index: %ld -------------", index); + SyncIndex index2 = index; + SSyncRaftEntry *pEntry = NULL; + SSkipListIterator* pIter = tSkipListCreateIterFromVal(pSkipList, (const char *)&index2, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + if (tSkipListIterNext(pIter)) { + SSkipListNode* pNode = tSkipListIterGet(pIter); + ASSERT(pNode != NULL); + pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode); + } + + syncEntryLog2((char*)"", pEntry); + return pEntry; +} + +void test5() { + SSkipList* pSkipList = + tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn); + ASSERT(pSkipList != NULL); + + sTrace("insert 9 - 5"); for (int i = 9; i >= 5; --i) { SSyncRaftEntry* pEntry = createEntry(i); - SyncIndex index = i; SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry); } + sTrace("insert 0 - 4"); + for (int i = 0; i <= 4; ++i) { + SSyncRaftEntry* pEntry = createEntry(i); + SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry); + } + + sTrace("insert 7 7 7 7 7"); + for (int i = 0; i <= 4; ++i) { + SSyncRaftEntry* pEntry = createEntry(7); + SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry); + } + + sTrace("print: -------------"); printSkipList(pSkipList); + delSkipListFirst(pSkipList, 3); + + sTrace("print: -------------"); + printSkipList(pSkipList); + + getLogEntry(pSkipList, 2); + getLogEntry(pSkipList, 5); + getLogEntry(pSkipList, 7); + getLogEntry(pSkipList, 7); + + tSkipListDestroy(pSkipList); }