From c2ad0a6ec54ac8e2b83384b4b9735a3b7f3e37aa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 25 Jun 2024 16:53:32 +0800 Subject: [PATCH] Revert "fix(stream):fill result immediately after delete data" --- source/libs/executor/src/streamfilloperator.c | 57 +++++-------------- .../executor/src/streamtimewindowoperator.c | 34 ++++------- source/libs/stream/src/streamBackendRocksdb.c | 4 +- 3 files changed, 28 insertions(+), 67 deletions(-) diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index c1a38b66ba..1cdd7d2d87 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -172,17 +172,10 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SWinKey key = {.ts = ts, .groupId = groupId}; void* curVal = NULL; int32_t curVLen = 0; - bool hasCurKey = true; int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen); - if (code == TSDB_CODE_SUCCESS) { - pFillSup->cur.key = key.ts; - pFillSup->cur.pRowVal = curVal; - } else { - qDebug("streamStateFillGet key failed, Data may be deleted. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId); - pFillSup->cur.key = ts; - pFillSup->cur.pRowVal = NULL; - hasCurKey = false; - } + ASSERT(code == TSDB_CODE_SUCCESS); + pFillSup->cur.key = key.ts; + pFillSup->cur.pRowVal = curVal; SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, &key); SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId}; @@ -194,10 +187,8 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, pFillSup->prev.key = preKey.ts; pFillSup->prev.pRowVal = preVal; - if (hasCurKey) { - code = pAPI->stateStore.streamStateCurNext(pState, pCur); - ASSERT(code == TSDB_CODE_SUCCESS); - } + code = pAPI->stateStore.streamStateCurNext(pState, pCur); + ASSERT(code == TSDB_CODE_SUCCESS); code = pAPI->stateStore.streamStateCurNext(pState, pCur); if (code != TSDB_CODE_SUCCESS) { @@ -750,8 +741,8 @@ static void doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKE getWindowFromDiscBuf(pOperator, startTs, groupId, pInfo->pFillSup); setDeleteFillValueInfo(startTs, endTs, pInfo->pFillSup, pInfo->pFillInfo); SWinKey key = {.ts = startTs, .groupId = groupId}; - pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key); if (!pInfo->pFillInfo->needFill) { + pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key); buildDeleteResult(pOperator, startTs, endTs, groupId, pInfo->pDelRes); } else { STimeRange tw = { @@ -760,27 +751,11 @@ static void doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKE .groupId = groupId, }; taosArrayPush(pInfo->pFillInfo->delRanges, &tw); - } -} - -static void getWindowInfoByKey(SStorageAPI* pAPI, void* pState, TSKEY ts, int64_t groupId, SResultRowData* pWinData) { - SWinKey key = {.ts = ts, .groupId = groupId}; - void* val = NULL; - int32_t len = 0; - int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&val, &len); - if (code != TSDB_CODE_SUCCESS) { - qDebug("get window info by key failed, Data may be deleted, try next window. ts:%" PRId64 ", groupId:%" PRId64, ts, - groupId); - SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key); - code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &key, (const void**)&val, &len); - pAPI->stateStore.streamStateFreeCur(pCur); - qDebug("get window info by key ts:%" PRId64 ", groupId:%" PRId64 ", res%d", ts, groupId, code); - } - - if (code == TSDB_CODE_SUCCESS) { - resetFillWindow(pWinData); - pWinData->key = key.ts; - pWinData->pRowVal = val; + while (key.ts <= endTs) { + key.ts = taosTimeAdd(key.ts, pInfo->pFillSup->interval.sliding, pInfo->pFillSup->interval.slidingUnit, + pInfo->pFillSup->interval.precision); + tSimpleHashPut(pInfo->pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0); + } } } @@ -790,22 +765,20 @@ static void doDeleteFillFinalize(SOperatorInfo* pOperator) { SStreamFillOperatorInfo* pInfo = pOperator->info; SStreamFillInfo* pFillInfo = pInfo->pFillInfo; int32_t size = taosArrayGetSize(pFillInfo->delRanges); - while (pFillInfo->delIndex < size) { + tSimpleHashClear(pInfo->pFillSup->pResMap); + for (; pFillInfo->delIndex < size; pFillInfo->delIndex++) { STimeRange* range = taosArrayGet(pFillInfo->delRanges, pFillInfo->delIndex); if (pInfo->pRes->info.id.groupId != 0 && pInfo->pRes->info.id.groupId != range->groupId) { return; } getWindowFromDiscBuf(pOperator, range->skey, range->groupId, pInfo->pFillSup); - TSKEY realEnd = range->ekey + 1; - if (pInfo->pFillInfo->type == TSDB_FILL_NEXT && pInfo->pFillSup->next.key != realEnd) { - getWindowInfoByKey(pAPI, pOperator->pTaskInfo->streamInfo.pState, realEnd, range->groupId, &pInfo->pFillSup->next); - } setDeleteFillValueInfo(range->skey, range->ekey, pInfo->pFillSup, pInfo->pFillInfo); - pFillInfo->delIndex++; if (pInfo->pFillInfo->needFill) { doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes); pInfo->pRes->info.id.groupId = range->groupId; } + SWinKey key = {.ts = range->skey, .groupId = range->groupId}; + pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key); } } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 5b9c018bba..4d567f729e 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2632,7 +2632,7 @@ int32_t doStreamSessionEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOp } // 4.dataVersion - tlen += taosEncodeFixedI64(buf, pInfo->dataVersion); + tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); // 5.checksum if (isParent) { @@ -3086,17 +3086,15 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); - if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { - // for stream - void* buff = NULL; - int32_t len = 0; - int32_t res = - pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, - strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len); - if (res == TSDB_CODE_SUCCESS) { - doStreamSessionDecodeOpState(buff, len, pOperator, true); - taosMemoryFree(buff); - } + // for stream + void* buff = NULL; + int32_t len = 0; + int32_t res = + pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, + strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len); + if (res == TSDB_CODE_SUCCESS) { + doStreamSessionDecodeOpState(buff, len, pOperator, true); + taosMemoryFree(buff); } pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); @@ -3281,16 +3279,6 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i, pInfo->primaryTsIndex); taosArrayPush(pInfo->pChildren, &pChildOp); } - - void* buff = NULL; - int32_t len = 0; - int32_t res = - pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, - strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len); - if (res == TSDB_CODE_SUCCESS) { - doStreamSessionDecodeOpState(buff, len, pOperator, true); - taosMemoryFree(buff); - } } if (!IS_FINAL_SESSION_OP(pOperator) || numOfChild == 0) { @@ -3633,7 +3621,7 @@ int32_t doStreamStateEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOper } // 4.dataVersion - tlen += taosEncodeFixedI64(buf, pInfo->dataVersion); + tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); // 5.checksum if (isParent) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 1986cc43e9..c151193284 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3422,7 +3422,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const size_t kLen = 0; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); winKeyDecode((void*)&curKey, keyStr); - if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) < 0) { + if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) { return pCur; } rocksdb_iter_next(pCur->iter); @@ -3459,7 +3459,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const size_t kLen = 0; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); winKeyDecode((void*)&curKey, keyStr); - if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) { + if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) < 0) { return pCur; } rocksdb_iter_prev(pCur->iter);