From a75c67efc8d7d98ad5bfe8b63c3e9f7d911b5f3b Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Fri, 23 Feb 2024 13:09:54 +0800 Subject: [PATCH] rebase with stream sub table name no group id --- include/common/tmsg.h | 1 + source/common/src/tmsg.c | 4 ++++ source/dnode/mnode/impl/src/mndScheduler.c | 2 +- source/dnode/mnode/impl/src/mndSma.c | 3 ++- source/dnode/mnode/impl/src/mndStream.c | 5 ++++- 5 files changed, 12 insertions(+), 3 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f7a66bc50b..a88d6287ed 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2604,6 +2604,7 @@ typedef struct { SArray* pVgroupVerList; // 3.3.0.0 SArray* pCols; // array of SField + int64_t smaId; } SCMCreateStreamReq; typedef struct { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 0992e74c66..95bfdc3e44 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7798,6 +7798,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeCStr(&encoder, pField->name) < 0) return -1; } + if (tEncodeI64(&encoder, pReq->smaId) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -7924,6 +7925,9 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea } } } + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI64(&decoder, &pReq->smaId)< 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 9aba428ff6..fc2e3b4619 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -109,7 +109,7 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) { mDebug("mndSetSinkTaskInfo to sma or table, taskId:%s", pTask->id.idStr); - if (pStream->smaId != 0) { + if (pStream->smaId != 0 && pStream->subTableWithoutMd5 != 1) { pInfo->type = TASK_OUTPUT__SMA; pInfo->smaSink.smaId = pStream->smaId; } else { diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 1a211df4b2..54fed4b3e4 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -612,7 +612,6 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.conf.triggerParam = pCreate->maxDelay; streamObj.ast = taosStrdup(smaObj.ast); streamObj.indexForMultiAggBalance = -1; - streamObj.subTableWithoutMd5 = 1; // check the maxDelay if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) { @@ -1452,6 +1451,8 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) { pCxt->pCreateStreamReq->fillNullCols = NULL; pCxt->pCreateStreamReq->igUpdate = 0; pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs; + pCxt->pCreateStreamReq->smaId = pCxt->pSma->uid; + //TODO remove this log mDebug("tsma create stream with last ts: %" PRId64 "vgversion size: %d", pCxt->pCreateSmaReq->lastTs, pCxt->pCreateStreamReq->pVgroupVerList ? pCxt->pCreateStreamReq->pVgroupVerList->size : 0); pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 731c316c95..0e2bc83f7c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -346,7 +346,10 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pObj->createTime = taosGetTimestampMs(); pObj->updateTime = pObj->createTime; pObj->version = 1; - pObj->smaId = 0; + if (pCreate->smaId > 0) { + pObj->subTableWithoutMd5 = 1; + } + pObj->smaId = pCreate->smaId; pObj->indexForMultiAggBalance = -1; pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));