diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 929248f5cb..cd426a3a3a 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -378,6 +378,7 @@ typedef struct SStateStore { int32_t (*streamStateFillGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); + void (*streamStateSetFillInfo)(SStreamState* pState); void (*streamStateClearExpiredState)(SStreamState* pState); int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 3b05e3abf2..a50451c3eb 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -108,6 +108,7 @@ int32_t streamStateFillGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, con int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); // twa +void streamStateSetFillInfo(SStreamState* pState); void streamStateClearExpiredState(SStreamState* pState); void streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index a6f55fca76..4a696d9798 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -145,9 +145,12 @@ SSHashObj* getGroupIdCache(SStreamFileState* pFileState); int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos); int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal, int32_t* pVLen, int32_t* pWinCode); +int32_t addSearchItem(SStreamFileState* pFileState, SArray* pWinStates, const SWinKey* pKey); //twa +void setFillInfo(SStreamFileState* pFileState); void clearExpiredState(SStreamFileState* pFileState); +int32_t addArrayBuffIfNotExist(SSHashObj* pSearchBuff, uint64_t groupId, SArray** ppResStates); #ifdef __cplusplus } diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index 628b4b392d..680a2fd83c 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -67,6 +67,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur; pStore->streamStateGetKVByCur = streamStateGetKVByCur; + pStore->streamStateSetFillInfo = streamStateSetFillInfo; pStore->streamStateClearExpiredState = streamStateClearExpiredState; pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 3a36dacbf0..d688d1323d 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -183,6 +183,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur; pStore->streamStateGetKVByCur = streamStateGetKVByCur; + pStore->streamStateSetFillInfo = streamStateSetFillInfo; pStore->streamStateClearExpiredState = streamStateClearExpiredState; pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist; diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index a36c2a65e0..81832cac8f 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1636,6 +1636,7 @@ int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInter *pInterval = pInfo->interval; pInfo->hasFill = true; (*ppAggSup) = &pInfo->streamAggSup; + pInfo->streamAggSup.stateStore.streamStateSetFillInfo(pInfo->streamAggSup.pState); } else { code = TSDB_CODE_STREAM_INTERNAL_ERROR; } diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 289c641c43..8becadaa1a 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -1751,6 +1751,9 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR goto _end; } + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); + } setStreamOperatorCompleted(pOperator); resetStreamFillSup(pInfo->pFillSup); (*ppRes) = NULL; @@ -1854,6 +1857,9 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR QUERY_CHECK_CODE(code, lino, _end); if (!(*ppRes)) { + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); + } setStreamOperatorCompleted(pOperator); resetStreamFillSup(pInfo->pFillSup); } diff --git a/source/libs/function/inc/functionResInfoInt.h b/source/libs/function/inc/functionResInfoInt.h index 9ee1e884b3..f97d2e8024 100644 --- a/source/libs/function/inc/functionResInfoInt.h +++ b/source/libs/function/inc/functionResInfoInt.h @@ -237,6 +237,7 @@ typedef struct SElapsedInfo { } SElapsedInfo; typedef struct STwaInfo { + double dTwaRes; double dOutput; int64_t numOfElems; SPoint1 p; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 9f50e705ca..78e1166a0b 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -6201,11 +6201,11 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { pResInfo->numOfRes = 0; } else { if (pInfo->win.ekey == pInfo->win.skey) { - pInfo->dOutput = pInfo->p.val; + pInfo->dTwaRes = pInfo->p.val; } else if (pInfo->win.ekey == INT64_MAX || pInfo->win.skey == INT64_MIN) { // no data in timewindow - pInfo->dOutput = 0; + pInfo->dTwaRes = 0; } else { - pInfo->dOutput = pInfo->dOutput / (pInfo->win.ekey - pInfo->win.skey); + pInfo->dTwaRes = pInfo->dOutput / (pInfo->win.ekey - pInfo->win.skey); } pResInfo->numOfRes = 1; diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c index 0660a68ed2..a80c42a881 100644 --- a/source/libs/stream/src/streamSliceState.c +++ b/source/libs/stream/src/streamSliceState.c @@ -33,16 +33,7 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo SArray* pWinStates = NULL; SSHashObj* pSearchBuff = getSearchBuff(pFileState); - void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); - if (ppBuff) { - pWinStates = (SArray*)(*ppBuff); - } else { - pWinStates = taosArrayInit(16, sizeof(SWinKey)); - QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno); - - code = tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); - QUERY_CHECK_CODE(code, lino, _end); - } + addArrayBuffIfNotExist(pSearchBuff, pKey->groupId, &pWinStates); // recover if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) { @@ -64,25 +55,7 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo streamStateFreeCur(pCur); } - int32_t size = taosArrayGetSize(pWinStates); - int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); - if (!isFlushedState(pFileState, pKey->ts, 0)|| index >= 0 || size == 0) { - // find the first position which is smaller than the pKey - if (index >= 0) { - SWinKey* pTmpKey = taosArrayGet(pWinStates, index); - if (winKeyCmprImpl(pTmpKey, pKey) == 0) { - goto _end; - } - } - index++; - void* tmp = taosArrayInsert(pWinStates, index, pKey); - QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); - } - - if (size >= MAX_NUM_OF_CACHE_WIN) { - int32_t num = size - NUM_OF_CACHE_WIN; - taosArrayRemoveBatch(pWinStates, 0, num, NULL); - } + addSearchItem(pFileState, pWinStates, pKey); _end: if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 99801ab0eb..1266208d2c 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -229,7 +229,6 @@ int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) { if (pState->pFileState) { - // todo(liuyao) 改这里 return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen, pWinCode); } return streamStateFillGet_rocksdb(pState, key, pVal, pVLen); @@ -289,7 +288,24 @@ int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, voi int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) { - return streamStateGet(pState, key, pVal, pVLen, pWinCode); + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + code = streamStateGet(pState, key, pVal, pVLen, pWinCode); + QUERY_CHECK_CODE(code, lino, _end); + + SSHashObj* pSearchBuff = getSearchBuff(pState->pFileState); + if (pSearchBuff != NULL) { + SArray* pWinStates = NULL; + code = addArrayBuffIfNotExist(pSearchBuff, key->groupId, &pWinStates); + QUERY_CHECK_CODE(code, lino, _end); + code = addSearchItem(pState->pFileState, pWinStates, key); + QUERY_CHECK_CODE(code, lino, _end); + } +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } void streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used) { @@ -588,6 +604,10 @@ void streamStateClearExpiredState(SStreamState* pState) { clearExpiredState(pState->pFileState); } +void streamStateSetFillInfo(SStreamState* pState) { + setFillInfo(pState->pFileState); +} + int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { return getRowStatePrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index d31665de90..7a646e93b5 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -56,6 +56,7 @@ struct SStreamFileState { char* cfName; void* searchBuff; SSHashObj* pGroupIdMap; + bool hasFillCatch; _state_buff_cleanup_fn stateBuffCleanupFn; _state_buff_remove_fn stateBuffRemoveFn; @@ -253,6 +254,8 @@ int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, pFileState->pGroupIdMap = tSimpleHashInit(1024, hashFn); QUERY_CHECK_NULL(pFileState->pGroupIdMap, code, lino, _end, terrno); + pFileState->hasFillCatch = true; + if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) { code = recoverSnapshot(pFileState, checkpointId); } else if (type == STREAM_STATE_BUFF_SORT) { @@ -997,6 +1000,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { } if (vlen != pFileState->rowSize) { code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + taosMemoryFreeClear(pVal); QUERY_CHECK_CODE(code, lino, _end); } memcpy(pNewPos->pRowBuff, pVal, vlen); @@ -1207,6 +1211,10 @@ SSHashObj* getGroupIdCache(SStreamFileState* pFileState) { return pFileState->pGroupIdMap; } +void setFillInfo(SStreamFileState* pFileState) { + pFileState->hasFillCatch = false; +} + void clearExpiredState(SStreamFileState* pFileState) { SSHashObj* pSearchBuff = pFileState->searchBuff; void* pIte = NULL; @@ -1222,8 +1230,10 @@ void clearExpiredState(SStreamFileState* pFileState) { int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); qTrace("%s at line %d res:%d", __func__, __LINE__, code_file); - code_file = streamStateFillDel_rocksdb(pFileState->pFileStore, pKey); - qTrace("%s at line %d res %d", __func__, __LINE__, code_file); + if (pFileState->hasFillCatch == false || isFlushedState(pFileState, pKey->ts, 0)) { + code_file = streamStateFillDel_rocksdb(pFileState->pFileStore, pKey); + qTrace("state fill delete.%s at line %d res %d", __func__, __LINE__, code_file); + } } taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL); } @@ -1368,3 +1378,55 @@ _end: } return code; } + +int32_t addSearchItem(SStreamFileState* pFileState, SArray* pWinStates, const SWinKey* pKey) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t size = taosArrayGetSize(pWinStates); + int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); + if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0 || size == 0) { + if (index >= 0) { + SWinKey* pTmpKey = taosArrayGet(pWinStates, index); + if (winKeyCmprImpl(pTmpKey, pKey) == 0) { + goto _end; + } + } + index++; + void* tmp = taosArrayInsert(pWinStates, index, pKey); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + } + + if (size >= MAX_NUM_OF_CACHE_WIN) { + int32_t num = size - NUM_OF_CACHE_WIN; + taosArrayRemoveBatch(pWinStates, 0, num, NULL); + } +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t addArrayBuffIfNotExist(SSHashObj* pSearchBuff, uint64_t groupId, SArray** ppResStates) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SArray* pWinStates = NULL; + void** ppBuff = tSimpleHashGet(pSearchBuff, &groupId, sizeof(uint64_t)); + if (ppBuff) { + pWinStates = (SArray*)(*ppBuff); + } else { + pWinStates = taosArrayInit(16, sizeof(SWinKey)); + QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno); + + code = tSimpleHashPut(pSearchBuff, &groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); + QUERY_CHECK_CODE(code, lino, _end); + } + + (*ppResStates) = pWinStates; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +}