From b6aa299a3ce652bdf0428418846e01d94290a22c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 5 Dec 2024 08:57:12 +0800 Subject: [PATCH] opt stream build twa result --- .../src/streamintervalsliceoperator.c | 70 ++++++++++++------- source/libs/stream/src/tstreamFileState.c | 4 ++ .../tsim/stream/streamTwaFwcInterval.sim | 58 +++++++++++++++ 3 files changed, 108 insertions(+), 24 deletions(-) diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c index e7a9c58710..86dc7649c4 100644 --- a/source/libs/executor/src/streamintervalsliceoperator.c +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -120,6 +120,36 @@ void initIntervalSlicePoint(SStreamAggSupporter* pAggSup, STimeWindow* pTWin, in pPoint->pLastRow = POINTER_SHIFT(pPoint->pFinished, sizeof(bool)); } +int32_t getIntervalSlicePrevStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, SWinKey* pCurKey, + SInervalSlicePoint* pPrevPoint) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SWinKey prevKey = {.groupId = pCurKey->groupId}; + SET_WIN_KEY_INVALID(prevKey.ts); + int32_t prevVLen = 0; + int32_t prevWinCode = TSDB_CODE_SUCCESS; + code = pAggSup->stateStore.streamStateGetPrev(pAggSup->pState, pCurKey, &prevKey, (void**)&pPrevPoint->pResPos, + &prevVLen, &prevWinCode); + QUERY_CHECK_CODE(code, lino, _end); + + if (prevWinCode == TSDB_CODE_SUCCESS) { + STimeWindow prevSTW = {.skey = prevKey.ts}; + prevSTW.ekey = taosTimeGetIntervalEnd(prevSTW.skey, pInterval); + initIntervalSlicePoint(pAggSup, &prevSTW, pCurKey->groupId, pPrevPoint); + qDebug("===stream=== set stream twa prev point buf.ts:%" PRId64 ", groupId:%" PRIu64 ", res:%d", + pPrevPoint->winKey.win.skey, pPrevPoint->winKey.groupId, prevWinCode); + } else { + SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.skey); + SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.ekey); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, bool needPrev, STimeWindow* pTWin, int64_t groupId, SInervalSlicePoint* pCurPoint, SInervalSlicePoint* pPrevPoint, int32_t* pWinCode) { int32_t code = TSDB_CODE_SUCCESS; @@ -136,24 +166,8 @@ static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterv initIntervalSlicePoint(pAggSup, pTWin, groupId, pCurPoint); if (needPrev) { - SWinKey prevKey = {.groupId = groupId}; - SET_WIN_KEY_INVALID(prevKey.ts); - int32_t prevVLen = 0; - int32_t prevWinCode = TSDB_CODE_SUCCESS; - code = pAggSup->stateStore.streamStateGetPrev(pAggSup->pState, &curKey, &prevKey, (void**)&pPrevPoint->pResPos, - &prevVLen, &prevWinCode); + code = getIntervalSlicePrevStateBuf(pAggSup, pInterval, &curKey, pPrevPoint); QUERY_CHECK_CODE(code, lino, _end); - - if (prevWinCode == TSDB_CODE_SUCCESS) { - STimeWindow prevSTW = {.skey = prevKey.ts}; - prevSTW.ekey = taosTimeGetIntervalEnd(prevSTW.skey, pInterval); - initIntervalSlicePoint(pAggSup, &prevSTW, groupId, pPrevPoint); - qDebug("===stream=== set stream twa prev point buf.ts:%" PRId64 ", groupId:%" PRIu64 ", res:%d", pPrevPoint->winKey.win.skey, - pPrevPoint->winKey.groupId, prevWinCode); - } else { - SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.skey); - SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.ekey); - } } _end: @@ -265,13 +279,15 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc STimeWindow curWin = getActiveTimeWindow(NULL, pResultRowInfo, curTs, &pInfo->interval, TSDB_ORDER_ASC); while (1) { - if (curTs > pInfo->endTs) { - break; - } - int32_t winCode = TSDB_CODE_SUCCESS; - code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId, &curPoint, &prevPoint, &winCode); - QUERY_CHECK_CODE(code, lino, _end); + if (curTs <= pInfo->endTs) { + code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId, &curPoint, &prevPoint, &winCode); + QUERY_CHECK_CODE(code, lino, _end); + } else if (pInfo->hasInterpoFunc) { + SWinKey curKey = {.ts = curWin.skey, .groupId = groupId}; + code = getIntervalSlicePrevStateBuf(&pInfo->streamAggSup, &pInfo->interval, &curKey, &prevPoint); + QUERY_CHECK_CODE(code, lino, _end); + } if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && isInterpoWindowFinished(&prevPoint) == false) { code = setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); @@ -288,6 +304,12 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc code = saveWinResult(&prevKey, prevPoint.pResPos, pInfo->pUpdatedMap); QUERY_CHECK_CODE(code, lino, _end); setInterpoWindowFinished(&prevPoint); + } else if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey)) { + releaseOutputBuf(pInfo->streamAggSup.pState, prevPoint.pResPos, &pInfo->streamAggSup.stateStore); + } + + if (curTs > pInfo->endTs) { + break; } code = setIntervalSliceOutputBuf(&curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); @@ -300,7 +322,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); int32_t prevEndPos = (forwardRows - 1) + startPos; - if (pInfo->hasInterpoFunc && winCode != TSDB_CODE_SUCCESS) { + if (pInfo->hasInterpoFunc) { int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false); TSKEY endRowTs = tsCols[endRowId]; transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL, pInfo->pOffsetInfo); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 592523e70b..05edad0f5f 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -947,6 +947,7 @@ int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) { } SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen); + pPos->beUsed = false; winRes = putSessionWinResultBuff(pFileState, pPos); if (winRes != TSDB_CODE_SUCCESS) { break; @@ -1008,6 +1009,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { memcpy(pNewPos->pRowBuff, pVal, vlen); taosMemoryFreeClear(pVal); pNewPos->beFlushed = true; + pNewPos->beUsed = false; qDebug("===stream=== read checkpoint state from disc. %s", __func__); code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); if (code != TSDB_CODE_SUCCESS) { @@ -1090,6 +1092,7 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) { if (vlen != pFileState->rowSize) { qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen); + destroyRowBuffPos(pNewPos); code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; taosMemoryFreeClear(pVal); QUERY_CHECK_CODE(code, lino, _end); @@ -1098,6 +1101,7 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) { memcpy(pNewPos->pRowBuff, pVal, vlen); taosMemoryFreeClear(pVal); pNewPos->beFlushed = true; + pNewPos->beUsed = false; qDebug("===stream=== read checkpoint state from disc. %s", __func__); winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); if (winRes != TSDB_CODE_SUCCESS) { diff --git a/tests/script/tsim/stream/streamTwaFwcInterval.sim b/tests/script/tsim/stream/streamTwaFwcInterval.sim index 8640650310..5151d7caff 100644 --- a/tests/script/tsim/stream/streamTwaFwcInterval.sim +++ b/tests/script/tsim/stream/streamTwaFwcInterval.sim @@ -289,6 +289,64 @@ if $data51 != $query1_data51 then goto loop3 endi +print ======step3 +sql create database test3 vgroups 1; +sql use test3; + +sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams3 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt3 as select _wstart, twa(a), ta from st partition by tbname,ta interval(10s); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now + 3000a,1,1,1); +sql flush database test; +sql insert into t1 values(now + 3001a,10,10,10); +sql insert into t1 values(now + 13s,50,50,50); + +sleep 1000 + +print sql select _wstart, twa(a), ta from st partition by tbname,ta interval(10s) order by 1; +sql select _wstart, twa(a), ta from st partition by tbname,ta interval(10s) order by 1; + +$query_data01 = $data01 + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +$loop_count = 0 +loop4: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt3 order by 1; +sql select * from streamt3 order by 1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $data01 != $query_data01 then + print ======data01======$data01 + print ====query_data01=$query_data01 + goto loop4 +endi + print end system sh/exec.sh -n dnode1 -s stop -x SIGINT