From 5e0cbdf0b7b4632c6f3d284bd9d4207c4ddfeb13 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 29 Jun 2022 15:13:56 +0800 Subject: [PATCH] feat(stream): ignore close window --- source/common/src/tdatablock.c | 4 +- source/libs/executor/inc/executorimpl.h | 4 + source/libs/executor/src/timewindowoperator.c | 120 ++++++++++++------ 3 files changed, 89 insertions(+), 39 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 1ec298ee15..22999b38ee 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1662,8 +1662,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; int32_t len = 0; - len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|\n", flag, - (int32_t)pDataBlock->info.type, pDataBlock->info.childId); + len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|group id %lu|\n", flag, + (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId); for (int32_t j = 0; j < rows; j++) { len += snprintf(dumpBuf + len, size - len, "%s |", flag); for (int32_t k = 0; k < colNum; k++) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1cc3f9b874..aeef0fae2a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -416,6 +416,7 @@ typedef struct SIntervalAggOperatorInfo { STimeWindowAggSupp twAggSup; bool invertible; SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation. + bool ignoreCloseWindow; } SIntervalAggOperatorInfo; typedef struct SStreamFinalIntervalOperatorInfo { @@ -437,6 +438,7 @@ typedef struct SStreamFinalIntervalOperatorInfo { SArray* pPullWins; // SPullWindowInfo int32_t pullIndex; SSDataBlock* pPullDataRes; + bool ignoreCloseWindow; } SStreamFinalIntervalOperatorInfo; typedef struct SAggOperatorInfo { @@ -574,6 +576,7 @@ typedef struct SStreamSessionAggOperatorInfo { SArray* pChildren; // cache for children's result; final stream operator SPhysiNode* pPhyNode; // create new child bool isFinal; + bool ignoreCloseWindow; } SStreamSessionAggOperatorInfo; typedef struct STimeSliceOperatorInfo { @@ -617,6 +620,7 @@ typedef struct SStreamStateAggOperatorInfo { void* pDelIterator; SArray* pScanWindow; SArray* pChildren; // cache for children's result; + bool ignoreCloseWindow; } SStreamStateAggOperatorInfo; typedef struct SSortedMergeOperatorInfo { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 963e714972..ea5c4a2c8e 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -813,6 +813,16 @@ static void removeResults(SArray* pWins, SArray* pUpdated) { } } + +bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) { + ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0); + return pSup->maxTs != INT64_MIN && ts < pSup->maxTs - pSup->waterMark; +} + +bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) { + return isOverdue(pWin->ekey, pSup); +} + static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, int32_t scanFlag, SArray* pUpdated) { SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; @@ -830,15 +840,16 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->interval.precision, &pInfo->win); + int32_t ret = TSDB_CODE_SUCCESS; + if (!pInfo->ignoreCloseWindow || !isCloseWindow(&win, &pInfo->twAggSup)) { + ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, + pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } - int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, - pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { + if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResultRow(pResult, tableGroupId, pUpdated); } } @@ -864,9 +875,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup); } - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pBlock->info.rows, numOfOutput, pInfo->order); + if (!pInfo->ignoreCloseWindow || !isCloseWindow(&win, &pInfo->twAggSup)) { + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); + doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, + pBlock->info.rows, numOfOutput, pInfo->order); + } doCloseWindow(pResultRowInfo, pInfo, pResult); @@ -877,6 +890,12 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if (startPos < 0) { break; } + if (pInfo->ignoreCloseWindow && isCloseWindow(&nextWin, &pInfo->twAggSup)) { + ekey = ascScan ? nextWin.ekey : nextWin.skey; + forwardRows = + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); + continue; + } // null data, failed to allocate more memory buffer int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, @@ -885,10 +904,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - saveResultRow(pResult, tableGroupId, pUpdated); - } + if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { + saveResultRow(pResult, tableGroupId, pUpdated); } ekey = ascScan ? nextWin.ekey : nextWin.skey; @@ -1292,11 +1310,6 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { return TSDB_CODE_SUCCESS; } -bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) { - ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0); - return pSup->maxTs != INT64_MIN && pWin->ekey < pSup->maxTs - pSup->waterMark; -} - static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, SHashObj* pPullDataMap, SArray* closeWins) { void* pIte = NULL; @@ -1411,7 +1424,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); pOperator->status = OP_RES_TO_RETURN; - + printDataBlock(pInfo->binfo.pRes, "single interval"); return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } @@ -1521,6 +1534,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->interval = *pInterval; pInfo->execModel = pTaskInfo->execModel; pInfo->twAggSup = *pTwAggSupp; + pInfo->ignoreCloseWindow = false; if (pPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; @@ -2276,7 +2290,15 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->interval.precision, NULL); while (1) { - if (IS_FINAL_OP(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) && pInfo->pChildren) { + bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); + if (pInfo->ignoreCloseWindow && isClosed) { + startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); + if (startPos < 0) { + break; + } + continue; + } + if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) { bool ignore = true; SWinRes winRes = {.ts = nextWin.skey, .groupId = tableGroupId,}; void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinRes)); @@ -2684,6 +2706,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK); pInfo->pPullDataRes = createPullDataBlock(); + pInfo->ignoreCloseWindow = false; pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; @@ -2830,6 +2853,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->pChildren = NULL; pInfo->isFinal = false; pInfo->pPhyNode = pPhyNode; + pInfo->ignoreCloseWindow = false; pOperator->name = "StreamSessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; @@ -3007,6 +3031,9 @@ static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, SStre updateTimeWindowInfo(pTimeWindowData, &pCurWin->win, false); doApplyFunctions(pTaskInfo, pSup->pCtx, &pCurWin->win, pTimeWindowData, startIndex, winRows, tsCols, pSDataBlock->info.rows, numOutput, TSDB_ORDER_ASC); + SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, pCurWin->pos.pageId); + setBufPageDirty(bufPage, true); + releaseBufPage(pAggSup->pResultBuf, bufPage); return TSDB_CODE_SUCCESS; } @@ -3063,7 +3090,13 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, pWinInfo->isOutput = false; } taosArrayRemove(pInfo->streamAggSup.pCurWins, i); + SFilePage* tmpPage = getBufPage(pInfo->streamAggSup.pResultBuf, pWinInfo->pos.pageId); + releaseBufPage(pInfo->streamAggSup.pResultBuf, tmpPage); } + SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pCurWin->pos.pageId); + ASSERT(num > 0); + setBufPageDirty(bufPage, true); + releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage); } static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated, @@ -3083,22 +3116,23 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData SResultRow* pResult = NULL; int32_t winRows = 0; - if (pSDataBlock->pDataBlock != NULL) { - SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); - startTsCols = (int64_t*)pStartTsCol->pData; - SColumnInfoData* pEndTsCol = NULL; - if (hasEndTs) { - pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->endTsIndex); - } else { - pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); - } - endTsCols = (int64_t*)pEndTsCol->pData; + ASSERT(pSDataBlock->pDataBlock); + SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); + startTsCols = (int64_t*)pStartTsCol->pData; + SColumnInfoData* pEndTsCol = NULL; + if (hasEndTs) { + pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->endTsIndex); } else { - return; + pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); } + endTsCols = (int64_t*)pEndTsCol->pData; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; for (int32_t i = 0; i < pSDataBlock->info.rows;) { + if (pInfo->ignoreCloseWindow && isOverdue(endTsCols[i], &pInfo->twAggSup)) { + i++; + continue; + } int32_t winIndex = 0; SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, startTsCols[i], endTsCols[i], groupId, gap, &winIndex); winRows = @@ -3205,17 +3239,24 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin index = 0; } for (int32_t k = index; k < chWinSize; k++) { - SResultWindowInfo* pcw = taosArrayGet(pChWins, k); - if (pParentWin->win.skey <= pcw->win.skey && pcw->win.ekey <= pParentWin->win.ekey) { + SResultWindowInfo* pChWin = taosArrayGet(pChWins, k); + if (pParentWin->win.skey <= pChWin->win.skey && pChWin->win.ekey <= pParentWin->win.ekey) { SResultRow* pChResult = NULL; - setWindowOutputBuf(pcw, &pChResult, pChild->exprSupp.pCtx, groupId, numOfOutput, + setWindowOutputBuf(pChWin, &pChResult, pChild->exprSupp.pCtx, groupId, numOfOutput, pChild->exprSupp.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo); compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo); + SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pChWin->pos.pageId); + setBufPageDirty(bufPage, true); + releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage); continue; } break; } } + SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pParentWin->pos.pageId); + ASSERT(size > 0); + setBufPageDirty(bufPage, true); + releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage); } } @@ -3234,7 +3275,7 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra for (int32_t i = 0; i < size; i++) { void* pWin = taosArrayGet(pWins, i); SResultWindowInfo* pSeWin = fn(pWin); - if (pSeWin->win.ekey < pTwSup->maxTs - pTwSup->waterMark) { + if (isCloseWindow(&pSeWin->win, pTwSup)) { if (!pSeWin->isClosed) { pSeWin->isClosed = true; if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { @@ -3745,6 +3786,10 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); for (int32_t i = 0; i < pSDataBlock->info.rows; i += winRows) { + if (pInfo->ignoreCloseWindow && isOverdue(tsCols[i], &pInfo->twAggSup)) { + i++; + continue; + } char* pKeyData = colDataGetData(pKeyColInfo, i); int32_t winIndex = 0; bool allEqual = true; @@ -3895,6 +3940,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pDelRes->info.type = STREAM_DELETE; blockDataEnsureCapacity(pInfo->pDelRes, 64); pInfo->pChildren = NULL; + pInfo->ignoreCloseWindow = false; pOperator->name = "StreamStateAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;