fix(stream):fill dresultim mediately after delete data

This commit is contained in:
54liuyao 2024-06-24 15:52:27 +08:00
parent 2d7d144ca2
commit 4f8dfe7522
1 changed files with 24 additions and 0 deletions

View File

@ -763,6 +763,27 @@ static void doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKE
}
}
static void getWindowInfoByKey(SStorageAPI* pAPI, void* pState, TSKEY ts, int64_t groupId, SResultRowData* pWinData) {
SWinKey key = {.ts = ts, .groupId = groupId};
void* val = NULL;
int32_t len = 0;
int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&val, &len);
if (code != TSDB_CODE_SUCCESS) {
qDebug("get window info by key failed, Data may be deleted, try next window. ts:%" PRId64 ", groupId:%" PRId64, ts,
groupId);
SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key);
code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &key, (const void**)&val, &len);
pAPI->stateStore.streamStateFreeCur(pCur);
qDebug("get window info by key ts:%" PRId64 ", groupId:%" PRId64 ", res%d", ts, groupId, code);
}
if (code == TSDB_CODE_SUCCESS) {
resetFillWindow(pWinData);
pWinData->key = key.ts;
pWinData->pRowVal = val;
}
}
static void doDeleteFillFinalize(SOperatorInfo* pOperator) {
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
@ -775,6 +796,9 @@ static void doDeleteFillFinalize(SOperatorInfo* pOperator) {
return;
}
getWindowFromDiscBuf(pOperator, range->skey, range->groupId, pInfo->pFillSup);
if (pInfo->pFillInfo->type == TSDB_FILL_NEXT && pInfo->pFillSup->next.key != range->ekey) {
getWindowInfoByKey(pAPI, pOperator->pTaskInfo->streamInfo.pState, range->ekey, range->groupId, &pInfo->pFillSup->next);
}
setDeleteFillValueInfo(range->skey, range->ekey, pInfo->pFillSup, pInfo->pFillInfo);
if (pInfo->pFillInfo->needFill) {
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);