This commit is contained in:
54liuyao 2024-10-17 16:43:47 +08:00
parent 4a34ae2570
commit 24b963d621
2 changed files with 147 additions and 4 deletions

View File

@ -26,6 +26,7 @@
typedef struct SInervalSlicePoint {
SSessionKey winKey;
bool *pFinished;
SSliceRowData* pLastRow;
SRowBuffPos* pResPos;
} SInervalSlicePoint;
@ -113,7 +114,8 @@ _end:
void initIntervalSlicePoint(SStreamAggSupporter* pAggSup, STimeWindow* pTWin, int64_t groupId, SInervalSlicePoint* pPoint) {
pPoint->winKey.groupId = groupId;
pPoint->winKey.win = *pTWin;
pPoint->pLastRow = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pAggSup->resultRowSize - pAggSup->stateKeySize);
pPoint->pFinished = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pAggSup->resultRowSize - pAggSup->stateKeySize);
pPoint->pLastRow = POINTER_SHIFT(pPoint->pFinished, sizeof(bool));
}
static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, bool needPrev, STimeWindow* pTWin, int64_t groupId,
@ -217,6 +219,14 @@ _end:
return code;
}
static void setInterpoWindowFinished(SInervalSlicePoint* pPoint) {
(*pPoint->pFinished) = true;
}
static bool isInterpoWindowFinished(SInervalSlicePoint* pPoint) {
return *pPoint->pFinished;
}
static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock, SSHashObj* pUpdatedMap,
SSHashObj* pDeletedMap) {
int32_t code = TSDB_CODE_SUCCESS;
@ -249,7 +259,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId, &curPoint, &prevPoint, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && prevPoint.pLastRow->key != prevPoint.winKey.win.ekey) {
if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && isInterpoWindowFinished(&prevPoint) == false) {
code = setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
QUERY_CHECK_CODE(code, lino, _end);
@ -262,7 +272,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
SWinKey prevKey = {.ts = prevPoint.winKey.win.skey, .groupId = prevPoint.winKey.groupId};
code = saveWinResult(&prevKey, prevPoint.pResPos, pInfo->pUpdatedMap);
QUERY_CHECK_CODE(code, lino, _end);
prevPoint.pLastRow->key = prevPoint.winKey.win.ekey;
setInterpoWindowFinished(&prevPoint);
}
code = setIntervalSliceOutputBuf(&curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
@ -294,6 +304,10 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
forwardRows, pBlock->info.rows, numOfOutput);
QUERY_CHECK_CODE(code, lino, _end);
if (curPoint.pLastRow->key == curPoint.winKey.win.ekey) {
setInterpoWindowFinished(&curPoint);
}
startPos = getNextQualifiedWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
if (startPos < 0) {
break;
@ -569,7 +583,7 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN
QUERY_CHECK_CODE(code, lino, _error);
int32_t keyBytes = sizeof(TSKEY);
keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock);
keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock) + sizeof(bool);
if (pPkCol) {
keyBytes += pPkCol->bytes;
}

View File

@ -0,0 +1,129 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, twa(a), ta from st partition by tbname,ta interval(2s);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(now + 3000a,1,1,1) (now + 3100a,5,10,10) (now + 3200a,5,10,10) (now + 5100a,20,1,1) (now + 5200a,30,10,10) (now + 5300a,40,10,10);
sql insert into t2 values(now + 3000a,1,1,1) (now + 3100a,2,10,10) (now + 3200a,30,10,10) (now + 5100a,10,1,1) (now + 5200a,40,10,10) (now + 5300a,7,10,10);
print sql select _wstart, twa(a) from t1 interval(2s);
sql select _wstart, twa(a) from t1 interval(2s);
$query1_data01 = $data01
$query1_data11 = $data11
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
print sql select _wstart, twa(a) from t2 interval(2s);
sql select _wstart, twa(a) from t2 interval(2s);
$query2_data01 = $data01
$query2_data11 = $data11
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop0:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select * from streamt where ta == 1;
sql select * from streamt where ta == 1;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 2 then
print ======rows=$rows
goto loop0
endi
if $data01 != $query1_data01 then
print ======data01========$data01
print ======query1_data01=$query1_data01
return -1
endi
if $data11 != $query1_data11 then
print ======data11========$data11
print ======query1_data11=$query1_data11
return -1
endi
$loop_count = 0
loop1:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select * from streamt where ta == 2;
sql select * from streamt where ta == 2;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 2 then
print ======rows=$rows
goto loop1
endi
if $data01 != $query2_data01 then
print ======data01======$data01
print ====query2_data01=$query2_data01
return -1
endi
if $data11 != $query2_data11 then
print ======data11======$data11
print ====query2_data11=$query2_data11
return -1
endi
print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT