From ae294b98c2b58bffcdc7550cc579c279f073d57a Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 21 Sep 2022 15:15:01 +0800 Subject: [PATCH 1/3] enh(stream): blocklist do not copy --- source/common/src/tglobal.c | 11 +++++------ source/libs/executor/src/executor.c | 13 ++++++++----- source/libs/executor/src/scanoperator.c | 18 +++++++++++------- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index a6e30ae0b5..80be8c6a35 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -58,7 +58,7 @@ int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfVnodeQueryThreads = 4; int32_t tsNumOfVnodeStreamThreads = 2; -int32_t tsNumOfVnodeFetchThreads = 4; +int32_t tsNumOfVnodeFetchThreads = 1; int32_t tsNumOfVnodeWriteThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeRsmaThreads = 2; @@ -162,8 +162,8 @@ int32_t tsMqRebalanceInterval = 2; int32_t tsTtlUnit = 86400; int32_t tsTtlPushInterval = 86400; int32_t tsGrantHBInterval = 60; -int32_t tsUptimeInterval = 300; // seconds -char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits +int32_t tsUptimeInterval = 300; // seconds +char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { @@ -366,9 +366,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1; - tsNumOfVnodeFetchThreads = tsNumOfCores / 4; - tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); - if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1; + tsNumOfVnodeFetchThreads = 1; + if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeWriteThreads = tsNumOfCores; tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 45b88ec6a5..a36307de93 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -52,11 +52,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu // TODO: if a block was set but not consumed, // prevent setting a different type of block pInfo->validBlockIndex = 0; - if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { - taosArrayClearP(pInfo->pBlockLists, taosMemoryFree); - } else { - taosArrayClear(pInfo->pBlockLists); - } + /*if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {*/ + /*taosArrayClearP(pInfo->pBlockLists, taosMemoryFree);*/ + /*} else {*/ + taosArrayClear(pInfo->pBlockLists); + /*}*/ if (type == STREAM_INPUT__MERGED_SUBMIT) { // ASSERT(numOfBlocks > 1); @@ -79,7 +79,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; + taosArrayPush(pInfo->pBlockLists, &pDataBlock); +#if 0 // TODO optimize SSDataBlock* p = createOneDataBlock(pDataBlock, false); p->info = pDataBlock->info; @@ -87,6 +89,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu taosArrayClear(p->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); +#endif } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4d16b2e170..8dcc2cd39b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1602,26 +1602,30 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } break; case STREAM_DELETE_DATA: { printDataBlock(pBlock, "stream scan delete recv"); + SSDataBlock* pDelBlock = NULL; if (pInfo->tqReader) { - SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); + pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); filterDelBlockByUid(pDelBlock, pBlock, pInfo); - pBlock = pDelBlock; + } else { + pDelBlock = pBlock; } - printDataBlock(pBlock, "stream scan delete recv filtered"); if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) { - generateDeleteResultBlock(pInfo, pBlock, pInfo->pDeleteDataRes); + generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; - printDataBlock(pBlock, "stream scan delete result"); + printDataBlock(pDelBlock, "stream scan delete result"); return pInfo->pDeleteDataRes; } else { pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->updateResIndex = 0; - generateScanRange(pInfo, pBlock, pInfo->pUpdateRes); + generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - printDataBlock(pBlock, "stream scan delete data"); + printDataBlock(pDelBlock, "stream scan delete data"); + if (pInfo->tqReader) { + blockDataDestroy(pDelBlock); + } return pInfo->pDeleteDataRes; } } break; From 7ade2b3b8d1d29ca73fbd68b2f2de384c2a75749 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 21 Sep 2022 16:00:05 +0800 Subject: [PATCH 2/3] fix double free in sma --- source/dnode/vnode/src/tq/tq.c | 3 ++ source/libs/executor/src/executor.c | 21 ++++++++------ source/libs/executor/src/scanoperator.c | 28 +++++++++++++++++-- source/libs/executor/src/timewindowoperator.c | 1 - 4 files changed, 41 insertions(+), 12 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 29217e29a4..76f3c1b12d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -479,6 +479,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); +#if 1 + +#endif if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a36307de93..8d2c66ff90 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -49,14 +49,19 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu SStreamScanInfo* pInfo = pOperator->info; +#if 0 // TODO: if a block was set but not consumed, // prevent setting a different type of block pInfo->validBlockIndex = 0; - /*if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {*/ - /*taosArrayClearP(pInfo->pBlockLists, taosMemoryFree);*/ - /*} else {*/ - taosArrayClear(pInfo->pBlockLists); - /*}*/ + if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { + taosArrayClearP(pInfo->pBlockLists, taosMemoryFree); + } else { + taosArrayClear(pInfo->pBlockLists); + } +#endif + + ASSERT(pInfo->validBlockIndex == 0); + ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0); if (type == STREAM_INPUT__MERGED_SUBMIT) { // ASSERT(numOfBlocks > 1); @@ -79,17 +84,15 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - taosArrayPush(pInfo->pBlockLists, &pDataBlock); -#if 0 // TODO optimize SSDataBlock* p = createOneDataBlock(pDataBlock, false); + /*qError("alloc p i, %d %p", i, p);*/ p->info = pDataBlock->info; taosArrayClear(p->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); -#endif } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else { @@ -103,6 +106,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); } void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) { +#if 0 SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) { return; @@ -119,6 +123,7 @@ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) { } else { ASSERT(0); } +#endif } int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8dcc2cd39b..dc9e0e8d45 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -920,6 +920,17 @@ _error: } static void doClearBufferedBlocks(SStreamScanInfo* pInfo) { + if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { + size_t total = taosArrayGetSize(pInfo->pBlockLists); + for (int32_t i = 0; i < total; i++) { + SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i); + taosArrayDestroy(p->pDataBlock); + taosMemoryFree(p); + } + } + taosArrayClear(pInfo->pBlockLists); + pInfo->validBlockIndex = 0; +#if 0 size_t total = taosArrayGetSize(pInfo->pBlockLists); pInfo->validBlockIndex = 0; @@ -928,6 +939,7 @@ static void doClearBufferedBlocks(SStreamScanInfo* pInfo) { blockDataDestroy(p); } taosArrayClear(pInfo->pBlockLists); +#endif } static bool isSessionWindow(SStreamScanInfo* pInfo) { @@ -1576,9 +1588,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { size_t total = taosArrayGetSize(pInfo->pBlockLists); // TODO: refactor +FETCH_NEXT_BLOCK: if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { if (pInfo->validBlockIndex >= total) { - /*doClearBufferedBlocks(pInfo);*/ + doClearBufferedBlocks(pInfo); /*pOperator->status = OP_EXEC_DONE;*/ return NULL; } @@ -1613,7 +1626,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; printDataBlock(pDelBlock, "stream scan delete result"); - return pInfo->pDeleteDataRes; + if (pInfo->pDeleteDataRes->info.rows > 0) { + return pInfo->pDeleteDataRes; + } else { + goto FETCH_NEXT_BLOCK; + } } else { pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->updateResIndex = 0; @@ -1626,7 +1643,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pInfo->tqReader) { blockDataDestroy(pDelBlock); } - return pInfo->pDeleteDataRes; + if (pInfo->pDeleteDataRes->info.rows > 0) { + return pInfo->pDeleteDataRes; + } else { + goto FETCH_NEXT_BLOCK; + } } } break; default: @@ -1691,6 +1712,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { while (1) { if (pInfo->tqReader->pMsg == NULL) { if (pInfo->validBlockIndex >= totBlockNum) { + doClearBufferedBlocks(pInfo); return NULL; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index aed4ae9b26..f10e5e33f1 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -5655,7 +5655,6 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock* TSKEY* tsCols = NULL; SResultRow* pResult = NULL; int32_t forwardRows = 0; - int32_t aa = 4; ASSERT(pSDataBlock->pDataBlock != NULL); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); From ab94dafed407e88309cd6d3edffbde5a8e0cc133 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 23 Sep 2022 11:37:50 +0800 Subject: [PATCH 3/3] optimize block cache --- source/libs/executor/src/executor.c | 4 +++- source/libs/executor/src/scanoperator.c | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 8d2c66ff90..be8bdf7308 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -84,15 +84,17 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; + taosArrayPush(pInfo->pBlockLists, &pDataBlock); +#if 0 // TODO optimize SSDataBlock* p = createOneDataBlock(pDataBlock, false); - /*qError("alloc p i, %d %p", i, p);*/ p->info = pDataBlock->info; taosArrayClear(p->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); +#endif } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 56fbb21309..6c5c33ae29 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -920,6 +920,7 @@ _error: } static void doClearBufferedBlocks(SStreamScanInfo* pInfo) { +#if 0 if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { size_t total = taosArrayGetSize(pInfo->pBlockLists); for (int32_t i = 0; i < total; i++) { @@ -928,6 +929,7 @@ static void doClearBufferedBlocks(SStreamScanInfo* pInfo) { taosMemoryFree(p); } } +#endif taosArrayClear(pInfo->pBlockLists); pInfo->validBlockIndex = 0; #if 0