stream count window sliding

This commit is contained in:
54liuyao 2024-01-26 15:56:26 +08:00
parent bc4f3d1296
commit 9e86dfaee7
18 changed files with 1065 additions and 799 deletions

View File

@ -283,6 +283,7 @@ typedef struct SWindowLogicNode {
EWindowAlgorithm windowAlgo;
bool isPartTb;
int64_t windowCount;
int64_t windowSliding;
} SWindowLogicNode;
typedef struct SFillLogicNode {
@ -634,6 +635,7 @@ typedef SEventWinodwPhysiNode SStreamEventWinodwPhysiNode;
typedef struct SCountWinodwPhysiNode {
SWindowPhysiNode window;
int64_t windowCount;
int64_t windowSliding;
} SCountWinodwPhysiNode;
typedef SCountWinodwPhysiNode SStreamCountWinodwPhysiNode;

View File

@ -282,6 +282,7 @@ typedef struct SCountWindowNode {
ENodeType type; // QUERY_NODE_EVENT_WINDOW
SNode* pCol; // timestamp primary key
int64_t windowCount;
int64_t windowSliding;
} SCountWindowNode;
typedef enum EFillMode {

View File

@ -373,6 +373,7 @@ typedef struct SStreamAggSupporter {
SStorageAPI* pSessionAPI;
struct SUpdateInfo* pUpdateInfo;
int32_t windowCount;
int32_t windowSliding;
} SStreamAggSupporter;
typedef struct SWindowSupporter {

View File

@ -160,6 +160,8 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo);

View File

@ -0,0 +1,205 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "ttime.h"
typedef struct SCountWindowOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SExprSupp scalarSup;
SWindowRowsSup winSup;
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
uint64_t groupId; // current group id, used to identify the data block from different groups
SResultRow* pRow;
} SCountWindowOperatorInfo;
void destroyCountWindowOperatorInfo(void* param) {
SCountWindowOperatorInfo* pInfo = (SCountWindowOperatorInfo*)param;
if (pInfo == NULL) {
return;
}
if (pInfo->pRow != NULL) {
taosMemoryFree(pInfo->pRow);
}
cleanupBasicInfo(&pInfo->binfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSup);
taosMemoryFreeClear(param);
}
int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;
SCountWindowOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes;
int64_t groupId = pBlock->info.id.groupId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
TSKEY* tsCols = (TSKEY*)pColInfoData->pData;
SWindowRowsSup* pRowSup = &pInfo->winSup;
int32_t rowIndex = 0;
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
// 1.如果group id发生变化获取新group id上一次的window的缓存并把旧group id的信息存入缓存。
// 2.计算 当前需要合并的行数
// 3.做聚集计算。
// 4.达到行数将结果存入pInfo->res中。
}
return code;
}
static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
SCountWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;
int32_t order = pInfo->binfo.inputTsOrder;
SSDataBlock* pRes = pInfo->binfo.pRes;
SOperatorInfo* downstream = pOperator->pDownstream[0];
blockDataCleanup(pRes);
while (1) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
break;
}
pRes->info.scanFlag = pBlock->info.scanFlag;
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
}
doCountWindowAggImpl(pOperator, pBlock);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return pRes;
}
}
return pRes->info.rows == 0 ? NULL : pRes;
}
SOperatorInfo* createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
SExecTaskInfo* pTaskInfo) {
SCountWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SCountWindowOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
int32_t code = TSDB_CODE_SUCCESS;
SCountWinodwPhysiNode* pCountWindowNode = (SCountWinodwPhysiNode*)physiNode;
pInfo->tsSlotId = ((SColumnNode*)pCountWindowNode->window.pTspk)->slotId;
if (pCountWindowNode->window.pExprs != NULL) {
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pCountWindowNode->window.pExprs, NULL, &numOfScalarExpr);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
code = filterInitFromNode((SNode*)pCountWindowNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &num);
initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
SSDataBlock* pResBlock = createDataBlockFromDescNode(pCountWindowNode->window.node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
initBasicInfo(&pInfo->binfo, pResBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->binfo.inputTsOrder = physiNode->inputTsOrder;
pInfo->binfo.outputTsOrder = physiNode->outputTsOrder;
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pCountWindowNode->window.watermark,
.calTrigger = pCountWindowNode->window.triggerType};
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
setOperatorInfo(pOperator, "CountWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, countWindowAggregate, NULL, destroyCountWindowOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
if (pInfo != NULL) {
destroyEWindowOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
SExprSupp* pExprSup, SAggSupporter* pAggSup) {
if (*pResult == NULL) {
SResultRow* p = taosMemoryCalloc(1, pAggSup->resultRowSize);
pResultRowInfo->cur = (SResultRowPosition){.pageId = p->pageId, .offset = p->offset};
*pResult = p;
}
(*pResult)->win = *win;
clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
}

View File

@ -1546,7 +1546,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
return TSDB_CODE_SUCCESS;
}
static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamScanMode mode) {
static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) {
blockDataCleanup(pDestBlock);
if (pSrcBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;

View File

@ -498,6 +498,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
goto _error;
}
pInfo->streamAggSup.windowCount = pCountNode->windowCount;
pInfo->streamAggSup.windowSliding = pCountNode->windowSliding;
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pCountNode->window.watermark,

View File

@ -332,6 +332,7 @@ static int32_t eventWindowNodeCopy(const SEventWindowNode* pSrc, SEventWindowNod
static int32_t countWindowNodeCopy(const SCountWindowNode* pSrc, SCountWindowNode* pDst) {
CLONE_NODE_FIELD(pCol);
COPY_SCALAR_FIELD(windowCount);
COPY_SCALAR_FIELD(windowSliding);
return TSDB_CODE_SUCCESS;
}
@ -558,6 +559,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
COPY_SCALAR_FIELD(igCheckUpdate);
COPY_SCALAR_FIELD(windowAlgo);
COPY_SCALAR_FIELD(windowCount);
COPY_SCALAR_FIELD(windowSliding);
return TSDB_CODE_SUCCESS;
}

View File

@ -2742,6 +2742,7 @@ static int32_t jsonToPhysiEventWindowNode(const SJson* pJson, void* pObj) {
}
static const char* jkCountWindowPhysiPlanWindowCount = "WindowCount";
static const char* jkCountWindowPhysiPlanWindowSliding = "WindowSliding";
static int32_t physiCountWindowNodeToJson(const void* pObj, SJson* pJson) {
const SCountWinodwPhysiNode* pNode = (const SCountWinodwPhysiNode*)pObj;
@ -2750,6 +2751,9 @@ static int32_t physiCountWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkCountWindowPhysiPlanWindowCount, pNode->windowCount);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkCountWindowPhysiPlanWindowSliding, pNode->windowSliding);
}
return code;
}
@ -2760,6 +2764,9 @@ static int32_t jsonToPhysiCountWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkCountWindowPhysiPlanWindowCount, &pNode->windowCount);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkCountWindowPhysiPlanWindowSliding, &pNode->windowSliding);
}
return code;
}
@ -4415,6 +4422,7 @@ static int32_t jsonToEventWindowNode(const SJson* pJson, void* pObj) {
static const char* jkCountWindowTsPrimaryKey = "CountTsPrimaryKey";
static const char* jkCountWindowCount = "CountWindowCount";
static const char* jkCountWindowSliding = "CountWindowSliding";
static int32_t countWindowNodeToJson(const void* pObj, SJson* pJson) {
const SCountWindowNode* pNode = (const SCountWindowNode*)pObj;
@ -4423,6 +4431,9 @@ static int32_t countWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkCountWindowCount, pNode->windowCount);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkCountWindowSliding, pNode->windowSliding);
}
return code;
}
@ -4433,6 +4444,9 @@ static int32_t jsonToCountWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkCountWindowCount, &pNode->windowCount);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkCountWindowSliding, &pNode->windowSliding);
}
return code;
}

View File

@ -3210,7 +3210,7 @@ static int32_t msgToPhysiEventWindowNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { PHY_COUNT_CODE_WINDOW = 1, PHY_COUNT_CODE_WINDOW_COUNT };
enum { PHY_COUNT_CODE_WINDOW = 1, PHY_COUNT_CODE_WINDOW_COUNT, PHY_COUNT_CODE_WINDOW_SLIDING };
static int32_t physiCountWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SCountWinodwPhysiNode* pNode = (const SCountWinodwPhysiNode*)pObj;
@ -3219,6 +3219,9 @@ static int32_t physiCountWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_COUNT_CODE_WINDOW_COUNT, pNode->windowCount);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_COUNT_CODE_WINDOW_SLIDING, pNode->windowSliding);
}
return code;
}
@ -3236,6 +3239,9 @@ static int32_t msgToPhysiCountWindowNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_COUNT_CODE_WINDOW_COUNT:
code = tlvDecodeI64(pTlv, &pNode->windowCount);
break;
case PHY_COUNT_CODE_WINDOW_SLIDING:
code = tlvDecodeI64(pTlv, &pNode->windowSliding);
break;
default:
break;
}

View File

@ -131,7 +131,7 @@ SNode* createOrderByExprNode(SAstCreateContext* pCxt, SNode* pExpr, EOrder order
SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* pGap);
SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr);
SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond);
SNode* createCountWindowNode(SAstCreateContext* pCxt, const SToken* pToken);
SNode* createCountWindowNode(SAstCreateContext* pCxt, const SToken* pCountToken, const SToken* pSlidingToken);
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding,
SNode* pFill);
SNode* createFillNode(SAstCreateContext* pCxt, EFillMode mode, SNode* pValues);

View File

@ -1153,7 +1153,9 @@ twindow_clause_opt(A) ::=
twindow_clause_opt(A) ::=
EVENT_WINDOW START WITH search_condition(B) END WITH search_condition(C). { A = createEventWindowNode(pCxt, B, C); }
twindow_clause_opt(A) ::=
COUNT_WINDOW NK_LP NK_INTEGER(B) NK_RP. { A = createCountWindowNode(pCxt, &B); }
COUNT_WINDOW NK_LP NK_INTEGER(B) NK_RP. { A = createCountWindowNode(pCxt, &B, &B); }
twindow_clause_opt(A) ::=
COUNT_WINDOW NK_LP NK_INTEGER(B) NK_COMMA NK_INTEGER(C) NK_RP. { A = createCountWindowNode(pCxt, &B, &C); }
sliding_opt(A) ::= . { A = NULL; }
sliding_opt(A) ::= SLIDING NK_LP interval_sliding_duration_literal(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }

View File

@ -883,7 +883,7 @@ SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode*
return (SNode*)pEvent;
}
SNode* createCountWindowNode(SAstCreateContext* pCxt, const SToken* pToken) {
SNode* createCountWindowNode(SAstCreateContext* pCxt, const SToken* pCountToken, const SToken* pSlidingToken) {
CHECK_PARSER_STATUS(pCxt);
SCountWindowNode* pCount = (SCountWindowNode*)nodesMakeNode(QUERY_NODE_COUNT_WINDOW);
CHECK_OUT_OF_MEM(pCount);
@ -892,7 +892,8 @@ SNode* createCountWindowNode(SAstCreateContext* pCxt, const SToken* pToken) {
nodesDestroyNode((SNode*)pCount);
CHECK_OUT_OF_MEM(NULL);
}
pCount->windowCount = taosStr2Int64(pToken->z, NULL, 10);
pCount->windowCount = taosStr2Int64(pCountToken->z, NULL, 10);
pCount->windowSliding = taosStr2Int64(pSlidingToken->z, NULL, 10);
return (SNode*)pCount;
}

View File

@ -7772,6 +7772,22 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
"Ignore expired data of Count window must be 1.");
}
SCountWindowNode* pCountWin = (SCountWindowNode*)pSelect->pWindow;
if (pCountWin->windowCount <= 1) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Size of Count window must exceed 1.");
}
if (pCountWin->windowSliding <= 1) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Size of Count window must exceed 1.");
}
if (pCountWin->windowCount > INT32_MAX) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Size of Count window must less than 2147483647(INT32_MAX).");
}
}
return TSDB_CODE_SUCCESS;

File diff suppressed because it is too large Load Diff

View File

@ -1022,6 +1022,7 @@ static int32_t createWindowLogicNodeByCount(SLogicPlanContext* pCxt, SCountWindo
pWindow->node.resultDataOrder =
pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_GLOBAL : pWindow->node.requireDataOrder;
pWindow->windowCount = pCount->windowCount;
pWindow->windowSliding = pCount->windowSliding;
pWindow->pTspk = nodesCloneNode(pCount->pCol);
if (NULL == pWindow->pTspk) {
nodesDestroyNode((SNode*)pWindow);

View File

@ -1758,6 +1758,7 @@ static int32_t createCountWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
return TSDB_CODE_OUT_OF_MEMORY;
}
pCount->windowCount = pWindowLogicNode->windowCount;
pCount->windowSliding = pWindowLogicNode->windowSliding;
int32_t code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pCount->window, pWindowLogicNode);
if (TSDB_CODE_SUCCESS == code) {

View File

@ -25,7 +25,12 @@ sql_error create stream streams1 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE
# All
sql_error create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart as s, count(*) c1, sum(b), max(c) from st count_window(3);
#2~INT32_MAX
sql_error create stream streams1 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt as select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(1);
sql_error create stream streams1 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt as select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(2147483648);
sql create stream streams2 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE 0 WATERMARK 10s into streamt2 as select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(2);
sql create stream streams3 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE 0 WATERMARK 10s into streamt3 as select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(2147483647);
print count1 end
system sh/exec.sh -n dnode1 -s stop -x SIGINT