From 7836b581a7d36ee48b7e5cac0dbbeeac96e2a8b8 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 11 Aug 2022 14:25:16 +0800 Subject: [PATCH] feat(stream): delete result --- source/libs/executor/inc/executorimpl.h | 3 +-- source/libs/executor/src/timewindowoperator.c | 27 ++++++++++--------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 75db151f85..839c9e61ed 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -694,6 +694,7 @@ typedef struct SSessionAggOperatorInfo { typedef struct SResultWindowInfo { SResultRowPosition pos; STimeWindow win; + uint64_t groupId; bool isOutput; bool isClosed; } SResultWindowInfo; @@ -1008,8 +1009,6 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex); bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap); -int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, - TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted); bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 2b67f2bf01..6020794429 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3734,7 +3734,7 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star return insertNewSessionWindow(pWinInfos, startTs, index + 1); } -int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, int32_t rows, +int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted) { for (int32_t i = start; i < rows; ++i) { if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) { @@ -3742,7 +3742,8 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS } if (pWinInfo->win.skey > pStartTs[i]) { if (pStDeleted && pWinInfo->isOutput) { - taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY)); + SWinRes res = {.ts = pWinInfo->win.skey, .groupId = groupId}; + taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); pWinInfo->isOutput = false; } pWinInfo->win.skey = pStartTs[i]; @@ -3861,7 +3862,8 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, compactFunctions(pSup->pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo); taosHashRemove(pStUpdated, &pWinInfo->pos, sizeof(SResultRowPosition)); if (pWinInfo->isOutput) { - taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY)); + SWinRes res = {.ts = pWinInfo->win.skey, .groupId = groupId}; + taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); pWinInfo->isOutput = false; } taosArrayRemove(pInfo->streamAggSup.pCurWins, i); @@ -3911,7 +3913,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData int32_t winIndex = 0; SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, startTsCols[i], endTsCols[i], groupId, gap, &winIndex); winRows = - updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted); + updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, groupId, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted); code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pOperator); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -3960,6 +3962,7 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc } deleteWindow(pAggSup->pCurWins, winIndex, fp); if (result) { + pCurWin->groupId = gpDatas[i]; taosArrayPush(result, pCurWin); } } @@ -3980,7 +3983,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup, step = 1; continue; } - step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL); + step = updateSessionWindowInfo(pCurWin, tsCols, NULL, 0, pBlock->info.rows, i, gap, NULL); ASSERT(isInWindow(pCurWin, tsCols[i], gap)); doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput); if (result) { @@ -4017,12 +4020,11 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It blockDataEnsureCapacity(pBlock, size); size_t keyLen = 0; while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); - colDataAppend(pColInfoData, pBlock->info.rows, *Ite, false); - for (int32_t i = 1; i < taosArrayGetSize(pBlock->pDataBlock); i++) { - pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - colDataAppendNULL(pColInfoData, pBlock->info.rows); - } + SWinRes* res = *Ite; + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + colDataAppend(pTsCol, pBlock->info.rows, (const char*)&res->ts, false); + SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + colDataAppend(pGpCol, pBlock->info.rows, (const char*)&res->groupId, false); pBlock->info.rows += 1; if (pBlock->info.rows + 1 >= pBlock->info.capacity) { break; @@ -4149,7 +4151,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) { int32_t size = taosArrayGetSize(pResWins); for (int32_t i = 0; i < size; i++) { SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i); - taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY)); + SWinRes res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId}; + taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); } }