From 97498e1844da5884eb2bba8148a9c63b541943c8 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Sun, 7 Apr 2024 11:20:43 +0800 Subject: [PATCH] fix create tsma txn and fix fetch wrong stream progress --- source/dnode/mnode/impl/src/mndSma.c | 2 +- source/libs/catalog/src/ctgAsync.c | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 0753114e13..e19a4c6382 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -1523,7 +1523,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) { mndTransSetDbName(pTrans, pCxt->pDb->name, NULL); if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER; - mndTransSetParallel(pTrans); + mndTransSetSerial(pTrans); mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCxt->pCreateSmaReq->name, pCxt->pCreateStreamReq->name); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 9f20755313..601e01f7e9 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -2845,7 +2845,8 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx); - CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); + if (reqType != TDMT_VND_GET_STREAM_PROGRESS) + CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); switch (reqType) { case TDMT_MND_GET_TABLE_TSMA: { @@ -2878,10 +2879,12 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf } } break; case TDMT_VND_GET_STREAM_PROGRESS: { + SStreamProgressRsp rsp = {0}; + CTG_ERR_JRET(ctgProcessRspMsg(&rsp, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); // update progress into res STableTSMAInfoRsp* pTsmasRsp = pRes->pRes; SArray* pTsmas = pTsmasRsp->pTsmas; - SStreamProgressRsp* pRsp = pMsgCtx->out; + SStreamProgressRsp* pRsp = &rsp; int32_t tsmaIdx = pRsp->subFetchIdx / pFetch->vgNum; STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas, tsmaIdx); if (pTsmaInfo->rspTs == 0) pTsmaInfo->fillHistoryFinished = true;