From 7930260feda84ff59acb6459b504ccba88e08c8a Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 11 Jun 2022 16:44:01 +0800 Subject: [PATCH 01/16] other: add debug logs when load block info --- source/dnode/vnode/src/tsdb/tsdbReadImpl.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index 1c2514d46f..7e3dc5b98f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -233,6 +233,17 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { tsdbError("vgId:%d, SBlockInfo part in file %s is corrupted since wrong checksum, offset:%u len :%u", TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len); return -1; + } else { + SBlockInfo *pBlkInfo = pReadh->pBlkInfo; + SBlock *pBlk = NULL; + int nBlks = (pBlkIdx->len - sizeof(SBlockInfo)) / sizeof(SBlock); + for (int n = 0; n < nBlks; ++n) { + pBlk = &pBlkInfo->blocks[n]; + if (pBlk->numOfSubBlocks == 1) { + tsdbDebug("prop:vgId:%d, file %s , offset:%u len :%u has %d rows", TSDB_READ_REPO_ID(pReadh), + TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len, pBlk->numOfRows); + } + } } // ASSERT(pBlkIdx->tid == pReadh->pBlkInfo->tid && pBlkIdx->uid == pReadh->pBlkInfo->uid); From 9b8a7d11df194c003006ea000ab5c4bac781cff7 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 11 Jun 2022 18:50:28 +0800 Subject: [PATCH 02/16] other: add logs when read blk info --- source/dnode/vnode/src/tsdb/tsdbReadImpl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index 7e3dc5b98f..e4c5ef8fc3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -239,7 +239,7 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { int nBlks = (pBlkIdx->len - sizeof(SBlockInfo)) / sizeof(SBlock); for (int n = 0; n < nBlks; ++n) { pBlk = &pBlkInfo->blocks[n]; - if (pBlk->numOfSubBlocks == 1) { + if (pBlk->numOfSubBlocks >= 1) { tsdbDebug("prop:vgId:%d, file %s , offset:%u len :%u has %d rows", TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len, pBlk->numOfRows); } From 58051169d2102ad1b9c43f62673d40e34d7aab63 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 16 Jun 2022 10:54:13 +0800 Subject: [PATCH 03/16] refactor(sync): add trace log --- source/libs/sync/src/syncAppendEntriesReply.c | 8 +-- source/libs/sync/src/syncSnapshot.c | 70 ++++++++++--------- 2 files changed, 40 insertions(+), 38 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 0e116e13ee..fbe3319675 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -191,17 +191,17 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries char* s = snapshotSender2Str(pSender); sDebug( "vgId:%d sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu " - "lastConfigIndex:%ld" + "lastConfigIndex:%ld privateTerm:%lu " "sender:%s", ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, - pSender->snapshot.lastConfigIndex, s); + pSender->snapshot.lastConfigIndex, pSender->privateTerm, s); taosMemoryFree(s); } else { sDebug( "vgId:%d sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " - "lastApplyTerm:%lu lastConfigIndex:%ld", + "lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu", ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, - pSender->snapshot.lastConfigIndex); + pSender->snapshot.lastConfigIndex, pSender->privateTerm); } } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index afebfa798e..c92920a936 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -142,17 +142,17 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( "vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " - "lastConfigIndex:%ld send " + "lastConfigIndex:%ld privateTerm:%lu send " "msg:%s", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr); + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( "vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " - "lastConfigIndex:%ld", + "lastConfigIndex:%ld privateTerm:%lu", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); } syncSnapshotSendDestroy(pMsg); @@ -280,24 +280,24 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( "vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " - "lastConfigIndex:%ld send " + "lastConfigIndex:%ld privateTerm:%lu send " "msg:%s", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr); + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( "vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " - "lastConfigIndex:%ld", + "lastConfigIndex:%ld privateTerm:%lu", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); } } else { sDebug( "vgId:%d sync event snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " - "lastConfigIndex:%ld", + "lastConfigIndex:%ld privateTerm:%lu", pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); } syncSnapshotSendDestroy(pMsg); @@ -328,12 +328,12 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); - sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", pSender->pSyncNode->vgId, - host, port, pSender->seq, pSender->ack, msgStr); + sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send msg:%s", + pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { - sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d", pSender->pSyncNode->vgId, host, port, - pSender->seq, pSender->ack); + sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu", pSender->pSyncNode->vgId, + host, port, pSender->seq, pSender->ack, pSender->privateTerm); } syncSnapshotSendDestroy(pMsg); @@ -567,15 +567,16 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( "vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " - "lastConfigIndex:%ld, recv msg:%s", + "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, - msgStr); + pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( "vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " - "lastConfigIndex:%ld", - pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); + "lastConfigIndex:%ld privateTerm:%lu", + pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, + pReceiver->privateTerm); } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { @@ -638,17 +639,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { sDebug( "vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, " "snapshot.lastApplyIndex:%ld, " - "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, raft log:%s", + "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s", pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - snapshot.lastConfigIndex, logSimpleStr); + snapshot.lastConfigIndex, pReceiver->privateTerm, logSimpleStr); taosMemoryFree(logSimpleStr); } else { sDebug( "vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, " "snapshot.lastApplyIndex:%ld, " - "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld", + "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu", pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - snapshot.lastConfigIndex); + snapshot.lastConfigIndex, pReceiver->privateTerm); } pReceiver->pWriter = NULL; @@ -660,16 +661,16 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( "vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " - "lastConfigIndex:%ld, recv msg:%s", + "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, - pMsg->lastConfigIndex, msgStr); + pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( "vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " - "lastConfigIndex:%ld", + "lastConfigIndex:%ld, privateTerm:%lu", pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, - pMsg->lastConfigIndex); + pMsg->lastConfigIndex, pReceiver->privateTerm); } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { @@ -685,17 +686,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( "vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, " - "lastConfigIndex:%ld, recv " + "lastConfigIndex:%ld, privateTerm:%lu, recv " "msg:%s", pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, - pMsg->lastConfigIndex, msgStr); + pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( "vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, " - "lastConfigIndex:%ld", + "lastConfigIndex:%ld, privateTerm:%lu", pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, - pMsg->lastConfigIndex); + pMsg->lastConfigIndex, pReceiver->privateTerm); } } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { @@ -716,15 +717,16 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( "vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, " - "lastConfigIndex:%ld, recv msg:%s", + "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, - msgStr); + pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( "vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, " - "lastConfigIndex:%ld", - pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); + "lastConfigIndex:%ld, privateTerm:%lu", + pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, + pReceiver->privateTerm); } } else { From 77b03ac821c8c03d8f583d2e52203dd7021e605a Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 16 Jun 2022 14:40:42 +0800 Subject: [PATCH 04/16] refactor(sync): add restore finish when become leader again --- include/libs/sync/sync.h | 2 +- source/dnode/mnode/impl/src/mndSync.c | 12 +-- source/libs/sync/inc/syncInt.h | 3 + source/libs/sync/src/syncMain.c | 129 +++++++++++++++++++------ source/libs/sync/src/syncRaftLog.c | 7 +- source/libs/sync/src/syncRespMgr.c | 15 +-- source/libs/sync/src/syncSnapshot.c | 7 ++ source/libs/sync/test/syncTestTool.cpp | 8 +- 8 files changed, 134 insertions(+), 49 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 2d21b3531a..e694203563 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -24,7 +24,7 @@ extern "C" { #include "tdef.h" #include "tmsgcb.h" -#define SYNC_INDEX_BEGIN 0 +#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_INVALID -1 typedef uint64_t SyncNodeId; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 63708aef8a..3a243e36d3 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -221,17 +221,17 @@ void mndCleanupSync(SMnode *pMnode) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; - rsp.pCont = rpcMallocCont(rsp.contLen); - if (rsp.pCont == NULL) return -1; - memcpy(rsp.pCont, pRaw, rsp.contLen); + SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; + req.pCont = rpcMallocCont(req.contLen); + if (req.pCont == NULL) return -1; + memcpy(req.pCont, pRaw, req.contLen); pMgmt->errCode = 0; pMgmt->transId = transId; mTrace("trans:%d, will be proposed", pMgmt->transId); const bool isWeak = false; - int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); + int32_t code = syncPropose(pMgmt->sync, &req, isWeak); if (code == 0) { tsem_wait(&pMgmt->syncSem); } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { @@ -242,7 +242,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { terrno = TSDB_CODE_APP_ERROR; } - rpcFreeCont(rsp.pCont); + rpcFreeCont(req.pCont); if (code != 0) { mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code); return code; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8a44dcd9bc..b00c7cbda1 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -195,6 +195,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S cJSON* syncNode2Json(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode); char* syncNode2SimpleStr(const SSyncNode* pSyncNode); +bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config); void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop); SSyncNode* syncNodeAcquire(int64_t rid); @@ -230,6 +231,8 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncInd int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag); +int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg); + bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a0d8487b63..5e3c1adf32 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -185,7 +185,9 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg ASSERT(rid == pSyncNode->rid); int32_t ret = 0; - bool IamInNew = false; + bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg); + +#if 0 for (int i = 0; i < pNewCfg->replicaNum; ++i) { if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && (pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { @@ -201,6 +203,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg } */ } +#endif if (!IamInNew) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -228,7 +231,9 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { } ASSERT(rid == pSyncNode->rid); - bool IamInNew = false; + bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg); + +#if 0 for (int i = 0; i < pNewCfg->replicaNum; ++i) { if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && (pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { @@ -247,6 +252,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { } */ } +#endif if (!IamInNew) { sError("sync reconfig error, not in new config"); @@ -556,7 +562,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { return -1; } assert(rid == pSyncNode->rid); - sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); + sDebug("vgId:%d sync event propose msgType:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType); ret = syncNodePropose(pSyncNode, pMsg, isWeak); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -565,7 +571,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = 0; - sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); + sDebug("vgId:%d sync event propose msgType:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { SRespStub stub; @@ -1256,9 +1262,36 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { return s; } -void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) { +bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) { + bool b1 = false; + bool b2 = false; + + for (int i = 0; i < config->replicaNum; ++i) { + if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && + (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { + b1 = true; + break; + } + } + + for (int i = 0; i < config->replicaNum; ++i) { + SRaftId raftId; + raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort); + raftId.vgId = pSyncNode->vgId; + + if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) { + b2 = true; + break; + } + } + + ASSERT(b1 == b2); + return b1; +} + +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) { SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg; - pSyncNode->pRaftCfg->cfg = *newConfig; + pSyncNode->pRaftCfg->cfg = *pNewConfig; pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex; int32_t ret = 0; @@ -1270,7 +1303,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { oldSenders[i] = (pSyncNode->senders)[i]; - sDebug("vgId:%d sync event save senders %d, %p", pSyncNode->vgId, i, oldSenders[i]); + sDebug("vgId:%d sync event save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, i, oldSenders[i], + oldSenders[i]->privateTerm); if (gRaftDetailLog) { ; } @@ -1322,8 +1356,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l char host[128]; uint16_t port; syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); - sDebug("vgId:%d sync event reset sender for %lu, newIndex:%d, %s:%d, %p", pSyncNode->vgId, - (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]); + sDebug("vgId:%d sync event reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu", pSyncNode->vgId, + (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], oldSenders[j]->privateTerm); (pSyncNode->senders)[i] = oldSenders[j]; oldSenders[j] = NULL; reset = true; @@ -1341,7 +1375,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if ((pSyncNode->senders)[i] == NULL) { (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); - sDebug("vgId:%d sync event create new sender %p replicaIndex:%d", pSyncNode->vgId, (pSyncNode->senders)[i], i); + sDebug("vgId:%d sync event create new sender %p replicaIndex:%d, privateTerm:%lu", pSyncNode->vgId, + (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm); } } @@ -1354,8 +1389,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l } } - bool IamInOld = false; - bool IamInNew = false; + bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig); + bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig); + +#if 0 for (int i = 0; i < oldConfig.replicaNum; ++i) { if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && (oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { @@ -1371,6 +1408,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l break; } } +#endif *isDrop = true; if (IamInOld && !IamInNew) { @@ -1379,6 +1417,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l *isDrop = false; } + // may be add me to a new raft group + if (IamInOld && IamInNew && oldConfig.replicaNum == 1) { + } + if (IamInNew) { pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal } @@ -1404,14 +1446,17 @@ void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid) void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { if (term > pSyncNode->pRaftStore->currentTerm) { raftStoreSetTerm(pSyncNode->pRaftStore, term); - syncNodeBecomeFollower(pSyncNode, "update term"); + char tmpBuf[64]; + snprintf(tmpBuf, sizeof(tmpBuf), "update term to %lu", term); + syncNodeBecomeFollower(pSyncNode, tmpBuf); raftStoreClearVote(pSyncNode->pRaftStore); } } void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { - sDebug("vgId:%d sync event become follower, isStandBy:%d, replicaNum:%d, %s", pSyncNode->vgId, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr); + sDebug("vgId:%d sync event become follower, isStandBy:%d, currentTerm:%lu, replicaNum:%d, restoreFinish:%d, %s", + pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftStore->currentTerm, pSyncNode->replicaNum, + pSyncNode->restoreFinish, debugStr); // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -1445,8 +1490,12 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // /\ UNCHANGED <> // void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { - sDebug("vgId:%d sync event become leader, isStandBy:%d, replicaNum:%d %s", pSyncNode->vgId, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr); + // reset restoreFinish + pSyncNode->restoreFinish = false; + + sDebug("vgId:%d sync event become leader, isStandBy:%d, currentTerm:%lu, replicaNum:%d, restoreFinish:%d, %s", + pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftStore->currentTerm, pSyncNode->replicaNum, + pSyncNode->restoreFinish, debugStr); // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; @@ -2028,11 +2077,11 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE pSyncLeaderTransfer->newLeaderId.addr); */ - sDebug("vgId:%d sync event, begin leader transfer", ths->vgId); + sDebug("vgId:%d sync event begin leader transfer", ths->vgId); if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { - sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, + sDebug("vgId:%d sync event maybe leader transfer to %s:%d %lu", ths->vgId, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr); @@ -2067,6 +2116,21 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE return 0; } +int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) { + for (int i = 0; i < pNewCfg->replicaNum; ++i) { + SRaftId raftId; + raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort); + raftId.vgId = ths->vgId; + + if (syncUtilSameId(&(ths->myRaftId), &raftId)) { + pNewCfg->myIndex = i; + return 0; + } + } + + return -1; +} + static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg; @@ -2075,19 +2139,23 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ASSERT(ret == 0); // update new config myIndex - bool IamInNew = false; - for (int i = 0; i < newSyncCfg.replicaNum; ++i) { - if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && - ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { - newSyncCfg.myIndex = i; - IamInNew = true; - break; - } - } + syncNodeUpdateNewConfigIndex(ths, &newSyncCfg); + + bool IamInNew = syncNodeInConfig(ths, &newSyncCfg); + + /* + for (int i = 0; i < newSyncCfg.replicaNum; ++i) { + if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && + ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { + newSyncCfg.myIndex = i; + IamInNew = true; + break; + } + } + */ bool isDrop; - // if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) { if (IamInNew) { syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop); @@ -2186,7 +2254,8 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ths->pFsm->FpRestoreFinishCb(ths->pFsm); } ths->restoreFinish = true; - sDebug("vgId:%d sync event restore finish, index:%ld", ths->vgId, pEntry->index); + sDebug("vgId:%d sync event restore finish, %s, index:%ld", ths->vgId, syncUtilState2String(ths->state), + pEntry->index); } } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 996cd12e4a..79ce84abe9 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -162,9 +162,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr walFsync(pWal, true); - sDebug("vgId:%d sync event write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s", pData->pSyncNode->vgId, - pEntry->index, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, - TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); + sDebug("vgId:%d sync event write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", + pData->pSyncNode->vgId, pEntry->index, syncUtilState2String(pData->pSyncNode->state), + pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), pEntry->msgType, + TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); return code; } diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 5087cacd02..adc67b5d8d 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -45,8 +45,9 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event resp mgr add, type:%s seq:%lu handle:%p", pSyncNode->vgId, - TMSG_INFO(pStub->rpcMsg.msgType), keyCode, pStub->rpcMsg.info.handle); + sDebug("vgId:%d sync event resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, + TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle, + pStub->rpcMsg.info.ahandle); taosThreadMutexUnlock(&(pObj->mutex)); return keyCode; @@ -69,8 +70,9 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { memcpy(pStub, pTmp, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event resp mgr get, type:%s seq:%lu handle:%p", pSyncNode->vgId, - TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle); + sDebug("vgId:%d sync event resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, + TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, + pStub->rpcMsg.info.ahandle); taosThreadMutexUnlock(&(pObj->mutex)); return 1; // get one object @@ -87,8 +89,9 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu memcpy(pStub, pTmp, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event resp mgr get and del, type:%s seq:%lu handle:%p", pSyncNode->vgId, - TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle); + sDebug("vgId:%d sync event resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, + TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, + pStub->rpcMsg.info.ahandle); taosHashRemove(pObj->pRespHash, &index, sizeof(index)); taosThreadMutexUnlock(&(pObj->mutex)); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index c92920a936..545ea953f8 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -591,6 +591,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { int32_t oldReplicaNum = pSyncNode->replicaNum; + // update new config myIndex + SSyncCfg newSyncCfg = pMsg->lastConfig; + syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg); + bool IamInNew = syncNodeInConfig(pSyncNode, &newSyncCfg); + +#if 0 // update new config myIndex bool IamInNew = false; SSyncCfg newSyncCfg = pMsg->lastConfig; @@ -602,6 +608,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { break; } } +#endif bool isDrop; if (IamInNew) { diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 21049454f4..1eac372bd4 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -297,7 +297,7 @@ void usage(char* exe) { SRpcMsg* createRpcMsg(int i, int count, int myIndex) { SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg)); memset(pMsg, 0, sizeof(SRpcMsg)); - pMsg->msgType = 9999; + pMsg->msgType = TDMT_VND_SUBMIT; pMsg->contLen = 256; pMsg->pCont = rpcMallocCont(pMsg->contLen); snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%ld", myIndex, i, count, taosGetTimestampMs()); @@ -384,8 +384,10 @@ int main(int argc, char** argv) { leaderTransferWait++; if (leaderTransferWait == 7) { - sTrace("begin leader transfer ..."); - int32_t ret = syncLeaderTransfer(rid); + if (leaderTransfer) { + sTrace("begin leader transfer ..."); + int32_t ret = syncLeaderTransfer(rid); + } } if (alreadySend < writeRecordNum) { From dbef3dfc55d19e1d3e182ef61f23629fe7261eff Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 16 Jun 2022 15:34:33 +0800 Subject: [PATCH 05/16] fix: not submit for rsma without rollup --- source/common/src/tdatablock.c | 42 ++++++++++++++++---------- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/sma/smaRollup.c | 14 +++++++-- 3 files changed, 38 insertions(+), 19 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5a2aaed74e..3ec1b98b16 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1583,6 +1583,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks int32_t rowSize = pDataBlock->info.rowSize; int64_t groupId = pDataBlock->info.groupId; + if (colNum <= 1) { + // invalid if only with TS col + continue; + } + if (rb.nCols != colNum) { tdSRowSetTpInfo(&rb, colNum, pTSchema->flen); } @@ -1679,23 +1684,28 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks msgLen += pSubmitBlk->dataLen; } - (*pReq)->length = msgLen; + if (numOfBlks > 0) { + (*pReq)->length = msgLen; - (*pReq)->header.vgId = htonl(vgId); - (*pReq)->header.contLen = htonl(msgLen); - (*pReq)->length = (*pReq)->header.contLen; - (*pReq)->numOfBlocks = htonl(numOfBlks); - SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1); - while (numOfBlks--) { - int32_t dataLen = blk->dataLen; - blk->uid = htobe64(blk->uid); - blk->suid = htobe64(blk->suid); - blk->padding = htonl(blk->padding); - blk->sversion = htonl(blk->sversion); - blk->dataLen = htonl(blk->dataLen); - blk->schemaLen = htonl(blk->schemaLen); - blk->numOfRows = htons(blk->numOfRows); - blk = (SSubmitBlk*)(blk->data + dataLen); + (*pReq)->header.vgId = htonl(vgId); + (*pReq)->header.contLen = htonl(msgLen); + (*pReq)->length = (*pReq)->header.contLen; + (*pReq)->numOfBlocks = htonl(numOfBlks); + SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1); + while (numOfBlks--) { + int32_t dataLen = blk->dataLen; + blk->uid = htobe64(blk->uid); + blk->suid = htobe64(blk->suid); + blk->padding = htonl(blk->padding); + blk->sversion = htonl(blk->sversion); + blk->dataLen = htonl(blk->dataLen); + blk->schemaLen = htonl(blk->schemaLen); + blk->numOfRows = htons(blk->numOfRows); + blk = (SSubmitBlk*)(blk->data + dataLen); + } + } else { + // no valid rows + taosMemoryFreeClear(*pReq); } return TSDB_CODE_SUCCESS; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2fd0815181..553c6a40ab 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -260,6 +260,7 @@ struct SSma { #define SMA_CFG(s) (&(s)->pVnode->config) #define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg) +#define SMA_RETENTION(s) ((SRetention *)&(s)->pVnode->config.tsdbCfg.retentions) #define SMA_LOCKED(s) ((s)->locked) #define SMA_META(s) ((s)->pVnode->pMeta) #define SMA_VID(s) TD_VID((s)->pVnode) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index b2dcce8f4c..0c372dfa70 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -400,22 +400,24 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 } if (taosArrayGetSize(pResult) > 0) { -#if 1 +#if 0 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, level); blockDebugShowData(pResult, flag); #endif STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); SSubmitReq *pReq = NULL; - if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) != 0) { + if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) { taosArrayDestroy(pResult); return TSDB_CODE_FAILED; } - if (tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) != 0) { + + if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) { taosArrayDestroy(pResult); taosMemoryFreeClear(pReq); return TSDB_CODE_FAILED; } + taosMemoryFreeClear(pReq); } else { smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno)); @@ -469,6 +471,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { return TSDB_CODE_SUCCESS; } + SRetention *pRetention = SMA_RETENTION(pSma); + if (!RETENTION_VALID(pRetention + 1)) { + // return directly if retention level 1 is invalid + return TSDB_CODE_SUCCESS; + } + if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { STbUidStore uidStore = {0}; tdFetchSubmitReqSuids(pMsg, &uidStore); From 9a9200d28b93ae286f19b530688c3d28dfce94e6 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 16 Jun 2022 15:38:40 +0800 Subject: [PATCH 06/16] refactor(sync): add restore finish when become leader again --- source/libs/sync/src/syncAppendEntries.c | 7 +- source/libs/sync/src/syncAppendEntriesReply.c | 13 +- source/libs/sync/src/syncCommit.c | 5 +- source/libs/sync/src/syncMain.c | 70 +++++----- source/libs/sync/src/syncRaftLog.c | 16 ++- source/libs/sync/src/syncRespMgr.c | 19 +-- source/libs/sync/src/syncSnapshot.c | 131 ++++++++++-------- 7 files changed, 147 insertions(+), 114 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 08a4081ad3..89b4761212 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -713,7 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { // delete confict entries code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); - sDebug("vgId:%d sync event log truncate, from %ld to %ld", ths->vgId, delBegin, delEnd); + sDebug("vgId:%d sync event currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, ths->pRaftStore->currentTerm, + delBegin, delEnd); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); return code; @@ -994,8 +995,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs SyncIndex commitEnd = snapshot.lastApplyIndex; ths->commitIndex = snapshot.lastApplyIndex; - sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, commitBegin, - commitEnd, syncUtilState2String(ths->state)); + sDebug("vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, + ths->pRaftStore->currentTerm, commitBegin, commitEnd, syncUtilState2String(ths->state)); } SyncIndex beginIndex = ths->commitIndex + 1; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index fbe3319675..5290d7d28e 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -190,18 +190,19 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (gRaftDetailLog) { char* s = snapshotSender2Str(pSender); sDebug( - "vgId:%d sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu " + "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " + "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu " "sender:%s", - ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, - pSender->snapshot.lastConfigIndex, pSender->privateTerm, s); + ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, s); taosMemoryFree(s); } else { sDebug( - "vgId:%d sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " + "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " "lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu", - ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, - pSender->snapshot.lastConfigIndex, pSender->privateTerm); + ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); } } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 3424fac5e7..efdb5b24ce 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -56,8 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SyncIndex commitEnd = snapshot.lastApplyIndex; pSyncNode->commitIndex = snapshot.lastApplyIndex; - sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId, - pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state)); + sDebug("vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId, + pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex, + syncUtilState2String(pSyncNode->state)); } // update commit index diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5e3c1adf32..05a41daa9e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -562,7 +562,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { return -1; } assert(rid == pSyncNode->rid); - sDebug("vgId:%d sync event propose msgType:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType); + sDebug("vgId:%d sync event currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, + pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType); ret = syncNodePropose(pSyncNode, pMsg, isWeak); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -571,7 +572,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = 0; - sDebug("vgId:%d sync event propose msgType:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType); + sDebug("vgId:%d sync event currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, + pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { SRespStub stub; @@ -604,8 +606,6 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; - sDebug("vgId:%d sync event sync open", pSyncInfo->vgId); - SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); assert(pSyncNode != NULL); memset(pSyncNode, 0, sizeof(SSyncNode)); @@ -819,6 +819,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // snapshot meta // pSyncNode->sMeta.lastConfigIndex = -1; + sDebug("vgId:%d sync event currentTerm:%lu sync open", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm); + return pSyncNode; } @@ -864,7 +866,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { } void syncNodeClose(SSyncNode* pSyncNode) { - sDebug("vgId:%d sync event sync close", pSyncNode->vgId); + sDebug("vgId:%d sync event currentTerm:%lu sync close", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm); int32_t ret; assert(pSyncNode != NULL); @@ -1303,8 +1305,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { oldSenders[i] = (pSyncNode->senders)[i]; - sDebug("vgId:%d sync event save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, i, oldSenders[i], - oldSenders[i]->privateTerm); + sDebug("vgId:%d sync event currentTerm:%lu save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, + pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm); if (gRaftDetailLog) { ; } @@ -1356,8 +1358,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex char host[128]; uint16_t port; syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); - sDebug("vgId:%d sync event reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu", pSyncNode->vgId, - (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], oldSenders[j]->privateTerm); + sDebug("vgId:%d sync event currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu", + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, + oldSenders[j], oldSenders[j]->privateTerm); (pSyncNode->senders)[i] = oldSenders[j]; oldSenders[j] = NULL; reset = true; @@ -1365,8 +1368,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex // reset replicaIndex int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; (pSyncNode->senders)[i]->replicaIndex = i; - sDebug("vgId:%d sync event udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", pSyncNode->vgId, - oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); + sDebug("vgId:%d sync event currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, + (pSyncNode->senders)[i], reset); } } } @@ -1375,8 +1379,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if ((pSyncNode->senders)[i] == NULL) { (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); - sDebug("vgId:%d sync event create new sender %p replicaIndex:%d, privateTerm:%lu", pSyncNode->vgId, - (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm); + sDebug("vgId:%d sync event currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, + (pSyncNode->senders)[i]->privateTerm); } } @@ -1384,7 +1389,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if (oldSenders[i] != NULL) { snapshotSenderDestroy(oldSenders[i]); - sDebug("vgId:%d sync event delete old sender %p replicaIndex:%d", pSyncNode->vgId, oldSenders[i], i); + sDebug("vgId:%d sync event currentTerm:%lu delete old sender %p replicaIndex:%d", pSyncNode->vgId, + pSyncNode->pRaftStore->currentTerm, oldSenders[i], i); oldSenders[i] = NULL; } } @@ -1454,9 +1460,11 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { } void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { - sDebug("vgId:%d sync event become follower, isStandBy:%d, currentTerm:%lu, replicaNum:%d, restoreFinish:%d, %s", - pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftStore->currentTerm, pSyncNode->replicaNum, - pSyncNode->restoreFinish, debugStr); + sDebug( + "vgId:%d sync event currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " + "restoreFinish:%d, %s", + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, + pSyncNode->restoreFinish, debugStr); // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -1493,8 +1501,8 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // reset restoreFinish pSyncNode->restoreFinish = false; - sDebug("vgId:%d sync event become leader, isStandBy:%d, currentTerm:%lu, replicaNum:%d, restoreFinish:%d, %s", - pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftStore->currentTerm, pSyncNode->replicaNum, + sDebug("vgId:%d sync event currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, restoreFinish:%d, %s", + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->restoreFinish, debugStr); // state change @@ -2069,21 +2077,13 @@ const char* syncStr(ESyncState state) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); - /* - char host[128]; - uint16_t port; - syncUtilU642Addr(pSyncLeaderTransfer->newLeaderId.addr, host, sizeof(host), &port); - sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, host, port, - pSyncLeaderTransfer->newLeaderId.addr); - */ - - sDebug("vgId:%d sync event begin leader transfer", ths->vgId); + sDebug("vgId:%d sync event currentTerm:%lu begin leader transfer", ths->vgId, ths->pRaftStore->currentTerm); if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { - sDebug("vgId:%d sync event maybe leader transfer to %s:%d %lu", ths->vgId, - pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort, - pSyncLeaderTransfer->newLeaderId.addr); + sDebug("vgId:%d sync event currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, + ths->pRaftStore->currentTerm, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, + pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr); // reset elect timer now! int32_t electMS = 1; @@ -2205,8 +2205,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t code = 0; ESyncState state = flag; - sDebug("vgId:%d sync event commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex, - endIndex, syncUtilState2String(state)); + sDebug("vgId:%d sync event currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, + ths->pRaftStore->currentTerm, beginIndex, endIndex, syncUtilState2String(state)); // execute fsm if (ths->pFsm != NULL) { @@ -2254,8 +2254,8 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ths->pFsm->FpRestoreFinishCb(ths->pFsm); } ths->restoreFinish = true; - sDebug("vgId:%d sync event restore finish, %s, index:%ld", ths->vgId, syncUtilState2String(ths->state), - pEntry->index); + sDebug("vgId:%d sync event currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, + ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), pEntry->index); } } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 79ce84abe9..31ac44fa81 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -15,6 +15,7 @@ #include "syncRaftLog.h" #include "syncRaftCfg.h" +#include "syncRaftStore.h" #include "wal.h" // refactor, log[0 .. n] ==> log[m .. n] @@ -162,10 +163,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr walFsync(pWal, true); - sDebug("vgId:%d sync event write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", - pData->pSyncNode->vgId, pEntry->index, syncUtilState2String(pData->pSyncNode->state), - pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), pEntry->msgType, - TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); + sDebug("vgId:%d sync event currentTerm:%lu write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", + pData->pSyncNode->vgId, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, + syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, + TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); return code; } @@ -321,7 +322,12 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { walFsync(pWal, true); - sDebug("sync event old write wal: %ld", pEntry->index); + sDebug( + "vgId:%d sync event currentTerm:%lu old write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", + pData->pSyncNode->vgId, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, + syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), + pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); + return code; } diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index adc67b5d8d..5b127793d6 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -14,6 +14,7 @@ */ #include "syncRespMgr.h" +#include "syncRaftStore.h" SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr)); @@ -45,9 +46,9 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, - TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle, - pStub->rpcMsg.info.ahandle); + sDebug("vgId:%d sync event currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, + pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, + pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); taosThreadMutexUnlock(&(pObj->mutex)); return keyCode; @@ -70,9 +71,9 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { memcpy(pStub, pTmp, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, - TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, - pStub->rpcMsg.info.ahandle); + sDebug("vgId:%d sync event currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p", + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, + index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); taosThreadMutexUnlock(&(pObj->mutex)); return 1; // get one object @@ -89,9 +90,9 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu memcpy(pStub, pTmp, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, - TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, - pStub->rpcMsg.info.ahandle); + sDebug("vgId:%d sync event currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p", + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, + index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); taosHashRemove(pObj->pRespHash, &index, sizeof(index)); taosThreadMutexUnlock(&(pObj->mutex)); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 545ea953f8..d3785060d4 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -141,18 +141,22 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " + "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu send " "msg:%s", - pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, msgStr); + pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, + pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " + "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); + pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, + pSender->privateTerm); } syncSnapshotSendDestroy(pMsg); @@ -279,25 +283,31 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " + "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu send " "msg:%s", - pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, msgStr); + pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, + pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " + "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); + pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, + pSender->privateTerm); } } else { sDebug( - "vgId:%d sync event snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " + "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld " + "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); + pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, + pSender->privateTerm); } syncSnapshotSendDestroy(pMsg); @@ -328,12 +338,15 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); - sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send msg:%s", - pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->privateTerm, msgStr); + sDebug( + "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send msg:%s", + pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { - sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu", pSender->pSyncNode->vgId, - host, port, pSender->seq, pSender->ack, pSender->privateTerm); + sDebug("vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu", + pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, + pSender->ack, pSender->privateTerm); } syncSnapshotSendDestroy(pMsg); @@ -485,7 +498,7 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) { pReceiver->start = false; if (apply) { - ++(pReceiver->privateTerm); + // ++(pReceiver->privateTerm); } if (gRaftDetailLog) { @@ -566,17 +579,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, - pReceiver->privateTerm, msgStr); + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, + pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld privateTerm:%lu", - pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, - pReceiver->privateTerm); + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, + pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { @@ -612,14 +625,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { bool isDrop; if (IamInNew) { - sDebug("vgId:%d sync event update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ", - pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); + sDebug( + "vgId:%d sync event currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, " + "lastConfigIndex:%ld ", + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex); syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); } else { sDebug( - "vgId:%d sync event do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event currentTerm:%lu do not update config by snapshot, I am not in newCfg, " + "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld ", - pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex); } // change isStandBy to normal @@ -644,19 +662,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); sDebug( - "vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, " + "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s", - pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - snapshot.lastConfigIndex, pReceiver->privateTerm, logSimpleStr); + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, + snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm, + logSimpleStr); taosMemoryFree(logSimpleStr); } else { sDebug( - "vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, " + "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu", - pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - snapshot.lastConfigIndex, pReceiver->privateTerm); + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, + snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm); } pReceiver->pWriter = NULL; @@ -667,17 +686,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, - pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); + pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, + pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, - pMsg->lastConfigIndex, pReceiver->privateTerm); + pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, + pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { @@ -692,18 +711,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " + "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv " "msg:%s", - pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, - pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); + pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, + pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " + "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, - pMsg->lastConfigIndex, pReceiver->privateTerm); + pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, + pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); } } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { @@ -723,17 +744,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " + "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, - pReceiver->privateTerm, msgStr); + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, + pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " + "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, - pReceiver->privateTerm); + pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, + pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); } } else { From 528d7d7248cbafb734aec558368cacbcaa832551 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Thu, 16 Jun 2022 15:46:51 +0800 Subject: [PATCH 07/16] os: add dll check --- include/os/osSysinfo.h | 1 + source/client/src/clientHb.c | 4 +++- source/libs/catalog/src/ctgCache.c | 4 +++- source/os/src/osSysinfo.c | 16 ++++++++++++++++ source/util/src/tcache.c | 6 +++++- 5 files changed, 28 insertions(+), 3 deletions(-) diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index c009bcf350..4ec2e2884e 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -68,6 +68,7 @@ typedef struct { } SysNameInfo; SysNameInfo taosGetSysNameInfo(); +bool taosCheckCurrentInDll(); #ifdef __cplusplus } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 5db27ae438..c450e9351f 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -588,7 +588,9 @@ void hbThreadFuncUnexpectedStopped(void) { static void *hbThreadFunc(void *param) { setThreadName("hb"); #ifdef WINDOWS - atexit(hbThreadFuncUnexpectedStopped); + if (taosCheckCurrentInDll()) { + atexit(hbThreadFuncUnexpectedStopped); + } #endif while (1) { int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 277516686b..0948c01270 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -1938,7 +1938,9 @@ void ctgCleanupCacheQueue(void) { void* ctgUpdateThreadFunc(void* param) { setThreadName("catalog"); #ifdef WINDOWS - atexit(ctgUpdateThreadUnexpectedStopped); + if (taosCheckCurrentInDll()) { + atexit(ctgUpdateThreadUnexpectedStopped); + } #endif qInfo("catalog update thread started"); diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index ace870853f..4981f7dc26 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -945,3 +945,19 @@ SysNameInfo taosGetSysNameInfo() { return info; #endif } + + +bool taosCheckCurrentInDll() { +#ifdef WINDOWS + MEMORY_BASIC_INFORMATION mbi; + char path[PATH_MAX] = {0}; + GetModuleFileName(((VirtualQuery(taosCheckCurrentInDll,&mbi,sizeof(mbi)) != 0) ? (HMODULE)mbi.AllocationBase : NULL), path, PATH_MAX); + int strLastIndex = strlen(path); + if ((path[strLastIndex-3] == 'd' || path[strLastIndex-3] == 'D') && (path[strLastIndex-2] == 'l' || path[strLastIndex-2] == 'L') && (path[strLastIndex-1] == 'l' || path[strLastIndex-1] == 'L')) { + return true; + } + return false; +#else + return false; +#endif +} \ No newline at end of file diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 10a5475555..1a716752a9 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -829,7 +829,11 @@ void *taosCacheTimedRefresh(void *handle) { const int32_t SLEEP_DURATION = 500; // 500 ms int64_t count = 0; - atexit(taosCacheRefreshWorkerUnexpectedStopped); +#ifdef WINDOWS + if (taosCheckCurrentInDll()) { + atexit(taosCacheRefreshWorkerUnexpectedStopped); + } +#endif while (1) { taosMsleep(SLEEP_DURATION); From 2f81a41f98ed0a49d7329eb561753971b6a4faba Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 16 Jun 2022 16:16:40 +0800 Subject: [PATCH 08/16] refactor(sync): add syncIsReady --- include/libs/sync/sync.h | 1 + source/libs/sync/src/syncMain.c | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index e694203563..fc9d419b1f 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -182,6 +182,7 @@ void syncStart(int64_t rid); void syncStop(int64_t rid); int32_t syncSetStandby(int64_t rid); ESyncState syncGetMyRole(int64_t rid); +bool syncIsReady(int64_t rid); const char* syncGetMyRoleStr(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid); void syncGetEpSet(int64_t rid, SEpSet* pEpSet); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 05a41daa9e..cc2c82fa5f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -381,6 +381,17 @@ ESyncState syncGetMyRole(int64_t rid) { return state; } +bool syncIsReady(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return false; + } + assert(rid == pSyncNode->rid); + bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish; + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return b; +} + bool syncIsRestoreFinish(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { From 7b98430906e03208282be22ac084585172ac0a01 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 16 Jun 2022 17:26:52 +0800 Subject: [PATCH 09/16] feat(stream): stream partition by --- source/libs/executor/src/timewindowoperator.c | 170 ++++++++++-------- .../tsim/stream/distributeInterval0.sim | 37 +++- tests/script/tsim/stream/partitionby.sim | 62 ++++++- 3 files changed, 195 insertions(+), 74 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index f8972a02a1..d7ae823522 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1257,6 +1257,10 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { return TSDB_CODE_SUCCESS; } +bool isCloseWindow(STimeWindow *pWin, STimeWindowAggSupp* pSup) { + return pWin->ekey < pSup->maxTs - pSup->waterMark; +} + static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, SArray* closeWins) { void* pIte = NULL; @@ -1269,7 +1273,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL); - if (win.ekey < pSup->maxTs - pSup->waterMark) { + if (isCloseWindow(&win, pSup)) { char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); taosHashRemove(pHashMap, keyBuf, keyLen); @@ -2036,59 +2040,6 @@ _error: return NULL; } -static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId, - SArray* pUpdated) { - SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; - SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); - SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - int32_t numOfOutput = pOperatorInfo->numOfExprs; - int32_t step = 1; - bool ascScan = true; - TSKEY* tsCols = NULL; - SResultRow* pResult = NULL; - int32_t forwardRows = 0; - - if (pSDataBlock->pDataBlock != NULL) { - SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); - tsCols = (int64_t*)pColDataInfo->pData; - } else { - return; - } - - int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); - TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); - STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, - pInfo->interval.precision, NULL); - while (1) { - int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx, - numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); - if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); - pos->groupId = tableGroupId; - pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; - *(int64_t*)pos->key = pResult->win.skey; - forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, - TSDB_ORDER_ASC); - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { - saveResultRow(pResult, tableGroupId, pUpdated); - } - // window start(end) key interpolation - // doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, - // forwardRows); - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, - tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); - int32_t prevEndPos = (forwardRows - 1) * step + startPos; - ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); - startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); - if (startPos < 0) { - break; - } - } -} - bool isFinalInterval(SStreamFinalIntervalOperatorInfo* pInfo) { return pInfo->pChildren != NULL; } void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput, @@ -2130,6 +2081,74 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArra } } +bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { + SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId); + SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, + pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(int64_t))); + return p1 == NULL; +} + +static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId, + SArray* pUpdated) { + SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; + SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); + SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; + int32_t numOfOutput = pOperatorInfo->numOfExprs; + int32_t step = 1; + bool ascScan = true; + TSKEY* tsCols = NULL; + SResultRow* pResult = NULL; + int32_t forwardRows = 0; + + if (pSDataBlock->pDataBlock != NULL) { + SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); + tsCols = (int64_t*)pColDataInfo->pData; + } else { + return; + } + + int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); + TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); + STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, + pInfo->interval.precision, NULL); + while (1) { + if (isFinalInterval(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) && + isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup)) { + SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); + taosArrayPush(pUpWins, &nextWin); + rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId, + pOperatorInfo->numOfExprs, pOperatorInfo->pTaskInfo); + taosArrayDestroy(pUpWins); + } + int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx, + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); + pos->groupId = tableGroupId; + pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; + *(int64_t*)pos->key = pResult->win.skey; + forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, + TSDB_ORDER_ASC); + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { + saveResultRow(pResult, tableGroupId, pUpdated); + } + // window start(end) key interpolation + // doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, + // forwardRows); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + int32_t prevEndPos = (forwardRows - 1) * step + startPos; + ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); + startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); + if (startPos < 0) { + break; + } + } +} + static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) { taosHashClear(pInfo->aggSup.pResultRowHashTable); clearDiskbasedBuf(pInfo->aggSup.pResultBuf); @@ -2169,6 +2188,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); + TSKEY maxTs = INT64_MIN; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2222,6 +2242,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true); + doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated); if (isFinalInterval(pInfo)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -2238,10 +2259,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true); doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL); } - doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated); - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); + maxTs = TMAX(maxTs, pBlock->info.window.ekey); } + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); if (isFinalInterval(pInfo)) { closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); } @@ -2564,7 +2585,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t } static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx, - int32_t groupId, int32_t numOfOutput, int32_t* rowCellInfoOffset, + uint64_t groupId, int32_t numOfOutput, int32_t* rowCellInfoOffset, SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) { assert(pWinInfo->win.skey <= pWinInfo->win.ekey); // too many time window in query @@ -2642,7 +2663,7 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap) return size - startIndex - 1; } -void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, int32_t groupId, +void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, uint64_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) { SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pCurWins, startIndex); SResultRow* pCurResult = NULL; @@ -2667,13 +2688,18 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, } } +typedef struct SWinRes { + TSKEY ts; + uint64_t groupId; +} SWinRes; + static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated, SHashObj* pStDeleted) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; bool masterScan = true; int32_t numOfOutput = pOperator->numOfExprs; - int64_t groupId = pSDataBlock->info.groupId; + uint64_t groupId = pSDataBlock->info.groupId; int64_t gap = pInfo->gap; int64_t code = TSDB_CODE_SUCCESS; @@ -2693,7 +2719,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; for (int32_t i = 0; i < pSDataBlock->info.rows;) { int32_t winIndex = 0; - SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], pSDataBlock->info.groupId, gap, &winIndex); + SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], groupId, gap, &winIndex); winRows = updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted); code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pTaskInfo); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { @@ -2709,7 +2735,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } pCurWin->isClosed = false; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &(pCurWin->win.skey), sizeof(TSKEY)); + SWinRes value = {.ts = pCurWin->win.skey, .groupId = groupId}; + code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes)); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -2736,7 +2763,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo* } } -static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated, int32_t groupId) { +static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated) { void* pData = NULL; size_t keyLen = 0; while ((pData = taosHashIterate(pStUpdated, pData)) != NULL) { @@ -2746,9 +2773,9 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated, int32_t if (pos == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - pos->groupId = groupId; + pos->groupId = ((SWinRes*)pData)->groupId; pos->pos = *(SResultRowPosition*)key; - *(int64_t*)pos->key = *(uint64_t*)pData; + *(int64_t*)pos->key = ((SWinRes*)pData)->ts; taosArrayPush(pUpdated, &pos); } return TSDB_CODE_SUCCESS; @@ -2815,7 +2842,9 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra __get_win_info_ fn) { // Todo(liuyao) save window to tdb void **pIte = NULL; + size_t keyLen = 0; while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { + uint64_t* pGroupId = taosHashGetKey(pIte, &keyLen); SArray *pWins = (SArray *) (*pIte); int32_t size = taosArrayGetSize(pWins); for (int32_t i = 0; i < size; i++) { @@ -2825,7 +2854,7 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra if (!pSeWin->isClosed) { pSeWin->isClosed = true; if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed); + int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, *pGroupId, pClosed); pSeWin->isOutput = true; } } @@ -2892,7 +2921,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; doClearSessionWindows(&pChildInfo->streamAggSup, &pChildInfo->binfo, pBlock, 0, pChildOp->numOfExprs, pChildInfo->gap, NULL); - rebuildTimeWindow(pInfo, pWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs, pOperator->pTaskInfo); + rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->numOfExprs, pOperator->pTaskInfo); } taosArrayDestroy(pWins); continue; @@ -2916,7 +2945,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getSessionWinInfo); - copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId); + copyUpdateResult(pStUpdated, pUpdated); taosHashCleanup(pStUpdated); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, @@ -3216,8 +3245,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } pCurWin->winInfo.isClosed = false; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &(pCurWin->winInfo.win.skey), - sizeof(TSKEY)); + SWinRes value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId}; + code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), + &value, sizeof(SWinRes)); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -3274,7 +3304,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getStateWinInfo); - copyUpdateResult(pSeUpdated, pUpdated, pBInfo->pRes->info.groupId); + copyUpdateResult(pSeUpdated, pUpdated); taosHashCleanup(pSeUpdated); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, diff --git a/tests/script/tsim/stream/distributeInterval0.sim b/tests/script/tsim/stream/distributeInterval0.sim index f4f3e04f0a..b720272116 100644 --- a/tests/script/tsim/stream/distributeInterval0.sim +++ b/tests/script/tsim/stream/distributeInterval0.sim @@ -173,4 +173,39 @@ endi sql select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s); -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file + +sql create database test1 vgroups 1; +sql use test1; +sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create stream stream_t2 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ; + +sql insert into ts1 values(1648791211000,1,2,3); +sql insert into ts1 values(1648791222001,2,2,3); +sql insert into ts2 values(1648791211000,1,2,3); +sql insert into ts2 values(1648791222001,2,2,3); + +$loop_count = 0 +loop2: +sql select * from streamtST1; + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop2 +endi + +#rows 1 +if $data11 != 2 then + print =====data11=$data11 + goto loop2 +endi + +system sh/stop_dnodes.sh \ No newline at end of file diff --git a/tests/script/tsim/stream/partitionby.sim b/tests/script/tsim/stream/partitionby.sim index df1e096551..b84a01eb4a 100644 --- a/tests/script/tsim/stream/partitionby.sim +++ b/tests/script/tsim/stream/partitionby.sim @@ -34,6 +34,7 @@ print =====rows=$rows goto loop0 endi +print =====loop0 sql create database test1 vgroups 1; sql use test1; @@ -51,7 +52,7 @@ sql insert into ts2 values(1648791211000,1,2,3); $loop_count = 0 -loop0: +loop1: sleep 300 sql select * from streamt; @@ -62,7 +63,62 @@ endi if $rows != 2 then print =====rows=$rows -goto loop0 +goto loop1 endi -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +print =====loop1 + +sql create database test2 vgroups 1; +sql use test2; +sql create stable st(ts timestamp,a int,b int,c int,id int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); + +sql create stream stream_t2 trigger at_once into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by ta interval(10s) ; +sql insert into ts1 values(1648791211000,1,2,3,1); +sql insert into ts1 values(1648791222001,2,2,3,2); +sql insert into ts2 values(1648791211000,1,2,3,3); +sql insert into ts2 values(1648791222001,2,2,3,4); + +sql insert into ts2 values(1648791222002,2,2,3,5); +sql insert into ts2 values(1648791222002,2,2,3,6); + +sql insert into ts1 values(1648791211000,1,2,3,1); +sql insert into ts1 values(1648791222001,2,2,3,2); +sql insert into ts2 values(1648791211000,1,2,3,3); +sql insert into ts2 values(1648791222001,2,2,3,4); + +$loop_count = 0 + +loop2: +sleep 300 +sql select * from streamtST; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then +print =====data01=$data01 +goto loop2 +endi + +if $data02 != 1 then +print =====data02=$data02 +goto loop2 +endi + +if $data03 != 1 then +print =====data03=$data03 +goto loop2 +endi + +if $data04 != 2 then +print =====data04=$data04 +goto loop2 +endi + +print =====loop2 + +system sh/stop_dnodes.sh \ No newline at end of file From c1de4df95bde0ec91dd59203d6403042fad52f20 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 16 Jun 2022 18:33:45 +0800 Subject: [PATCH 10/16] fix: fix page capacity calculation --- source/libs/tdb/src/db/tdbPage.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/libs/tdb/src/db/tdbPage.c b/source/libs/tdb/src/db/tdbPage.c index 92272fb438..78470b6256 100644 --- a/source/libs/tdb/src/db/tdbPage.c +++ b/source/libs/tdb/src/db/tdbPage.c @@ -246,14 +246,17 @@ void tdbPageCopy(SPage *pFromPage, SPage *pToPage) { int tdbPageCapacity(int pageSize, int amHdrSize) { int szPageHdr; + int minCellIndexSize; // at least one cell in cell index if (pageSize < 65536) { szPageHdr = pageMethods.szPageHdr; + minCellIndexSize = pageMethods.szOffset; } else { szPageHdr = pageLargeMethods.szPageHdr; + minCellIndexSize = pageLargeMethods.szOffset; } - return pageSize - szPageHdr - amHdrSize; + return pageSize - szPageHdr - amHdrSize - sizeof(SPageFtr) - minCellIndexSize; } static int tdbPageAllocate(SPage *pPage, int szCell, SCell **ppCell) { @@ -599,4 +602,4 @@ SPageMethods pageLargeMethods = { setLPageCellOffset, // setCellOffset getLPageFreeCellInfo, // getFreeCellInfo setLPageFreeCellInfo // setFreeCellInfo -}; \ No newline at end of file +}; From 67540f2504464927ff1f684f97481138bfc19baf Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 16 Jun 2022 19:49:42 +0800 Subject: [PATCH 11/16] fix: table merge scan sort buf size --- source/libs/executor/src/scanoperator.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ddebba1c9e..a8aad8fe68 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2310,14 +2310,14 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); int32_t rowSize = pInfo->pResBlock->info.rowSize; - pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2; + int32_t blockMetaSize = (int32_t)blockDataGetSerialMetaSize(pInfo->pResBlock->info.numOfCols); + pInfo->bufPageSize = (rowSize * 2 + blockMetaSize) < 1024 ? 1024 : (rowSize * 2 + blockMetaSize); pInfo->sortBufSize = pInfo->bufPageSize * 16; pInfo->hasGroupId = false; pInfo->prefetchedTuple = NULL; pOperator->name = "TableMergeScanOperator"; - // TODO : change it - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; From 98cbbc01975f78132c0ac90c0d0c1be6c73ef507 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Thu, 16 Jun 2022 19:53:39 +0800 Subject: [PATCH 12/16] test: add auto exit case --- tests/system-test/test.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 5a68f548da..f3e39fc9ce 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -13,6 +13,7 @@ # pip install src/connector/python/ # -*- coding: utf-8 -*- +from ast import If import sys import getopt import subprocess @@ -33,8 +34,15 @@ import taos def checkRunTimeError(): import win32gui + timeCount = 0 while 1: time.sleep(1) + timeCount = timeCount + 1 + if (timeCount>900): + os.system("TASKKILL /F /IM taosd.exe") + os.system("TASKKILL /F /IM taos.exe") + os.system("TASKKILL /F /IM tmq_sim.exe") + quit(0) hwnd = win32gui.FindWindow(None, "Microsoft Visual C++ Runtime Library") if hwnd: os.system("TASKKILL /F /IM taosd.exe") From c858b6952853677f3e2a25d5532b63a5114f7df6 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Thu, 16 Jun 2022 19:56:51 +0800 Subject: [PATCH 13/16] test: add auto exit case --- tests/system-test/test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/system-test/test.py b/tests/system-test/test.py index f3e39fc9ce..85f63a2868 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -13,7 +13,6 @@ # pip install src/connector/python/ # -*- coding: utf-8 -*- -from ast import If import sys import getopt import subprocess From 49d92079ca1005546eca0b87daba8fbdaee3e509 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Thu, 16 Jun 2022 19:58:24 +0800 Subject: [PATCH 14/16] test: add auto exit case --- tests/system-test/test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 85f63a2868..55bfd8c945 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -41,6 +41,7 @@ def checkRunTimeError(): os.system("TASKKILL /F /IM taosd.exe") os.system("TASKKILL /F /IM taos.exe") os.system("TASKKILL /F /IM tmq_sim.exe") + os.system("TASKKILL /F /IM mintty.exe") quit(0) hwnd = win32gui.FindWindow(None, "Microsoft Visual C++ Runtime Library") if hwnd: From bdf24b6ab06c991db5198835d18fff3720f793f1 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 16 Jun 2022 20:14:48 +0800 Subject: [PATCH 15/16] fix: uncomment the codes of dropping stable --- source/dnode/vnode/src/vnd/vnodeSvr.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d170931e7e..da1a126a76 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -466,10 +466,10 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe } // process request - // if (metaDropSTable(pVnode->pMeta, version, &req) < 0) { - // rcode = terrno; - // goto _exit; - // } + if (metaDropSTable(pVnode->pMeta, version, &req) < 0) { + rcode = terrno; + goto _exit; + } // return rsp _exit: From 8353da6694408895df7f277aa69920968dc560ce Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 16 Jun 2022 20:17:36 +0800 Subject: [PATCH 16/16] other: revert the code --- source/dnode/vnode/src/tsdb/tsdbReadImpl.c | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index e4c5ef8fc3..1c2514d46f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -233,17 +233,6 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { tsdbError("vgId:%d, SBlockInfo part in file %s is corrupted since wrong checksum, offset:%u len :%u", TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len); return -1; - } else { - SBlockInfo *pBlkInfo = pReadh->pBlkInfo; - SBlock *pBlk = NULL; - int nBlks = (pBlkIdx->len - sizeof(SBlockInfo)) / sizeof(SBlock); - for (int n = 0; n < nBlks; ++n) { - pBlk = &pBlkInfo->blocks[n]; - if (pBlk->numOfSubBlocks >= 1) { - tsdbDebug("prop:vgId:%d, file %s , offset:%u len :%u has %d rows", TSDB_READ_REPO_ID(pReadh), - TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len, pBlk->numOfRows); - } - } } // ASSERT(pBlkIdx->tid == pReadh->pBlkInfo->tid && pBlkIdx->uid == pReadh->pBlkInfo->uid);