From 7d699ae579088ce2f075417a6925a0a3eae260e8 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 30 May 2023 15:16:11 +0800 Subject: [PATCH 01/16] fix sliding window issue --- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/timewindowoperator.c | 11 ++++++----- source/libs/stream/src/streamState.c | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2d2f377041..f11b083ad6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1883,7 +1883,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pInfo->pRecoverRes != NULL) { pInfo->blockRecoverContiCnt++; calBlockTbName(pInfo, pInfo->pRecoverRes); - if (pInfo->pUpdateInfo) { + if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) { TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index aa0aa9799b..2cdf9bb15c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1623,7 +1623,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup.parentType = type; pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; - if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { + if (!pScanInfo->pUpdateInfo) { pScanInfo->pUpdateInfo = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark); } @@ -2150,28 +2150,29 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB } void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) { - SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); TSKEY* tsData = (TSKEY*)pStartCol->pData; - SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); TSKEY* tsEndData = (TSKEY*)pEndCol->pData; SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* groupIdData = (uint64_t*)pGroupCol->pData; int32_t chId = getChildIndex(pBlock); for (int32_t i = 0; i < pBlock->info.rows; i++) { TSKEY winTs = tsData[i]; - while (winTs < tsEndData[i]) { + while (winTs <= tsEndData[i]) { SWinKey winRes = {.ts = winTs, .groupId = groupIdData[i]}; void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey)); if (chIds) { SArray* chArray = *(SArray**)chIds; int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); if (index != -1) { - qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId); + qDebug("===stream===retrive window %" PRId64 " delete child id %d", winRes.ts, chId); taosArrayRemove(chArray, index); if (taosArrayGetSize(chArray) == 0) { // pull data is over taosArrayDestroy(chArray); taosHashRemove(pMap, &winRes, sizeof(SWinKey)); + qDebug("===stream===retrive pull data over.window %" PRId64 , winRes.ts); } } } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index ba24a581e9..71a21ac150 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1062,7 +1062,7 @@ _end: } int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { - qWarn("try to write to cf parname"); + qDebug("try to write to cf parname"); #ifdef USE_ROCKSDB if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) { if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) { From e96ed81ec0df8c6eacdaa0c38c44e8099c9c3346 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 May 2023 17:29:01 +0800 Subject: [PATCH 02/16] fix(stream): add input queue capacity check. --- source/libs/stream/src/stream.c | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d3e4b23ad1..0d772247b4 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -132,8 +132,6 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); // input queue is full, upstream is blocked now status = (code == TSDB_CODE_SUCCESS)? TASK_INPUT_STATUS__NORMAL:TASK_INPUT_STATUS__BLOCKED; - - } // rsp by input status @@ -304,10 +302,15 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - taosWriteQitem(pTask->inputQueue->queue, pItem); + int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); + if (code != TSDB_CODE_SUCCESS) { + streamDataSubmitDestroy(px); + taosFreeQitem(pItem); + return code; + } } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { + if (/*(pTask->taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, size); @@ -317,10 +320,16 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { } qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); - taosWriteQitem(pTask->inputQueue->queue, pItem); + int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); + if (code != TSDB_CODE_SUCCESS) { + destroyStreamDataBlock((SStreamDataBlock*) pItem); + taosFreeQitem(pItem); + return code; + } } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__GET_RES) { + // use the default memory limit, refactor later. taosWriteQitem(pTask->inputQueue->queue, pItem); qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } From 937f67f07b0a89dc1df5a5e01c4dbfa2e31b76e5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 May 2023 18:02:42 +0800 Subject: [PATCH 03/16] fix(stream): fix invalid free. --- source/libs/stream/src/stream.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 0d772247b4..03013dbf3e 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -315,7 +315,6 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, size); destroyStreamDataBlock((SStreamDataBlock*) pItem); - taosFreeQitem(pItem); return -1; } From d9cc9f21574f4799e827c5792e85e4efa4ce3db1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 May 2023 18:03:21 +0800 Subject: [PATCH 04/16] fix(stream): remove invalid free. --- source/libs/stream/src/stream.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 03013dbf3e..6b69f19427 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -322,7 +322,6 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); if (code != TSDB_CODE_SUCCESS) { destroyStreamDataBlock((SStreamDataBlock*) pItem); - taosFreeQitem(pItem); return code; } } else if (type == STREAM_INPUT__CHECKPOINT) { From fc20ca8423dbe4b637d3465adea0f8fce692072e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 May 2023 18:42:23 +0800 Subject: [PATCH 05/16] fix(stream): set the correct rps flag as the dispatch rsp. --- source/libs/stream/src/stream.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 6b69f19427..8bca0e6ec2 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -233,8 +233,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { - ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED); - qDebug("s-task:%s receive dispatch rsp, code: %x", pTask->id.idStr, code); + qDebug("s-task:%s receive dispatch rsp, status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); @@ -246,13 +245,16 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); ASSERT(old == TASK_OUTPUT_STATUS__WAIT); + + // the input queue of the (down stream) task that receive the output data is full, so the TASK_INPUT_STATUS_BLOCKED is rsp + // todo we need to send EMPTY PACKAGE to detect if the input queue is available for output of upstream task, every 50 ms. if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { // TODO: init recover timer - ASSERT(0); + qError("s-task:%s inputQ of downstream task:0x%x is full, need to block output", pTask->id.idStr, pRsp->downstreamTaskId); return 0; } - // continue dispatch one block to down stream in pipeline + // otherwise, continue dispatch the first block to down stream task in pipeline streamDispatchStreamBlock(pTask); return 0; } From 67becbd4d1562dba82fcbb7b31a16bd7a0c4446a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 31 May 2023 01:10:36 +0000 Subject: [PATCH 06/16] change db param --- source/libs/stream/src/streamBackendRocksdb.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f7638a42ae..f7a8ec4564 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -92,6 +92,7 @@ void* streamBackendInit(const char* path) { rocksdb_options_set_recycle_log_file_num(opts, 6); rocksdb_options_set_max_write_buffer_number(opts, 2); rocksdb_options_set_info_log_level(opts, 0); + rocksdb_options_set_db_write_buffer_size(opts, 256 << 20); pHandle->env = env; pHandle->dbOpt = opts; From 3b0a6d7f0b0e5082ef074c536b0afd45dff3c36a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 31 May 2023 03:15:25 +0000 Subject: [PATCH 07/16] change db param --- include/common/tglobal.h | 2 +- source/common/src/tglobal.c | 16 ++++++++++------ source/libs/stream/src/streamBackendRocksdb.c | 12 +++++++++++- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 157e37f080..ced94e0f11 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -29,7 +29,6 @@ extern "C" { #define SLOW_LOG_TYPE_OTHERS 0x4 #define SLOW_LOG_TYPE_ALL 0xFFFFFFFF - // cluster extern char tsFirst[]; extern char tsSecond[]; @@ -181,6 +180,7 @@ extern bool tsDisableStream; extern int64_t tsStreamBufferSize; extern int64_t tsCheckpointInterval; extern bool tsFilterScalarMode; +extern int32_t tsMaxStreamBackendCache; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c648f8551a..cc368167f8 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -60,6 +60,7 @@ int32_t tsNumOfQnodeQueryThreads = 4; int32_t tsNumOfQnodeFetchThreads = 1; int32_t tsNumOfSnodeStreamThreads = 4; int32_t tsNumOfSnodeWriteThreads = 1; +int32_t tsMaxStreamBackendCache = 256; // M // sync raft int32_t tsElectInterval = 25 * 1000; @@ -105,7 +106,7 @@ int32_t tsQueryPolicy = 1; int32_t tsQueryRspPolicy = 0; int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT bool tsEnableQueryHb = false; -bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true +bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true int32_t tsQuerySmaOptimize = 0; int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data. bool tsQueryPlannerTrace = false; @@ -117,8 +118,8 @@ int32_t tsRedirectFactor = 2; int32_t tsRedirectMaxPeriod = 1000; int32_t tsMaxRetryWaitTime = 10000; bool tsUseAdapter = false; -int32_t tsMetaCacheMaxSize = -1; // MB -int32_t tsSlowLogThreshold = 3; // seconds +int32_t tsMetaCacheMaxSize = -1; // MB +int32_t tsSlowLogThreshold = 3; // seconds int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL; /* @@ -349,7 +350,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1; if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1; - if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) return -1; + if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) + return -1; if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, 1) != 0) return -1; if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, true) != 0) return -1; if (cfgAddString(pCfg, "slowLogScope", "", true) != 0) return -1; @@ -524,6 +526,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, 0) != 0) return -1; if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, 0) != 0) return -1; GRANT_CFG_ADD; return 0; @@ -781,7 +784,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32; tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32; tsEnableQueryHb = cfgGetItem(pCfg, "enableQueryHb")->bval; - tsEnableScience = cfgGetItem(pCfg, "enableScience")->bval; + tsEnableScience = cfgGetItem(pCfg, "enableScience")->bval; tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32; tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval; tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32; @@ -902,7 +905,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i64; tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval; - + tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32; + GRANT_CFG_GET; return 0; } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f7a8ec4564..c8a6597bad 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -38,6 +38,15 @@ typedef struct { rocksdb_comparator_t** pCompares; } RocksdbCfInst; +uint32_t nextPow2(uint32_t x) { + x = x - 1; + x = x | (x >> 1); + x = x | (x >> 2); + x = x | (x >> 4); + x = x | (x >> 8); + x = x | (x >> 16); + return x + 1; +} int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf); void destroyRocksdbCfInst(RocksdbCfInst* inst); @@ -92,7 +101,8 @@ void* streamBackendInit(const char* path) { rocksdb_options_set_recycle_log_file_num(opts, 6); rocksdb_options_set_max_write_buffer_number(opts, 2); rocksdb_options_set_info_log_level(opts, 0); - rocksdb_options_set_db_write_buffer_size(opts, 256 << 20); + uint32_t dbLimit = nextPow2(tsMaxStreamBackendCache); + rocksdb_options_set_db_write_buffer_size(opts, dbLimit << 20); pHandle->env = env; pHandle->dbOpt = opts; From ed851b522cc6a9da43a009cb09145bc05fd4183a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 31 May 2023 03:30:10 +0000 Subject: [PATCH 08/16] change db param --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index cc368167f8..c5646d92d1 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -60,7 +60,7 @@ int32_t tsNumOfQnodeQueryThreads = 4; int32_t tsNumOfQnodeFetchThreads = 1; int32_t tsNumOfSnodeStreamThreads = 4; int32_t tsNumOfSnodeWriteThreads = 1; -int32_t tsMaxStreamBackendCache = 256; // M +int32_t tsMaxStreamBackendCache = 128; // M // sync raft int32_t tsElectInterval = 25 * 1000; From 18b971c666873525f0f8fade035af51f5995d7eb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 May 2023 17:06:38 +0800 Subject: [PATCH 09/16] fix(query): check the version range when dump partial rows of file block to sdata block directly. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 54 ++++++++++++++++++++++++-- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 362934ec84..9ab8654bd9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1121,6 +1121,27 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order); } + if ((pReader->verRange.maxVer >= pBlock->minVer && pReader->verRange.maxVer < pBlock->maxVer)|| + (pReader->verRange.minVer <= pBlock->maxVer && pReader->verRange.minVer > pBlock->minVer)) { + int32_t i = endPos; + + if (asc) { + for(; i >= 0; --i) { + if (pBlockData->aVersion[i] <= pReader->verRange.maxVer) { + break; + } + } + } else { + for(; i < pBlock->nRow; ++i) { + if (pBlockData->aVersion[i] >= pReader->verRange.minVer) { + break; + } + } + } + + endPos = i; + } + return endPos; } @@ -1260,10 +1281,11 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { return 0; } + // row index of dump info remain the initial position, let's find the appropriate start position. if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) { - if (asc && pReader->window.skey <= pBlock->minKey.ts) { + if (asc && pReader->window.skey <= pBlock->minKey.ts && pReader->verRange.minVer <= pBlock->minVer) { // pDumpInfo->rowIndex = 0; - } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) { + } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts && pReader->verRange.maxVer >= pBlock->maxVer) { // pDumpInfo->rowIndex = pBlock->nRow - 1; } else { // find the appropriate the start position in current block, and set it to be the current rowIndex int32_t pos = asc ? pBlock->nRow - 1 : 0; @@ -1279,6 +1301,29 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { pBlock->maxVer, pReader->idStr); return TSDB_CODE_INVALID_PARA; } + + ASSERT(pReader->verRange.minVer <= pBlock->maxVer && pReader->verRange.maxVer >= pBlock->minVer); + + // find the appropriate start position that satisfies the version requirement. + if ((pReader->verRange.maxVer >= pBlock->minVer && pReader->verRange.maxVer < pBlock->maxVer)|| + (pReader->verRange.minVer <= pBlock->maxVer && pReader->verRange.minVer > pBlock->minVer)) { + int32_t i = pDumpInfo->rowIndex; + if (asc) { + for(; i < pBlock->nRow; ++i) { + if (pBlockData->aVersion[i] >= pReader->verRange.minVer) { + break; + } + } + } else { + for(; i >= 0; --i) { + if (pBlockData->aVersion[i] <= pReader->verRange.maxVer) { + break; + } + } + } + + pDumpInfo->rowIndex = i; + } } } @@ -1293,6 +1338,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex); if (dumpedRows > pReader->resBlockInfo.capacity) { // output buffer check dumpedRows = pReader->resBlockInfo.capacity; + } else if (dumpedRows <= 0) { // no qualified rows in current data block, abort directly. + setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order); + return TSDB_CODE_SUCCESS; } int32_t i = 0; @@ -1848,7 +1896,7 @@ static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBloc SDataBlockToLoadInfo info = {0}; getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader); bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf || - info.overlapWithDelInfo || info.overlapWithLastBlock || info.partiallyRequired); + info.overlapWithDelInfo || info.overlapWithLastBlock); return isCleanFileBlock; } From fa5c024b3ee97914ae58e58f5507719eddf19977 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 May 2023 17:22:59 +0800 Subject: [PATCH 10/16] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 9ab8654bd9..7af0936bb5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2857,7 +2857,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { // it is a clean block, load it directly if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && pBlock->nRow <= pReader->resBlockInfo.capacity) { - if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) { + if (asc || (!hasDataInLastBlock(pLastBlockReader))) { code = copyBlockDataToSDataBlock(pReader); if (code) { goto _end; @@ -2876,7 +2876,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } } - SBlockData* pBlockData = &pReader->status.fileBlockData; + SBlockData* pBlockData = &pReader->status.fileBlockData; while (1) { bool hasBlockData = false; @@ -2890,7 +2890,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { pDumpInfo->rowIndex += step; - SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); + pBlock = getCurrentBlock(&pReader->status.blockIter); if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); // NOTE: get the new block info @@ -2918,7 +2918,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { // currently loaded file data block is consumed if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) { - SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); + pBlock = getCurrentBlock(&pReader->status.blockIter); setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); break; } From 65b81a803f7edda2ab3b0aa50c08c5c258c6c521 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 May 2023 18:05:53 +0800 Subject: [PATCH 11/16] refactor: do some internal refactor and add some logs. --- source/libs/stream/src/stream.c | 4 +++- source/libs/stream/src/streamDispatch.c | 8 ++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 8bca0e6ec2..72fd520498 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -237,7 +237,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); - qDebug("task %d is shuffle, left waiting rsp %d", pTask->id.taskId, leftRsp); + qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp); if (leftRsp > 0) { return 0; } @@ -246,6 +246,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); ASSERT(old == TASK_OUTPUT_STATUS__WAIT); + qDebug("s-task:%s receive dispatch rsp, output status:%d", pTask->id.idStr, pTask->outputStatus); + // the input queue of the (down stream) task that receive the output data is full, so the TASK_INPUT_STATUS_BLOCKED is rsp // todo we need to send EMPTY PACKAGE to detect if the input queue is available for output of upstream task, every 50 ms. if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 401a8b9e74..1e939cb071 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -510,14 +510,17 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { int8_t old = atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); if (old != TASK_OUTPUT_STATUS__NORMAL) { - qDebug("s-task:%s task wait for dispatch rsp, not dispatch now", pTask->id.idStr); + qDebug("s-task:%s task wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old); return 0; } + qDebug("s-task:%s start to dispatch msg, output status:%d", pTask->id.idStr, pTask->outputStatus); + SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue); if (pDispatchedBlock == NULL) { - qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); + qDebug("s-task:%s stop dispatching since no output in output queue, output status:%d", pTask->id.idStr, + pTask->outputStatus); return 0; } @@ -527,6 +530,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { streamQueueProcessFail(pTask->outputQueue); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); + qDebug("s-task:%s failed to dispatch msg to downstream, output status:%d", pTask->id.idStr, pTask->outputStatus); } // this block can be freed only when it has been pushed to down stream. From a3abe25539a07ca87fb051cdbf5fe4056aae8440 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 May 2023 19:03:37 +0800 Subject: [PATCH 12/16] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 6 ++++-- source/dnode/vnode/src/tq/tqSink.c | 12 +++++------- source/libs/stream/src/stream.c | 10 ++++++---- source/libs/stream/src/streamExec.c | 2 +- 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index dcfc578ac7..917f06ae14 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1272,13 +1272,15 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = ntohl(pRsp->upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - tqDebug("recv dispatch rsp, code:%x", pMsg->code); + + int32_t vgId = pTq->pStreamMeta->vgId; + tqDebug("vgId:%d recv dispatch rsp, code:%d", vgId, pMsg->code); if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } else { - return -1; + tqDebug("vgId:%d failed to find task:0x%x", vgId, taskId); } } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 0bd7d9a57b..6e02d5e21b 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -137,7 +137,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d int32_t blockSz = taosArrayGetSize(pBlocks); - tqDebug("vgId:%d, s-task:%s write results blocks:%d into table", TD_VID(pVnode), pTask->id.idStr, blockSz); + tqDebug("vgId:%d, s-task:%s write results %d blocks into table", TD_VID(pVnode), pTask->id.idStr, blockSz); void* pBuf = NULL; SArray* tagArray = NULL; @@ -482,17 +482,15 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tEncoderClear(&encoder); tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - SRpcMsg msg = { - .msgType = TDMT_VND_SUBMIT, - .pCont = pBuf, - .contLen = len, - }; - + SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { tqDebug("failed to put into write-queue since %s", terrstr()); } } } + + tqDebug("vgId:%d, s-task:%s write results completed", TD_VID(pVnode), pTask->id.idStr); + _end: taosArrayDestroy(tagArray); taosArrayDestroy(pVals); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 72fd520498..acc69c5a2b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -126,7 +126,7 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR if (pBlock == NULL) { streamTaskInputFail(pTask); status = TASK_INPUT_STATUS__FAILED; - qDebug("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, + qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, pTask->id.idStr); } else { int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); @@ -233,7 +233,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { - qDebug("s-task:%s receive dispatch rsp, status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); + qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); @@ -246,13 +246,15 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); ASSERT(old == TASK_OUTPUT_STATUS__WAIT); - qDebug("s-task:%s receive dispatch rsp, output status:%d", pTask->id.idStr, pTask->outputStatus); - // the input queue of the (down stream) task that receive the output data is full, so the TASK_INPUT_STATUS_BLOCKED is rsp // todo we need to send EMPTY PACKAGE to detect if the input queue is available for output of upstream task, every 50 ms. if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { // TODO: init recover timer qError("s-task:%s inputQ of downstream task:0x%x is full, need to block output", pTask->id.idStr, pRsp->downstreamTaskId); + + atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); + qError("s-task:%s ignore error, and reset task output status:%d", pTask->id.idStr, pTask->outputStatus); + return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 95b97e080a..d6f5533db4 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -385,7 +385,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pTask->taskLevel == TASK_LEVEL__SINK) { ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); - qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize); + qDebug("s-task:%s sink task start to sink %d blocks", pTask->id.idStr, batchSize); streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); continue; } From 9e021087a935c79bf56bef5e498e84427a7cdb45 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 May 2023 19:04:53 +0800 Subject: [PATCH 13/16] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 917f06ae14..6ec9020759 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1281,6 +1281,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; } else { tqDebug("vgId:%d failed to find task:0x%x", vgId, taskId); + return TSDB_CODE_INVALID_MSG; } } From 6d04c4e2fd3ab3463c422f73cb298a51a9b586c2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 May 2023 19:05:40 +0800 Subject: [PATCH 14/16] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6ec9020759..8cc791d1a6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1274,13 +1274,12 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); int32_t vgId = pTq->pStreamMeta->vgId; - tqDebug("vgId:%d recv dispatch rsp, code:%d", vgId, pMsg->code); if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } else { - tqDebug("vgId:%d failed to find task:0x%x", vgId, taskId); + tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId); return TSDB_CODE_INVALID_MSG; } } From 87deb36f8a3a6fe9c903e5be0f775478a0d15149 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 May 2023 19:36:02 +0800 Subject: [PATCH 15/16] refactor: do some internal refactor. --- source/libs/executor/src/executor.c | 5 ++++- source/libs/executor/src/scanoperator.c | 10 ++++++---- source/libs/stream/src/streamExec.c | 4 ++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fb35b211c9..ed8a4f3a3b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -132,7 +132,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pOperator->status = OP_NOT_OPENED; SStreamScanInfo* pInfo = pOperator->info; - qDebug("s-task:%s set source blocks:%d", id, (int32_t)numOfBlocks); + + qDebug("s-task:%s in this batch, all %d blocks need to be processed and dump results", id, (int32_t)numOfBlocks); ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0); if (type == STREAM_INPUT__MERGED_SUBMIT) { @@ -140,6 +141,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData)); taosArrayPush(pInfo->pBlockLists, pReq); } + pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; } else if (type == STREAM_INPUT__DATA_SUBMIT) { taosArrayPush(pInfo->pBlockLists, input); @@ -150,6 +152,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu SPackedData tmp = { .pDataBlock = pDataBlock }; taosArrayPush(pInfo->pBlockLists, &tmp); } + pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else { ASSERT(0); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2d2f377041..88f5642ef9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1797,9 +1797,10 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + const char* id = GET_TASKID(pTaskInfo); + SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStreamScanInfo* pInfo = pOperator->info; qDebug("stream scan started, %s", GET_TASKID(pTaskInfo)); @@ -1922,7 +1923,9 @@ FETCH_NEXT_BLOCK: return NULL; } - int32_t current = pInfo->validBlockIndex++; + int32_t current = pInfo->validBlockIndex++; + qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id); + SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current); SSDataBlock* pBlock = pPacked->pDataBlock; if (pBlock->info.parTbName[0]) { @@ -2057,7 +2060,6 @@ FETCH_NEXT_BLOCK: return pInfo->pUpdateRes; } - const char* id = GET_TASKID(pTaskInfo); SSDataBlock* pBlock = pInfo->pRes; SDataBlockInfo* pBlockInfo = &pBlock->info; int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d6f5533db4..0970bbcf0c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -16,8 +16,8 @@ #include "streamInc.h" // maximum allowed processed block batches. One block may include several submit blocks -#define MAX_STREAM_EXEC_BATCH_NUM 128 -#define MIN_STREAM_EXEC_BATCH_NUM 16 +#define MAX_STREAM_EXEC_BATCH_NUM 32 +#define MIN_STREAM_EXEC_BATCH_NUM 8 #define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000 static int32_t updateCheckPointInfo (SStreamTask* pTask); From ccc86f991637e101a1544a5a409fca5049b39740 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 May 2023 19:36:34 +0800 Subject: [PATCH 16/16] refactor: do some internal refactor. --- source/libs/stream/src/streamExec.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 0970bbcf0c..716b939e5f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -18,7 +18,7 @@ // maximum allowed processed block batches. One block may include several submit blocks #define MAX_STREAM_EXEC_BATCH_NUM 32 #define MIN_STREAM_EXEC_BATCH_NUM 8 -#define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000 +#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 static int32_t updateCheckPointInfo (SStreamTask* pTask);