From 955f8f9f076e7ca27b48bf790c32ed0ee1cb8bf9 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 7 Apr 2023 18:45:09 +0800 Subject: [PATCH] feat:optimize get patitionby name --- include/libs/stream/streamState.h | 6 +++-- source/libs/executor/inc/executorimpl.h | 3 --- source/libs/executor/src/executorimpl.c | 27 ------------------- source/libs/executor/src/scanoperator.c | 1 + source/libs/executor/src/timewindowoperator.c | 8 +++--- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamState.c | 24 +++++++++++++++-- source/libs/stream/src/tstreamFileState.c | 10 ++++--- 8 files changed, 39 insertions(+), 42 deletions(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 8e142d3bca..4a63cfeb5f 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -17,6 +17,7 @@ #include "rocksdb/c.h" #include "tdbInt.h" +#include "tsimplehash.h" #include "tstreamFileState.h" #ifdef __cplusplus @@ -54,9 +55,10 @@ typedef struct STdbState { // incremental state storage typedef struct { - STdbState* pTdbState; + STdbState* pTdbState; SStreamFileState* pFileState; - int32_t number; + int32_t number; + SSHashObj* parNameMap; } SStreamState; SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 74f642271a..0544568c3b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -714,9 +714,6 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo void cleanupAggSup(SAggSupporter* pAggSup); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); - -void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, - SDiskbasedBuf* pBuf); void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 606816d853..e41ec5c7c7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1218,33 +1218,6 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS return 0; } -void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, - SDiskbasedBuf* pBuf) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSDataBlock* pBlock = pbInfo->pRes; - - // set output datablock version - pBlock->info.version = pTaskInfo->version; - - blockDataCleanup(pBlock); - if (!hasRemainResults(pGroupResInfo)) { - return; - } - - // clear the existed group id - pBlock->info.id.groupId = 0; - ASSERT(!pbInfo->mergeResultBlock); - doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); - - void* tbname = NULL; - if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { - pBlock->info.parTbName[0] = 0; - } else { - memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); - } - streamFreeVal(tbname); -} - void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2a69f12639..35448a9e07 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1130,6 +1130,7 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t } static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { + qInfo("do stream range scan. windows index:%d", *pRowIndex); while (1) { SSDataBlock* pResult = NULL; pResult = doTableScan(pInfo->pTableScanOp); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8b5750dbcf..5d7caabb74 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2627,11 +2627,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } if (!pInfo->pUpdated) { - pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES); + pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn); + pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn); } while (1) { @@ -4864,11 +4864,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { - pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES); + pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn); + pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn); } while (1) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 06819a1fd4..207be792ed 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -15,7 +15,7 @@ #include "streamInc.h" -#define STREAM_EXEC_MAX_BATCH_NUM 100 +#define STREAM_EXEC_MAX_BATCH_NUM 1024 static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { int32_t code; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index b11ef1b640..a68e16c0e6 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -26,6 +26,8 @@ #include "tcompare.h" #include "ttimer.h" +#define MAX_TABLE_NAME_NUM 100000 + int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { if (pWin1->groupId > pWin2->groupId) { return 1; @@ -133,6 +135,8 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int qWarn("open stream state2, %s", statePath); pState->pTdbState->pOwner = pTask; pState->pFileState = NULL; + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT); + pState->parNameMap = tSimpleHashInit(1024, hashFn); return pState; #else @@ -1020,7 +1024,14 @@ _end: int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { qWarn("try to write to cf parname"); #ifdef USE_ROCKSDB - return streamStatePutParName_rocksdb(pState, groupId, tbname); + if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) { + if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) { + streamStatePutParName_rocksdb(pState, groupId, tbname); + } + return TSDB_CODE_SUCCESS; + } + tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN); + return TSDB_CODE_SUCCESS; #else return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, pState->pTdbState->txn); @@ -1029,7 +1040,16 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) { #ifdef USE_ROCKSDB - return streamStateGetParName_rocksdb(pState, groupId, pVal); + void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)); + if (!pStr) { + if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) { + return streamStateGetParName_rocksdb(pState, groupId, pVal); + } + return TSDB_CODE_FAILED; + } + *pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); + memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN); + return TSDB_CODE_SUCCESS; #else int32_t len; return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index b2621a32c5..9949f67321 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -56,10 +56,12 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ if (!pFileState) { goto _error; } + pFileState->maxRowCount = TMAX( (uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->usedBuffs = tdListNew(POINTER_BYTES); pFileState->freeBuffs = tdListNew(POINTER_BYTES); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - pFileState->rowBuffMap = tSimpleHashInit(1024, hashFn); + int32_t cap = TMIN(10240, pFileState->maxRowCount); + pFileState->rowBuffMap = tSimpleHashInit(cap, hashFn); if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) { goto _error; } @@ -69,7 +71,6 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->checkPointVersion = 1; pFileState->pFileStore = pFile; pFileState->getTs = fp; - pFileState->maxRowCount = TMAX( (uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->curRowCount = 0; pFileState->deleteMark = delMark; pFileState->flushMark = -1; @@ -125,7 +126,9 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { ASSERT(pPos->pRowBuff != NULL); tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff)); pPos->pRowBuff = NULL; - tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen); + if (!all) { + tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen); + } destroyRowBuffPos(pPos); tdListPopNode(pFileState->usedBuffs, pNode); taosMemoryFreeClear(pNode); @@ -155,6 +158,7 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin i++; } } + qInfo("do stream state flush %d rows to disck. is used: %d", listNEles(pFlushList), used); } int32_t flushRowBuff(SStreamFileState* pFileState) {