feat:stream ignore check update data
This commit is contained in:
parent
1e0e417275
commit
e89362fb36
|
@ -357,6 +357,7 @@ typedef struct STableScanPhysiNode {
|
||||||
int64_t watermark;
|
int64_t watermark;
|
||||||
int8_t igExpired;
|
int8_t igExpired;
|
||||||
bool assignBlockUid;
|
bool assignBlockUid;
|
||||||
|
int8_t igCheckUpdate;
|
||||||
} STableScanPhysiNode;
|
} STableScanPhysiNode;
|
||||||
|
|
||||||
typedef STableScanPhysiNode STableSeqScanPhysiNode;
|
typedef STableScanPhysiNode STableSeqScanPhysiNode;
|
||||||
|
|
|
@ -474,6 +474,8 @@ typedef struct SStreamScanInfo {
|
||||||
int32_t blockRecoverContiCnt;
|
int32_t blockRecoverContiCnt;
|
||||||
int32_t blockRecoverTotCnt;
|
int32_t blockRecoverTotCnt;
|
||||||
|
|
||||||
|
int8_t igCheckUpdate;
|
||||||
|
int8_t igExpired;
|
||||||
} SStreamScanInfo;
|
} SStreamScanInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -1109,7 +1109,7 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
|
||||||
SStreamScanInfo* pScanInfo = downstream->info;
|
SStreamScanInfo* pScanInfo = downstream->info;
|
||||||
pScanInfo->partitionSup = *pParSup;
|
pScanInfo->partitionSup = *pParSup;
|
||||||
pScanInfo->pPartScalarSup = pExpr;
|
pScanInfo->pPartScalarSup = pExpr;
|
||||||
if (!pScanInfo->pUpdateInfo) {
|
if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
|
||||||
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0);
|
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2368,6 +2368,10 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
|
pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
|
||||||
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
|
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
|
||||||
pInfo->partitionSup.needCalc = false;
|
pInfo->partitionSup.needCalc = false;
|
||||||
|
//todo(liuyao) for test
|
||||||
|
pTableScanNode->igCheckUpdate = true;
|
||||||
|
pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
|
||||||
|
pInfo->igExpired = pTableScanNode->igExpired;
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
|
|
|
@ -1726,7 +1726,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSuppor
|
||||||
SStreamScanInfo* pScanInfo = downstream->info;
|
SStreamScanInfo* pScanInfo = downstream->info;
|
||||||
pScanInfo->windowSup.parentType = type;
|
pScanInfo->windowSup.parentType = type;
|
||||||
pScanInfo->windowSup.pIntervalAggSup = pSup;
|
pScanInfo->windowSup.pIntervalAggSup = pSup;
|
||||||
if (!pScanInfo->pUpdateInfo) {
|
if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
|
||||||
pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark);
|
pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark);
|
||||||
}
|
}
|
||||||
pScanInfo->interval = *pInterval;
|
pScanInfo->interval = *pInterval;
|
||||||
|
@ -2893,7 +2893,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
|
||||||
}
|
}
|
||||||
SStreamScanInfo* pScanInfo = downstream->info;
|
SStreamScanInfo* pScanInfo = downstream->info;
|
||||||
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
|
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
|
||||||
if (!pScanInfo->pUpdateInfo) {
|
if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
|
||||||
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
|
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
|
||||||
}
|
}
|
||||||
pScanInfo->twAggSup = *pTwSup;
|
pScanInfo->twAggSup = *pTwSup;
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sleep 50
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql drop stream if exists streams0;
|
||||||
|
sql drop database if exists test;
|
||||||
|
sql create database test vgroups 1;
|
||||||
|
sql use test;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int);
|
||||||
|
sql create stream streams0 trigger at_once ignore update 1 into streamt as select _wstart c1, count(*) c2, max(b) c3 from t1 interval(10s);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,1,1,1);
|
||||||
|
sql insert into t1 values(1648791213000,2,2,2);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop0:
|
||||||
|
sleep 300
|
||||||
|
sql select * from streamt order by 1,2,3;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 2 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 1 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,3,3,3);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop1:
|
||||||
|
sleep 300
|
||||||
|
sql select * from streamt order by 1,2,3;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 3 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 1 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
system sh/stop_dnodes.sh
|
Loading…
Reference in New Issue