From a5db0b4de5ff746194f5ecdda1f84366a61c2406 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 5 Jun 2024 10:41:22 +0800 Subject: [PATCH 1/8] fix:[TD-30365] test case error --- tests/system-test/7-tmq/subscribeDb3.py | 2 +- tests/system-test/7-tmq/tmqDropStbCtb.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index 37e3a17100..185d1b01cf 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -219,7 +219,7 @@ class TDTestCase: expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] topicList = topicName1 ifcheckdata = 0 - ifManualCommit = 1 + ifManualCommit = 0 keyList = 'group.id:cgrp1,\ enable.auto.commit:false,\ auto.commit.interval.ms:6000,\ diff --git a/tests/system-test/7-tmq/tmqDropStbCtb.py b/tests/system-test/7-tmq/tmqDropStbCtb.py index eacacd913b..a1929237b7 100644 --- a/tests/system-test/7-tmq/tmqDropStbCtb.py +++ b/tests/system-test/7-tmq/tmqDropStbCtb.py @@ -157,7 +157,7 @@ class TDTestCase: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) if self.snapshot == 0: - if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows < expectrowcnt)): + if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows <= expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") @@ -249,7 +249,7 @@ class TDTestCase: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) if self.snapshot == 0: - if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows < expectrowcnt)): + if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows <= expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") From 48ecba95da8d7380f8962079fa373e5f8403b2ca Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 5 Jun 2024 18:02:57 +0800 Subject: [PATCH 2/8] fix:[TD-30365] ci case error & drop topic error if vnode is splitted --- source/dnode/mnode/impl/src/mndSubscribe.c | 12 ++++---- source/dnode/vnode/src/tq/tq.c | 29 +++++++------------ source/dnode/vnode/src/tq/tqMeta.c | 1 - source/dnode/vnode/src/tq/tqPush.c | 2 +- .../6-cluster/clusterCommonCheck.py | 18 +++++------- .../7-tmq/tmqVnodeSplit-stb-select-false.py | 5 ++-- .../7-tmq/tmqVnodeSplit-stb-select.py | 6 ++-- 7 files changed, 29 insertions(+), 44 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 0068b582cf..9f84e25c9f 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -912,6 +912,11 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran int32_t sz = taosArrayGetSize(pSub->unassignedVgs); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId); + if (pVgObj == NULL) { + mError("sendDeleteSubToVnode %s failed since vg %d doesn't exist", pSub->key, pVgEp->vgId); + continue; + } SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); if(pReq == NULL){ terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -922,17 +927,12 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran pReq->consumerId = -1; memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId); - if (pVgObj == NULL) { - taosMemoryFree(pReq); - terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST; - return -1; - } STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj);; action.pCont = pReq; action.contLen = sizeof(SMqVDeleteReq); action.msgType = TDMT_VND_TMQ_DELETE_SUB; + action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST; mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 79f53e6dec..18442f182b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -649,21 +649,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); - STqHandle* pHandle = NULL; - while (1) { - pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - if (pHandle) { - break; - } - taosRLockLatch(&pTq->lock); - ret = tqMetaGetHandle(pTq, req.subKey); - taosRUnLockLatch(&pTq->lock); - - if (ret < 0) { - break; - } - } - + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { if (req.oldConsumerId != -1) { tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64, @@ -698,10 +684,17 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); - atomic_store_64(&pHandle->consumerId, req.newConsumerId); - atomic_store_32(&pHandle->epoch, 0); tqUnregisterPushHandle(pTq, pHandle); - ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); + taosHashRemove(pTq->pHandle, pHandle->subKey, strlen(pHandle->subKey)); + + // update handle to avoid req->qmsg changed if spilt vnode is failed + STqHandle handle = {0}; + ret = tqCreateHandle(pTq, &req, &handle); + if (ret < 0) { + tqDestroyTqHandle(&handle); + goto end; + } + ret = tqMetaSaveHandle(pTq, req.subKey, &handle); } taosWUnLockLatch(&pTq->lock); break; diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 76322c527f..cb64c9033a 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -351,7 +351,6 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); handle->consumerId = req->newConsumerId; - handle->epoch = -1; handle->execHandle.subType = req->subType; handle->fetchMeta = req->withMeta; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 71e6771370..7375478e61 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -87,7 +87,7 @@ int tqUnregisterPushHandle(STQ* pTq, void *handle) { int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); tqInfo("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); - if(pHandle->msg != NULL) { + if(ret == 0 && pHandle->msg != NULL) { // tqPushDataRsp(pHandle, vgId); tqPushEmptyDataRsp(pHandle, vgId); diff --git a/tests/system-test/6-cluster/clusterCommonCheck.py b/tests/system-test/6-cluster/clusterCommonCheck.py index 694227cea7..be99d01a5c 100644 --- a/tests/system-test/6-cluster/clusterCommonCheck.py +++ b/tests/system-test/6-cluster/clusterCommonCheck.py @@ -237,21 +237,19 @@ class ClusterComCheck: last_number=vgroup_numbers-1 while count < count_number: time.sleep(1) + count+=1 + print("check vgroup count :", count) tdSql.query(f"show {db_name}.vgroups;") - if count == 0 : - if tdSql.checkRows(vgroup_numbers) : - tdLog.success(f"{db_name} has {vgroup_numbers} vgroups" ) - else: - tdLog.exit(f"vgroup number of {db_name} is not correct") + if tdSql.getRows() != vgroup_numbers : + continue if self.db_replica == 1 : if tdSql.queryResult[0][4] == 'leader' and tdSql.queryResult[last_number][4] == 'leader': tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';") print("db replica :",tdSql.queryResult[0][0]) if tdSql.queryResult[0][0] == db_replica: - ready_time= (count + 1) - tdLog.success(f"all vgroups with replica {self.db_replica} of {db_name} are leaders in {count + 1} s") + tdLog.success(f"all vgroups with replica {self.db_replica} of {db_name} are leaders in {count} s") return True - count+=1 + elif self.db_replica == 3 : vgroup_status_first=[tdSql.queryResult[0][4],tdSql.queryResult[0][6],tdSql.queryResult[0][8]] @@ -261,10 +259,8 @@ class ClusterComCheck: tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';") print("db replica :",tdSql.queryResult[0][0]) if tdSql.queryResult[0][0] == db_replica: - ready_time= (count + 1) - tdLog.success(f"elections of {db_name}.vgroups with replica {self.db_replica} are ready in {ready_time} s") + tdLog.success(f"elections of {db_name}.vgroups with replica {self.db_replica} are ready in {count} s") return True - count+=1 else: tdLog.debug(tdSql.queryResult) tdLog.notice(f"elections of {db_name} all vgroups with replica {self.db_replica} are failed in {count} s ") diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py index f01bf2558c..a5e61adc8d 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py @@ -200,12 +200,11 @@ class TDTestCase: # tmqCom.checkFileContent(consumerId, queryString) - time.sleep(2) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) - if deleteWal == True: - clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240) + clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240) + tdLog.printNoPrefix("======== test case 1 end ...... ") def run(self): diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py index 5e11de04cb..eb35ebc718 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py @@ -199,13 +199,11 @@ class TDTestCase: tdLog.exit("%d tmq consume rows error!"%consumerId) # tmqCom.checkFileContent(consumerId, queryString) + clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240) - time.sleep(2) + time.sleep(3) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) - - if deleteWal == True: - clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240) tdLog.printNoPrefix("======== test case 1 end ...... ") def run(self): From 697b81aa8ade6241520e44001380df7e35f6f541 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 6 Jun 2024 11:15:23 +0800 Subject: [PATCH 3/8] fix:[TD-30365] ci case error & drop topic error if vnode is splitted --- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 12 ++++++------ source/dnode/vnode/src/tq/tqMeta.c | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 08d32b2b81..68d966add6 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -133,7 +133,7 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); int32_t tqMetaRestoreCheckInfo(STQ* pTq); int32_t tqMetaGetHandle(STQ* pTq, const char* key); -int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t snapshotVer); STqOffsetStore* tqOffsetOpen(STQ* pTq); int32_t tqMetaTransform(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 18442f182b..925f8feb05 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -50,6 +50,9 @@ void tqDestroyTqHandle(void* data) { if (pData->block != NULL) { blockDataDestroy(pData->block); } + if (pData->pRef) { + walCloseRef(pData->pRef->pWal, pData->pRef->refId); + } } static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) { @@ -571,9 +574,6 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg taosMsleep(10); continue; } - if (pHandle->pRef) { - walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); - } tqUnregisterPushHandle(pTq, pHandle); @@ -660,7 +660,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg goto end; } STqHandle handle = {0}; - ret = tqCreateHandle(pTq, &req, &handle); + ret = tqCreateHandle(pTq, &req, &handle, walGetCommittedVer(pTq->pVnode->pWal)); if (ret < 0) { tqDestroyTqHandle(&handle); goto end; @@ -689,7 +689,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // update handle to avoid req->qmsg changed if spilt vnode is failed STqHandle handle = {0}; - ret = tqCreateHandle(pTq, &req, &handle); + ret = tqCreateHandle(pTq, &req, &handle, pHandle->snapshotVer); if (ret < 0) { tqDestroyTqHandle(&handle); goto end; @@ -701,7 +701,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } } -end: + end: tDecoderClear(&dc); return ret; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index cb64c9033a..5162136591 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -346,7 +346,7 @@ end: return code; } -int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t snapshotVer){ int32_t vgId = TD_VID(pTq->pVnode); memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); @@ -364,7 +364,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg); } - handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal); + handle->snapshotVer = snapshotVer; if(buildHandle(pTq, handle) < 0){ return -1; From 0e337d34186717aae935cd6278588710659e9794 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 6 Jun 2024 17:26:20 +0800 Subject: [PATCH 4/8] fix:[TD-30365] ci case error & drop topic error if vnode is splitted --- source/dnode/vnode/src/tq/tq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 925f8feb05..58085e2cc2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -685,13 +685,13 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); tqUnregisterPushHandle(pTq, pHandle); - taosHashRemove(pTq->pHandle, pHandle->subKey, strlen(pHandle->subKey)); // update handle to avoid req->qmsg changed if spilt vnode is failed STqHandle handle = {0}; ret = tqCreateHandle(pTq, &req, &handle, pHandle->snapshotVer); if (ret < 0) { tqDestroyTqHandle(&handle); + taosWUnLockLatch(&pTq->lock); goto end; } ret = tqMetaSaveHandle(pTq, req.subKey, &handle); From 70db803aecef56c68094f185f48af4993ee65eab Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 11 Jun 2024 17:24:29 +0800 Subject: [PATCH 5/8] fix:[TD-30365] ci case error & drop topic error if vnode is splitted --- source/dnode/mnode/impl/inc/mndDef.h | 4 +-- source/dnode/mnode/impl/src/mndConsumer.c | 15 ++++------ source/dnode/mnode/impl/src/mndSubscribe.c | 32 +++++++++++--------- source/dnode/mnode/impl/src/mndTopic.c | 8 ++--- source/dnode/mnode/impl/src/mndTrans.c | 34 +++++++++++----------- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 29 ++++++++++-------- source/dnode/vnode/src/tq/tqMeta.c | 6 ++-- 8 files changed, 67 insertions(+), 63 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 5c21e9b22b..81772635fc 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -102,8 +102,8 @@ typedef enum { TRN_CONFLICT_GLOBAL = 1, TRN_CONFLICT_DB = 2, TRN_CONFLICT_DB_INSIDE = 3, - TRN_CONFLICT_TOPIC = 4, - TRN_CONFLICT_TOPIC_INSIDE = 5, +// TRN_CONFLICT_TOPIC = 4, +// TRN_CONFLICT_TOPIC_INSIDE = 5, TRN_CONFLICT_ARBGROUP = 6, } ETrnConflct; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 3eef2afcc1..9a7a8155ec 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -91,7 +91,7 @@ void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SR } } -static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, +static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) { SMqTopicObj *pTopic = NULL; int32_t code = 0; @@ -135,11 +135,6 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode * } } - mndTransSetDbName(pTrans, pOneTopic, NULL); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - code = -1; - goto FAILED; - } mndReleaseTopic(pMnode, pTopic); } @@ -177,12 +172,12 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { goto END; } - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm"); if (pTrans == NULL) { code = -1; goto END; } - code = validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false); + code = validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false); if (code != 0) { goto END; } @@ -675,13 +670,13 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto _over; } - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); if (pTrans == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _over; } - code = validateTopics(pTrans, subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay); + code = validateTopics(subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay); if (code != TSDB_CODE_SUCCESS) { goto _over; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 9f84e25c9f..ffb723756c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -618,13 +618,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "tmq-reb"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); if (pTrans == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto END; } - mndTransSetDbName(pTrans, topic, cgroup); + mndTransSetDbName(pTrans, pOutput->pSub->dbName, cgroup); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto END; @@ -908,22 +908,26 @@ END: } static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){ - // iter all vnode to delete handle - int32_t sz = taosArrayGetSize(pSub->unassignedVgs); - for (int32_t i = 0; i < sz; i++) { - SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId); - if (pVgObj == NULL) { - mError("sendDeleteSubToVnode %s failed since vg %d doesn't exist", pSub->key, pVgEp->vgId); + void* pIter = NULL; + SVgObj* pVgObj = NULL; + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgObj); + if (pIter == NULL) { + break; + } + + if (!mndVgroupInDb(pVgObj, pSub->dbUid)) { + sdbRelease(pMnode->pSdb, pVgObj); continue; } SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); if(pReq == NULL){ terrno = TSDB_CODE_OUT_OF_MEMORY; + sdbRelease(pMnode->pSdb, pVgObj); return -1; } - pReq->head.vgId = htonl(pVgEp->vgId); - pReq->vgId = pVgEp->vgId; + pReq->head.vgId = htonl(pVgObj->vgId); + pReq->vgId = pVgObj->vgId; pReq->consumerId = -1; memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); @@ -934,7 +938,7 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran action.msgType = TDMT_VND_TMQ_DELETE_SUB; action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST; - mndReleaseVgroup(pMnode, pVgObj); + sdbRelease(pMnode->pSdb, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -996,7 +1000,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { goto end; } - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "drop-cgroup"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "drop-cgroup"); if (pTrans == NULL) { mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); code = -1; @@ -1004,7 +1008,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { } mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); - mndTransSetDbName(pTrans, dropReq.topic, dropReq.cgroup); + mndTransSetDbName(pTrans, pSub->dbName, dropReq.cgroup); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto end; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 8a06b4a613..bcb38a3902 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -422,14 +422,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * SQueryPlan *pPlan = NULL; SMqTopicObj topicObj = {0}; - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "create-topic"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "create-topic"); if (pTrans == NULL) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); code = -1; goto _OUT; } - mndTransSetDbName(pTrans, pCreate->name, NULL); + mndTransSetDbName(pTrans, pDb->name, NULL); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto _OUT; @@ -779,14 +779,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } } - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "drop-topic"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-topic"); if (pTrans == NULL) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); code = -1; goto end; } - mndTransSetDbName(pTrans, pTopic->name, NULL); + mndTransSetDbName(pTrans, pTopic->db, NULL); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto end; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 8b01d296a3..d80164bcad 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -830,26 +830,26 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { } } - if (pNew->conflict == TRN_CONFLICT_TOPIC) { - if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; - if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { - if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; - } - } - if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) { - if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; - if (pTrans->conflict == TRN_CONFLICT_TOPIC) { - if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; - } - if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { - if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0) - conflict = true; - } - } +// if (pNew->conflict == TRN_CONFLICT_TOPIC) { +// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; +// if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { +// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; +// } +// } +// if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) { +// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; +// if (pTrans->conflict == TRN_CONFLICT_TOPIC) { +// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; +// } +// if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { +// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0) +// conflict = true; +// } +// } if (pNew->conflict == TRN_CONFLICT_ARBGROUP) { if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) { - void *pIter = taosHashIterate(pNew->arbGroupIds, NULL); + pIter = taosHashIterate(pNew->arbGroupIds, NULL); while (pIter != NULL) { int32_t groupId = *(int32_t *)pIter; if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 68d966add6..08d32b2b81 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -133,7 +133,7 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); int32_t tqMetaRestoreCheckInfo(STQ* pTq); int32_t tqMetaGetHandle(STQ* pTq, const char* key); -int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t snapshotVer); +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); STqOffsetStore* tqOffsetOpen(STQ* pTq); int32_t tqMetaTransform(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 58085e2cc2..083e0b28f3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -649,7 +649,19 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); - STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); + STqHandle* pHandle = NULL; + while (1) { + pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); + if (pHandle) { + break; + } + taosRLockLatch(&pTq->lock); + ret = tqMetaGetHandle(pTq, req.subKey); + taosRUnLockLatch(&pTq->lock); + if (ret < 0) { + break; + } + } if (pHandle == NULL) { if (req.oldConsumerId != -1) { tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64, @@ -660,7 +672,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg goto end; } STqHandle handle = {0}; - ret = tqCreateHandle(pTq, &req, &handle, walGetCommittedVer(pTq->pVnode->pWal)); + ret = tqCreateHandle(pTq, &req, &handle); if (ret < 0) { tqDestroyTqHandle(&handle); goto end; @@ -684,17 +696,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); + atomic_store_64(&pHandle->consumerId, req.newConsumerId); + atomic_store_32(&pHandle->epoch, 0); tqUnregisterPushHandle(pTq, pHandle); - - // update handle to avoid req->qmsg changed if spilt vnode is failed - STqHandle handle = {0}; - ret = tqCreateHandle(pTq, &req, &handle, pHandle->snapshotVer); - if (ret < 0) { - tqDestroyTqHandle(&handle); - taosWUnLockLatch(&pTq->lock); - goto end; - } - ret = tqMetaSaveHandle(pTq, req.subKey, &handle); + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); } taosWUnLockLatch(&pTq->lock); break; diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 5162136591..ce3308f0ac 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -346,7 +346,7 @@ end: return code; } -int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t snapshotVer){ +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ int32_t vgId = TD_VID(pTq->pVnode); memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); @@ -364,12 +364,12 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t sn handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg); } - handle->snapshotVer = snapshotVer; + handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal); if(buildHandle(pTq, handle) < 0){ return -1; } - tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId); + tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, handle->consumerId, vgId, handle->snapshotVer); return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); } From e17cdf84c29aee1048fc37930ed7c39d13ccf9b8 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 11 Jun 2024 18:08:34 +0800 Subject: [PATCH 6/8] fix: count always true for group by tbname performance issue --- source/libs/executor/src/executil.c | 24 +++++++++++++++++++----- source/libs/executor/src/scanoperator.c | 4 ++-- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index d06beebd6b..eb549db6d2 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2178,9 +2178,25 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* return code; } if (group == NULL || groupByTbname) { - for (int32_t i = 0; i < numOfTables; i++) { - STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - info->groupId = groupByTbname ? info->uid : 0; + if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) { + pTableListInfo->remainGroups = + taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + if (pTableListInfo->remainGroups == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int i = 0; i < numOfTables; i++) { + STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); + info->groupId = info->uid; + + taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid), + sizeof(info->uid)); + } + } else { + for (int32_t i = 0; i < numOfTables; i++) { + STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); + info->groupId = groupByTbname ? info->uid : 0; + } } pTableListInfo->oneTableForEachGroup = groupByTbname; @@ -2193,8 +2209,6 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pTableListInfo->numOfOuputGroups = numOfTables; } else if (groupByTbname && pScanNode->groupOrderScan) { pTableListInfo->numOfOuputGroups = numOfTables; - } else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) { - pTableListInfo->numOfOuputGroups = numOfTables; } else { pTableListInfo->numOfOuputGroups = 1; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index eef8b06ac5..ec40bceb5e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -725,7 +725,7 @@ void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) { if (pInfo->countState == TABLE_COUNT_STATE_END) { return; } - if (pInfo->base.pTableListInfo->oneTableForEachGroup || pInfo->base.pTableListInfo->groupOffset) { + if (pInfo->base.pTableListInfo->groupOffset) { pInfo->countState = TABLE_COUNT_STATE_PROCESSED; } else { taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId)); @@ -890,7 +890,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { if (pTableScanInfo->countState < TABLE_COUNT_STATE_END) { STableListInfo* pTableListInfo = pTableScanInfo->base.pTableListInfo; - if (pTableListInfo->oneTableForEachGroup || pTableListInfo->groupOffset) { // group by tbname, group by tag + sort + if (pTableListInfo->groupOffset) { // group by tbname, group by tag + sort if (pTableScanInfo->countState < TABLE_COUNT_STATE_PROCESSED) { pTableScanInfo->countState = TABLE_COUNT_STATE_PROCESSED; STableKeyInfo* pStart = From 13a0bf3fdf5b480da5dd87624279729dd6afca57 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 12 Jun 2024 16:02:05 +0800 Subject: [PATCH 7/8] fix: count empty table with group by issue --- source/common/src/tdatablock.c | 2 ++ source/libs/executor/src/executil.c | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index ac4811fb1b..8e8c9d9a85 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1362,6 +1362,8 @@ void blockDataEmpty(SSDataBlock* pDataBlock) { return; } + taosMemoryFreeClear(pDataBlock->pBlockAgg); + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index eb549db6d2..9ca681779d 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2178,7 +2178,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* return code; } if (group == NULL || groupByTbname) { - if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) { + if (tsCountAlwaysReturnValue && QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode) && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) { pTableListInfo->remainGroups = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); if (pTableListInfo->remainGroups == NULL) { From e060535c1342b69ae393f17a04f49a6132ba9f88 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 12 Jun 2024 17:23:57 +0800 Subject: [PATCH 8/8] feat: data migration speed limit --- docs/en/14-reference/12-config/index.md | 2 +- docs/zh/14-reference/12-config/index.md | 2 +- include/common/tglobal.h | 1 + source/common/src/tglobal.c | 3 ++ source/dnode/vnode/src/tsdb/tsdbRetention.c | 31 ++++++++++++++++++++- 5 files changed, 36 insertions(+), 3 deletions(-) diff --git a/docs/en/14-reference/12-config/index.md b/docs/en/14-reference/12-config/index.md index 5e4eadcceb..f50551b5de 100755 --- a/docs/en/14-reference/12-config/index.md +++ b/docs/en/14-reference/12-config/index.md @@ -432,7 +432,7 @@ The charset that takes effect is UTF-8. | Applicable | Server Only | | Meaning | Maximum number of threads to commit | | Value Range | 0-1024 | -| Default Value | | +| Default Value | 4 | ## Log Parameters diff --git a/docs/zh/14-reference/12-config/index.md b/docs/zh/14-reference/12-config/index.md index 6fce985927..effa72099a 100755 --- a/docs/zh/14-reference/12-config/index.md +++ b/docs/zh/14-reference/12-config/index.md @@ -430,7 +430,7 @@ charset 的有效值是 UTF-8。 | 适用范围 | 仅服务端适用 | | 含义 | 设置写入线程的最大数量 | | 取值范围 | 0-1024 | -| 缺省值 | | +| 缺省值 | 4 | ## 日志相关 diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 90ee6f7cc0..e7035fe297 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -86,6 +86,7 @@ extern int32_t tsNumOfQnodeFetchThreads; extern int32_t tsNumOfSnodeStreamThreads; extern int32_t tsNumOfSnodeWriteThreads; extern int64_t tsRpcQueueMemoryAllowed; +extern int32_t tsRetentionSpeedLimitMB; // sync raft extern int32_t tsElectInterval; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c68dc85c29..da692e78fd 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -74,6 +74,7 @@ int32_t tsNumOfSnodeStreamThreads = 4; int32_t tsNumOfSnodeWriteThreads = 1; int32_t tsMaxStreamBackendCache = 128; // M int32_t tsPQSortMemThreshold = 16; // M +int32_t tsRetentionSpeedLimitMB = 0; // unlimited // sync raft int32_t tsElectInterval = 25 * 1000; @@ -667,6 +668,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -1117,6 +1119,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; + tsRetentionSpeedLimitMB = cfgGetItem(pCfg, "retentionSpeedLimitMB")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; tsRatioOfVnodeStreamThreads = cfgGetItem(pCfg, "ratioOfVnodeStreamThreads")->fval; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 3d53d1ada3..0d3994d78e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -38,6 +38,34 @@ static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) { return TARRAY2_APPEND(&rtner->fopArr, op); } +static int64_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) { + int64_t total = 0; + int64_t interval = 1000; // 1s + int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX; + int64_t offset = 0; + int64_t remain = size; + + while (remain > 0) { + int64_t n; + int64_t last = taosGetTimestampMs(); + if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) { + return -1; + } + + total += n; + remain -= n; + + if (remain > 0) { + int64_t elapsed = taosGetTimestampMs() - last; + if (elapsed < interval) { + taosMsleep(interval - elapsed); + } + } + } + + return total; +} + static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) { int32_t code = 0; int32_t lino = 0; @@ -98,7 +126,8 @@ static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile if (fdTo == NULL) code = terrno; TSDB_CHECK_CODE(code, lino, _exit); - int64_t n = taosFSendFile(fdTo, fdFrom, 0, tsdbLogicToFileSize(from->f->size, rtner->szPage)); + int64_t n = tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage), + tsRetentionSpeedLimitMB); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); TSDB_CHECK_CODE(code, lino, _exit);