diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index fa8fdb19d4..09f5d4b4b2 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -58,7 +58,7 @@ int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfVnodeQueryThreads = 4; int32_t tsNumOfVnodeStreamThreads = 2; -int32_t tsNumOfVnodeFetchThreads = 4; +int32_t tsNumOfVnodeFetchThreads = 1; int32_t tsNumOfVnodeWriteThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeRsmaThreads = 2; @@ -165,8 +165,8 @@ int32_t tsMqRebalanceInterval = 2; int32_t tsTtlUnit = 86400; int32_t tsTtlPushInterval = 86400; int32_t tsGrantHBInterval = 60; -int32_t tsUptimeInterval = 300; // seconds -char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits +int32_t tsUptimeInterval = 300; // seconds +char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { @@ -371,9 +371,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1; - tsNumOfVnodeFetchThreads = tsNumOfCores / 4; - tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); - if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1; + tsNumOfVnodeFetchThreads = 1; + if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeWriteThreads = tsNumOfCores; tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 29217e29a4..76f3c1b12d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -479,6 +479,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); +#if 1 + +#endif if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 45b88ec6a5..be8bdf7308 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -49,6 +49,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu SStreamScanInfo* pInfo = pOperator->info; +#if 0 // TODO: if a block was set but not consumed, // prevent setting a different type of block pInfo->validBlockIndex = 0; @@ -57,6 +58,10 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } else { taosArrayClear(pInfo->pBlockLists); } +#endif + + ASSERT(pInfo->validBlockIndex == 0); + ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0); if (type == STREAM_INPUT__MERGED_SUBMIT) { // ASSERT(numOfBlocks > 1); @@ -79,7 +84,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; + taosArrayPush(pInfo->pBlockLists, &pDataBlock); +#if 0 // TODO optimize SSDataBlock* p = createOneDataBlock(pDataBlock, false); p->info = pDataBlock->info; @@ -87,6 +94,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu taosArrayClear(p->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); +#endif } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else { @@ -100,6 +108,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); } void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) { +#if 0 SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) { return; @@ -116,6 +125,7 @@ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) { } else { ASSERT(0); } +#endif } int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5293e23fa7..6c5c33ae29 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -920,6 +920,19 @@ _error: } static void doClearBufferedBlocks(SStreamScanInfo* pInfo) { +#if 0 + if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { + size_t total = taosArrayGetSize(pInfo->pBlockLists); + for (int32_t i = 0; i < total; i++) { + SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i); + taosArrayDestroy(p->pDataBlock); + taosMemoryFree(p); + } + } +#endif + taosArrayClear(pInfo->pBlockLists); + pInfo->validBlockIndex = 0; +#if 0 size_t total = taosArrayGetSize(pInfo->pBlockLists); pInfo->validBlockIndex = 0; @@ -928,6 +941,7 @@ static void doClearBufferedBlocks(SStreamScanInfo* pInfo) { blockDataDestroy(p); } taosArrayClear(pInfo->pBlockLists); +#endif } static bool isSessionWindow(SStreamScanInfo* pInfo) { @@ -1580,9 +1594,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { size_t total = taosArrayGetSize(pInfo->pBlockLists); // TODO: refactor +FETCH_NEXT_BLOCK: if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { if (pInfo->validBlockIndex >= total) { - /*doClearBufferedBlocks(pInfo);*/ + doClearBufferedBlocks(pInfo); /*pOperator->status = OP_EXEC_DONE;*/ return NULL; } @@ -1606,27 +1621,40 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } break; case STREAM_DELETE_DATA: { printDataBlock(pBlock, "stream scan delete recv"); + SSDataBlock* pDelBlock = NULL; if (pInfo->tqReader) { - SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); + pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); filterDelBlockByUid(pDelBlock, pBlock, pInfo); - pBlock = pDelBlock; + } else { + pDelBlock = pBlock; } printDataBlock(pBlock, "stream scan delete recv filtered"); if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) { - generateDeleteResultBlock(pInfo, pBlock, pInfo->pDeleteDataRes); + generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; - printDataBlock(pBlock, "stream scan delete result"); - return pInfo->pDeleteDataRes; + printDataBlock(pDelBlock, "stream scan delete result"); + if (pInfo->pDeleteDataRes->info.rows > 0) { + return pInfo->pDeleteDataRes; + } else { + goto FETCH_NEXT_BLOCK; + } } else { pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->updateResIndex = 0; - generateScanRange(pInfo, pBlock, pInfo->pUpdateRes); + generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - printDataBlock(pBlock, "stream scan delete data"); - return pInfo->pDeleteDataRes; + printDataBlock(pDelBlock, "stream scan delete data"); + if (pInfo->tqReader) { + blockDataDestroy(pDelBlock); + } + if (pInfo->pDeleteDataRes->info.rows > 0) { + return pInfo->pDeleteDataRes; + } else { + goto FETCH_NEXT_BLOCK; + } } } break; default: @@ -1688,10 +1716,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists); + NEXT_SUBMIT_BLK: while (1) { if (pInfo->tqReader->pMsg == NULL) { if (pInfo->validBlockIndex >= totBlockNum) { updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); + doClearBufferedBlocks(pInfo); return NULL; } @@ -1763,7 +1793,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } qDebug("scan rows: %d", pBlockInfo->rows); - return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; + if (pBlockInfo->rows > 0) { + return pInfo->pRes; + } else { + goto NEXT_SUBMIT_BLK; + } + /*return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;*/ } else { ASSERT(0); return NULL; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b5ad3f9270..9e51f3638e 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -5658,7 +5658,6 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock* TSKEY* tsCols = NULL; SResultRow* pResult = NULL; int32_t forwardRows = 0; - int32_t aa = 4; ASSERT(pSDataBlock->pDataBlock != NULL); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);