adj state cache

This commit is contained in:
54liuyao 2024-08-22 15:59:42 +08:00
parent af38c14069
commit 2d4ff65509
3 changed files with 13 additions and 21 deletions

View File

@ -156,20 +156,6 @@ static void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup) {
resetFillWindow(&pFillSup->nextNext);
}
void getCurWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) {
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
void* pState = pOperator->pTaskInfo->streamInfo.pState;
resetPrevAndNextWindow(pFillSup);
SWinKey key = {.ts = ts, .groupId = groupId};
int32_t curVLen = 0;
int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&pFillSup->cur.pRowVal, &curVLen, NULL);
ASSERT(code == TSDB_CODE_SUCCESS);
pFillSup->cur.key = key.ts;
}
void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) {
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
void* pState = pOperator->pTaskInfo->streamInfo.pState;

View File

@ -20,7 +20,8 @@
#include "tcommon.h"
#include "tsimplehash.h"
#define NUM_OF_FLUSED_WIN 64
#define NUM_OF_CACHE_WIN 64
#define MAX_NUM_OF_CACHE_WIN 128
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
SWinKey* pWin2 = taosArrayGet(pDatas, pos);
@ -54,7 +55,7 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo
SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX};
void* pState = getStateFileStore(pFileState);
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start);
for (int32_t i = 0; i < NUM_OF_FLUSED_WIN; i++) {
for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
SWinKey tmpKey = {.groupId = pKey->groupId};
int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0);
if (tmpRes != TSDB_CODE_SUCCESS) {
@ -83,6 +84,11 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
if (size >= MAX_NUM_OF_CACHE_WIN) {
int32_t num = size - NUM_OF_CACHE_WIN;
taosArrayRemoveBatch(pWinStates, 0, num, NULL);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
@ -115,8 +121,8 @@ void clearSearchBuff(SStreamFileState* pFileState) {
int64_t gpId = *(int64_t*)tSimpleHashGetKey(pIte, NULL);
SWinKey key = {.ts = flushMark, .groupId = gpId};
int32_t num = binarySearch(pWinStates, size, &key, fillStateKeyCompare);
if (size > NUM_OF_FLUSED_WIN) {
num = TMIN(num, size - NUM_OF_FLUSED_WIN);
if (size > NUM_OF_CACHE_WIN) {
num = TMIN(num, size - NUM_OF_CACHE_WIN);
taosArrayRemoveBatch(pWinStates, 0, num, NULL);
}
}

View File

@ -151,10 +151,10 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
rowSize += selectRowSize;
pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->usedBuffs = tdListNew(POINTER_BYTES);
QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _error, terrno);
QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
pFileState->freeBuffs = tdListNew(POINTER_BYTES);
QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _error, terrno);
QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
int32_t cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
@ -230,7 +230,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
int32_t len = 0;
int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
if (tmpRes == TSDB_CODE_SUCCESS) {
QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
streamFileStateDecode(&pFileState->flushMark, valBuf, len);
qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark);
}