diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 799f4e368b..80fa0d3e78 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -93,6 +93,7 @@ typedef struct SScanLogicNode { int64_t watermark; int64_t deleteMark; int8_t igExpired; + int8_t igCheckUpdate; SArray* pSmaIndexes; SNodeList* pGroupTags; bool groupSort; @@ -217,6 +218,7 @@ typedef struct SWindowLogicNode { int64_t watermark; int64_t deleteMark; int8_t igExpired; + int8_t igCheckUpdate; EWindowAlgorithm windowAlgo; EOrder inputTsOrder; EOrder outputTsOrder; diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index f7bd68393e..41c0e98084 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -36,6 +36,7 @@ typedef struct SPlanContext { int64_t watermark; int64_t deleteMark; int8_t igExpired; + int8_t igCheckUpdate; char* pMsg; int32_t msgLen; const char* pUser; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 2f824b48b4..6f6f801c39 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -648,6 +648,7 @@ typedef struct { int64_t checkpointFreq; // ms int64_t currentTick; // do not serialize int64_t deleteMark; + int8_t igCheckUpdate; } SStreamObj; int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index a5f77513de..75177f4158 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -78,6 +78,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { // 3.0.20 if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->igCheckUpdate) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; @@ -145,6 +146,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) { // 3.0.20 if (sver >= 2) { if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->igCheckUpdate) < 0) return -1; } tEndDecode(pDecoder); return 0; @@ -489,7 +491,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { tlen += tEncodeSMqConsumerEp(buf, pConsumerEp); cnt++; } - if(cnt != sz) return -1; + if (cnt != sz) return -1; tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp); tlen += taosEncodeString(buf, pSub->dbName); return tlen; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8f6c7e19ed..61e9fc5366 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -297,6 +297,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pObj->triggerParam = pCreate->maxDelay; pObj->watermark = pCreate->watermark; pObj->fillHistory = pCreate->fillHistory; + pObj->igCheckUpdate = pCreate->igUpdate; memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN); SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); @@ -345,6 +346,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, .triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger, .watermark = pObj->watermark, .igExpired = pObj->igExpired, + .igCheckUpdate = pObj->igCheckUpdate, }; // using ast and param to build physical plan diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 7b9c54ab2c..9da52edf32 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -467,6 +467,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(deleteMark); COPY_SCALAR_FIELD(igExpired); + COPY_SCALAR_FIELD(igCheckUpdate); COPY_SCALAR_FIELD(windowAlgo); COPY_SCALAR_FIELD(inputTsOrder); COPY_SCALAR_FIELD(outputTsOrder); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 1a4ba8e43e..c19f64bb3a 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1553,6 +1553,7 @@ static const char* jkTableScanPhysiPlanGroupSort = "GroupSort"; static const char* jkTableScanPhysiPlanTags = "Tags"; static const char* jkTableScanPhysiPlanSubtable = "Subtable"; static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid"; +static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate"; static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; @@ -1618,6 +1619,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanAssignBlockUid, pNode->assignBlockUid); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanIgnoreUpdate, pNode->igCheckUpdate); + } return code; } @@ -1686,6 +1690,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanAssignBlockUid, &pNode->assignBlockUid); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanIgnoreUpdate, &pNode->igCheckUpdate); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index dfe2f6c8b9..274cac405d 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2078,6 +2078,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeValueBool(pEncoder, pNode->assignBlockUid); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeValueI8(pEncoder, pNode->igCheckUpdate); + } return code; } @@ -2154,6 +2157,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj) if (TSDB_CODE_SUCCESS == code) { code = tlvDecodeValueBool(pDecoder, &pNode->assignBlockUid); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvDecodeValueI8(pDecoder, &pNode->igCheckUpdate); + } return code; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index bd1823a770..a384babb94 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -705,6 +705,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm pWindow->watermark = pCxt->pPlanCxt->watermark; pWindow->deleteMark = pCxt->pPlanCxt->deleteMark; pWindow->igExpired = pCxt->pPlanCxt->igExpired; + pWindow->igCheckUpdate = pCxt->pPlanCxt->igCheckUpdate; } pWindow->inputTsOrder = ORDER_ASC; pWindow->outputTsOrder = ORDER_ASC; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index e901297f4d..ff59087e02 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -332,6 +332,7 @@ static void scanPathOptSetScanWin(SScanLogicNode* pScan) { pScan->watermark = ((SWindowLogicNode*)pParent)->watermark; pScan->deleteMark = ((SWindowLogicNode*)pParent)->deleteMark; pScan->igExpired = ((SWindowLogicNode*)pParent)->igExpired; + pScan->igCheckUpdate = ((SWindowLogicNode*)pParent)->igCheckUpdate; } } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index df10fe8ee3..f016ca165b 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -582,6 +582,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp pTableScan->triggerType = pScanLogicNode->triggerType; pTableScan->watermark = pScanLogicNode->watermark; pTableScan->igExpired = pScanLogicNode->igExpired; + pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate; pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false; int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);