fix issue
This commit is contained in:
parent
f3b42fd9d1
commit
a0e7791e49
|
@ -3396,6 +3396,7 @@ static bool isStreamWindow(SStreamScanInfo* pInfo) {
|
||||||
static int32_t copyGetResultBlock(SSDataBlock* dest, const SSDataBlock* src) {
|
static int32_t copyGetResultBlock(SSDataBlock* dest, const SSDataBlock* src) {
|
||||||
TSKEY start = src->info.window.skey;
|
TSKEY start = src->info.window.skey;
|
||||||
TSKEY end = src->info.window.ekey;
|
TSKEY end = src->info.window.ekey;
|
||||||
|
blockDataEnsureCapacity(dest, 1);
|
||||||
return appendDataToSpecialBlock(dest, &start, &end, NULL, NULL, NULL);
|
return appendDataToSpecialBlock(dest, &start, &end, NULL, NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1687,18 +1687,24 @@ static int32_t setAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SSHashObj
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
int64_t groupId = 0;
|
int64_t groupId = 0;
|
||||||
SStreamStateCur* pCur = pAggSup->stateStore.streamStateGroupGetCur(pAggSup->pState);
|
SStreamStateCur* pCur = pAggSup->stateStore.streamStateGroupGetCur(pAggSup->pState);
|
||||||
int32_t winCode = pAggSup->stateStore.streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL);
|
while (1) {
|
||||||
if (winCode != TSDB_CODE_SUCCESS) {
|
int32_t winCode = pAggSup->stateStore.streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL);
|
||||||
goto _end;
|
if (winCode != TSDB_CODE_SUCCESS) {
|
||||||
}
|
break;
|
||||||
SWinKey key = {.ts = ts, .groupId = groupId};
|
}
|
||||||
code = saveTimeSliceWinResult(&key, pUpdatedMap);
|
SWinKey key = {.ts = ts, .groupId = groupId};
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
code = saveTimeSliceWinResult(&key, pUpdatedMap);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
pAggSup->stateStore.streamStateGroupCurNext(pCur);
|
pAggSup->stateStore.streamStateGroupCurNext(pCur);
|
||||||
|
}
|
||||||
|
pAggSup->stateStore.streamStateFreeCur(pCur);
|
||||||
|
pCur = NULL;
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
pAggSup->stateStore.streamStateFreeCur(pCur);
|
||||||
|
pCur = NULL;
|
||||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -569,7 +569,7 @@ void streamStateGroupCurNext(SStreamStateCur* pCur) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
|
int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
|
||||||
if (pVal != NULL) {
|
if (pVal == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return streamFileStateGroupGetKVByCur(pCur, pKey, pVal, pVLen);
|
return streamFileStateGroupGetKVByCur(pCur, pKey, pVal, pVLen);
|
||||||
|
|
Loading…
Reference in New Issue