opt stream build twa result
This commit is contained in:
parent
03c3efbf92
commit
b6aa299a3c
|
@ -120,6 +120,36 @@ void initIntervalSlicePoint(SStreamAggSupporter* pAggSup, STimeWindow* pTWin, in
|
|||
pPoint->pLastRow = POINTER_SHIFT(pPoint->pFinished, sizeof(bool));
|
||||
}
|
||||
|
||||
int32_t getIntervalSlicePrevStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, SWinKey* pCurKey,
|
||||
SInervalSlicePoint* pPrevPoint) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SWinKey prevKey = {.groupId = pCurKey->groupId};
|
||||
SET_WIN_KEY_INVALID(prevKey.ts);
|
||||
int32_t prevVLen = 0;
|
||||
int32_t prevWinCode = TSDB_CODE_SUCCESS;
|
||||
code = pAggSup->stateStore.streamStateGetPrev(pAggSup->pState, pCurKey, &prevKey, (void**)&pPrevPoint->pResPos,
|
||||
&prevVLen, &prevWinCode);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (prevWinCode == TSDB_CODE_SUCCESS) {
|
||||
STimeWindow prevSTW = {.skey = prevKey.ts};
|
||||
prevSTW.ekey = taosTimeGetIntervalEnd(prevSTW.skey, pInterval);
|
||||
initIntervalSlicePoint(pAggSup, &prevSTW, pCurKey->groupId, pPrevPoint);
|
||||
qDebug("===stream=== set stream twa prev point buf.ts:%" PRId64 ", groupId:%" PRIu64 ", res:%d",
|
||||
pPrevPoint->winKey.win.skey, pPrevPoint->winKey.groupId, prevWinCode);
|
||||
} else {
|
||||
SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.skey);
|
||||
SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.ekey);
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, bool needPrev, STimeWindow* pTWin, int64_t groupId,
|
||||
SInervalSlicePoint* pCurPoint, SInervalSlicePoint* pPrevPoint, int32_t* pWinCode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -136,24 +166,8 @@ static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterv
|
|||
initIntervalSlicePoint(pAggSup, pTWin, groupId, pCurPoint);
|
||||
|
||||
if (needPrev) {
|
||||
SWinKey prevKey = {.groupId = groupId};
|
||||
SET_WIN_KEY_INVALID(prevKey.ts);
|
||||
int32_t prevVLen = 0;
|
||||
int32_t prevWinCode = TSDB_CODE_SUCCESS;
|
||||
code = pAggSup->stateStore.streamStateGetPrev(pAggSup->pState, &curKey, &prevKey, (void**)&pPrevPoint->pResPos,
|
||||
&prevVLen, &prevWinCode);
|
||||
code = getIntervalSlicePrevStateBuf(pAggSup, pInterval, &curKey, pPrevPoint);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (prevWinCode == TSDB_CODE_SUCCESS) {
|
||||
STimeWindow prevSTW = {.skey = prevKey.ts};
|
||||
prevSTW.ekey = taosTimeGetIntervalEnd(prevSTW.skey, pInterval);
|
||||
initIntervalSlicePoint(pAggSup, &prevSTW, groupId, pPrevPoint);
|
||||
qDebug("===stream=== set stream twa prev point buf.ts:%" PRId64 ", groupId:%" PRIu64 ", res:%d", pPrevPoint->winKey.win.skey,
|
||||
pPrevPoint->winKey.groupId, prevWinCode);
|
||||
} else {
|
||||
SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.skey);
|
||||
SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.ekey);
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
|
@ -265,13 +279,15 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
|
|||
STimeWindow curWin =
|
||||
getActiveTimeWindow(NULL, pResultRowInfo, curTs, &pInfo->interval, TSDB_ORDER_ASC);
|
||||
while (1) {
|
||||
if (curTs > pInfo->endTs) {
|
||||
break;
|
||||
}
|
||||
|
||||
int32_t winCode = TSDB_CODE_SUCCESS;
|
||||
code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId, &curPoint, &prevPoint, &winCode);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (curTs <= pInfo->endTs) {
|
||||
code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId, &curPoint, &prevPoint, &winCode);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
} else if (pInfo->hasInterpoFunc) {
|
||||
SWinKey curKey = {.ts = curWin.skey, .groupId = groupId};
|
||||
code = getIntervalSlicePrevStateBuf(&pInfo->streamAggSup, &pInfo->interval, &curKey, &prevPoint);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && isInterpoWindowFinished(&prevPoint) == false) {
|
||||
code = setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
||||
|
@ -288,6 +304,12 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
|
|||
code = saveWinResult(&prevKey, prevPoint.pResPos, pInfo->pUpdatedMap);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
setInterpoWindowFinished(&prevPoint);
|
||||
} else if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey)) {
|
||||
releaseOutputBuf(pInfo->streamAggSup.pState, prevPoint.pResPos, &pInfo->streamAggSup.stateStore);
|
||||
}
|
||||
|
||||
if (curTs > pInfo->endTs) {
|
||||
break;
|
||||
}
|
||||
|
||||
code = setIntervalSliceOutputBuf(&curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
||||
|
@ -300,7 +322,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
|
|||
forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
|
||||
TSDB_ORDER_ASC);
|
||||
int32_t prevEndPos = (forwardRows - 1) + startPos;
|
||||
if (pInfo->hasInterpoFunc && winCode != TSDB_CODE_SUCCESS) {
|
||||
if (pInfo->hasInterpoFunc) {
|
||||
int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false);
|
||||
TSKEY endRowTs = tsCols[endRowId];
|
||||
transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL, pInfo->pOffsetInfo);
|
||||
|
|
|
@ -947,6 +947,7 @@ int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) {
|
|||
}
|
||||
|
||||
SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen);
|
||||
pPos->beUsed = false;
|
||||
winRes = putSessionWinResultBuff(pFileState, pPos);
|
||||
if (winRes != TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
|
@ -1008,6 +1009,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
|||
memcpy(pNewPos->pRowBuff, pVal, vlen);
|
||||
taosMemoryFreeClear(pVal);
|
||||
pNewPos->beFlushed = true;
|
||||
pNewPos->beUsed = false;
|
||||
qDebug("===stream=== read checkpoint state from disc. %s", __func__);
|
||||
code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1090,6 +1092,7 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
|||
|
||||
if (vlen != pFileState->rowSize) {
|
||||
qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
|
||||
destroyRowBuffPos(pNewPos);
|
||||
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
taosMemoryFreeClear(pVal);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -1098,6 +1101,7 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
|||
memcpy(pNewPos->pRowBuff, pVal, vlen);
|
||||
taosMemoryFreeClear(pVal);
|
||||
pNewPos->beFlushed = true;
|
||||
pNewPos->beUsed = false;
|
||||
qDebug("===stream=== read checkpoint state from disc. %s", __func__);
|
||||
winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
|
||||
if (winRes != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -289,6 +289,64 @@ if $data51 != $query1_data51 then
|
|||
goto loop3
|
||||
endi
|
||||
|
||||
print ======step3
|
||||
sql create database test3 vgroups 1;
|
||||
sql use test3;
|
||||
|
||||
sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream streams3 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt3 as select _wstart, twa(a), ta from st partition by tbname,ta interval(10s);
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into t1 values(now + 3000a,1,1,1);
|
||||
sql flush database test;
|
||||
sql insert into t1 values(now + 3001a,10,10,10);
|
||||
sql insert into t1 values(now + 13s,50,50,50);
|
||||
|
||||
sleep 1000
|
||||
|
||||
print sql select _wstart, twa(a), ta from st partition by tbname,ta interval(10s) order by 1;
|
||||
sql select _wstart, twa(a), ta from st partition by tbname,ta interval(10s) order by 1;
|
||||
|
||||
$query_data01 = $data01
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
loop4:
|
||||
|
||||
sleep 2000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 sql select * from streamt3 order by 1;
|
||||
sql select * from streamt3 order by 1;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
if $data01 != $query_data01 then
|
||||
print ======data01======$data01
|
||||
print ====query_data01=$query_data01
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
print end
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue