diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 46a348479a..54c8d092d3 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -841,7 +841,6 @@ typedef struct SStreamTimeSliceOperatorInfo { SGroupResInfo groupResInfo; bool ignoreNull; bool isHistoryOp; - bool isReloadState; } SStreamTimeSliceOperatorInfo; #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index a8ebc6bb17..d5284cd3fc 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -106,6 +106,7 @@ typedef struct SStreamFillInfo { TSKEY end; // endKey for fill TSKEY current; // current Key for fill TSKEY preRowKey; + TSKEY prePointKey; TSKEY nextRowKey; TSKEY nextPointKey; TSKEY nextNextRowKey; @@ -118,6 +119,7 @@ typedef struct SStreamFillInfo { SArray* delRanges; int32_t delIndex; uint64_t curGroupId; + bool hasNext; } SStreamFillInfo; int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 75d072a828..8c0bfd0282 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1335,6 +1335,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pFillInfo->delIndex = 0; pFillInfo->curGroupId = 0; + pFillInfo->hasNext = false; return pFillInfo; _end: diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index e1d91de54e..ee7ca4d64d 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -88,6 +88,7 @@ void streamTimeSliceReloadState(SOperatorInfo* pOperator) { SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStreamFillSupporter* pFillSup = pInfo->pFillSup; resetWinRange(&pAggSup->winRange); int32_t size = 0; @@ -111,11 +112,23 @@ void streamTimeSliceReloadState(SOperatorInfo* pOperator) { QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno); } + int32_t tmpRes = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < num; i++) { SWinKey* pKey = pKeyBuf + i; - qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pKey->ts, pKey->groupId, - i); - code = saveTimeSliceWinResult(pKey, pInfo->pUpdatedMap); + SWinKey resKey = {.groupId = pKey->groupId}; + if (pFillSup->type != TSDB_FILL_PREV && pFillSup->type != TSDB_FILL_LINEAR) { + code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, pKey, &resKey, NULL, NULL, &tmpRes); + QUERY_CHECK_CODE(code, lino, _end); + + if (tmpRes != TSDB_CODE_SUCCESS) { + continue; + } + } else { + resKey = *pKey; + } + qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", resKey.ts, + resKey.groupId, i); + code = saveTimeSliceWinResult(&resKey, pInfo->pUpdatedMap); QUERY_CHECK_CODE(code, lino, _end); } taosMemoryFree(pBuf); @@ -125,7 +138,6 @@ void streamTimeSliceReloadState(SOperatorInfo* pOperator) { downstream->fpSet.reloadStreamStateFn(downstream); } reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup); - pInfo->isReloadState = true; _end: if (code != TSDB_CODE_SUCCESS) { @@ -494,6 +506,20 @@ static void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStream pFillInfo->end = end; } +static TSKEY adustPrevTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) { + if (rowTs >= pointTs) { + pointTs = taosTimeAdd(pointTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision); + } + return pointTs; +} + +static TSKEY adustEndTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) { + if (rowTs <= pointTs) { + pointTs = taosTimeAdd(pointTs, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision); + } + return pointTs; +} + static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pRes) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -513,6 +539,22 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p } if (pFillInfo->type != TSDB_FILL_LINEAR) { fillNormalRange(pFillSup, pFillInfo, pRes); + + if (pFillInfo->pos == FILL_POS_MID) { + code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false); + QUERY_CHECK_CODE(code, lino, _end); + if (res) { + pFillInfo->pos = FILL_POS_INVALID; + } + } + if (pFillInfo->current > pFillInfo->end && pFillInfo->hasNext) { + pFillInfo->hasNext = false; + TSKEY startTs = adustPrevTsKey(pFillInfo->current, pFillSup->cur.key, &pFillSup->interval); + setFillKeyInfo(startTs, pFillSup->next.key, &pFillSup->interval, pFillInfo); + pFillInfo->pResRow = &pFillSup->cur; + fillNormalRange(pFillSup, pFillInfo, pRes); + } + } else { fillLinearRange(pFillSup, pFillInfo, pRes); @@ -597,19 +639,6 @@ static void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup) { } } -static TSKEY adustPrevTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) { - if (rowTs >= pointTs) { - pointTs = taosTimeAdd(pointTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision); - } - return pointTs; -} - -static TSKEY adustEndTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) { - if (rowTs <= pointTs) { - pointTs = taosTimeAdd(pointTs, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision); - } - return pointTs; -} static int32_t getLinearResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts, int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint, SSlicePoint* pNextPoint) { @@ -755,15 +784,17 @@ static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillS } pFillSup->next.key = adustEndTsKey(pNextPoint->key.ts, pFillSup->next.key, &pFillSup->interval); - int32_t nextNextVLen = 0; - int32_t tmpWinCode = TSDB_CODE_SUCCESS; - SSlicePoint nextNextPoint = {.key.groupId = pNextPoint->key.groupId}; - code = - pAggSup->stateStore.streamStateFillGetNext(pState, &pNextPoint->key, &nextNextPoint.key, NULL, NULL, &tmpWinCode); - if (tmpWinCode == TSDB_CODE_SUCCESS) { - pFillSup->nextNext.key = nextNextPoint.key.ts; + if (pFillSup->type == TSDB_FILL_PREV) { + int32_t nextNextVLen = 0; + int32_t tmpWinCode = TSDB_CODE_SUCCESS; + SSlicePoint nextNextPoint = {.key.groupId = pNextPoint->key.groupId}; + code = pAggSup->stateStore.streamStateFillGetNext(pState, &pNextPoint->key, &nextNextPoint.key, NULL, NULL, + &tmpWinCode); + QUERY_CHECK_CODE(code, lino, _end); + if (tmpWinCode == TSDB_CODE_SUCCESS) { + pFillSup->nextNext.key = nextNextPoint.key.ts; + } } - QUERY_CHECK_CODE(code, lino, _end); } _end: @@ -907,9 +938,8 @@ static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFi } } -static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts, bool isReloadState) { - qDebug("set stream interp fill rule, isReloadState:%d", isReloadState); - if (!hasNextWindow(pFillSup) && (!hasPrevWindow(pFillSup) || isReloadState) ) { +static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) { + if (!hasNextWindow(pFillSup) && !hasPrevWindow(pFillSup)) { pFillInfo->needFill = false; pFillInfo->pos = FILL_POS_START; goto _end; @@ -933,7 +963,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo case TSDB_FILL_NULL_F: case TSDB_FILL_SET_VALUE: case TSDB_FILL_SET_VALUE_F: { - if (hasPrevWindow(pFillSup) && !isReloadState) { + if (hasPrevWindow(pFillSup)) { setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; } else { @@ -944,10 +974,11 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo pFillInfo->pResRow->key = ts; } break; case TSDB_FILL_PREV: { - if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup) && pFillInfo->nextRowKey == pFillInfo->nextPointKey && - (!hasNextNextWindow(pFillSup) || pFillInfo->nextNextRowKey == pFillInfo->nextNextPointKey) && !isReloadState ) { + if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup) && pFillInfo->preRowKey != pFillInfo->prePointKey && + pFillInfo->nextRowKey == pFillInfo->nextPointKey) { setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); - pFillInfo->pos = FILL_POS_END; + pFillInfo->pos = FILL_POS_MID; + pFillInfo->hasNext = true; } else if (hasNextWindow(pFillSup)) { setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; @@ -962,7 +993,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo pFillInfo->pResRow = &pFillSup->prev; } break; case TSDB_FILL_NEXT: { - if (hasPrevWindow(pFillSup) && !isReloadState) { + if (hasPrevWindow(pFillSup)) { setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; resetFillWindow(&pFillSup->next); @@ -1233,8 +1264,20 @@ void getNextResKey(int64_t curGroupId, SArray* pKeyArray, int32_t curIndex, TSKE *pNextKey = INT64_MIN; } +void getPrevResKey(int64_t curGroupId, SArray* pKeyArray, int32_t curIndex, TSKEY* pNextKey) { + int32_t prevIndex = curIndex - 1; + if (prevIndex >= 0) { + SWinKey* pKey = (SWinKey*)taosArrayGet(pKeyArray, prevIndex); + if (pKey->groupId == curGroupId) { + *pNextKey = pKey->ts; + return; + } + } + *pNextKey = INT64_MIN; +} + void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, - SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo, bool isReloadState) { + SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; blockDataCleanup(pBlock); @@ -1263,15 +1306,24 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor } QUERY_CHECK_CODE(code, lino, _end); - getNextResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index, &pFillInfo->nextRowKey); - if (hasNextWindow(pFillSup)) { - pFillInfo->nextPointKey = nextPoint.key.ts; + if (pFillSup->type == TSDB_FILL_PREV) { + getPrevResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index, &pFillInfo->preRowKey); + if (hasPrevWindow(pFillSup)) { + pFillInfo->prePointKey = prevPoint.key.ts; + } + + getNextResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index, &pFillInfo->nextRowKey); + if (hasNextWindow(pFillSup)) { + pFillInfo->nextPointKey = nextPoint.key.ts; + } + + getNextResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index + 1, &pFillInfo->nextNextRowKey); + if (hasNextNextWindow(pFillSup)) { + pFillInfo->nextNextPointKey = pFillSup->nextNext.key; + } } - getNextResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index + 1, &pFillInfo->nextNextRowKey); - if (hasNextNextWindow(pFillSup)) { - pFillInfo->nextNextPointKey = pFillSup->nextNext.key; - } - setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts, isReloadState); + + setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts); doStreamFillRange(pFillSup, pFillInfo, pBlock); releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); releaseOutputBuf(pAggSup->pState, prevPoint.pResPos, &pAggSup->stateStore); @@ -1300,7 +1352,7 @@ static int32_t buildTimeSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRe goto _end; } - doBuildTimeSlicePointResult(pAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo, pInfo->isReloadState); + doBuildTimeSlicePointResult(pAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo); if (pInfo->pRes->info.rows != 0) { printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); (*ppRes) = pInfo->pRes; @@ -1452,7 +1504,6 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR goto _end; } - pInfo->isReloadState = false; setStreamOperatorCompleted(pOperator); resetStreamFillSup(pInfo->pFillSup); (*ppRes) = NULL; @@ -1544,7 +1595,6 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR QUERY_CHECK_CODE(code, lino, _end); if (!(*ppRes)) { - pInfo->isReloadState = false; setStreamOperatorCompleted(pOperator); resetStreamFillSup(pInfo->pFillSup); } @@ -1719,7 +1769,6 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* if (pHandle) { pInfo->isHistoryOp = pHandle->fillHistory; } - pInfo->isReloadState = false; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC; setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC, diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c index f5589f53ae..4ad60daec8 100644 --- a/source/libs/stream/src/streamSliceState.c +++ b/source/libs/stream/src/streamSliceState.c @@ -183,32 +183,35 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW void* tmpVal = NULL; int32_t len = 0; (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); - if ((*pWinCode) == TSDB_CODE_SUCCESS && ppVal != NULL) { - SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); - if (!pNewPos || !pNewPos->pRowBuff) { - code = TSDB_CODE_OUT_OF_MEMORY; - QUERY_CHECK_CODE(code, lino, _end); + if ((*pWinCode) == TSDB_CODE_SUCCESS) { + if (ppVal != NULL) { + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + QUERY_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pRowBuff, tmpVal, len); + *pVLen = getRowStateRowSize(pFileState); + (*ppVal) = pNewPos; } - memcpy(pNewPos->pRowBuff, tmpVal, len); - *pVLen = getRowStateRowSize(pFileState); - (*ppVal) = pNewPos; + taosMemoryFreeClear(tmpVal); + streamStateFreeCur(pCur); + return code; } - taosMemoryFreeClear(tmpVal); streamStateFreeCur(pCur); - return code; - } else { - if (index == size - 1) { - (*pWinCode) = TSDB_CODE_FAILED; - return code; - } - SWinKey* pNext = taosArrayGet(pWinStates, index + 1); - *pResKey = *pNext; - if (ppVal == NULL) { - return code; - } - return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode); } - (*pWinCode) = TSDB_CODE_FAILED; + + if (index == size - 1) { + (*pWinCode) = TSDB_CODE_FAILED; + return code; + } + SWinKey* pNext = taosArrayGet(pWinStates, index + 1); + *pResKey = *pNext; + if (ppVal == NULL) { + (*pWinCode) = TSDB_CODE_SUCCESS; + return code; + } + return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode); _end: if (code != TSDB_CODE_SUCCESS) {