ci(stream): add ignore expired data
This commit is contained in:
parent
06514e13df
commit
c9f133a5e4
|
@ -428,7 +428,7 @@ typedef struct SIntervalAggOperatorInfo {
|
|||
STimeWindowAggSupp twAggSup;
|
||||
bool invertible;
|
||||
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
|
||||
bool ignoreCloseWindow;
|
||||
bool ignoreExpiredData;
|
||||
} SIntervalAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamFinalIntervalOperatorInfo {
|
||||
|
@ -450,7 +450,7 @@ typedef struct SStreamFinalIntervalOperatorInfo {
|
|||
SArray* pPullWins; // SPullWindowInfo
|
||||
int32_t pullIndex;
|
||||
SSDataBlock* pPullDataRes;
|
||||
bool ignoreCloseWindow;
|
||||
bool ignoreExpiredData;
|
||||
} SStreamFinalIntervalOperatorInfo;
|
||||
|
||||
typedef struct SAggOperatorInfo {
|
||||
|
@ -588,7 +588,7 @@ typedef struct SStreamSessionAggOperatorInfo {
|
|||
SArray* pChildren; // cache for children's result; final stream operator
|
||||
SPhysiNode* pPhyNode; // create new child
|
||||
bool isFinal;
|
||||
bool ignoreCloseWindow;
|
||||
bool ignoreExpiredData;
|
||||
} SStreamSessionAggOperatorInfo;
|
||||
|
||||
typedef struct STimeSliceOperatorInfo {
|
||||
|
@ -632,7 +632,7 @@ typedef struct SStreamStateAggOperatorInfo {
|
|||
void* pDelIterator;
|
||||
SArray* pScanWindow;
|
||||
SArray* pChildren; // cache for children's result;
|
||||
bool ignoreCloseWindow;
|
||||
bool ignoreExpiredData;
|
||||
} SStreamStateAggOperatorInfo;
|
||||
|
||||
typedef struct SSortedMergeOperatorInfo {
|
||||
|
|
|
@ -1031,10 +1031,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
|
||||
} else {
|
||||
if (isStateWindow(pInfo) && taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) {
|
||||
if (isStateWindow(pInfo)) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
|
||||
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
|
||||
prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||
if (!prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
}
|
||||
}
|
||||
if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
|
||||
SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||
|
|
|
@ -838,7 +838,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
|
||||
pInfo->interval.precision, &pInfo->win);
|
||||
int32_t ret = TSDB_CODE_SUCCESS;
|
||||
if (!pInfo->ignoreCloseWindow || !isCloseWindow(&win, &pInfo->twAggSup)) {
|
||||
if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) {
|
||||
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
|
||||
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
|
||||
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||
|
@ -871,7 +871,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
|
||||
}
|
||||
|
||||
if (!pInfo->ignoreCloseWindow || !isCloseWindow(&win, &pInfo->twAggSup)) {
|
||||
if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) {
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
|
||||
pBlock->info.rows, numOfOutput, pInfo->order);
|
||||
|
@ -886,7 +886,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
if (pInfo->ignoreCloseWindow && isCloseWindow(&nextWin, &pInfo->twAggSup)) {
|
||||
if (pInfo->ignoreExpiredData && isCloseWindow(&nextWin, &pInfo->twAggSup)) {
|
||||
ekey = ascScan ? nextWin.ekey : nextWin.skey;
|
||||
forwardRows =
|
||||
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
|
||||
|
@ -1535,7 +1535,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pInfo->interval = *pInterval;
|
||||
pInfo->execModel = pTaskInfo->execModel;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
pInfo->ignoreCloseWindow = false;
|
||||
pInfo->ignoreExpiredData = pPhyNode->window.igExpired;
|
||||
|
||||
if (pPhyNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
|
@ -2292,7 +2292,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
|
|||
pInfo->interval.precision, NULL);
|
||||
while (1) {
|
||||
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
|
||||
if (pInfo->ignoreCloseWindow && isClosed) {
|
||||
if (pInfo->ignoreExpiredData && isClosed) {
|
||||
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
|
@ -2710,7 +2710,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
|
||||
pInfo->pPullDataRes = createPullDataBlock();
|
||||
pInfo->ignoreCloseWindow = false;
|
||||
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
|
||||
|
||||
pOperator->operatorType = pPhyNode->type;
|
||||
pOperator->blocking = true;
|
||||
|
@ -2857,7 +2857,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
pInfo->pChildren = NULL;
|
||||
pInfo->isFinal = false;
|
||||
pInfo->pPhyNode = pPhyNode;
|
||||
pInfo->ignoreCloseWindow = false;
|
||||
pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
|
||||
|
||||
pOperator->name = "StreamSessionWindowAggOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||
|
@ -3133,7 +3133,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
|||
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
for (int32_t i = 0; i < pSDataBlock->info.rows;) {
|
||||
if (pInfo->ignoreCloseWindow && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
|
||||
if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
|
||||
i++;
|
||||
continue;
|
||||
}
|
||||
|
@ -3413,8 +3413,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
|
||||
getResWinForSession, pInfo->ignoreCloseWindow);
|
||||
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow);
|
||||
getResWinForSession, pInfo->ignoreExpiredData);
|
||||
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
|
||||
copyUpdateResult(pStUpdated, pUpdated);
|
||||
taosHashCleanup(pStUpdated);
|
||||
|
||||
|
@ -3822,7 +3822,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
|
||||
for (int32_t i = 0; i < pSDataBlock->info.rows; i += winRows) {
|
||||
if (pInfo->ignoreCloseWindow && isOverdue(tsCols[i], &pInfo->twAggSup)) {
|
||||
if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) {
|
||||
i++;
|
||||
continue;
|
||||
}
|
||||
|
@ -3866,12 +3866,14 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, "single state");
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
printDataBlock(pBInfo->pRes, "single state");
|
||||
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
||||
}
|
||||
|
||||
|
@ -3884,6 +3886,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
printDataBlock(pBlock, "single state recv");
|
||||
|
||||
if (pBlock->info.type == STREAM_CLEAR) {
|
||||
doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId,
|
||||
|
@ -3903,8 +3906,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
|
||||
getResWinForState, pInfo->ignoreCloseWindow);
|
||||
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow);
|
||||
getResWinForState, pInfo->ignoreExpiredData);
|
||||
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
|
||||
copyUpdateResult(pSeUpdated, pUpdated);
|
||||
taosHashCleanup(pSeUpdated);
|
||||
|
||||
|
@ -3914,9 +3917,11 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, "single state");
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||
printDataBlock(pBInfo->pRes, "single state");
|
||||
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
||||
}
|
||||
|
||||
|
@ -3978,7 +3983,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pInfo->pDelRes->info.type = STREAM_DELETE;
|
||||
blockDataEnsureCapacity(pInfo->pDelRes, 64);
|
||||
pInfo->pChildren = NULL;
|
||||
pInfo->ignoreCloseWindow = false;
|
||||
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
|
||||
|
||||
pOperator->name = "StreamStateAggOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
|
||||
|
|
|
@ -83,9 +83,9 @@ sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0)
|
|||
|
||||
$loop_count = 0
|
||||
loop1:
|
||||
sleep 300
|
||||
sql select * from streamtST1;
|
||||
|
||||
sleep 300
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
|
|
|
@ -10,6 +10,30 @@ sql create dnode $hostname2 port 7200
|
|||
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
|
||||
print ===== step1
|
||||
$x = 0
|
||||
step1:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data(1)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
if $data(2)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
|
||||
print ===== step2
|
||||
|
||||
sql create database test vgroups 4;
|
||||
sql use test;
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
|
|
|
@ -0,0 +1,161 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
sql create dnode $hostname2 port 7200
|
||||
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
|
||||
print ===== step1
|
||||
$x = 0
|
||||
step1:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data(1)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
if $data(2)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
|
||||
print ===== step2
|
||||
|
||||
print =============== create database
|
||||
sql create database test vgroups 1
|
||||
sql show databases
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print $data00 $data01 $data02
|
||||
|
||||
sql use test
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
sql create stream streams1 trigger at_once IGNORE EXPIRED into streamt1 as select _wstartts, count(*) c1, sum(a) c3 from t1 interval(10s);
|
||||
sql create stream streams2 trigger at_once IGNORE EXPIRED into streamt2 as select _wstartts, count(*) c1, sum(a) c3 from t1 session(ts,10s);
|
||||
sql create stream streams3 trigger at_once IGNORE EXPIRED into streamt3 as select _wstartts, count(*) c1, sum(a) c3 from t1 state_window(a);
|
||||
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791223001,1,2,3,1.1);
|
||||
sql insert into t1 values(1648791233002,2,2,3,2.1);
|
||||
sql insert into t1 values(1648791243003,2,2,3,3.1);
|
||||
sql insert into t1 values(1648791200000,4,2,3,4.1);
|
||||
|
||||
$loop_count = 0
|
||||
loop1:
|
||||
sleep 300
|
||||
sql select * from streamt1;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 4 then
|
||||
print =====rows=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop2:
|
||||
sleep 300
|
||||
sql select * from streamt2;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 4 then
|
||||
print =====rows=$rows
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop3:
|
||||
sleep 300
|
||||
sql select * from streamt3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
|
||||
print =============== create database
|
||||
sql create database test1 vgroups 4
|
||||
sql show databases
|
||||
|
||||
print ======database=$rows
|
||||
|
||||
sql use test1
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table ts1 using st tags(1,1,1);
|
||||
sql create table ts2 using st tags(2,2,2);
|
||||
sql create stream stream_t1 trigger at_once IGNORE EXPIRED into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
|
||||
sql create stream stream_t2 trigger at_once IGNORE EXPIRED into streamtST2 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st session(ts, 10s) ;
|
||||
sql insert into ts1 values(1648791211000,1,2,3);
|
||||
sql insert into ts1 values(1648791222001,2,2,3);
|
||||
sql insert into ts2 values(1648791211000,1,2,3);
|
||||
sql insert into ts2 values(1648791222001,2,2,3);
|
||||
|
||||
$loop_count = 0
|
||||
loop4:
|
||||
sleep 300
|
||||
sql select * from streamtST1;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data02 != 1 then
|
||||
print =====data02=$data02
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop5:
|
||||
sleep 300
|
||||
sql select * from streamtST2;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data02 != 1 then
|
||||
print =====data02=$data02
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
system sh/stop_dnodes.sh
|
Loading…
Reference in New Issue