From 25b735f0e3c101fedd7c140f4e7bff9ebe9410ba Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Mon, 22 Aug 2022 14:05:19 +0800 Subject: [PATCH] feat(stream): delete result --- include/libs/function/function.h | 1 + source/libs/executor/src/executil.c | 1 + source/libs/executor/src/timewindowoperator.c | 43 +++++++++++-------- tests/script/tsim/stream/state0.sim | 6 +-- 4 files changed, 31 insertions(+), 20 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index e708a2c42d..d5da306fd2 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -142,6 +142,7 @@ typedef struct SqlFunctionCtx { struct SSDataBlock *pDstBlock; // used by indifinite rows function to set selectivity int32_t curBufPage; bool increase; + bool isStream; char udfName[TSDB_FUNC_NAME_LEN]; } SqlFunctionCtx; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index bf969bf2e4..f3b395cc7c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -987,6 +987,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->end.key = INT64_MIN; pCtx->numOfParams = pExpr->base.numOfParams; pCtx->increase = false; + pCtx->isStream = false; pCtx->param = pFunct->pParam; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9eaab69633..0594a727fc 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1793,6 +1793,12 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSuppor pScanInfo->sessionSup.pIntervalAggSup = pSup; } +void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) { + for (int32_t i = 0; i < numOfExpr; i++) { + pCtx[i].isStream = true; + } +} + SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, @@ -1835,6 +1841,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* if (isStream) { ASSERT(numOfCols > 0); increaseTs(pSup->pCtx); + initStreamFunciton(pSup->pCtx, pSup->numOfExprs); } initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); @@ -3329,6 +3336,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + initStreamFunciton(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs); initBasicInfo(&pInfo->binfo, pResBlock); ASSERT(numOfCols > 0); @@ -3470,6 +3478,7 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* if (code != TSDB_CODE_SUCCESS) { return code; } + initStreamFunciton(pSup->pCtx, pSup->numOfExprs); initBasicInfo(pBasicInfo, pResultBlock); @@ -4569,8 +4578,8 @@ SStateWindowInfo* getStateWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_ return insertNewStateWindow(pWinInfos, ts, pKeyData, index + 1, pCol); } -int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, SColumnInfoData* pKeyCol, int32_t rows, - int32_t start, bool* allEqual, SHashObj* pSeDelete) { +int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, uint64_t groupId, + SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual, SHashObj* pSeDeleted) { *allEqual = true; SStateWindowInfo* pWinInfo = taosArrayGet(pWinInfos, winIndex); for (int32_t i = start; i < rows; ++i) { @@ -4590,9 +4599,10 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, S } } if (pWinInfo->winInfo.win.skey > pTs[i]) { - if (pSeDelete && pWinInfo->winInfo.isOutput) { - taosHashPut(pSeDelete, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &pWinInfo->winInfo.win.skey, - sizeof(TSKEY)); + if (pSeDeleted && pWinInfo->winInfo.isOutput) { + SWinRes res = {.ts = pWinInfo->winInfo.win.skey, .groupId = groupId}; + taosHashPut(pSeDeleted, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &res, + sizeof(SWinRes)); pWinInfo->winInfo.isOutput = false; } pWinInfo->winInfo.win.skey = pTs[i]; @@ -4605,22 +4615,23 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, S return rows - start; } -static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int32_t tsIndex, SColumn* pCol, - int32_t keyIndex, SHashObj* pSeUpdated, SHashObj* pSeDeleted) { +static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, + int32_t tsIndex, SColumn* pCol, int32_t keyIndex, SHashObj* pSeUpdated, SHashObj* pSeDeleted) { SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); SColumnInfoData* pKeyColInfo = taosArrayGet(pBlock->pDataBlock, keyIndex); TSKEY* tsCol = (TSKEY*)pTsColInfo->pData; bool allEqual = false; int32_t step = 1; + uint64_t groupId = pBlock->info.groupId; for (int32_t i = 0; i < pBlock->info.rows; i += step) { char* pKeyData = colDataGetData(pKeyColInfo, i); int32_t winIndex = 0; - SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], pBlock->info.groupId, &winIndex); + SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], groupId, &winIndex); if (!pCurWin) { continue; } - step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, pKeyColInfo, pBlock->info.rows, i, &allEqual, - pSeDeleted); + step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, groupId, pKeyColInfo, + pBlock->info.rows, i, &allEqual, pSeDeleted); ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData)); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo); @@ -4659,12 +4670,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl int32_t winIndex = 0; bool allEqual = true; SStateWindowInfo* pCurWin = - getStateWindow(pAggSup, tsCols[i], pSDataBlock->info.groupId, pKeyData, &pInfo->stateCol, &winIndex); - winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, pKeyColInfo, pSDataBlock->info.rows, i, - &allEqual, pInfo->pSeDeleted); + getStateWindow(pAggSup, tsCols[i], groupId, pKeyData, &pInfo->stateCol, &winIndex); + winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, groupId, pKeyColInfo, + pSDataBlock->info.rows, i, &allEqual, pStDeleted); if (!allEqual) { appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, - &pSDataBlock->info.groupId); + &groupId); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo); continue; @@ -4828,9 +4839,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pDelIterator = NULL; - // pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); - pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete - pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete + pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pChildren = NULL; pInfo->ignoreExpiredData = pStateNode->window.igExpired; diff --git a/tests/script/tsim/stream/state0.sim b/tests/script/tsim/stream/state0.sim index 4fa883b813..877a2877b9 100644 --- a/tests/script/tsim/stream/state0.sim +++ b/tests/script/tsim/stream/state0.sim @@ -5,15 +5,15 @@ sleep 50 sql connect print =============== create database -sql create database test vgroups 1 -sql select * from information_schema.ins_databases +sql create database test vgroups 1; +sql select * from information_schema.ins_databases; if $rows != 3 then return -1 endi print $data00 $data01 $data02 -sql use test +sql use test; sql create table t1(ts timestamp, a int, b int , c int, d double, id int); sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);