feat:do not check whether the expired data has been update
This commit is contained in:
parent
0c9b2c0388
commit
40987fa507
|
@ -1435,7 +1435,12 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
|
||||||
dumyInfo.cur.pageId = -1;
|
dumyInfo.cur.pageId = -1;
|
||||||
bool isClosed = false;
|
bool isClosed = false;
|
||||||
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
|
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||||
if (tableInserted && isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
|
bool overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
|
||||||
|
if (pInfo->igExpired && overDue) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tableInserted && overDue) {
|
||||||
win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
|
win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
|
||||||
isClosed = isCloseWindow(&win, &pInfo->twAggSup);
|
isClosed = isCloseWindow(&win, &pInfo->twAggSup);
|
||||||
}
|
}
|
||||||
|
@ -1701,41 +1706,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
SStreamScanInfo* pInfo = pOperator->info;
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
qDebug("stream scan called");
|
qDebug("stream scan called");
|
||||||
#if 0
|
|
||||||
SStreamState* pState = pTaskInfo->streamInfo.pState;
|
|
||||||
if (pState) {
|
|
||||||
printf(">>>>>>>> stream write backend\n");
|
|
||||||
SWinKey key = {
|
|
||||||
.ts = 1,
|
|
||||||
.groupId = 2,
|
|
||||||
};
|
|
||||||
char tmp[100] = "abcdefg1";
|
|
||||||
if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
key.ts = 2;
|
|
||||||
char tmp2[100] = "abcdefg2";
|
|
||||||
if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
key.groupId = 5;
|
|
||||||
key.ts = 1;
|
|
||||||
char tmp3[100] = "abcdefg3";
|
|
||||||
if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
char* val2 = NULL;
|
|
||||||
int32_t sz;
|
|
||||||
if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
printf("stream read %s %d\n", val2, sz);
|
|
||||||
streamFreeVal(val2);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
|
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
|
||||||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
|
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
|
||||||
|
@ -2370,6 +2340,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
pInfo->partitionSup.needCalc = false;
|
pInfo->partitionSup.needCalc = false;
|
||||||
pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
|
pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
|
||||||
pInfo->igExpired = pTableScanNode->igExpired;
|
pInfo->igExpired = pTableScanNode->igExpired;
|
||||||
|
pInfo->twAggSup.maxTs = INT64_MIN;
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
|
|
|
@ -343,6 +343,13 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
||||||
|
|
||||||
pScan->node.groupAction = GROUP_ACTION_NONE;
|
pScan->node.groupAction = GROUP_ACTION_NONE;
|
||||||
pScan->node.resultDataOrder = DATA_ORDER_LEVEL_IN_BLOCK;
|
pScan->node.resultDataOrder = DATA_ORDER_LEVEL_IN_BLOCK;
|
||||||
|
if (pCxt->pPlanCxt->streamQuery) {
|
||||||
|
pScan->triggerType = pCxt->pPlanCxt->triggerType;
|
||||||
|
pScan->watermark = pCxt->pPlanCxt->watermark;
|
||||||
|
pScan->deleteMark = pCxt->pPlanCxt->deleteMark;
|
||||||
|
pScan->igExpired = pCxt->pPlanCxt->igExpired;
|
||||||
|
pScan->igCheckUpdate = pCxt->pPlanCxt->igCheckUpdate;
|
||||||
|
}
|
||||||
|
|
||||||
// set columns to scan
|
// set columns to scan
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
|
|
@ -328,11 +328,6 @@ static void scanPathOptSetScanWin(SScanLogicNode* pScan) {
|
||||||
pScan->sliding = ((SWindowLogicNode*)pParent)->sliding;
|
pScan->sliding = ((SWindowLogicNode*)pParent)->sliding;
|
||||||
pScan->intervalUnit = ((SWindowLogicNode*)pParent)->intervalUnit;
|
pScan->intervalUnit = ((SWindowLogicNode*)pParent)->intervalUnit;
|
||||||
pScan->slidingUnit = ((SWindowLogicNode*)pParent)->slidingUnit;
|
pScan->slidingUnit = ((SWindowLogicNode*)pParent)->slidingUnit;
|
||||||
pScan->triggerType = ((SWindowLogicNode*)pParent)->triggerType;
|
|
||||||
pScan->watermark = ((SWindowLogicNode*)pParent)->watermark;
|
|
||||||
pScan->deleteMark = ((SWindowLogicNode*)pParent)->deleteMark;
|
|
||||||
pScan->igExpired = ((SWindowLogicNode*)pParent)->igExpired;
|
|
||||||
pScan->igCheckUpdate = ((SWindowLogicNode*)pParent)->igCheckUpdate;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,6 +52,7 @@ sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
sql insert into t1 values(1648791223001,1,2,3,1.1);
|
sql insert into t1 values(1648791223001,1,2,3,1.1);
|
||||||
sql insert into t1 values(1648791233002,2,2,3,2.1);
|
sql insert into t1 values(1648791233002,2,2,3,2.1);
|
||||||
sql insert into t1 values(1648791243003,2,2,3,3.1);
|
sql insert into t1 values(1648791243003,2,2,3,3.1);
|
||||||
|
sleep 300
|
||||||
sql insert into t1 values(1648791200000,4,2,3,4.1);
|
sql insert into t1 values(1648791200000,4,2,3,4.1);
|
||||||
|
|
||||||
$loop_count = 0
|
$loop_count = 0
|
||||||
|
@ -115,6 +116,7 @@ sql create stream stream_t1 trigger at_once IGNORE EXPIRED 1 into streamtST1 as
|
||||||
sql create stream stream_t2 trigger at_once IGNORE EXPIRED 1 into streamtST2 as select _wstart, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st session(ts, 10s) ;
|
sql create stream stream_t2 trigger at_once IGNORE EXPIRED 1 into streamtST2 as select _wstart, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st session(ts, 10s) ;
|
||||||
sql insert into ts1 values(1648791211000,1,2,3);
|
sql insert into ts1 values(1648791211000,1,2,3);
|
||||||
sql insert into ts1 values(1648791222001,2,2,3);
|
sql insert into ts1 values(1648791222001,2,2,3);
|
||||||
|
sleep 300
|
||||||
sql insert into ts2 values(1648791211000,1,2,3);
|
sql insert into ts2 values(1648791211000,1,2,3);
|
||||||
sql insert into ts2 values(1648791222001,2,2,3);
|
sql insert into ts2 values(1648791222001,2,2,3);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue