diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 89ab8ead21..9c2df5de2c 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -823,10 +823,11 @@ typedef struct SStreamFillOperatorInfo { int32_t primaryTsCol; int32_t primarySrcSlotId; SStreamFillInfo* pFillInfo; - SStreamAggSupporter* pStreamAggSup; SArray* pCloseTs; SArray* pUpdated; SGroupResInfo groupResInfo; + SStreamState* pState; + SStateStore stateStore; } SStreamFillOperatorInfo; typedef struct SStreamTimeSliceOperatorInfo { diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 91aef93452..2cf2959924 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -149,7 +149,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pInfo); -int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); +int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pInfo); int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pInfo); diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 4bd931c567..0a69080314 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -102,8 +102,9 @@ int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, struct SOperatorInfo** ppOptInfo); -int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated); +int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY ts, SArray* pUpdated); int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes); +TSKEY compareTs(void* pKey); #ifdef __cplusplus } diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index 6072063bbf..d20742bf45 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -119,7 +119,8 @@ typedef struct SStreamFillInfo { int32_t delIndex; uint64_t curGroupId; bool hasNext; - SResultRowData* pNonFillRow; + SResultRowData* pNonFillRow; + void* pTempBuff; } SStreamFillInfo; int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 3b10dce63f..a31d3d50e1 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -614,7 +614,7 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { code = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) { - code = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr); + code = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo, pHandle, &pOptr); } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) { code = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr); } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) { diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index d00f95e5a9..b90729e3b5 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -131,6 +131,7 @@ void destroyStreamFillInfo(SStreamFillInfo* pFillInfo) { pFillInfo->pLinearInfo = NULL; taosArrayDestroy(pFillInfo->delRanges); + taosMemoryFreeClear(pFillInfo->pTempBuff); taosMemoryFree(pFillInfo); } @@ -150,6 +151,14 @@ static void destroyStreamFillOperatorInfo(void* param) { clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroy(pInfo->pCloseTs); + if (pInfo->stateStore.streamFileStateDestroy != NULL) { + pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); + } + + if (pInfo->pState != NULL) { + taosMemoryFreeClear(pInfo->pState); + } + taosMemoryFree(pInfo); } @@ -1159,14 +1168,19 @@ _end: return code; } +static void resetForceFillWindow(SResultRowData* pRowData) { + pRowData->key = INT64_MIN; + pRowData->pRowVal = NULL; +} + void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; - void* pState = pOperator->pTaskInfo->streamInfo.pState; - bool res = false; - int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); + + SStreamFillOperatorInfo* pInfo = pOperator->info; + bool res = false; + int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) { SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index); if (pBlock->info.id.groupId == 0) { @@ -1174,25 +1188,30 @@ void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* } else if (pBlock->info.id.groupId != pKey->groupId) { break; } - void* val = NULL; - int32_t len = 0; - int32_t winCode = pAPI->stateStore.streamStateFillGet(pOperator->pTaskInfo->streamInfo.pState, pKey, (void**)&val, &len, NULL); + + SRowBuffPos* pValPos = NULL; + int32_t len = 0; + int32_t winCode = TSDB_CODE_SUCCESS; + code = pInfo->stateStore.streamStateFillGet(pInfo->pState, pKey, (void**)&pValPos, &len, &winCode); + QUERY_CHECK_CODE(code, lino, _end); qDebug("===stream=== build force fill res. key:%" PRId64 ",groupId:%" PRId64".res:%d", pKey->ts, pKey->groupId, winCode); if (winCode == TSDB_CODE_SUCCESS) { pFillSup->cur.key = pKey->ts; - pFillSup->cur.pRowVal = val; + pFillSup->cur.pRowVal = pValPos->pRowBuff; code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res); QUERY_CHECK_CODE(code, lino, _end); - resetFillWindow(&pFillSup->cur); + resetForceFillWindow(&pFillSup->cur); + releaseOutputBuf(pInfo->pState, pValPos, &pInfo->stateStore); } else { - SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, pKey); - SWinKey preKey = {.ts = INT64_MIN, .groupId = pKey->groupId}; - void* preVal = NULL; - int32_t preVLen = 0; - winCode = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen); + SWinKey preKey = {.ts = INT64_MIN, .groupId = pKey->groupId}; + SRowBuffPos* prePos = NULL; + int32_t preVLen = 0; + code = pInfo->stateStore.streamStateFillGetPrev(pInfo->pState, pKey, &preKey, + (void**)&prePos, &preVLen, &winCode); + QUERY_CHECK_CODE(code, lino, _end); if (winCode == TSDB_CODE_SUCCESS) { pFillSup->cur.key = pKey->ts; - pFillSup->cur.pRowVal = preVal; + pFillSup->cur.pRowVal = prePos->pRowBuff; if (pFillInfo->type == TSDB_FILL_PREV) { code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res); QUERY_CHECK_CODE(code, lino, _end); @@ -1202,9 +1221,9 @@ void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* code = buildFillResult(pFillInfo->pResRow, pFillSup, pKey->ts, pBlock, &res); QUERY_CHECK_CODE(code, lino, _end); } - resetFillWindow(&pFillSup->cur); + resetForceFillWindow(&pFillSup->cur); } - pAPI->stateStore.streamStateFreeCur(pCur); + releaseOutputBuf(pInfo->pState, prePos, &pInfo->stateStore); } } @@ -1249,6 +1268,45 @@ _end: return code; } +static void keepResultInStateBuf(SStreamFillOperatorInfo* pInfo, uint64_t groupId, SResultRowData* pRow) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + SWinKey key = {.groupId = groupId, .ts = pRow->key}; + int32_t curVLen = 0; + SRowBuffPos* pStatePos = NULL; + int32_t winCode = TSDB_CODE_SUCCESS; + code = pInfo->stateStore.streamStateFillAddIfNotExist(pInfo->pState, &key, (void**)&pStatePos, + &curVLen, &winCode); + QUERY_CHECK_CODE(code, lino, _end); + memcpy(pStatePos->pRowBuff, pRow->pRowVal, pInfo->pFillSup->rowSize); + qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 " code:%d", key.ts, key.groupId, code); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } +} + +int32_t keepBlockRowInStateBuf(SStreamFillOperatorInfo* pInfo, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol, + int32_t rowId, uint64_t groupId, int32_t rowSize) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + TSKEY ts = tsCol[rowId]; + pFillInfo->nextRowKey = ts; + TAOS_MEMSET(pFillInfo->pTempBuff, 0, rowSize); + SResultRowData tmpNextRow = {.key = ts, .pRowVal = pFillInfo->pTempBuff}; + + transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow); + keepResultInStateBuf(pInfo, groupId, &tmpNextRow); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + // force window close impl static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; @@ -1259,11 +1317,10 @@ static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) { SStreamFillInfo* pFillInfo = pInfo->pFillInfo; SSDataBlock* pBlock = pInfo->pSrcBlock; uint64_t groupId = pBlock->info.id.groupId; - SStreamAggSupporter* pAggSup = pInfo->pStreamAggSup; SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol); TSKEY* tsCol = (TSKEY*)pTsCol->pData; for (int32_t i = 0; i < pBlock->info.rows; i++){ - code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize); + code = keepBlockRowInStateBuf(pInfo, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize); QUERY_CHECK_CODE(code, lino, _end); int32_t size = taosArrayGetSize(pInfo->pCloseTs); @@ -1283,7 +1340,7 @@ static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) { } } } - code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, groupId, NULL, 0); + code = pInfo->stateStore.streamStateGroupPut(pInfo->pState, groupId, NULL, 0); QUERY_CHECK_CODE(code, lino, _end); _end: @@ -1293,13 +1350,13 @@ _end: return code; } -int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated) { +int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY ts, SArray* pUpdated) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int64_t groupId = 0; - SStreamStateCur* pCur = pAggSup->stateStore.streamStateGroupGetCur(pAggSup->pState); + SStreamStateCur* pCur = pStateStore->streamStateGroupGetCur(pState); while (1) { - int32_t winCode = pAggSup->stateStore.streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL); + int32_t winCode = pStateStore->streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL); if (winCode != TSDB_CODE_SUCCESS) { break; } @@ -1307,14 +1364,14 @@ int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdat void* pPushRes = taosArrayPush(pUpdated, &key); QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno); - pAggSup->stateStore.streamStateGroupCurNext(pCur); + pStateStore->streamStateGroupCurNext(pCur); } - pAggSup->stateStore.streamStateFreeCur(pCur); + pStateStore->streamStateFreeCur(pCur); pCur = NULL; _end: if (code != TSDB_CODE_SUCCESS) { - pAggSup->stateStore.streamStateFreeCur(pCur); + pStateStore->streamStateFreeCur(pCur); pCur = NULL; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } @@ -1347,7 +1404,8 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR (*ppRes) = resBlock; goto _end; } - pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); + + pInfo->stateStore.streamStateClearExpiredState(pInfo->pState); setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; goto _end; @@ -1395,7 +1453,7 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) { TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i); - code = buildAllResultKey(pInfo->pStreamAggSup, ts, pInfo->pUpdated); + code = buildAllResultKey(&pInfo->stateStore, pInfo->pState, ts, pInfo->pUpdated); QUERY_CHECK_CODE(code, lino, _end); } taosArrayClear(pInfo->pCloseTs); @@ -1414,7 +1472,7 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR QUERY_CHECK_CODE(code, lino, _end); if ((*ppRes) == NULL) { - pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); + pInfo->stateStore.streamStateClearExpiredState(pInfo->pState); setStreamOperatorCompleted(pOperator); } @@ -1621,6 +1679,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pFillInfo->delIndex = 0; pFillInfo->curGroupId = 0; pFillInfo->hasNext = false; + pFillInfo->pTempBuff = taosMemoryCalloc(1, pFillSup->rowSize); return pFillInfo; _end: @@ -1664,21 +1723,18 @@ static void setValueForFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* } } -int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInterval* pInterval, SStreamAggSupporter** ppAggSup) { +int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInterval* pInterval) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (IS_NORMAL_INTERVAL_OP(downstream)) { SStreamIntervalOperatorInfo* pInfo = downstream->info; *triggerType = pInfo->twAggSup.calTrigger; *pInterval = pInfo->interval; - (*ppAggSup) = NULL; } else if (IS_CONTINUE_INTERVAL_OP(downstream)) { SStreamIntervalSliceOperatorInfo* pInfo = downstream->info; *triggerType = pInfo->twAggSup.calTrigger; *pInterval = pInfo->interval; pInfo->hasFill = true; - (*ppAggSup) = &pInfo->streamAggSup; - pInfo->streamAggSup.stateStore.streamStateSetFillInfo(pInfo->streamAggSup.pState); } else { code = TSDB_CODE_STREAM_INTERNAL_ERROR; } @@ -1691,8 +1747,31 @@ _end: return code; } +int32_t initFillOperatorStateBuff(SStreamFillOperatorInfo* pInfo, SStreamState* pState, SStateStore* pStore, + SReadHandle* pHandle, const char* taskIdStr, SStorageAPI* pApi) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + pInfo->stateStore = *pStore; + pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + QUERY_CHECK_NULL(pInfo->pState, code, lino, _end, terrno); + + *(pInfo->pState) = *pState; + pInfo->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsCol); + code = pInfo->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->pFillSup->rowSize, 0, compareTs, + pInfo->pState, INT64_MAX, taskIdStr, pHandle->checkpointId, + STREAM_STATE_BUFF_HASH_SORT, &pInfo->pState->pFileState); + QUERY_CHECK_CODE(code, lino, _end); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, - SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { + SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; @@ -1718,7 +1797,7 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi int8_t triggerType = 0; SInterval interval = {0}; - code = getDownStreamInfo(downstream, &triggerType, &interval, &pInfo->pStreamAggSup); + code = getDownStreamInfo(downstream, &triggerType, &interval); QUERY_CHECK_CODE(code, lino, _error); pInfo->pFillSup = initStreamFillSup(pPhyFillNode, &interval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI, @@ -1773,9 +1852,12 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi pTaskInfo); if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + initFillOperatorStateBuff(pInfo, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.stateStore, pHandle, + GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } else { + pInfo->pState != NULL; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c index f26cbce119..fcf7d6ef10 100644 --- a/source/libs/executor/src/streamintervalsliceoperator.c +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -360,9 +360,7 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** return code; } - if (pInfo->hasFill == false) { - pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); - } + pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; return code; @@ -452,9 +450,7 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** QUERY_CHECK_CODE(code, lino, _end); if ((*ppRes) == NULL) { - if (pInfo->hasFill == false) { - pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); - } + pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); setStreamOperatorCompleted(pOperator); } diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 66793ca5a0..8611678e5a 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -1909,7 +1909,7 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR qDebug("===stream===build stream result, ts count:%d", size); for (int32_t i = 0; i < size; i++) { TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i); - code = buildAllResultKey(&pInfo->streamAggSup, ts, pInfo->pUpdated); + code = buildAllResultKey(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pState, ts, pInfo->pUpdated); QUERY_CHECK_CODE(code, lino, _end); } qDebug("===stream===build stream result, res count:%ld", taosArrayGetSize(pInfo->pUpdated)); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 8fd00e9313..8bf7323d63 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1834,7 +1834,7 @@ int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption) { return deleteMark; } -static TSKEY compareTs(void* pKey) { +TSKEY compareTs(void* pKey) { SWinKey* pWinKey = (SWinKey*)pKey; return pWinKey->ts; } diff --git a/source/libs/executor/test/queryPlanTests.cpp b/source/libs/executor/test/queryPlanTests.cpp index 6710435aba..69097ce755 100755 --- a/source/libs/executor/test/queryPlanTests.cpp +++ b/source/libs/executor/test/queryPlanTests.cpp @@ -3094,7 +3094,7 @@ void qptExecPlan(SReadHandle* pReadHandle, SNode* pNode, SExecTaskInfo* pTaskInf qptCtx.result.code = createFillOperatorInfo(NULL, (SFillPhysiNode*)pNode, pTaskInfo, ppOperaotr); break; case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: - qptCtx.result.code = createStreamFillOperatorInfo(NULL, (SStreamFillPhysiNode*)pNode, pTaskInfo, ppOperaotr); + qptCtx.result.code = createStreamFillOperatorInfo(NULL, (SStreamFillPhysiNode*)pNode, pTaskInfo, pReadHandle, ppOperaotr); break; case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: qptCtx.result.code = createSessionAggOperatorInfo(NULL, (SSessionWinodwPhysiNode*)pNode, pTaskInfo, ppOperaotr); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index dc4ca7c0e5..564f0d3cb0 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -1232,11 +1232,6 @@ void clearExpiredState(SStreamFileState* pFileState) { int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file); } - - if (pFileState->hasFillCatch == false) { - int32_t code_file = streamStateFillDel_rocksdb(pFileState->pFileStore, pKey); - qTrace("force clear expired file, ts:%" PRId64 ". %s at line %d res %d", pKey->ts, __func__, __LINE__, code_file); - } } taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL); }