diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index af673f163e..62b555d437 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -17,6 +17,7 @@ #include "rocksdb/c.h" #include "tdbInt.h" +#include "tsimplehash.h" #include "tstreamFileState.h" #ifdef __cplusplus @@ -57,6 +58,7 @@ typedef struct { STdbState* pTdbState; SStreamFileState* pFileState; int32_t number; + SSHashObj* parNameMap; } SStreamState; SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 74f642271a..0544568c3b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -714,9 +714,6 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo void cleanupAggSup(SAggSupporter* pAggSup); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); - -void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, - SDiskbasedBuf* pBuf); void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 606816d853..e41ec5c7c7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1218,33 +1218,6 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS return 0; } -void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, - SDiskbasedBuf* pBuf) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSDataBlock* pBlock = pbInfo->pRes; - - // set output datablock version - pBlock->info.version = pTaskInfo->version; - - blockDataCleanup(pBlock); - if (!hasRemainResults(pGroupResInfo)) { - return; - } - - // clear the existed group id - pBlock->info.id.groupId = 0; - ASSERT(!pbInfo->mergeResultBlock); - doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); - - void* tbname = NULL; - if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { - pBlock->info.parTbName[0] = 0; - } else { - memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); - } - streamFreeVal(tbname); -} - void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2a69f12639..35448a9e07 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1130,6 +1130,7 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t } static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { + qInfo("do stream range scan. windows index:%d", *pRowIndex); while (1) { SSDataBlock* pResult = NULL; pResult = doTableScan(pInfo->pTableScanOp); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8b5750dbcf..5d7caabb74 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2627,11 +2627,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } if (!pInfo->pUpdated) { - pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES); + pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn); + pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn); } while (1) { @@ -4864,11 +4864,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { - pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES); + pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn); + pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn); } while (1) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 06819a1fd4..207be792ed 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -15,7 +15,7 @@ #include "streamInc.h" -#define STREAM_EXEC_MAX_BATCH_NUM 100 +#define STREAM_EXEC_MAX_BATCH_NUM 1024 static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { int32_t code; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index b11ef1b640..a68e16c0e6 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -26,6 +26,8 @@ #include "tcompare.h" #include "ttimer.h" +#define MAX_TABLE_NAME_NUM 100000 + int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { if (pWin1->groupId > pWin2->groupId) { return 1; @@ -133,6 +135,8 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int qWarn("open stream state2, %s", statePath); pState->pTdbState->pOwner = pTask; pState->pFileState = NULL; + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT); + pState->parNameMap = tSimpleHashInit(1024, hashFn); return pState; #else @@ -1020,7 +1024,14 @@ _end: int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { qWarn("try to write to cf parname"); #ifdef USE_ROCKSDB - return streamStatePutParName_rocksdb(pState, groupId, tbname); + if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) { + if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) { + streamStatePutParName_rocksdb(pState, groupId, tbname); + } + return TSDB_CODE_SUCCESS; + } + tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN); + return TSDB_CODE_SUCCESS; #else return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, pState->pTdbState->txn); @@ -1029,7 +1040,16 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) { #ifdef USE_ROCKSDB - return streamStateGetParName_rocksdb(pState, groupId, pVal); + void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)); + if (!pStr) { + if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) { + return streamStateGetParName_rocksdb(pState, groupId, pVal); + } + return TSDB_CODE_FAILED; + } + *pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); + memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN); + return TSDB_CODE_SUCCESS; #else int32_t len; return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index e60a789e7b..34d4fa9d5f 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -57,10 +57,12 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ if (!pFileState) { goto _error; } + pFileState->maxRowCount = TMAX( (uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->usedBuffs = tdListNew(POINTER_BYTES); pFileState->freeBuffs = tdListNew(POINTER_BYTES); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - pFileState->rowBuffMap = tSimpleHashInit(1024, hashFn); + int32_t cap = TMIN(10240, pFileState->maxRowCount); + pFileState->rowBuffMap = tSimpleHashInit(cap, hashFn); if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) { goto _error; } @@ -126,7 +128,9 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { ASSERT(pPos->pRowBuff != NULL); tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff)); pPos->pRowBuff = NULL; - tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen); + if (!all) { + tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen); + } destroyRowBuffPos(pPos); tdListPopNode(pFileState->usedBuffs, pNode); taosMemoryFreeClear(pNode); @@ -156,6 +160,7 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin i++; } } + qInfo("do stream state flush %d rows to disck. is used: %d", listNEles(pFlushList), used); } int32_t flushRowBuff(SStreamFileState* pFileState) {