From c790ac64d6c258d99db3787ab6095663a202b8f9 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 10 Jun 2022 15:54:20 +0800 Subject: [PATCH] feat(stream): distribute stream interval --- source/common/src/tdatablock.c | 3 +++ source/libs/executor/src/executorimpl.c | 6 ++++-- source/libs/executor/src/scanoperator.c | 15 ++++++++++++--- source/libs/executor/src/timewindowoperator.c | 6 ++---- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 66b12678ab..f8ce86a30c 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1219,6 +1219,8 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; pBlock->info.rowSize = pDataBlock->info.rowSize; pBlock->info.groupId = pDataBlock->info.groupId; + pBlock->info.childId = pDataBlock->info.childId; + pBlock->info.type = pDataBlock->info.type; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {0}; @@ -1499,6 +1501,7 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) { SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i); int32_t colNum = pDataBlock->info.numOfCols; int32_t rows = pDataBlock->info.rows; + printf("%s |block type %d |child id %d|\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId); for (int32_t j = 0; j < rows; j++) { printf("%s |", flag); for (int32_t k = 0; k < colNum; k++) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 13f1b976dc..e11d220fe2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4689,10 +4689,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { - int32_t children = 8; + qDebug("[******]create Semi"); + int32_t children = 0; pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) { - int32_t children = 0; + qDebug("[******]create Final"); + int32_t children = 1; pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6c513a05b7..6387802303 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -809,11 +809,18 @@ static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSD // return p; SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, pInfo->primaryTsIndex); ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); - colInfoDataEnsureCapacity(pCol, 0, size); + blockDataEnsureCapacity(pUpdateBlock, size); for (int32_t i = 0; i < size; i++) { TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->tsArray, i); colDataAppend(pCol, i, (char*)pTs, false); } + for (int32_t i = 0; i < pUpdateBlock->info.numOfCols; i++) { + if (i == pInfo->primaryTsIndex) { + continue; + } + SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, i); + colDataAppendNNULL(pCol, 0, size); + } pUpdateBlock->info.rows = size; pUpdateBlock->info.type = STREAM_REPROCESS; blockDataUpdateTsWindow(pUpdateBlock, 0); @@ -841,7 +848,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } int32_t current = pInfo->validBlockIndex++; - return taosArrayGetP(pInfo->pBlockLists, current); + SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); + blockDataUpdateTsWindow(pBlock, 0); + return pBlock; } else { if (pInfo->scanMode == STREAM_SCAN_FROM_RES) { blockDataDestroy(pInfo->pUpdateRes); @@ -940,7 +949,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } doFilter(pInfo->pCondition, pInfo->pRes, false); - blockDataUpdateTsWindow(pInfo->pRes, 0); + blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); break; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 29e5630a50..d981b5034c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1925,6 +1925,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); int32_t prevEndPos = (forwardRows - 1) * step + startPos; + ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); if (startPos < 0) { break; @@ -2003,10 +2004,7 @@ static void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_ } static int32_t getChildIndex(SSDataBlock* pBlock) { - // if (pBlock->info.type != STREAM_INVALID && pBlock->info.rows < 4) { // for test - // return pBlock->info.rows - 1; - // } - return 0; + return pBlock->info.childId; } static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {