From d7caed1c44b46292a637c9a02ade78bde551ea56 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 6 Sep 2023 16:34:25 +0800 Subject: [PATCH] opt stream state --- include/libs/stream/streamState.h | 48 ------------------- .../executor/src/streamtimewindowoperator.c | 8 +++- source/libs/stream/src/streamState.c | 2 +- source/libs/stream/src/tstreamFileState.c | 12 ++++- 4 files changed, 18 insertions(+), 52 deletions(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 7747df8595..4312da6f2c 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -29,54 +29,6 @@ extern "C" { #include "storageapi.h" -// void* streamBackendInit(const char* path); -// void streamBackendCleanup(void* arg); -// SListNode* streamBackendAddCompare(void* backend, void* arg); -// void streamBackendDelCompare(void* backend, void* arg); - -// <<<<<<< HEAD -// typedef struct STdbState { -// rocksdb_t* rocksdb; -// rocksdb_column_family_handle_t** pHandle; -// rocksdb_writeoptions_t* writeOpts; -// rocksdb_readoptions_t* readOpts; -// rocksdb_options_t** cfOpts; -// rocksdb_options_t* dbOpt; -// struct SStreamTask* pOwner; -// void* param; -// void* env; -// SListNode* pComparNode; -// void* pBackend; -// char idstr[64]; -// void* compactFactory; -// TdThreadRwlock rwLock; -// ======= -// typedef struct STdbState { -// rocksdb_t* rocksdb; -// rocksdb_column_family_handle_t** pHandle; -// rocksdb_writeoptions_t* writeOpts; -// rocksdb_readoptions_t* readOpts; -// rocksdb_options_t** cfOpts; -// rocksdb_options_t* dbOpt; -// struct SStreamTask* pOwner; -// void* param; -// void* env; -// SListNode* pComparNode; -// void* pBackendHandle; -// char idstr[64]; -// void* compactFactory; -// -// TDB* db; -// TTB* pStateDb; -// TTB* pFuncStateDb; -// TTB* pFillStateDb; // todo refactor -// TTB* pSessionStateDb; -// TTB* pParNameDb; -// TTB* pParTagDb; -// TXN* txn; -//} STdbState; -//>>>>>>> enh/dev3.0 - SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); void streamStateClose(SStreamState* pState, bool remove); int32_t streamStateBegin(SStreamState* pState); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index c0e2a44153..1c909cb47d 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -786,6 +786,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat if (startPos < 0) { break; } + qDebug("===stream===ignore expired data, window end ts:%" PRId64 ", maxts - wartermak:%" PRId64, nextWin.ekey, + pInfo->twAggSup.maxTs - pInfo->twAggSup.waterMark); continue; } @@ -1552,6 +1554,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { tSimpleHashCleanup(pInfo->pStUpdated); tSimpleHashCleanup(pInfo->pStDeleted); pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); + cleanupGroupResInfo(&pInfo->groupResInfo); taosArrayDestroy(pInfo->historyWins); blockDataDestroy(pInfo->pCheckpointRes); @@ -2197,6 +2200,7 @@ void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL pGroupResInfo->pRows = pArrayList; pGroupResInfo->index = 0; pGroupResInfo->pBuf = NULL; + pGroupResInfo->freeItem = false; } void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, SSDataBlock* pBlock) { @@ -2874,10 +2878,12 @@ void destroyStreamStateOperatorInfo(void* param) { } colDataDestroy(&pInfo->twAggSup.timeWindowData); blockDataDestroy(pInfo->pDelRes); - taosArrayDestroy(pInfo->historyWins); tSimpleHashCleanup(pInfo->pSeUpdated); tSimpleHashCleanup(pInfo->pSeDeleted); pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); + cleanupGroupResInfo(&pInfo->groupResInfo); + + taosArrayDestroy(pInfo->historyWins); blockDataDestroy(pInfo->pCheckpointRes); taosMemoryFreeClear(param); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 83aed42fe2..44c7b4f2e0 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -345,7 +345,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key) { return hasRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey)); #else SStateKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen); + return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), NULL, NULL); #endif } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index bca9dcabda..be3ad73472 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -25,7 +25,8 @@ #define FLUSH_RATIO 0.5 #define FLUSH_NUM 4 -#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024); +#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024) +#define MIN_NUM_OF_ROW_BUFF 10240 struct SStreamFileState { SList* usedBuffs; @@ -67,7 +68,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->usedBuffs = tdListNew(POINTER_BYTES); pFileState->freeBuffs = tdListNew(POINTER_BYTES); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - int32_t cap = TMIN(10240, pFileState->maxRowCount); + int32_t cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount); pFileState->rowBuffMap = tSimpleHashInit(cap, hashFn); if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) { goto _error; @@ -272,10 +273,12 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi *pVLen = pFileState->rowSize; *pVal = *pos; (*pos)->beUsed = true; + (*pos)->beFlushed = false; return TSDB_CODE_SUCCESS; } SRowBuffPos* pNewPos = getNewRowPos(pFileState); pNewPos->beUsed = true; + pNewPos->beFlushed = false; ASSERT(pNewPos->pRowBuff); memcpy(pNewPos->pKey, pKey, keyLen); @@ -375,6 +378,10 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; ASSERT(pPos->pRowBuff && pFileState->rowSize > 0); + if (pPos->beFlushed) { + continue; + } + pPos->beFlushed = true; if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) { streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); @@ -513,6 +520,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { ASSERT(pVLen == pFileState->rowSize); memcpy(pNewPos->pRowBuff, pVal, pVLen); taosMemoryFreeClear(pVal); + pNewPos->beFlushed = true; code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); if (code != TSDB_CODE_SUCCESS) { destroyRowBuffPos(pNewPos);