opt stream state

This commit is contained in:
liuyao 2023-09-06 16:34:25 +08:00
parent ed9085281d
commit d7caed1c44
4 changed files with 18 additions and 52 deletions

View File

@ -29,54 +29,6 @@ extern "C" {
#include "storageapi.h"
// void* streamBackendInit(const char* path);
// void streamBackendCleanup(void* arg);
// SListNode* streamBackendAddCompare(void* backend, void* arg);
// void streamBackendDelCompare(void* backend, void* arg);
// <<<<<<< HEAD
// typedef struct STdbState {
// rocksdb_t* rocksdb;
// rocksdb_column_family_handle_t** pHandle;
// rocksdb_writeoptions_t* writeOpts;
// rocksdb_readoptions_t* readOpts;
// rocksdb_options_t** cfOpts;
// rocksdb_options_t* dbOpt;
// struct SStreamTask* pOwner;
// void* param;
// void* env;
// SListNode* pComparNode;
// void* pBackend;
// char idstr[64];
// void* compactFactory;
// TdThreadRwlock rwLock;
// =======
// typedef struct STdbState {
// rocksdb_t* rocksdb;
// rocksdb_column_family_handle_t** pHandle;
// rocksdb_writeoptions_t* writeOpts;
// rocksdb_readoptions_t* readOpts;
// rocksdb_options_t** cfOpts;
// rocksdb_options_t* dbOpt;
// struct SStreamTask* pOwner;
// void* param;
// void* env;
// SListNode* pComparNode;
// void* pBackendHandle;
// char idstr[64];
// void* compactFactory;
//
// TDB* db;
// TTB* pStateDb;
// TTB* pFuncStateDb;
// TTB* pFillStateDb; // todo refactor
// TTB* pSessionStateDb;
// TTB* pParNameDb;
// TTB* pParTagDb;
// TXN* txn;
//} STdbState;
//>>>>>>> enh/dev3.0
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
void streamStateClose(SStreamState* pState, bool remove);
int32_t streamStateBegin(SStreamState* pState);

View File

@ -786,6 +786,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
if (startPos < 0) {
break;
}
qDebug("===stream===ignore expired data, window end ts:%" PRId64 ", maxts - wartermak:%" PRId64, nextWin.ekey,
pInfo->twAggSup.maxTs - pInfo->twAggSup.waterMark);
continue;
}
@ -1552,6 +1554,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
tSimpleHashCleanup(pInfo->pStUpdated);
tSimpleHashCleanup(pInfo->pStDeleted);
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins);
blockDataDestroy(pInfo->pCheckpointRes);
@ -2197,6 +2200,7 @@ void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
pGroupResInfo->pRows = pArrayList;
pGroupResInfo->index = 0;
pGroupResInfo->pBuf = NULL;
pGroupResInfo->freeItem = false;
}
void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, SSDataBlock* pBlock) {
@ -2874,10 +2878,12 @@ void destroyStreamStateOperatorInfo(void* param) {
}
colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes);
taosArrayDestroy(pInfo->historyWins);
tSimpleHashCleanup(pInfo->pSeUpdated);
tSimpleHashCleanup(pInfo->pSeDeleted);
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins);
blockDataDestroy(pInfo->pCheckpointRes);
taosMemoryFreeClear(param);

View File

@ -345,7 +345,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
return hasRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey));
#else
SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), NULL, NULL);
#endif
}

View File

@ -25,7 +25,8 @@
#define FLUSH_RATIO 0.5
#define FLUSH_NUM 4
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024);
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024)
#define MIN_NUM_OF_ROW_BUFF 10240
struct SStreamFileState {
SList* usedBuffs;
@ -67,7 +68,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->usedBuffs = tdListNew(POINTER_BYTES);
pFileState->freeBuffs = tdListNew(POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
int32_t cap = TMIN(10240, pFileState->maxRowCount);
int32_t cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
pFileState->rowBuffMap = tSimpleHashInit(cap, hashFn);
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) {
goto _error;
@ -272,10 +273,12 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
*pVLen = pFileState->rowSize;
*pVal = *pos;
(*pos)->beUsed = true;
(*pos)->beFlushed = false;
return TSDB_CODE_SUCCESS;
}
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
pNewPos->beUsed = true;
pNewPos->beFlushed = false;
ASSERT(pNewPos->pRowBuff);
memcpy(pNewPos->pKey, pKey, keyLen);
@ -375,6 +378,10 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
ASSERT(pPos->pRowBuff && pFileState->rowSize > 0);
if (pPos->beFlushed) {
continue;
}
pPos->beFlushed = true;
if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
@ -513,6 +520,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
ASSERT(pVLen == pFileState->rowSize);
memcpy(pNewPos->pRowBuff, pVal, pVLen);
taosMemoryFreeClear(pVal);
pNewPos->beFlushed = true;
code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
destroyRowBuffPos(pNewPos);