fix(stream):fill dresultimmediately after delete data

This commit is contained in:
54liuyao 2024-06-24 10:56:26 +08:00
parent 0471afb840
commit 2d7d144ca2
3 changed files with 39 additions and 26 deletions

View File

@ -172,10 +172,17 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
SWinKey key = {.ts = ts, .groupId = groupId};
void* curVal = NULL;
int32_t curVLen = 0;
bool hasCurKey = true;
int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen);
ASSERT(code == TSDB_CODE_SUCCESS);
pFillSup->cur.key = key.ts;
pFillSup->cur.pRowVal = curVal;
if (code == TSDB_CODE_SUCCESS) {
pFillSup->cur.key = key.ts;
pFillSup->cur.pRowVal = curVal;
} else {
qDebug("streamStateFillGet key failed, Data may be deleted. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId);
pFillSup->cur.key = ts;
pFillSup->cur.pRowVal = NULL;
hasCurKey = false;
}
SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, &key);
SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId};
@ -187,8 +194,10 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
pFillSup->prev.key = preKey.ts;
pFillSup->prev.pRowVal = preVal;
code = pAPI->stateStore.streamStateCurNext(pState, pCur);
ASSERT(code == TSDB_CODE_SUCCESS);
if (hasCurKey) {
code = pAPI->stateStore.streamStateCurNext(pState, pCur);
ASSERT(code == TSDB_CODE_SUCCESS);
}
code = pAPI->stateStore.streamStateCurNext(pState, pCur);
if (code != TSDB_CODE_SUCCESS) {
@ -741,8 +750,8 @@ static void doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKE
getWindowFromDiscBuf(pOperator, startTs, groupId, pInfo->pFillSup);
setDeleteFillValueInfo(startTs, endTs, pInfo->pFillSup, pInfo->pFillInfo);
SWinKey key = {.ts = startTs, .groupId = groupId};
pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
if (!pInfo->pFillInfo->needFill) {
pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
buildDeleteResult(pOperator, startTs, endTs, groupId, pInfo->pDelRes);
} else {
STimeRange tw = {
@ -751,11 +760,6 @@ static void doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKE
.groupId = groupId,
};
taosArrayPush(pInfo->pFillInfo->delRanges, &tw);
while (key.ts <= endTs) {
key.ts = taosTimeAdd(key.ts, pInfo->pFillSup->interval.sliding, pInfo->pFillSup->interval.slidingUnit,
pInfo->pFillSup->interval.precision);
tSimpleHashPut(pInfo->pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0);
}
}
}
@ -765,7 +769,6 @@ static void doDeleteFillFinalize(SOperatorInfo* pOperator) {
SStreamFillOperatorInfo* pInfo = pOperator->info;
SStreamFillInfo* pFillInfo = pInfo->pFillInfo;
int32_t size = taosArrayGetSize(pFillInfo->delRanges);
tSimpleHashClear(pInfo->pFillSup->pResMap);
for (; pFillInfo->delIndex < size; pFillInfo->delIndex++) {
STimeRange* range = taosArrayGet(pFillInfo->delRanges, pFillInfo->delIndex);
if (pInfo->pRes->info.id.groupId != 0 && pInfo->pRes->info.id.groupId != range->groupId) {
@ -777,8 +780,6 @@ static void doDeleteFillFinalize(SOperatorInfo* pOperator) {
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
pInfo->pRes->info.id.groupId = range->groupId;
}
SWinKey key = {.ts = range->skey, .groupId = range->groupId};
pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
}
}

View File

@ -2632,7 +2632,7 @@ int32_t doStreamSessionEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOp
}
// 4.dataVersion
tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
tlen += taosEncodeFixedI64(buf, pInfo->dataVersion);
// 5.checksum
if (isParent) {
@ -3086,15 +3086,17 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
OP_NOT_OPENED, pInfo, pTaskInfo);
// for stream
void* buff = NULL;
int32_t len = 0;
int32_t res =
pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len);
if (res == TSDB_CODE_SUCCESS) {
doStreamSessionDecodeOpState(buff, len, pOperator, true);
taosMemoryFree(buff);
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
// for stream
void* buff = NULL;
int32_t len = 0;
int32_t res =
pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len);
if (res == TSDB_CODE_SUCCESS) {
doStreamSessionDecodeOpState(buff, len, pOperator, true);
taosMemoryFree(buff);
}
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
@ -3279,6 +3281,16 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i, pInfo->primaryTsIndex);
taosArrayPush(pInfo->pChildren, &pChildOp);
}
void* buff = NULL;
int32_t len = 0;
int32_t res =
pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len);
if (res == TSDB_CODE_SUCCESS) {
doStreamSessionDecodeOpState(buff, len, pOperator, true);
taosMemoryFree(buff);
}
}
if (!IS_FINAL_SESSION_OP(pOperator) || numOfChild == 0) {
@ -3621,7 +3633,7 @@ int32_t doStreamStateEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOper
}
// 4.dataVersion
tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
tlen += taosEncodeFixedI64(buf, pInfo->dataVersion);
// 5.checksum
if (isParent) {

View File

@ -3461,7 +3461,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
winKeyDecode((void*)&curKey, keyStr);
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) < 0) {
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) {
return pCur;
}
rocksdb_iter_prev(pCur->iter);