fix issue

This commit is contained in:
54liuyao 2024-08-15 13:20:42 +08:00
parent 345d852de7
commit 6ab21a0d17
5 changed files with 123 additions and 69 deletions

View File

@ -841,7 +841,6 @@ typedef struct SStreamTimeSliceOperatorInfo {
SGroupResInfo groupResInfo;
bool ignoreNull;
bool isHistoryOp;
bool isReloadState;
} SStreamTimeSliceOperatorInfo;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)

View File

@ -106,6 +106,7 @@ typedef struct SStreamFillInfo {
TSKEY end; // endKey for fill
TSKEY current; // current Key for fill
TSKEY preRowKey;
TSKEY prePointKey;
TSKEY nextRowKey;
TSKEY nextPointKey;
TSKEY nextNextRowKey;
@ -118,6 +119,7 @@ typedef struct SStreamFillInfo {
SArray* delRanges;
int32_t delIndex;
uint64_t curGroupId;
bool hasNext;
} SStreamFillInfo;
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);

View File

@ -1335,6 +1335,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->delIndex = 0;
pFillInfo->curGroupId = 0;
pFillInfo->hasNext = false;
return pFillInfo;
_end:

View File

@ -88,6 +88,7 @@ void streamTimeSliceReloadState(SOperatorInfo* pOperator) {
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamFillSupporter* pFillSup = pInfo->pFillSup;
resetWinRange(&pAggSup->winRange);
int32_t size = 0;
@ -111,11 +112,23 @@ void streamTimeSliceReloadState(SOperatorInfo* pOperator) {
QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
}
int32_t tmpRes = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < num; i++) {
SWinKey* pKey = pKeyBuf + i;
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pKey->ts, pKey->groupId,
i);
code = saveTimeSliceWinResult(pKey, pInfo->pUpdatedMap);
SWinKey resKey = {.groupId = pKey->groupId};
if (pFillSup->type != TSDB_FILL_PREV && pFillSup->type != TSDB_FILL_LINEAR) {
code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, pKey, &resKey, NULL, NULL, &tmpRes);
QUERY_CHECK_CODE(code, lino, _end);
if (tmpRes != TSDB_CODE_SUCCESS) {
continue;
}
} else {
resKey = *pKey;
}
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", resKey.ts,
resKey.groupId, i);
code = saveTimeSliceWinResult(&resKey, pInfo->pUpdatedMap);
QUERY_CHECK_CODE(code, lino, _end);
}
taosMemoryFree(pBuf);
@ -125,7 +138,6 @@ void streamTimeSliceReloadState(SOperatorInfo* pOperator) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
pInfo->isReloadState = true;
_end:
if (code != TSDB_CODE_SUCCESS) {
@ -494,6 +506,20 @@ static void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStream
pFillInfo->end = end;
}
static TSKEY adustPrevTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) {
if (rowTs >= pointTs) {
pointTs = taosTimeAdd(pointTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
}
return pointTs;
}
static TSKEY adustEndTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) {
if (rowTs <= pointTs) {
pointTs = taosTimeAdd(pointTs, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision);
}
return pointTs;
}
static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -513,6 +539,22 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
}
if (pFillInfo->type != TSDB_FILL_LINEAR) {
fillNormalRange(pFillSup, pFillInfo, pRes);
if (pFillInfo->pos == FILL_POS_MID) {
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false);
QUERY_CHECK_CODE(code, lino, _end);
if (res) {
pFillInfo->pos = FILL_POS_INVALID;
}
}
if (pFillInfo->current > pFillInfo->end && pFillInfo->hasNext) {
pFillInfo->hasNext = false;
TSKEY startTs = adustPrevTsKey(pFillInfo->current, pFillSup->cur.key, &pFillSup->interval);
setFillKeyInfo(startTs, pFillSup->next.key, &pFillSup->interval, pFillInfo);
pFillInfo->pResRow = &pFillSup->cur;
fillNormalRange(pFillSup, pFillInfo, pRes);
}
} else {
fillLinearRange(pFillSup, pFillInfo, pRes);
@ -597,19 +639,6 @@ static void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup) {
}
}
static TSKEY adustPrevTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) {
if (rowTs >= pointTs) {
pointTs = taosTimeAdd(pointTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
}
return pointTs;
}
static TSKEY adustEndTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) {
if (rowTs <= pointTs) {
pointTs = taosTimeAdd(pointTs, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision);
}
return pointTs;
}
static int32_t getLinearResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts,
int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint,
SSlicePoint* pNextPoint) {
@ -755,15 +784,17 @@ static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillS
}
pFillSup->next.key = adustEndTsKey(pNextPoint->key.ts, pFillSup->next.key, &pFillSup->interval);
int32_t nextNextVLen = 0;
int32_t tmpWinCode = TSDB_CODE_SUCCESS;
SSlicePoint nextNextPoint = {.key.groupId = pNextPoint->key.groupId};
code =
pAggSup->stateStore.streamStateFillGetNext(pState, &pNextPoint->key, &nextNextPoint.key, NULL, NULL, &tmpWinCode);
if (tmpWinCode == TSDB_CODE_SUCCESS) {
pFillSup->nextNext.key = nextNextPoint.key.ts;
if (pFillSup->type == TSDB_FILL_PREV) {
int32_t nextNextVLen = 0;
int32_t tmpWinCode = TSDB_CODE_SUCCESS;
SSlicePoint nextNextPoint = {.key.groupId = pNextPoint->key.groupId};
code = pAggSup->stateStore.streamStateFillGetNext(pState, &pNextPoint->key, &nextNextPoint.key, NULL, NULL,
&tmpWinCode);
QUERY_CHECK_CODE(code, lino, _end);
if (tmpWinCode == TSDB_CODE_SUCCESS) {
pFillSup->nextNext.key = nextNextPoint.key.ts;
}
}
QUERY_CHECK_CODE(code, lino, _end);
}
_end:
@ -907,9 +938,8 @@ static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFi
}
}
static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts, bool isReloadState) {
qDebug("set stream interp fill rule, isReloadState:%d", isReloadState);
if (!hasNextWindow(pFillSup) && (!hasPrevWindow(pFillSup) || isReloadState) ) {
static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) {
if (!hasNextWindow(pFillSup) && !hasPrevWindow(pFillSup)) {
pFillInfo->needFill = false;
pFillInfo->pos = FILL_POS_START;
goto _end;
@ -933,7 +963,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
case TSDB_FILL_NULL_F:
case TSDB_FILL_SET_VALUE:
case TSDB_FILL_SET_VALUE_F: {
if (hasPrevWindow(pFillSup) && !isReloadState) {
if (hasPrevWindow(pFillSup)) {
setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END;
} else {
@ -944,10 +974,11 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
pFillInfo->pResRow->key = ts;
} break;
case TSDB_FILL_PREV: {
if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup) && pFillInfo->nextRowKey == pFillInfo->nextPointKey &&
(!hasNextNextWindow(pFillSup) || pFillInfo->nextNextRowKey == pFillInfo->nextNextPointKey) && !isReloadState ) {
if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup) && pFillInfo->preRowKey != pFillInfo->prePointKey &&
pFillInfo->nextRowKey == pFillInfo->nextPointKey) {
setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END;
pFillInfo->pos = FILL_POS_MID;
pFillInfo->hasNext = true;
} else if (hasNextWindow(pFillSup)) {
setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START;
@ -962,7 +993,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
pFillInfo->pResRow = &pFillSup->prev;
} break;
case TSDB_FILL_NEXT: {
if (hasPrevWindow(pFillSup) && !isReloadState) {
if (hasPrevWindow(pFillSup)) {
setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END;
resetFillWindow(&pFillSup->next);
@ -1233,8 +1264,20 @@ void getNextResKey(int64_t curGroupId, SArray* pKeyArray, int32_t curIndex, TSKE
*pNextKey = INT64_MIN;
}
void getPrevResKey(int64_t curGroupId, SArray* pKeyArray, int32_t curIndex, TSKEY* pNextKey) {
int32_t prevIndex = curIndex - 1;
if (prevIndex >= 0) {
SWinKey* pKey = (SWinKey*)taosArrayGet(pKeyArray, prevIndex);
if (pKey->groupId == curGroupId) {
*pNextKey = pKey->ts;
return;
}
}
*pNextKey = INT64_MIN;
}
void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup,
SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo, bool isReloadState) {
SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
blockDataCleanup(pBlock);
@ -1263,15 +1306,24 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor
}
QUERY_CHECK_CODE(code, lino, _end);
getNextResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index, &pFillInfo->nextRowKey);
if (hasNextWindow(pFillSup)) {
pFillInfo->nextPointKey = nextPoint.key.ts;
if (pFillSup->type == TSDB_FILL_PREV) {
getPrevResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index, &pFillInfo->preRowKey);
if (hasPrevWindow(pFillSup)) {
pFillInfo->prePointKey = prevPoint.key.ts;
}
getNextResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index, &pFillInfo->nextRowKey);
if (hasNextWindow(pFillSup)) {
pFillInfo->nextPointKey = nextPoint.key.ts;
}
getNextResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index + 1, &pFillInfo->nextNextRowKey);
if (hasNextNextWindow(pFillSup)) {
pFillInfo->nextNextPointKey = pFillSup->nextNext.key;
}
}
getNextResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index + 1, &pFillInfo->nextNextRowKey);
if (hasNextNextWindow(pFillSup)) {
pFillInfo->nextNextPointKey = pFillSup->nextNext.key;
}
setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts, isReloadState);
setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts);
doStreamFillRange(pFillSup, pFillInfo, pBlock);
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
releaseOutputBuf(pAggSup->pState, prevPoint.pResPos, &pAggSup->stateStore);
@ -1300,7 +1352,7 @@ static int32_t buildTimeSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRe
goto _end;
}
doBuildTimeSlicePointResult(pAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo, pInfo->isReloadState);
doBuildTimeSlicePointResult(pAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo);
if (pInfo->pRes->info.rows != 0) {
printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pRes;
@ -1452,7 +1504,6 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
goto _end;
}
pInfo->isReloadState = false;
setStreamOperatorCompleted(pOperator);
resetStreamFillSup(pInfo->pFillSup);
(*ppRes) = NULL;
@ -1544,7 +1595,6 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
QUERY_CHECK_CODE(code, lino, _end);
if (!(*ppRes)) {
pInfo->isReloadState = false;
setStreamOperatorCompleted(pOperator);
resetStreamFillSup(pInfo->pFillSup);
}
@ -1719,7 +1769,6 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
if (pHandle) {
pInfo->isHistoryOp = pHandle->fillHistory;
}
pInfo->isReloadState = false;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC;
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC,

View File

@ -183,32 +183,35 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
void* tmpVal = NULL;
int32_t len = 0;
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
if ((*pWinCode) == TSDB_CODE_SUCCESS && ppVal != NULL) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
if (ppVal != NULL) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pRowBuff, tmpVal, len);
*pVLen = getRowStateRowSize(pFileState);
(*ppVal) = pNewPos;
}
memcpy(pNewPos->pRowBuff, tmpVal, len);
*pVLen = getRowStateRowSize(pFileState);
(*ppVal) = pNewPos;
taosMemoryFreeClear(tmpVal);
streamStateFreeCur(pCur);
return code;
}
taosMemoryFreeClear(tmpVal);
streamStateFreeCur(pCur);
return code;
} else {
if (index == size - 1) {
(*pWinCode) = TSDB_CODE_FAILED;
return code;
}
SWinKey* pNext = taosArrayGet(pWinStates, index + 1);
*pResKey = *pNext;
if (ppVal == NULL) {
return code;
}
return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode);
}
(*pWinCode) = TSDB_CODE_FAILED;
if (index == size - 1) {
(*pWinCode) = TSDB_CODE_FAILED;
return code;
}
SWinKey* pNext = taosArrayGet(pWinStates, index + 1);
*pResKey = *pNext;
if (ppVal == NULL) {
(*pWinCode) = TSDB_CODE_SUCCESS;
return code;
}
return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode);
_end:
if (code != TSDB_CODE_SUCCESS) {