From 4f168b4b2dd579082db6d7b4b449fc400c97afed Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 29 Nov 2023 17:22:43 +0800 Subject: [PATCH] fix:tmqVnodeSplit-column.py failed in arm64 because of rebalance after unsubscribe while split vnode --- source/dnode/mnode/impl/src/mndStream.c | 31 +------------------ source/dnode/mnode/impl/src/mndSubscribe.c | 1 + source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqPush.c | 2 +- .../system-test/7-tmq/tmqVnodeSplit-column.py | 2 +- 5 files changed, 5 insertions(+), 33 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bc265db766..08c60e7afa 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -328,35 +328,6 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } -static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64_t watermark, char **pStr) { - if (NULL == ast) { - return TSDB_CODE_SUCCESS; - } - - SNode * pAst = NULL; - int32_t code = nodesStringToNode(ast, &pAst); - - SQueryPlan *pPlan = NULL; - if (TSDB_CODE_SUCCESS == code) { - SPlanContext cxt = { - .pAstRoot = pAst, - .topicQuery = false, - .streamQuery = true, - .triggerType = (triggerType == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType, - .watermark = watermark, - }; - code = qCreateQueryPlan(&cxt, &pPlan, NULL); - } - - if (TSDB_CODE_SUCCESS == code) { - code = nodesNodeToString((SNode *)pPlan, false, pStr, NULL); - } - nodesDestroyNode(pAst); - nodesDestroyNode((SNode *)pPlan); - terrno = code; - return code; -} - static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) { SNode * pAst = NULL; SQueryPlan *pPlan = NULL; @@ -777,6 +748,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SDbObj * pDb = NULL; SCMCreateStreamReq createStreamReq = {0}; SStreamObj streamObj = {0}; + char* sql = NULL; if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -808,7 +780,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - char* sql = NULL; int32_t sqlLen = 0; if(createStreamReq.sql != NULL){ sqlLen = strlen(createStreamReq.sql); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 408b664e50..44ee804d22 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -161,6 +161,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg, SSubplan* pPlan) { if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { + if(pRebVg->oldConsumerId == -1) return 0; //drop stream, no consumer, while split vnode,all consumerId is -1 terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; return -1; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9d16402ee6..f077caace7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -337,7 +337,7 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) { while (pIter) { STqHandle* pHandle = *(STqHandle**)pIter; - tqInfo("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); + tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); if (ASSERT(pHandle->msg != NULL)) { tqError("pHandle->msg should not be null"); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index f367bc96f8..8fee1d5904 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -72,7 +72,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); - tqInfo("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, + tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); return 0; } diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-column.py b/tests/system-test/7-tmq/tmqVnodeSplit-column.py index 54a43465e7..87a73e981e 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-column.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-column.py @@ -188,7 +188,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId)