rebase with stream sub table name no group id

This commit is contained in:
wangjiaming0909 2024-02-23 13:09:54 +08:00
parent c3e73d9168
commit a75c67efc8
5 changed files with 12 additions and 3 deletions

View File

@ -2604,6 +2604,7 @@ typedef struct {
SArray* pVgroupVerList;
// 3.3.0.0
SArray* pCols; // array of SField
int64_t smaId;
} SCMCreateStreamReq;
typedef struct {

View File

@ -7798,6 +7798,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
}
if (tEncodeI64(&encoder, pReq->smaId) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -7924,6 +7925,9 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
}
}
}
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI64(&decoder, &pReq->smaId)< 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);

View File

@ -109,7 +109,7 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
mDebug("mndSetSinkTaskInfo to sma or table, taskId:%s", pTask->id.idStr);
if (pStream->smaId != 0) {
if (pStream->smaId != 0 && pStream->subTableWithoutMd5 != 1) {
pInfo->type = TASK_OUTPUT__SMA;
pInfo->smaSink.smaId = pStream->smaId;
} else {

View File

@ -612,7 +612,6 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.conf.triggerParam = pCreate->maxDelay;
streamObj.ast = taosStrdup(smaObj.ast);
streamObj.indexForMultiAggBalance = -1;
streamObj.subTableWithoutMd5 = 1;
// check the maxDelay
if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
@ -1452,6 +1451,8 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
pCxt->pCreateStreamReq->fillNullCols = NULL;
pCxt->pCreateStreamReq->igUpdate = 0;
pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs;
pCxt->pCreateStreamReq->smaId = pCxt->pSma->uid;
//TODO remove this log
mDebug("tsma create stream with last ts: %" PRId64 "vgversion size: %d", pCxt->pCreateSmaReq->lastTs,
pCxt->pCreateStreamReq->pVgroupVerList ? pCxt->pCreateStreamReq->pVgroupVerList->size : 0);
pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast);

View File

@ -346,7 +346,10 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->createTime = taosGetTimestampMs();
pObj->updateTime = pObj->createTime;
pObj->version = 1;
pObj->smaId = 0;
if (pCreate->smaId > 0) {
pObj->subTableWithoutMd5 = 1;
}
pObj->smaId = pCreate->smaId;
pObj->indexForMultiAggBalance = -1;
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));