diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8c72742a2d..5f6144634f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3396,6 +3396,7 @@ static bool isStreamWindow(SStreamScanInfo* pInfo) { static int32_t copyGetResultBlock(SSDataBlock* dest, const SSDataBlock* src) { TSKEY start = src->info.window.skey; TSKEY end = src->info.window.ekey; + blockDataEnsureCapacity(dest, 1); return appendDataToSpecialBlock(dest, &start, &end, NULL, NULL, NULL); } diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 20687964d4..d6cc178a7f 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -1687,18 +1687,24 @@ static int32_t setAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SSHashObj int32_t lino = 0; int64_t groupId = 0; SStreamStateCur* pCur = pAggSup->stateStore.streamStateGroupGetCur(pAggSup->pState); - int32_t winCode = pAggSup->stateStore.streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL); - if (winCode != TSDB_CODE_SUCCESS) { - goto _end; - } - SWinKey key = {.ts = ts, .groupId = groupId}; - code = saveTimeSliceWinResult(&key, pUpdatedMap); - QUERY_CHECK_CODE(code, lino, _end); + while (1) { + int32_t winCode = pAggSup->stateStore.streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL); + if (winCode != TSDB_CODE_SUCCESS) { + break; + } + SWinKey key = {.ts = ts, .groupId = groupId}; + code = saveTimeSliceWinResult(&key, pUpdatedMap); + QUERY_CHECK_CODE(code, lino, _end); - pAggSup->stateStore.streamStateGroupCurNext(pCur); + pAggSup->stateStore.streamStateGroupCurNext(pCur); + } + pAggSup->stateStore.streamStateFreeCur(pCur); + pCur = NULL; _end: if (code != TSDB_CODE_SUCCESS) { + pAggSup->stateStore.streamStateFreeCur(pCur); + pCur = NULL; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index ae19c8b42f..fd625958c4 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -569,7 +569,7 @@ void streamStateGroupCurNext(SStreamStateCur* pCur) { } int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) { - if (pVal != NULL) { + if (pVal == NULL) { return -1; } return streamFileStateGroupGetKVByCur(pCur, pKey, pVal, pVLen);