From 2d4ff655099e6129f3a7241024c6af7c3f2c85a3 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 22 Aug 2024 15:59:42 +0800 Subject: [PATCH] adj state cache --- source/libs/executor/src/streamfilloperator.c | 14 -------------- source/libs/stream/src/streamSliceState.c | 14 ++++++++++---- source/libs/stream/src/tstreamFileState.c | 6 +++--- 3 files changed, 13 insertions(+), 21 deletions(-) diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index b35c1e7385..cb7d1cd20a 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -156,20 +156,6 @@ static void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup) { resetFillWindow(&pFillSup->nextNext); } -void getCurWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) { - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; - - void* pState = pOperator->pTaskInfo->streamInfo.pState; - resetPrevAndNextWindow(pFillSup); - - SWinKey key = {.ts = ts, .groupId = groupId}; - int32_t curVLen = 0; - - int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&pFillSup->cur.pRowVal, &curVLen, NULL); - ASSERT(code == TSDB_CODE_SUCCESS); - pFillSup->cur.key = key.ts; -} - void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) { SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; void* pState = pOperator->pTaskInfo->streamInfo.pState; diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c index 82d204c634..117d9621e1 100644 --- a/source/libs/stream/src/streamSliceState.c +++ b/source/libs/stream/src/streamSliceState.c @@ -20,7 +20,8 @@ #include "tcommon.h" #include "tsimplehash.h" -#define NUM_OF_FLUSED_WIN 64 +#define NUM_OF_CACHE_WIN 64 +#define MAX_NUM_OF_CACHE_WIN 128 int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) { SWinKey* pWin2 = taosArrayGet(pDatas, pos); @@ -54,7 +55,7 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX}; void* pState = getStateFileStore(pFileState); SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start); - for (int32_t i = 0; i < NUM_OF_FLUSED_WIN; i++) { + for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) { SWinKey tmpKey = {.groupId = pKey->groupId}; int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0); if (tmpRes != TSDB_CODE_SUCCESS) { @@ -83,6 +84,11 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); } + if (size >= MAX_NUM_OF_CACHE_WIN) { + int32_t num = size - NUM_OF_CACHE_WIN; + taosArrayRemoveBatch(pWinStates, 0, num, NULL); + } + _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); @@ -115,8 +121,8 @@ void clearSearchBuff(SStreamFileState* pFileState) { int64_t gpId = *(int64_t*)tSimpleHashGetKey(pIte, NULL); SWinKey key = {.ts = flushMark, .groupId = gpId}; int32_t num = binarySearch(pWinStates, size, &key, fillStateKeyCompare); - if (size > NUM_OF_FLUSED_WIN) { - num = TMIN(num, size - NUM_OF_FLUSED_WIN); + if (size > NUM_OF_CACHE_WIN) { + num = TMIN(num, size - NUM_OF_CACHE_WIN); taosArrayRemoveBatch(pWinStates, 0, num, NULL); } } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index c97a93f50d..60be901ac9 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -151,10 +151,10 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ rowSize += selectRowSize; pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->usedBuffs = tdListNew(POINTER_BYTES); - QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _error, terrno); + QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno); pFileState->freeBuffs = tdListNew(POINTER_BYTES); - QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _error, terrno); + QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); int32_t cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount); @@ -230,7 +230,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ int32_t len = 0; int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len); if (tmpRes == TSDB_CODE_SUCCESS) { - QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); streamFileStateDecode(&pFileState->flushMark, valBuf, len); qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark); }