diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 1cdd7d2d87..7d84bd1d66 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -172,10 +172,17 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SWinKey key = {.ts = ts, .groupId = groupId}; void* curVal = NULL; int32_t curVLen = 0; + bool hasCurKey = true; int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen); - ASSERT(code == TSDB_CODE_SUCCESS); - pFillSup->cur.key = key.ts; - pFillSup->cur.pRowVal = curVal; + if (code == TSDB_CODE_SUCCESS) { + pFillSup->cur.key = key.ts; + pFillSup->cur.pRowVal = curVal; + } else { + qDebug("streamStateFillGet key failed, Data may be deleted. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId); + pFillSup->cur.key = ts; + pFillSup->cur.pRowVal = NULL; + hasCurKey = false; + } SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, &key); SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId}; @@ -187,8 +194,10 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, pFillSup->prev.key = preKey.ts; pFillSup->prev.pRowVal = preVal; - code = pAPI->stateStore.streamStateCurNext(pState, pCur); - ASSERT(code == TSDB_CODE_SUCCESS); + if (hasCurKey) { + code = pAPI->stateStore.streamStateCurNext(pState, pCur); + ASSERT(code == TSDB_CODE_SUCCESS); + } code = pAPI->stateStore.streamStateCurNext(pState, pCur); if (code != TSDB_CODE_SUCCESS) { @@ -741,8 +750,8 @@ static void doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKE getWindowFromDiscBuf(pOperator, startTs, groupId, pInfo->pFillSup); setDeleteFillValueInfo(startTs, endTs, pInfo->pFillSup, pInfo->pFillInfo); SWinKey key = {.ts = startTs, .groupId = groupId}; + pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key); if (!pInfo->pFillInfo->needFill) { - pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key); buildDeleteResult(pOperator, startTs, endTs, groupId, pInfo->pDelRes); } else { STimeRange tw = { @@ -751,11 +760,6 @@ static void doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKE .groupId = groupId, }; taosArrayPush(pInfo->pFillInfo->delRanges, &tw); - while (key.ts <= endTs) { - key.ts = taosTimeAdd(key.ts, pInfo->pFillSup->interval.sliding, pInfo->pFillSup->interval.slidingUnit, - pInfo->pFillSup->interval.precision); - tSimpleHashPut(pInfo->pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0); - } } } @@ -765,7 +769,6 @@ static void doDeleteFillFinalize(SOperatorInfo* pOperator) { SStreamFillOperatorInfo* pInfo = pOperator->info; SStreamFillInfo* pFillInfo = pInfo->pFillInfo; int32_t size = taosArrayGetSize(pFillInfo->delRanges); - tSimpleHashClear(pInfo->pFillSup->pResMap); for (; pFillInfo->delIndex < size; pFillInfo->delIndex++) { STimeRange* range = taosArrayGet(pFillInfo->delRanges, pFillInfo->delIndex); if (pInfo->pRes->info.id.groupId != 0 && pInfo->pRes->info.id.groupId != range->groupId) { @@ -777,8 +780,6 @@ static void doDeleteFillFinalize(SOperatorInfo* pOperator) { doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes); pInfo->pRes->info.id.groupId = range->groupId; } - SWinKey key = {.ts = range->skey, .groupId = range->groupId}; - pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key); } } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 4d567f729e..5b9c018bba 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2632,7 +2632,7 @@ int32_t doStreamSessionEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOp } // 4.dataVersion - tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); + tlen += taosEncodeFixedI64(buf, pInfo->dataVersion); // 5.checksum if (isParent) { @@ -3086,15 +3086,17 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); - // for stream - void* buff = NULL; - int32_t len = 0; - int32_t res = - pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, - strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len); - if (res == TSDB_CODE_SUCCESS) { - doStreamSessionDecodeOpState(buff, len, pOperator, true); - taosMemoryFree(buff); + if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { + // for stream + void* buff = NULL; + int32_t len = 0; + int32_t res = + pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, + strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len); + if (res == TSDB_CODE_SUCCESS) { + doStreamSessionDecodeOpState(buff, len, pOperator, true); + taosMemoryFree(buff); + } } pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); @@ -3279,6 +3281,16 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i, pInfo->primaryTsIndex); taosArrayPush(pInfo->pChildren, &pChildOp); } + + void* buff = NULL; + int32_t len = 0; + int32_t res = + pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, + strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len); + if (res == TSDB_CODE_SUCCESS) { + doStreamSessionDecodeOpState(buff, len, pOperator, true); + taosMemoryFree(buff); + } } if (!IS_FINAL_SESSION_OP(pOperator) || numOfChild == 0) { @@ -3621,7 +3633,7 @@ int32_t doStreamStateEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOper } // 4.dataVersion - tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); + tlen += taosEncodeFixedI64(buf, pInfo->dataVersion); // 5.checksum if (isParent) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 597d6035d6..6cc74e04f7 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3461,7 +3461,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const size_t kLen = 0; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); winKeyDecode((void*)&curKey, keyStr); - if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) < 0) { + if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) { return pCur; } rocksdb_iter_prev(pCur->iter);