From 4f8dfe7522dd125064056de3d7bfabeb0b5fce6c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 24 Jun 2024 15:52:27 +0800 Subject: [PATCH] fix(stream):fill dresultim mediately after delete data --- source/libs/executor/src/streamfilloperator.c | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 7d84bd1d66..e61f80f6ce 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -763,6 +763,27 @@ static void doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKE } } +static void getWindowInfoByKey(SStorageAPI* pAPI, void* pState, TSKEY ts, int64_t groupId, SResultRowData* pWinData) { + SWinKey key = {.ts = ts, .groupId = groupId}; + void* val = NULL; + int32_t len = 0; + int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&val, &len); + if (code != TSDB_CODE_SUCCESS) { + qDebug("get window info by key failed, Data may be deleted, try next window. ts:%" PRId64 ", groupId:%" PRId64, ts, + groupId); + SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key); + code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &key, (const void**)&val, &len); + pAPI->stateStore.streamStateFreeCur(pCur); + qDebug("get window info by key ts:%" PRId64 ", groupId:%" PRId64 ", res%d", ts, groupId, code); + } + + if (code == TSDB_CODE_SUCCESS) { + resetFillWindow(pWinData); + pWinData->key = key.ts; + pWinData->pRowVal = val; + } +} + static void doDeleteFillFinalize(SOperatorInfo* pOperator) { SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; @@ -775,6 +796,9 @@ static void doDeleteFillFinalize(SOperatorInfo* pOperator) { return; } getWindowFromDiscBuf(pOperator, range->skey, range->groupId, pInfo->pFillSup); + if (pInfo->pFillInfo->type == TSDB_FILL_NEXT && pInfo->pFillSup->next.key != range->ekey) { + getWindowInfoByKey(pAPI, pOperator->pTaskInfo->streamInfo.pState, range->ekey, range->groupId, &pInfo->pFillSup->next); + } setDeleteFillValueInfo(range->skey, range->ekey, pInfo->pFillSup, pInfo->pFillInfo); if (pInfo->pFillInfo->needFill) { doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);