diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 1c19894dff..cfebddf9a1 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -827,6 +827,8 @@ static int32_t getPointInfoFromStateRight(SStreamAggSupporter* pAggSup, SStreamF &curVLen, pWinCode); QUERY_CHECK_CODE(code, lino, _end); + qDebug("===stream=== set stream interp next point buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d", pNextPoint->key.ts, pNextPoint->key.groupId, pWinCode); + setPointBuff(pNextPoint, pFillSup); if (*pWinCode != TSDB_CODE_SUCCESS) { @@ -844,6 +846,9 @@ static int32_t getPointInfoFromStateRight(SStreamAggSupporter* pAggSup, SStreamF code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pNextPoint->key, &pCurPoint->key, (void**)&pCurPoint->pResPos, &nextVLen, &tmpRes); QUERY_CHECK_CODE(code, lino, _end); + + qDebug("===stream=== set stream interp cur point buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d", pCurPoint->key.ts, pCurPoint->key.groupId, tmpRes); + if (tmpRes == TSDB_CODE_SUCCESS) { setPointBuff(pCurPoint, pFillSup); } @@ -872,6 +877,8 @@ static int32_t getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSu &curVLen, pWinCode); QUERY_CHECK_CODE(code, lino, _end); + qDebug("===stream=== set stream interp buf.ts:%" PRId64 ", groupId:%" PRId64, pCurPoint->key.ts, pCurPoint->key.groupId); + setPointBuff(pCurPoint, pFillSup); if (*pWinCode != TSDB_CODE_SUCCESS) { @@ -942,9 +949,8 @@ static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFi } static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) { - TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval); - TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval); - + qDebug("===stream=== set force window close rule.ts:%" PRId64 ",cur key:%" PRId64 ", has prev%d, has next:%d", ts, + pFillSup->cur.key, hasPrevWindow(pFillSup), hasNextWindow(pFillSup)); pFillInfo->needFill = true; pFillInfo->pos = FILL_POS_INVALID; switch (pFillInfo->type) { @@ -962,9 +968,9 @@ static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamF } } break; case TSDB_FILL_PREV: { - if (ts == pFillSup->cur.key) { - pFillInfo->pos = FILL_POS_START; - pFillInfo->needFill = false; + if (ts >= pFillSup->cur.key) { + setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo); + pFillInfo->pResRow = &pFillSup->cur; } else if (hasPrevWindow(pFillSup)) { pFillInfo->pos = FILL_POS_INVALID; setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo); @@ -1430,6 +1436,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) { SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index); + qDebug("===stream=== build interp res. key:%" PRId64 ",groupId:%" PRId64, pKey->ts, pKey->groupId); if (pBlock->info.id.groupId == 0) { pBlock->info.id.groupId = pKey->groupId; } else if (pBlock->info.id.groupId != pKey->groupId) { diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c index 6f4eea06ee..e535c2e7a3 100644 --- a/source/libs/stream/src/streamSliceState.c +++ b/source/libs/stream/src/streamSliceState.c @@ -276,14 +276,16 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW streamStateFreeCur(pCur); return code; } else { - SWinKey* pNext = taosArrayGet(pWinStates, index - 1); - if (qDebugFlag & DEBUG_DEBUG) { - SWinKey* pTmp = taosArrayGet(pWinStates, index); - if (winKeyCmprImpl(pTmp, pKey) != 0) { - qError("%s failed at line %d since do not find cur SWinKey", __func__, lino); - } + SWinKey* pPrevKey = NULL; + SWinKey* pCurKey = taosArrayGet(pWinStates, index); + if (winKeyCmprImpl(pCurKey, pKey) == 0) { + pPrevKey = taosArrayGet(pWinStates, index - 1); + } else { + pPrevKey = taosArrayGet(pWinStates, index); + qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__, __LINE__); } - *pResKey = *pNext; + + *pResKey = *pPrevKey; return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode); } (*pWinCode) = TSDB_CODE_FAILED;