feat: add ignore update option for create stream

This commit is contained in:
Xiaoyu Wang 2023-02-08 18:46:12 +08:00
parent bafe5a577e
commit 98e2fa2d7b
11 changed files with 26 additions and 1 deletions

View File

@ -93,6 +93,7 @@ typedef struct SScanLogicNode {
int64_t watermark; int64_t watermark;
int64_t deleteMark; int64_t deleteMark;
int8_t igExpired; int8_t igExpired;
int8_t igCheckUpdate;
SArray* pSmaIndexes; SArray* pSmaIndexes;
SNodeList* pGroupTags; SNodeList* pGroupTags;
bool groupSort; bool groupSort;
@ -217,6 +218,7 @@ typedef struct SWindowLogicNode {
int64_t watermark; int64_t watermark;
int64_t deleteMark; int64_t deleteMark;
int8_t igExpired; int8_t igExpired;
int8_t igCheckUpdate;
EWindowAlgorithm windowAlgo; EWindowAlgorithm windowAlgo;
EOrder inputTsOrder; EOrder inputTsOrder;
EOrder outputTsOrder; EOrder outputTsOrder;

View File

@ -36,6 +36,7 @@ typedef struct SPlanContext {
int64_t watermark; int64_t watermark;
int64_t deleteMark; int64_t deleteMark;
int8_t igExpired; int8_t igExpired;
int8_t igCheckUpdate;
char* pMsg; char* pMsg;
int32_t msgLen; int32_t msgLen;
const char* pUser; const char* pUser;

View File

@ -648,6 +648,7 @@ typedef struct {
int64_t checkpointFreq; // ms int64_t checkpointFreq; // ms
int64_t currentTick; // do not serialize int64_t currentTick; // do not serialize
int64_t deleteMark; int64_t deleteMark;
int8_t igCheckUpdate;
} SStreamObj; } SStreamObj;
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);

View File

@ -78,6 +78,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
// 3.0.20 // 3.0.20
if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1; if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->igCheckUpdate) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
@ -145,6 +146,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
// 3.0.20 // 3.0.20
if (sver >= 2) { if (sver >= 2) {
if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->igCheckUpdate) < 0) return -1;
} }
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
@ -489,7 +491,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen += tEncodeSMqConsumerEp(buf, pConsumerEp); tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
cnt++; cnt++;
} }
if(cnt != sz) return -1; if (cnt != sz) return -1;
tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp); tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
tlen += taosEncodeString(buf, pSub->dbName); tlen += taosEncodeString(buf, pSub->dbName);
return tlen; return tlen;

View File

@ -297,6 +297,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->triggerParam = pCreate->maxDelay; pObj->triggerParam = pCreate->maxDelay;
pObj->watermark = pCreate->watermark; pObj->watermark = pCreate->watermark;
pObj->fillHistory = pCreate->fillHistory; pObj->fillHistory = pCreate->fillHistory;
pObj->igCheckUpdate = pCreate->igUpdate;
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN); memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
@ -345,6 +346,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger, .triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
.watermark = pObj->watermark, .watermark = pObj->watermark,
.igExpired = pObj->igExpired, .igExpired = pObj->igExpired,
.igCheckUpdate = pObj->igCheckUpdate,
}; };
// using ast and param to build physical plan // using ast and param to build physical plan

View File

@ -467,6 +467,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(deleteMark); COPY_SCALAR_FIELD(deleteMark);
COPY_SCALAR_FIELD(igExpired); COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(igCheckUpdate);
COPY_SCALAR_FIELD(windowAlgo); COPY_SCALAR_FIELD(windowAlgo);
COPY_SCALAR_FIELD(inputTsOrder); COPY_SCALAR_FIELD(inputTsOrder);
COPY_SCALAR_FIELD(outputTsOrder); COPY_SCALAR_FIELD(outputTsOrder);

View File

@ -1553,6 +1553,7 @@ static const char* jkTableScanPhysiPlanGroupSort = "GroupSort";
static const char* jkTableScanPhysiPlanTags = "Tags"; static const char* jkTableScanPhysiPlanTags = "Tags";
static const char* jkTableScanPhysiPlanSubtable = "Subtable"; static const char* jkTableScanPhysiPlanSubtable = "Subtable";
static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid"; static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
@ -1618,6 +1619,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanAssignBlockUid, pNode->assignBlockUid); code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanAssignBlockUid, pNode->assignBlockUid);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanIgnoreUpdate, pNode->igCheckUpdate);
}
return code; return code;
} }
@ -1686,6 +1690,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanAssignBlockUid, &pNode->assignBlockUid); code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanAssignBlockUid, &pNode->assignBlockUid);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanIgnoreUpdate, &pNode->igCheckUpdate);
}
return code; return code;
} }

View File

@ -2078,6 +2078,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->assignBlockUid); code = tlvEncodeValueBool(pEncoder, pNode->assignBlockUid);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI8(pEncoder, pNode->igCheckUpdate);
}
return code; return code;
} }
@ -2154,6 +2157,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj)
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->assignBlockUid); code = tlvDecodeValueBool(pDecoder, &pNode->assignBlockUid);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI8(pDecoder, &pNode->igCheckUpdate);
}
return code; return code;
} }

View File

@ -705,6 +705,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
pWindow->watermark = pCxt->pPlanCxt->watermark; pWindow->watermark = pCxt->pPlanCxt->watermark;
pWindow->deleteMark = pCxt->pPlanCxt->deleteMark; pWindow->deleteMark = pCxt->pPlanCxt->deleteMark;
pWindow->igExpired = pCxt->pPlanCxt->igExpired; pWindow->igExpired = pCxt->pPlanCxt->igExpired;
pWindow->igCheckUpdate = pCxt->pPlanCxt->igCheckUpdate;
} }
pWindow->inputTsOrder = ORDER_ASC; pWindow->inputTsOrder = ORDER_ASC;
pWindow->outputTsOrder = ORDER_ASC; pWindow->outputTsOrder = ORDER_ASC;

View File

@ -332,6 +332,7 @@ static void scanPathOptSetScanWin(SScanLogicNode* pScan) {
pScan->watermark = ((SWindowLogicNode*)pParent)->watermark; pScan->watermark = ((SWindowLogicNode*)pParent)->watermark;
pScan->deleteMark = ((SWindowLogicNode*)pParent)->deleteMark; pScan->deleteMark = ((SWindowLogicNode*)pParent)->deleteMark;
pScan->igExpired = ((SWindowLogicNode*)pParent)->igExpired; pScan->igExpired = ((SWindowLogicNode*)pParent)->igExpired;
pScan->igCheckUpdate = ((SWindowLogicNode*)pParent)->igCheckUpdate;
} }
} }

View File

@ -582,6 +582,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pTableScan->triggerType = pScanLogicNode->triggerType; pTableScan->triggerType = pScanLogicNode->triggerType;
pTableScan->watermark = pScanLogicNode->watermark; pTableScan->watermark = pScanLogicNode->watermark;
pTableScan->igExpired = pScanLogicNode->igExpired; pTableScan->igExpired = pScanLogicNode->igExpired;
pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate;
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false; pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode); int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);