From 6586f78599ad8a3aa536331889d7bbf3e2b62669 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 11 Jun 2022 12:44:58 +0800 Subject: [PATCH 1/7] refactor(sync): add last config index --- include/common/tmsgdef.h | 1 + include/libs/sync/sync.h | 4 + include/libs/sync/syncTools.h | 30 +++ source/dnode/mnode/impl/src/mndMain.c | 6 +- source/libs/sync/inc/syncInt.h | 4 +- source/libs/sync/inc/syncRaftCfg.h | 6 +- source/libs/sync/src/syncAppendEntries.c | 243 +++++++++++++++++- source/libs/sync/src/syncMain.c | 216 +++++++++++----- source/libs/sync/src/syncMessage.c | 167 ++++++++++++ source/libs/sync/src/syncRaftCfg.c | 8 + source/libs/sync/src/syncRaftLog.c | 16 +- source/libs/sync/src/syncSnapshot.c | 6 +- source/libs/sync/src/syncUtil.c | 8 +- source/libs/sync/test/CMakeLists.txt | 14 + .../libs/sync/test/syncLeaderTransferTest.cpp | 101 ++++++++ source/libs/sync/test/syncRaftCfgTest.cpp | 2 + 16 files changed, 746 insertions(+), 86 deletions(-) create mode 100644 source/libs/sync/test/syncLeaderTransferTest.cpp diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 40701640b7..ebebcb0f2a 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -236,6 +236,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL) #if defined(TD_MSG_NUMBER_) TDMT_MAX diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 9d1385bff2..3a77cc1e19 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -48,6 +48,7 @@ typedef enum { TAOS_SYNC_PROPOSE_SUCCESS = 0, TAOS_SYNC_PROPOSE_NOT_LEADER = 1, TAOS_SYNC_PROPOSE_OTHER_ERROR = 2, + TAOS_SYNC_ONLY_ONE_REPLICA = 3, } ESyncProposeCode; typedef enum { @@ -200,6 +201,9 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg); +int32_t syncLeaderTransfer(int64_t rid); +int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader); + // to be moved to static void syncStartNormal(int64_t rid); void syncStartStandBy(int64_t rid); diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index bb50fc141c..a6802fc915 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -456,6 +456,36 @@ void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg); void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg); void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg); +// --------------------------------------------- +typedef struct SyncLeaderTransfer { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + /* + SRaftId srcId; + SRaftId destId; + */ + SRaftId newLeaderId; +} SyncLeaderTransfer; + +SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId); +void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg); +void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen); +void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg); +char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len); +SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len); +void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg); +void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg); +SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg); +char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg); + +// for debug ---------------------- +void syncLeaderTransferPrint(const SyncLeaderTransfer* pMsg); +void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg); +void syncLeaderTransferLog(const SyncLeaderTransfer* pMsg); +void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg); + // on message ---------------------- int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 813e4c30b5..070b2a9643 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -402,7 +402,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { char logBuf[512] = {0}; char *syncNodeStr = sync2SimpleStr(pMgmt->sync); - snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); + snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); + static int64_t mndTick = 0; + if (++mndTick % 1000 == 1) { + mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr); + } syncRpcMsgLog2(logBuf, pMsg); taosMemoryFree(syncNodeStr); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 83f0bd7dd8..e7777af749 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -159,7 +159,7 @@ typedef struct SSyncNode { SSyncSnapshotSender* senders[TSDB_MAX_REPLICA]; SSyncSnapshotReceiver* pNewNodeReceiver; - SSnapshotMeta sMeta; + // SSnapshotMeta sMeta; } SSyncNode; @@ -194,7 +194,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S cJSON* syncNode2Json(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode); char* syncNode2SimpleStr(const SSyncNode* pSyncNode); -void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop); +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop); SSyncNode* syncNodeAcquire(int64_t rid); void syncNodeRelease(SSyncNode* pNode); diff --git a/source/libs/sync/inc/syncRaftCfg.h b/source/libs/sync/inc/syncRaftCfg.h index 86c5fab87c..e72e1e7be7 100644 --- a/source/libs/sync/inc/syncRaftCfg.h +++ b/source/libs/sync/inc/syncRaftCfg.h @@ -35,6 +35,7 @@ typedef struct SRaftCfg { char path[TSDB_FILENAME_LEN * 2]; int8_t isStandBy; int8_t snapshotEnable; + SyncIndex lastConfigIndex; } SRaftCfg; SRaftCfg *raftCfgOpen(const char *path); @@ -52,8 +53,9 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); typedef struct SRaftCfgMeta { - int8_t isStandBy; - int8_t snapshotEnable; + int8_t isStandBy; + int8_t snapshotEnable; + SyncIndex lastConfigIndex; } SRaftCfgMeta; int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 01c95d8241..6b5a86ded9 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -88,6 +88,246 @@ // /\ UNCHANGED <> // /\ UNCHANGED <> // + +int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { + int32_t ret = 0; + + char logBuf[128] = {0}; + snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesCb== term:%lu", ths->pRaftStore->currentTerm); + syncAppendEntriesLog2(logBuf, pMsg); + + if (pMsg->term > ths->pRaftStore->currentTerm) { + syncNodeUpdateTerm(ths, pMsg->term); + } + assert(pMsg->term <= ths->pRaftStore->currentTerm); + + // reset elect timer + if (pMsg->term == ths->pRaftStore->currentTerm) { + ths->leaderCache = pMsg->srcId; + syncNodeResetElectTimer(ths); + } + assert(pMsg->dataLen >= 0); + + SyncTerm localPreLogTerm = 0; + if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { + SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex); + assert(pEntry != NULL); + localPreLogTerm = pEntry->term; + syncEntryDestory(pEntry); + } + + bool logOK = + (pMsg->prevLogIndex == SYNC_INDEX_INVALID) || + ((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) && + (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm)); + + // reject request + if ((pMsg->term < ths->pRaftStore->currentTerm) || + ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { + sTrace( + "syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, " + "logOK:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK); + + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->success = false; + pReply->matchIndex = SYNC_INDEX_INVALID; + + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); + + return ret; + } + + // return to follower state + if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { + sTrace( + "syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, " + "ths->state:%d, logOK:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK); + + syncNodeBecomeFollower(ths, "from candidate by append entries"); + + // ret or reply? + return ret; + } + + // accept request + if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { + // preIndex = -1, or has preIndex entry in local log + assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); + + // has extra entries (> preIndex) in local log + bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); + + // has entries in SyncAppendEntries msg + bool hasAppendEntries = pMsg->dataLen > 0; + + sTrace( + "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, " + "logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries); + + if (hasExtraEntries && hasAppendEntries) { + // not conflict by default + bool conflict = false; + + SyncIndex extraIndex = pMsg->prevLogIndex + 1; + SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex); + assert(pExtraEntry != NULL); + + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + assert(pAppendEntry != NULL); + + // log not match, conflict + assert(extraIndex == pAppendEntry->index); + if (pExtraEntry->term != pAppendEntry->term) { + conflict = true; + } + + if (conflict) { + // roll back + SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); + SyncIndex delEnd = extraIndex; + + sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd); + + // notice! reverse roll back! + for (SyncIndex index = delEnd; index >= delBegin; --index) { + if (ths->pFsm->FpRollBackCb != NULL) { + SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index); + assert(pRollBackEntry != NULL); + + // if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) { + if (syncUtilUserRollback(pRollBackEntry->msgType)) { + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); + + SFsmCbMeta cbMeta; + cbMeta.index = pRollBackEntry->index; + cbMeta.isWeak = pRollBackEntry->isWeak; + cbMeta.code = 0; + cbMeta.state = ths->state; + cbMeta.seqNum = pRollBackEntry->seqNum; + ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta); + rpcFreeCont(rpcMsg.pCont); + } + + syncEntryDestory(pRollBackEntry); + } + } + + // delete confict entries + ths->pLogStore->truncate(ths->pLogStore, extraIndex); + + // append new entries + ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); + + // pre commit + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); + if (ths->pFsm != NULL) { + // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) { + if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { + SFsmCbMeta cbMeta; + cbMeta.index = pAppendEntry->index; + cbMeta.isWeak = pAppendEntry->isWeak; + cbMeta.code = 2; + cbMeta.state = ths->state; + cbMeta.seqNum = pAppendEntry->seqNum; + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); + } + } + rpcFreeCont(rpcMsg.pCont); + } + + // free memory + syncEntryDestory(pExtraEntry); + syncEntryDestory(pAppendEntry); + + } else if (hasExtraEntries && !hasAppendEntries) { + // do nothing + + } else if (!hasExtraEntries && hasAppendEntries) { + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + assert(pAppendEntry != NULL); + + // append new entries + ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); + + // pre commit + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); + if (ths->pFsm != NULL) { + // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) { + if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { + SFsmCbMeta cbMeta; + cbMeta.index = pAppendEntry->index; + cbMeta.isWeak = pAppendEntry->isWeak; + cbMeta.code = 3; + cbMeta.state = ths->state; + cbMeta.seqNum = pAppendEntry->seqNum; + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); + } + } + rpcFreeCont(rpcMsg.pCont); + + // free memory + syncEntryDestory(pAppendEntry); + + } else if (!hasExtraEntries && !hasAppendEntries) { + // do nothing + + } else { + assert(0); + } + + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->success = true; + + if (hasAppendEntries) { + pReply->matchIndex = pMsg->prevLogIndex + 1; + } else { + pReply->matchIndex = pMsg->prevLogIndex; + } + + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); + + // maybe update commit index from leader + if (pMsg->commitIndex > ths->commitIndex) { + // has commit entry in local + if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { + SyncIndex beginIndex = ths->commitIndex + 1; + SyncIndex endIndex = pMsg->commitIndex; + + // update commit index + ths->commitIndex = pMsg->commitIndex; + + // call back Wal + ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); + + int32_t code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); + ASSERT(code == 0); + } + } + } + + return ret; +} + + +#if 0 int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t ret = 0; @@ -375,7 +615,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // I am in newConfig if (hit) { - syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop); + syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop); // change isStandBy to normal if (!isDrop) { @@ -437,6 +677,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { return ret; } +#endif static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t code; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 26dbf6c47a..ea6673e220 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -192,6 +192,40 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return ret; } +int32_t syncLeaderTransfer(int64_t rid) { + int32_t ret = 0; + + return ret; +} + +int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return false; + } + assert(rid == pSyncNode->rid); + int32_t ret = 0; + + if (pSyncNode->replicaNum == 1) { + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + sError("only one replica, cannot drop leader"); + return TAOS_SYNC_ONLY_ONE_REPLICA; + } + + SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); + pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort); + pMsg->newLeaderId.vgId = pSyncNode->vgId; + ASSERT(pMsg != NULL); + SRpcMsg rpcMsg = {0}; + syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg); + syncLeaderTransferDestroy(pMsg); + + ret = syncPropose(rid, &rpcMsg, false); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return ret; +} + int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) { int32_t ret = 0; char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); @@ -206,6 +240,40 @@ int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) return ret; } +bool syncCanLeaderTransfer(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return false; + } + assert(rid == pSyncNode->rid); + + if (pSyncNode->replicaNum == 1) { + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return false; + } + + if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return true; + } + + bool matchOK = true; + if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE || pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + SyncIndex myCommitIndex = pSyncNode->commitIndex; + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]); + if (peerMatchIndex < myCommitIndex) { + matchOK = false; + } + } + } + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return matchOK; +} + +int32_t syncGiveUpLeader(int64_t rid) { return 0; } + int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = syncPropose(rid, pMsg, isWeak); return ret; @@ -241,7 +309,7 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { return -1; } assert(rid == pSyncNode->rid); - *sMeta = pSyncNode->sMeta; + sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex; taosReleaseRef(tsNodeRefId, pSyncNode->rid); return 0; @@ -643,7 +711,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // syncNodeBecomeFollower(pSyncNode); // snapshot meta - pSyncNode->sMeta.lastConfigIndex = -1; + // pSyncNode->sMeta.lastConfigIndex = -1; return pSyncNode; } @@ -1076,9 +1144,11 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { return s; } -void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop) { +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) { SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg; pSyncNode->pRaftCfg->cfg = *newConfig; + pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex; + int32_t ret = 0; // init internal @@ -1735,23 +1805,79 @@ const char* syncStr(ESyncState state) { } } +static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { + SyncLeaderTransfer* pSyncLeaderTransfer; + if (syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId))) { + } + + return 0; +} + +static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { + SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg; + + SSyncCfg newSyncCfg; + int32_t ret = syncCfgFromStr(pRpcMsg->pCont, &newSyncCfg); + ASSERT(ret == 0); + + // update new config myIndex + bool hit = false; + for (int i = 0; i < newSyncCfg.replicaNum; ++i) { + if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && + ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { + newSyncCfg.myIndex = i; + hit = true; + break; + } + } + + bool isDrop; + + if (hit) { // I am in newConfig + syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop); + + // change isStandBy to normal + if (!isDrop) { + if (ths->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(ths, "config change"); + } else { + syncNodeBecomeFollower(ths, "config change"); + } + } + + if (gRaftDetailLog) { + char* sOld = syncCfg2Str(&oldSyncCfg); + char* sNew = syncCfg2Str(&newSyncCfg); + sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop); + taosMemoryFree(sOld); + taosMemoryFree(sNew); + } + } + + // always call FpReConfigCb + if (ths->pFsm->FpReConfigCb != NULL) { + SReConfigCbMeta cbMeta = {0}; + cbMeta.code = 0; + cbMeta.currentTerm = ths->pRaftStore->currentTerm; + cbMeta.index = pEntry->index; + cbMeta.term = pEntry->term; + cbMeta.newCfg = newSyncCfg; + cbMeta.oldCfg = oldSyncCfg; + cbMeta.seqNum = pEntry->seqNum; + cbMeta.flag = 0x11; + cbMeta.isDrop = isDrop; + ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, cbMeta); + } + + return 0; +} + int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t code = 0; ESyncState state = flag; sInfo("sync event vgId:%d commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex, endIndex, syncUtilState2String(state)); - /* - // maybe execute by leader, skip snapshot - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; - if (ths->pFsm->FpGetSnapshot != NULL) { - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); - } - if (beginIndex <= snapshot.lastApplyIndex) { - beginIndex = snapshot.lastApplyIndex + 1; - } - */ - // execute fsm if (ths->pFsm != NULL) { for (SyncIndex i = beginIndex; i <= endIndex; ++i) { @@ -1764,6 +1890,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); + // user commit if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -1780,61 +1907,14 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, // config change if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) { - SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg; + code = syncNodeConfigChange(ths, &rpcMsg, pEntry); + ASSERT(code == 0); + } - SSyncCfg newSyncCfg; - int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); - ASSERT(ret == 0); - - // update new config myIndex - bool hit = false; - for (int i = 0; i < newSyncCfg.replicaNum; ++i) { - if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && - ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { - newSyncCfg.myIndex = i; - hit = true; - break; - } - } - - SReConfigCbMeta cbMeta = {0}; - bool isDrop; - - // I am in newConfig - if (hit) { - syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop); - - // change isStandBy to normal - if (!isDrop) { - if (ths->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(ths, "config change"); - } else { - syncNodeBecomeFollower(ths, "config change"); - } - } - - if (gRaftDetailLog) { - char* sOld = syncCfg2Str(&oldSyncCfg); - char* sNew = syncCfg2Str(&newSyncCfg); - sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop); - taosMemoryFree(sOld); - taosMemoryFree(sNew); - } - } - - // always call FpReConfigCb - if (ths->pFsm->FpReConfigCb != NULL) { - cbMeta.code = 0; - cbMeta.currentTerm = ths->pRaftStore->currentTerm; - cbMeta.index = pEntry->index; - cbMeta.term = pEntry->term; - cbMeta.newCfg = newSyncCfg; - cbMeta.oldCfg = oldSyncCfg; - cbMeta.seqNum = pEntry->seqNum; - cbMeta.flag = 0x11; - cbMeta.isDrop = isDrop; - ths->pFsm->FpReConfigCb(ths->pFsm, &rpcMsg, cbMeta); - } + // config change + if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) { + code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry); + ASSERT(code == 0); } // restore finish diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index af04a0f649..2f99a4c744 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -75,6 +75,11 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { pRoot = syncSnapshotRsp2Json(pSyncMsg); syncSnapshotRspDestroy(pSyncMsg); + } else if (pRpcMsg->msgType == TDMT_SYNC_LEADER_TRANSFER) { + SyncLeaderTransfer* pSyncMsg = syncLeaderTransferDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + pRoot = syncLeaderTransfer2Json(pSyncMsg); + syncLeaderTransferDestroy(pSyncMsg); + } else if (pRpcMsg->msgType == TDMT_SYNC_COMMON_RESPONSE) { pRoot = cJSON_CreateObject(); char* s; @@ -2055,4 +2060,166 @@ void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg) { sTrace("syncSnapshotRspLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); taosMemoryFree(serialized); } +} + +// --------------------------------------------- +SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId) { + uint32_t bytes = sizeof(SyncLeaderTransfer); + SyncLeaderTransfer* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER; + return pMsg; +} + +void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + assert(buf != NULL); + syncLeaderTransferSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncLeaderTransfer* pMsg = taosMemoryMalloc(bytes); + assert(pMsg != NULL); + syncLeaderTransferDeserialize(buf, len, pMsg); + assert(len == pMsg->bytes); + return pMsg; +} + +void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncLeaderTransferSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg) { + syncLeaderTransferDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncLeaderTransfer* pMsg = syncLeaderTransferDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + assert(pMsg != NULL); + return pMsg; +} + +cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg) { + char u64buf[128]; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + /* + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + */ + + cJSON* pNewerId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->newLeaderId.addr); + cJSON_AddStringToObject(pNewerId, "addr", u64buf); + { + uint64_t u64 = pMsg->newLeaderId.addr; + cJSON* pTmp = pNewerId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pNewerId, "vgId", pMsg->newLeaderId.vgId); + cJSON_AddItemToObject(pRoot, "newLeaderId", pNewerId); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncLeaderTransfer", pRoot); + return pJson; +} + +char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg) { + cJSON* pJson = syncLeaderTransfer2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncLeaderTransferPrint(const SyncLeaderTransfer* pMsg) { + char* serialized = syncLeaderTransfer2Str(pMsg); + printf("syncLeaderTransferPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg) { + char* serialized = syncLeaderTransfer2Str(pMsg); + printf("syncLeaderTransferPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncLeaderTransferLog(const SyncLeaderTransfer* pMsg) { + char* serialized = syncLeaderTransfer2Str(pMsg); + sTrace("syncLeaderTransferLog | len:%lu | %s", strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncLeaderTransfer2Str(pMsg); + sTrace("syncLeaderTransferLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } } \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 95eec5d98f..45e00aca2c 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -150,6 +150,10 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy); cJSON_AddNumberToObject(pRoot, "snapshotEnable", pRaftCfg->snapshotEnable); + char buf64[128]; + snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex); + cJSON_AddStringToObject(pRoot, "lastConfigIndex", buf64); + cJSON *pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "RaftCfg", pRoot); return pJson; @@ -172,6 +176,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { raftCfg.cfg = *pCfg; raftCfg.isStandBy = meta.isStandBy; raftCfg.snapshotEnable = meta.snapshotEnable; + raftCfg.lastConfigIndex = meta.lastConfigIndex; char *s = raftCfg2Str(&raftCfg); char buf[CONFIG_FILE_LEN] = {0}; @@ -199,6 +204,9 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { cJSON *pJsonSnapshotEnable = cJSON_GetObjectItem(pJson, "snapshotEnable"); pRaftCfg->snapshotEnable = cJSON_GetNumberValue(pJsonSnapshotEnable); + cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex"); + pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex)); + cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); ASSERT(code == 0); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index c53e5916ae..92699ab24d 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -553,15 +553,19 @@ void logStorePrint2(char* s, SSyncLogStore* pLogStore) { } void logStoreLog(SSyncLogStore* pLogStore) { - char* serialized = logStore2Str(pLogStore); - sTraceLong("logStoreLog | len:%lu | %s", strlen(serialized), serialized); - taosMemoryFree(serialized); + if (gRaftDetailLog) { + char* serialized = logStore2Str(pLogStore); + sTraceLong("logStoreLog | len:%lu | %s", strlen(serialized), serialized); + taosMemoryFree(serialized); + } } void logStoreLog2(char* s, SSyncLogStore* pLogStore) { - char* serialized = logStore2Str(pLogStore); - sTraceLong("logStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); - taosMemoryFree(serialized); + if (gRaftDetailLog) { + char* serialized = logStore2Str(pLogStore); + sTraceLong("logStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } } // for debug ----------------- diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index a23fe2c38a..ade4ed5d22 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -352,7 +352,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -473,7 +473,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON *pTmp = pFromId; + cJSON * pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -497,7 +497,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index f6ff521e01..d12c5058cc 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -214,29 +214,31 @@ void syncUtilMsgNtoH(void* msg) { pHead->vgId = ntohl(pHead->vgId); } +#if 0 bool syncUtilIsData(tmsg_t msgType) { if (msgType == TDMT_SYNC_NOOP || msgType == TDMT_SYNC_CONFIG_CHANGE) { return false; } return true; } +#endif bool syncUtilUserPreCommit(tmsg_t msgType) { - if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) { + if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) { return true; } return false; } bool syncUtilUserCommit(tmsg_t msgType) { - if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) { + if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) { return true; } return false; } bool syncUtilUserRollback(tmsg_t msgType) { - if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) { + if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) { return true; } return false; diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index c68c6349fb..d39035ba53 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -47,6 +47,7 @@ add_executable(syncTestTool "") add_executable(syncRaftLogTest "") add_executable(syncRaftLogTest2 "") add_executable(syncRaftLogTest3 "") +add_executable(syncLeaderTransferTest "") target_sources(syncTest @@ -245,6 +246,10 @@ target_sources(syncRaftLogTest3 PRIVATE "syncRaftLogTest3.cpp" ) +target_sources(syncLeaderTransferTest + PRIVATE + "syncLeaderTransferTest.cpp" +) target_include_directories(syncTest @@ -492,6 +497,11 @@ target_include_directories(syncRaftLogTest3 "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncLeaderTransferTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -690,6 +700,10 @@ target_link_libraries(syncRaftLogTest3 sync gtest_main ) +target_link_libraries(syncLeaderTransferTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncLeaderTransferTest.cpp b/source/libs/sync/test/syncLeaderTransferTest.cpp new file mode 100644 index 0000000000..1c3891d492 --- /dev/null +++ b/source/libs/sync/test/syncLeaderTransferTest.cpp @@ -0,0 +1,101 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SyncLeaderTransfer *createMsg() { + SyncLeaderTransfer *pMsg = syncLeaderTransferBuild(1000); + /* + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + */ + pMsg->newLeaderId.addr = syncUtilAddr2U64("127.0.0.1", 9999); + pMsg->newLeaderId.vgId = 100; + return pMsg; +} + +void test1() { + SyncLeaderTransfer *pMsg = createMsg(); + syncLeaderTransferLog2((char *)"test1:", pMsg); + syncLeaderTransferDestroy(pMsg); +} + +void test2() { + SyncLeaderTransfer *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)taosMemoryMalloc(len); + syncLeaderTransferSerialize(pMsg, serialized, len); + SyncLeaderTransfer *pMsg2 = syncLeaderTransferBuild(1000); + syncLeaderTransferDeserialize(serialized, len, pMsg2); + syncLeaderTransferLog2((char *)"test2: syncLeaderTransferSerialize -> syncLeaderTransferDeserialize ", pMsg2); + + taosMemoryFree(serialized); + syncLeaderTransferDestroy(pMsg); + syncLeaderTransferDestroy(pMsg2); +} + +void test3() { + SyncLeaderTransfer *pMsg = createMsg(); + uint32_t len; + char * serialized = syncLeaderTransferSerialize2(pMsg, &len); + SyncLeaderTransfer *pMsg2 = syncLeaderTransferDeserialize2(serialized, len); + syncLeaderTransferLog2((char *)"test3: syncLeaderTransferSerialize2 -> syncLeaderTransferDeserialize2 ", pMsg2); + + taosMemoryFree(serialized); + syncLeaderTransferDestroy(pMsg); + syncLeaderTransferDestroy(pMsg2); +} + +void test4() { + SyncLeaderTransfer *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg); + SyncLeaderTransfer *pMsg2 = (SyncLeaderTransfer *)taosMemoryMalloc(rpcMsg.contLen); + syncLeaderTransferFromRpcMsg(&rpcMsg, pMsg2); + syncLeaderTransferLog2((char *)"test4: syncLeaderTransfer2RpcMsg -> syncLeaderTransferFromRpcMsg ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncLeaderTransferDestroy(pMsg); + syncLeaderTransferDestroy(pMsg2); +} + +void test5() { + SyncLeaderTransfer *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg); + SyncLeaderTransfer *pMsg2 = syncLeaderTransferFromRpcMsg2(&rpcMsg); + syncLeaderTransferLog2((char *)"test5: syncLeaderTransfer2RpcMsg -> syncLeaderTransferFromRpcMsg2 ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncLeaderTransferDestroy(pMsg); + syncLeaderTransferDestroy(pMsg2); +} + +int main() { + gRaftDetailLog = true; + + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +} diff --git a/source/libs/sync/test/syncRaftCfgTest.cpp b/source/libs/sync/test/syncRaftCfgTest.cpp index 564cbdb69a..8c6a704e2d 100644 --- a/source/libs/sync/test/syncRaftCfgTest.cpp +++ b/source/libs/sync/test/syncRaftCfgTest.cpp @@ -74,6 +74,7 @@ void test3() { SRaftCfgMeta meta; meta.isStandBy = 7; meta.snapshotEnable = 9; + meta.lastConfigIndex = 789; raftCfgCreateFile(pCfg, meta, s); printf("%s create json file: %s \n", (char*)__FUNCTION__, s); } @@ -98,6 +99,7 @@ void test5() { pCfg->cfg.myIndex = taosGetTimestampSec(); pCfg->isStandBy += 2; pCfg->snapshotEnable += 3; + pCfg->lastConfigIndex += 1000; raftCfgPersist(pCfg); printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex); From 6adadc0b2606b5c560f64567833dbe8e528ec24e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 11 Jun 2022 13:03:58 +0800 Subject: [PATCH 2/7] refactor(sync): add last config index --- source/libs/sync/src/syncMain.c | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ea6673e220..0184203ee7 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1181,13 +1181,12 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); - // isDrop - *isDrop = true; - bool IamInOld, IamInNew; + bool IamInOld = false; + bool IamInNew = false; for (int i = 0; i < oldConfig.replicaNum; ++i) { if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && (oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { - *isDrop = false; + IamInOld = false; break; } } @@ -1195,16 +1194,21 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l for (int i = 0; i < newConfig->replicaNum; ++i) { if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && (newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { - *isDrop = false; + IamInNew = false; break; } } - if (!(*isDrop)) { - // change isStandBy to normal - pSyncNode->pRaftCfg->isStandBy = 0; + *isDrop = true; + if (IamInOld && !IamInNew) { + *isDrop = true; + } else { + *isDrop = false; } + if (IamInNew) { + pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal + } raftCfgPersist(pSyncNode->pRaftCfg); if (gRaftDetailLog) { @@ -1821,19 +1825,19 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ASSERT(ret == 0); // update new config myIndex - bool hit = false; + bool IamInNew = false; for (int i = 0; i < newSyncCfg.replicaNum; ++i) { if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { newSyncCfg.myIndex = i; - hit = true; + IamInNew = true; break; } } bool isDrop; - if (hit) { // I am in newConfig + if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) { syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop); // change isStandBy to normal From fa54663871260650bab37a6d8338f82385d29136 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 11 Jun 2022 13:52:17 +0800 Subject: [PATCH 3/7] refactor(sync): add last config index --- include/libs/sync/syncTools.h | 2 + source/libs/sync/inc/syncSnapshot.h | 1 + source/libs/sync/src/syncAppendEntries.c | 1 - source/libs/sync/src/syncAppendEntriesReply.c | 8 +-- source/libs/sync/src/syncMain.c | 4 +- source/libs/sync/src/syncMessage.c | 5 ++ source/libs/sync/src/syncSnapshot.c | 53 +++++++++++++++---- .../libs/sync/test/syncSnapshotSendTest.cpp | 12 +++++ 8 files changed, 69 insertions(+), 17 deletions(-) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index a6802fc915..68a33d48cb 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -398,6 +398,8 @@ typedef struct SyncSnapshotSend { SyncTerm term; SyncIndex lastIndex; // lastIndex of snapshot SyncTerm lastTerm; // lastTerm of snapshot + SyncIndex lastConfigIndex; + SSyncCfg lastConfig; SyncTerm privateTerm; int32_t seq; uint32_t dataLen; diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 9fbcdf138b..a6170a92e3 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -43,6 +43,7 @@ typedef struct SSyncSnapshotSender { void * pCurrentBlock; int32_t blockLen; SSnapshot snapshot; + SSyncCfg lastConfig; int64_t sendingMS; SSyncNode *pSyncNode; int32_t replicaIndex; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 6b5a86ded9..b33f3481e7 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -326,7 +326,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { return ret; } - #if 0 int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t ret = 0; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 7fc35afbb1..3d9565bdaf 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -190,15 +190,15 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (gRaftDetailLog) { char* s = snapshotSender2Str(pSender); sInfo( - "sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu " + "sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld" "sender:%s", - ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, s); + ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, s); taosMemoryFree(s); } else { sInfo( "sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " - "lastApplyTerm:%lu", - ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm); + "lastApplyTerm:%lu lastConfigIndex:%ld", + ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 6722ed3703..ac96d933ed 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1201,13 +1201,13 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l *isDrop = true; if (IamInOld && !IamInNew) { - *isDrop = true; + *isDrop = true; } else { *isDrop = false; } if (IamInNew) { - pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal + pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal } raftCfgPersist(pSyncNode->pRaftCfg); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 2f99a4c744..57d62d298e 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -16,6 +16,7 @@ #include "syncMessage.h" #include "syncUtil.h" #include "tcoding.h" +#include "syncRaftCfg.h" // --------------------------------------------- cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { @@ -1846,6 +1847,10 @@ cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) { snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastIndex); cJSON_AddStringToObject(pRoot, "lastIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastConfigIndex); + cJSON_AddStringToObject(pRoot, "lastConfigIndex", u64buf); + cJSON_AddItemToObject(pRoot, "lastConfig", syncCfg2Json((SSyncCfg*)&(pMsg->lastConfig))); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastTerm); cJSON_AddStringToObject(pRoot, "lastTerm", u64buf); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index ade4ed5d22..39f2a83c7c 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -19,6 +19,7 @@ #include "syncRaftStore.h" #include "syncUtil.h" #include "wal.h" +#include "syncRaftCfg.h" static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId); @@ -83,6 +84,26 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { // get current snapshot info pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); + if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) { + SSyncRaftEntry *pEntry = NULL; + int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex, &pEntry); + ASSERT(code == 0); + ASSERT(pEntry == NULL); + + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + SSyncCfg lastConfig; + int32_t ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig); + ASSERT(ret == 0); + pSender->lastConfig = lastConfig; + + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pEntry); + + } else { + memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg)); + } + pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS; pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; @@ -97,6 +118,8 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastTerm = pSender->snapshot.lastApplyTerm; + pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; + pMsg->lastConfig = pSender->lastConfig; pMsg->seq = pSender->seq; // SYNC_SNAPSHOT_SEQ_BEGIN pMsg->privateTerm = pSender->privateTerm; @@ -112,15 +135,15 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sTrace( - "sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send " + "sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld send " "msg:%s", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, msgStr); + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr); taosMemoryFree(msgStr); } else { - sTrace("sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu", + sTrace("sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm); + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); } syncSnapshotSendDestroy(pMsg); @@ -228,6 +251,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastTerm = pSender->snapshot.lastApplyTerm; + pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; + pMsg->lastConfig = pSender->lastConfig; pMsg->seq = pSender->seq; pMsg->privateTerm = pSender->privateTerm; memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); @@ -245,20 +270,20 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sTrace( - "sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send " + "sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld send " "msg:%s", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, msgStr); + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr); taosMemoryFree(msgStr); } else { - sTrace("sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu", + sTrace("sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm); + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); } } else { - sTrace("sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu", + sTrace("sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm); + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); } syncSnapshotSendDestroy(pMsg); @@ -274,6 +299,8 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastTerm = pSender->snapshot.lastApplyTerm; + pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; + pMsg->lastConfig = pSender->lastConfig; pMsg->seq = pSender->seq; memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); @@ -540,6 +567,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true); pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1); + // maybe update lastconfig + if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { + bool isDrop; + syncNodeUpdateConfig(pSyncNode, &(pMsg->lastConfig), pMsg->lastConfigIndex, &isDrop); + } + SSnapshot snapshot; pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); diff --git a/source/libs/sync/test/syncSnapshotSendTest.cpp b/source/libs/sync/test/syncSnapshotSendTest.cpp index 01d3264693..d4ae4af654 100644 --- a/source/libs/sync/test/syncSnapshotSendTest.cpp +++ b/source/libs/sync/test/syncSnapshotSendTest.cpp @@ -24,6 +24,15 @@ SyncSnapshotSend *createMsg() { pMsg->privateTerm = 99; pMsg->lastIndex = 22; pMsg->lastTerm = 33; + + pMsg->lastConfigIndex = 99; + pMsg->lastConfig.replicaNum = 3; + pMsg->lastConfig.myIndex = 1; + for (int i = 0; i < pMsg->lastConfig.replicaNum; ++i) { + ((pMsg->lastConfig.nodeInfo)[i]).nodePort = i * 100; + snprintf(((pMsg->lastConfig.nodeInfo)[i]).nodeFqdn, sizeof(((pMsg->lastConfig.nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i); + } + pMsg->seq = 44; strcpy(pMsg->data, "hello world"); return pMsg; @@ -87,6 +96,9 @@ void test5() { } int main() { + + gRaftDetailLog = true; + tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; logTest(); From dffbec29c73033cec313f4ee3b199c2125b965fc Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 11 Jun 2022 15:31:49 +0800 Subject: [PATCH 4/7] refactor(sync): add last config index z --- source/libs/sync/src/syncAppendEntriesReply.c | 9 ++- source/libs/sync/src/syncMain.c | 5 +- source/libs/sync/src/syncMessage.c | 2 +- source/libs/sync/src/syncSnapshot.c | 65 ++++++++++++++----- .../libs/sync/test/syncSnapshotSendTest.cpp | 4 +- 5 files changed, 62 insertions(+), 23 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 3d9565bdaf..5caf814cc5 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -190,15 +190,18 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (gRaftDetailLog) { char* s = snapshotSender2Str(pSender); sInfo( - "sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld" + "sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu " + "lastConfigIndex:%ld" "sender:%s", - ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, s); + ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, + pSender->snapshot.lastConfigIndex, s); taosMemoryFree(s); } else { sInfo( "sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " "lastApplyTerm:%lu lastConfigIndex:%ld", - ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); + ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, + pSender->snapshot.lastConfigIndex); } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ac96d933ed..e1fbc0bac1 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -35,7 +35,7 @@ #include "syncVoteMgr.h" #include "tref.h" -bool gRaftDetailLog = false; +bool gRaftDetailLog = true; static int32_t tsNodeRefId = -1; @@ -311,6 +311,8 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { assert(rid == pSyncNode->rid); sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex; + sTrace("sync get snapshot meta: lastConfigIndex:%ld", pSyncNode->pRaftCfg->lastConfigIndex); + taosReleaseRef(tsNodeRefId, pSyncNode->rid); return 0; } @@ -520,6 +522,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { SRaftCfgMeta meta; meta.isStandBy = pSyncInfo->isStandBy; meta.snapshotEnable = pSyncInfo->snapshotEnable; + meta.lastConfigIndex = SYNC_INDEX_INVALID; ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath); assert(ret == 0); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 57d62d298e..23165f6790 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -14,9 +14,9 @@ */ #include "syncMessage.h" +#include "syncRaftCfg.h" #include "syncUtil.h" #include "tcoding.h" -#include "syncRaftCfg.h" // --------------------------------------------- cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 39f2a83c7c..1eff9c98f4 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -15,11 +15,11 @@ #include "syncSnapshot.h" #include "syncIndexMgr.h" +#include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" #include "wal.h" -#include "syncRaftCfg.h" static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId); @@ -85,10 +85,17 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { // get current snapshot info pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) { + /* SSyncRaftEntry *pEntry = NULL; - int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex, &pEntry); + int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore, + pSender->snapshot.lastConfigIndex, &pEntry); ASSERT(code == 0); - ASSERT(pEntry == NULL); + ASSERT(pEntry != NULL); + */ + + SSyncRaftEntry *pEntry = + pSender->pSyncNode->pLogStore->getEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex); + ASSERT(pEntry != NULL); SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); @@ -103,7 +110,6 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { } else { memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg)); } - pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS; pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; @@ -135,15 +141,18 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sTrace( - "sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld send " + "sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + "lastConfigIndex:%ld send " "msg:%s", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr); taosMemoryFree(msgStr); } else { - sTrace("sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld", - pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); + sTrace( + "sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + "lastConfigIndex:%ld", + pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); } syncSnapshotSendDestroy(pMsg); @@ -270,20 +279,25 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sTrace( - "sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld send " + "sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + "lastConfigIndex:%ld send " "msg:%s", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr); taosMemoryFree(msgStr); } else { - sTrace("sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld", - pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); + sTrace( + "sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + "lastConfigIndex:%ld", + pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); } } else { - sTrace("sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld", - pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); + sTrace( + "sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + "lastConfigIndex:%ld", + pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); } syncSnapshotSendDestroy(pMsg); @@ -569,8 +583,27 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // maybe update lastconfig if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { + // update new config myIndex + bool IamInNew = false; + SSyncCfg newSyncCfg = pMsg->lastConfig; + for (int i = 0; i < newSyncCfg.replicaNum; ++i) { + if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && + pSyncNode->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { + newSyncCfg.myIndex = i; + IamInNew = true; + break; + } + } + bool isDrop; - syncNodeUpdateConfig(pSyncNode, &(pMsg->lastConfig), pMsg->lastConfigIndex, &isDrop); + if (IamInNew) { + sTrace("sync event update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ", + pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); + syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); + } else { + sTrace("sync event do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ", + pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); + } } SSnapshot snapshot; diff --git a/source/libs/sync/test/syncSnapshotSendTest.cpp b/source/libs/sync/test/syncSnapshotSendTest.cpp index d4ae4af654..ca7916359e 100644 --- a/source/libs/sync/test/syncSnapshotSendTest.cpp +++ b/source/libs/sync/test/syncSnapshotSendTest.cpp @@ -30,7 +30,8 @@ SyncSnapshotSend *createMsg() { pMsg->lastConfig.myIndex = 1; for (int i = 0; i < pMsg->lastConfig.replicaNum; ++i) { ((pMsg->lastConfig.nodeInfo)[i]).nodePort = i * 100; - snprintf(((pMsg->lastConfig.nodeInfo)[i]).nodeFqdn, sizeof(((pMsg->lastConfig.nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i); + snprintf(((pMsg->lastConfig.nodeInfo)[i]).nodeFqdn, sizeof(((pMsg->lastConfig.nodeInfo)[i]).nodeFqdn), + "100.200.300.%d", i); } pMsg->seq = 44; @@ -96,7 +97,6 @@ void test5() { } int main() { - gRaftDetailLog = true; tsAsyncLog = 0; From 104a1bb59bb2e149f3684cfbafe633b17b206cc3 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 11 Jun 2022 16:11:18 +0800 Subject: [PATCH 5/7] fix(sync): snapshot overwrite config change --- source/libs/sync/src/syncMain.c | 8 ++++---- source/libs/sync/src/syncSnapshot.c | 19 +++++++++++++++---- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e1fbc0bac1..9899f36607 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1189,7 +1189,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l for (int i = 0; i < oldConfig.replicaNum; ++i) { if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && (oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { - IamInOld = false; + IamInOld = true; break; } } @@ -1197,7 +1197,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l for (int i = 0; i < newConfig->replicaNum; ++i) { if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && (newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { - IamInNew = false; + IamInNew = true; break; } } @@ -1240,7 +1240,7 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { } void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { - sInfo("sync event vgId:%d become follower, %s", pSyncNode->vgId, debugStr); + sInfo("sync event vgId:%d become follower, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, debugStr); // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -1274,7 +1274,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // /\ UNCHANGED <> // void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { - sInfo("sync event vgId:%d become leader, %s", pSyncNode->vgId, debugStr); + sInfo("sync event vgId:%d become leader, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, debugStr); // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 1eff9c98f4..36598cc2bd 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -597,12 +597,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { bool isDrop; if (IamInNew) { - sTrace("sync event update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ", - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); + sTrace("sync event vgId:%d update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ", + pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); } else { - sTrace("sync event do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ", - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); + sTrace( + "sync event vgId:%d do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, " + "lastConfigIndex:%ld ", + pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); + } + + // change isStandBy to normal + if (!isDrop) { + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(pSyncNode, "config change"); + } else { + syncNodeBecomeFollower(pSyncNode, "config change"); + } } } From f92f1bbcc07538f59322639d686a84980f90929c Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 11 Jun 2022 16:20:15 +0800 Subject: [PATCH 6/7] fix(sync): snapshot overwrite config change --- source/libs/sync/src/syncMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9899f36607..c480df0ec0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -35,7 +35,7 @@ #include "syncVoteMgr.h" #include "tref.h" -bool gRaftDetailLog = true; +bool gRaftDetailLog = false; static int32_t tsNodeRefId = -1; From c8af8daa611b88962a99926e1d282ac4200ca4ad Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Sat, 11 Jun 2022 16:03:37 +0800 Subject: [PATCH 7/7] feat(stream): state\session max delay --- source/libs/executor/src/timewindowoperator.c | 56 +++--- tests/script/jenkins/basic.txt | 1 + .../tsim/stream/distributeInterval0.sim | 176 ++++++++++++++++++ 3 files changed, 207 insertions(+), 26 deletions(-) create mode 100644 tests/script/tsim/stream/distributeInterval0.sim diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ab595a3e34..e70a4c413c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2649,8 +2649,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*); SResultWindowInfo* getSessionWinInfo(void* pData) { return (SResultWindowInfo*)pData; } SResultWindowInfo* getStateWinInfo(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; } -int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pClosed, int8_t calTrigger, - __get_win_info_ fn) { +int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pClosed, + __get_win_info_ fn) { // Todo(liuyao) save window to tdb int32_t size = taosArrayGetSize(pWins); for (int32_t i = 0; i < size; i++) { @@ -2658,19 +2658,9 @@ int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pC SResultWindowInfo* pSeWin = fn(pWin); if (pSeWin->win.ekey < pTwSup->maxTs - pTwSup->waterMark) { if (!pSeWin->isClosed) { - SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); - if (pos == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - pos->groupId = 0; - pos->pos = pSeWin->pos; - *(int64_t*)pos->key = pSeWin->win.ekey; - if (!taosArrayPush(pClosed, &pos)) { - taosMemoryFree(pos); - return TSDB_CODE_OUT_OF_MEMORY; - } pSeWin->isClosed = true; - if (calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed); pSeWin->isOutput = true; } } @@ -2681,6 +2671,19 @@ int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pC return TSDB_CODE_SUCCESS; } +int32_t getAllSessionWindow(SArray* pWins, SArray* pClosed, __get_win_info_ fn) { + int32_t size = taosArrayGetSize(pWins); + for (int32_t i = 0; i < size; i++) { + void* pWin = taosArrayGet(pWins, i); + SResultWindowInfo* pSeWin = fn(pWin); + if (!pSeWin->isClosed) { + int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed); + pSeWin->isOutput = true; + } + } + return TSDB_CODE_SUCCESS; +} + static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2703,6 +2706,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); SHashObj* pStUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK); SOperatorInfo* downstream = pOperator->pDownstream[0]; + SArray* pUpdated = taosArrayInit(16, POINTER_BYTES); while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -2723,7 +2727,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } taosArrayDestroy(pWins); continue; + } else if (pBlock->info.type == STREAM_GET_ALL && + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) { + getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getSessionWinInfo); + continue; } + if (isFinalSession(pInfo)) { int32_t childIndex = 0; // Todo(liuyao) get child id from SSDataBlock SOptrBasicInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); @@ -2735,15 +2744,10 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; - SArray* pClosed = taosArrayInit(16, POINTER_BYTES); - closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pClosed, pInfo->twAggSup.calTrigger, + closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getSessionWinInfo); - SArray* pUpdated = taosArrayInit(16, POINTER_BYTES); copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId); taosHashCleanup(pStUpdated); - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - taosArrayAddAll(pUpdated, pClosed); - } finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); @@ -3067,6 +3071,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); SHashObj* pSeUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK); SOperatorInfo* downstream = pOperator->pDownstream[0]; + SArray* pUpdated = taosArrayInit(16, POINTER_BYTES); while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -3078,6 +3083,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId, pSeUpdated, pInfo->pSeDeleted); continue; + } else if (pBlock->info.type == STREAM_GET_ALL && + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) { + getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getStateWinInfo); + continue; } doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); @@ -3085,15 +3094,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; - SArray* pClosed = taosArrayInit(16, POINTER_BYTES); - closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pClosed, pInfo->twAggSup.calTrigger, + closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getStateWinInfo); - SArray* pUpdated = taosArrayInit(16, POINTER_BYTES); copyUpdateResult(pSeUpdated, pUpdated, pBInfo->pRes->info.groupId); taosHashCleanup(pSeUpdated); - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - taosArrayAddAll(pUpdated, pClosed); - } finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 5a8cf562a0..c179763f9a 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -71,6 +71,7 @@ ./test.sh -f tsim/stream/basic0.sim ./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic2.sim +# ./test.sh -f tsim/stream/distributeInterval0.sim # ./test.sh -f tsim/stream/session0.sim # ./test.sh -f tsim/stream/session1.sim # ./test.sh -f tsim/stream/state0.sim diff --git a/tests/script/tsim/stream/distributeInterval0.sim b/tests/script/tsim/stream/distributeInterval0.sim new file mode 100644 index 0000000000..f4f3e04f0a --- /dev/null +++ b/tests/script/tsim/stream/distributeInterval0.sim @@ -0,0 +1,176 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +sql create dnode $hostname2 port 7200 + +system sh/exec.sh -n dnode2 -s start + +sql create database test vgroups 4; +sql use test; +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create table ts3 using st tags(3,2,2); +sql create table ts4 using st tags(4,2,2); +sql create stream stream_t1 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s); + +sleep 1000 + +sql insert into ts1 values(1648791213001,1,12,3,1.0); +sql insert into ts2 values(1648791213001,1,12,3,1.0); + +sql insert into ts3 values(1648791213001,1,12,3,1.0); +sql insert into ts4 values(1648791213001,1,12,3,1.0); + +sql insert into ts1 values(1648791213002,NULL,NULL,NULL,NULL); +sql insert into ts2 values(1648791213002,NULL,NULL,NULL,NULL); + +sql insert into ts3 values(1648791213002,NULL,NULL,NULL,NULL); +sql insert into ts4 values(1648791213002,NULL,NULL,NULL,NULL); + +sql insert into ts1 values(1648791223002,2,2,3,1.1); +sql insert into ts1 values(1648791233003,3,2,3,2.1); +sql insert into ts2 values(1648791243004,4,2,43,73.1); +sql insert into ts1 values(1648791213002,24,22,23,4.1); +sql insert into ts1 values(1648791243005,4,20,3,3.1); +sql insert into ts2 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ; +sql insert into ts1 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ; +sql insert into ts2 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1); +sql insert into ts1 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; +sql insert into ts2 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ; +sql insert into ts1 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; + +sql insert into ts3 values(1648791223002,2,2,3,1.1); +sql insert into ts4 values(1648791233003,3,2,3,2.1); +sql insert into ts3 values(1648791243004,4,2,43,73.1); +sql insert into ts4 values(1648791213002,24,22,23,4.1); +sql insert into ts3 values(1648791243005,4,20,3,3.1); +sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ; +sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ; +sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1); +sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; +sql insert into ts4 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ; +sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; + +$loop_count = 0 +loop1: +sql select * from streamtST1; + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data01 != 8 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 4 then + print =====data02=$data02 + goto loop1 +endi + +if $data03 != 4 then + print ======$data03 + return -1 +endi + +if $data04 != 52 then + print ======$data04 + return -1 +endi + +if $data05 != 13 then + print ======$data05 + return -1 +endi + +# row 1 +if $data11 != 6 then + print =====data11=$data11 + goto loop1 +endi + +if $data12 != 6 then + print =====data12=$data12 + goto loop1 +endi + +if $data13 != 92 then + print ======$data13 + return -1 +endi + +if $data14 != 22 then + print ======$data14 + return -1 +endi + +if $data15 != 3 then + print ======$data15 + return -1 +endi + +# row 2 +if $data21 != 4 then + print =====data21=$data21 + goto loop1 +endi + +if $data22 != 4 then + print =====data22=$data22 + goto loop1 +endi + +if $data23 != 32 then + print ======$data23 + return -1 +endi + +if $data24 != 12 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +# row 3 +if $data31 != 30 then + print =====data31=$data31 + goto loop1 +endi + +if $data32 != 30 then + print =====data32=$data32 + goto loop1 +endi + +if $data33 != 180 then + print ======$data33 + return -1 +endi + +if $data34 != 42 then + print ======$data34 + return -1 +endi + +if $data35 != 3 then + print ======$data35 + return -1 +endi + +sql select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s); + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file