From 901ff9bf18bbf28b2b79a33be2bf513cf3016463 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 31 May 2022 15:31:28 +0800 Subject: [PATCH] feat(sma):files factor --- include/common/tmsg.h | 6 +- include/libs/nodes/cmdnodes.h | 2 +- include/libs/nodes/plannodes.h | 4 + include/libs/planner/planner.h | 1 + source/dnode/mnode/impl/inc/mndScheduler.h | 2 +- source/dnode/mnode/impl/src/mndScheduler.c | 3 +- source/dnode/mnode/impl/src/mndStb.c | 4 +- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 6 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/executor/inc/executorimpl.h | 5 +- source/libs/executor/src/executorimpl.c | 33 ++++++- source/libs/executor/src/scanoperator.c | 12 ++- source/libs/executor/src/timewindowoperator.c | 44 +++++++--- source/libs/nodes/src/nodesCloneFuncs.c | 5 ++ source/libs/nodes/src/nodesCodeFuncs.c | 15 +++- source/libs/parser/src/parAstCreater.c | 2 +- source/libs/planner/src/planLogicCreater.c | 4 + source/libs/planner/src/planOptimizer.c | 4 +- source/libs/planner/src/planPhysiCreater.c | 2 + source/libs/stream/src/tstreamUpdate.c | 4 +- .../script/tsim/sma/tsmaCreateInsertData.sim | 2 +- tests/script/tsim/stream/session0.sim | 4 +- tests/script/tsim/stream/session1.sim | 6 +- tests/script/tsim/stream/triggerInterval0.sim | 88 ------------------- 25 files changed, 132 insertions(+), 130 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index faf4addb4b..12d1c85879 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1439,8 +1439,10 @@ typedef struct { int32_t code; } STaskDropRsp; -#define STREAM_TRIGGER_AT_ONCE 1 -#define STREAM_TRIGGER_WINDOW_CLOSE 2 +#define STREAM_TRIGGER_AT_ONCE_SMA 0 +#define STREAM_TRIGGER_AT_ONCE 1 +#define STREAM_TRIGGER_WINDOW_CLOSE 2 +#define STREAM_TRIGGER_WINDOW_CLOSE_SMA 3 typedef struct { char name[TSDB_TABLE_FNAME_LEN]; diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 0bd6d73ce1..b437ad60b8 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -80,7 +80,7 @@ typedef struct SAlterDatabaseStmt { typedef struct STableOptions { ENodeType type; char comment[TSDB_TB_COMMENT_LEN]; - float filesFactor; + double filesFactor; SNodeList* pRollupFuncs; int32_t ttl; SNodeList* pSma; diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 2648a468dd..44e7295b69 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -59,6 +59,7 @@ typedef struct SScanLogicNode { int8_t triggerType; int64_t watermark; int16_t tsColId; + double filesFactor; } SScanLogicNode; typedef struct SJoinLogicNode { @@ -113,6 +114,7 @@ typedef struct SWindowLogicNode { SNode* pStateExpr; int8_t triggerType; int64_t watermark; + double filesFactor; } SWindowLogicNode; typedef struct SFillLogicNode { @@ -222,6 +224,7 @@ typedef struct STableScanPhysiNode { int8_t triggerType; int64_t watermark; int16_t tsColId; + double filesFactor; } STableScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode; @@ -272,6 +275,7 @@ typedef struct SWinodwPhysiNode { SNode* pTspk; // timestamp primary key int8_t triggerType; int64_t watermark; + double filesFactor; } SWinodwPhysiNode; typedef struct SIntervalPhysiNode { diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index c4f71e57a8..6e14445ac7 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -36,6 +36,7 @@ typedef struct SPlanContext { int64_t watermark; char* pMsg; int32_t msgLen; + double filesFactor; } SPlanContext; // Create the physical plan for the query, according to the AST. diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 9f4e377dd1..05aea3f68c 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -30,7 +30,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr, - int32_t* pLen); + int32_t* pLen, double filesFactor); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index d1404b96fe..eb8d2f0752 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -36,7 +36,7 @@ extern bool tsStreamSchedV; int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr, - int32_t* pLen) { + int32_t* pLen, double filesFactor) { SNode* pAst = NULL; SQueryPlan* pPlan = NULL; terrno = TSDB_CODE_SUCCESS; @@ -58,6 +58,7 @@ int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int .rSmaQuery = true, .triggerType = triggerType, .watermark = watermark, + .filesFactor = filesFactor, }; if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { terrno = TSDB_CODE_QRY_INVALID_INPUT; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index b33c09a0f9..d436b684ad 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -397,13 +397,13 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.pRSmaParam.xFilesFactor = pStb->xFilesFactor; req.pRSmaParam.delay = pStb->delay; if (pStb->ast1Len > 0) { - if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len) != + if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { return NULL; } } if (pStb->ast2Len > 0) { - if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len) != + if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { return NULL; } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 0e67d9e426..e3a0c94ccc 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -149,7 +149,7 @@ int32_t tdUpdateExpireWindow(SSma* pSma, SSubmitReq* pMsg, int64_t version); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); -int32_t tdProcessRSmaCreate(SSma* pSma, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb); +int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq* pReq); int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType); int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 0769da12bc..a1e397f11d 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -165,7 +165,10 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui * @param pReq * @return int32_t */ -int32_t tdProcessRSmaCreate(SSma *pSma, SMeta *pMeta, SVCreateStbReq *pReq, SMsgCb *pMsgCb) { +int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { + SSma *pSma = pVnode->pSma; + SMeta *pMeta = pVnode->pMeta; + SMsgCb *pMsgCb = &pVnode->msgCb; if (!pReq->rollup) { smaTrace("vgId:%d return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); return TSDB_CODE_SUCCESS; @@ -210,6 +213,7 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SMeta *pMeta, SVCreateStbReq *pReq, SMsg .reader = pReadHandle, .meta = pMeta, .pMsgCb = pMsgCb, + .vnode = pVnode, }; if (param->qmsg1) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 74b6982008..4b237bc703 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -360,7 +360,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, goto _err; } - tdProcessRSmaCreate(pVnode->pSma, pVnode->pMeta, &req, &pVnode->msgCb); + tdProcessRSmaCreate(pVnode, &req); tDecoderClear(&coder); return 0; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index cb45fa9730..8139e71f63 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -440,6 +440,7 @@ typedef struct STimeWindowSupp { int64_t waterMark; TSKEY maxTs; SColumnInfoData timeWindowData; // query time window info for scalar function execution. + SHashObj *winMap; } STimeWindowAggSupp; typedef struct SIntervalAggOperatorInfo { @@ -758,7 +759,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, - STimeWindowAggSupp* pTwSup, int16_t tsColId); + STimeWindowAggSupp* pTwSup); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, @@ -837,6 +838,8 @@ SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted); bool functionNeedToExecute(SqlFunctionCtx* pCtx); +int64_t getSmaWaterMark(int64_t interval, double filesFactor); +bool isSmaStream(int8_t triggerType); int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); #ifdef __cplusplus diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e7de886b03..7786ae009a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4529,8 +4529,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo)); } SArray* tableIdList = extractTableIdList(pTableListInfo); - SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode, - pTaskInfo, &twSup, pTableScanNode->tsColId); + SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, + tableIdList, pTableScanNode, pTaskInfo, &twSup); taosArrayDestroy(tableIdList); return pOperator; @@ -4631,7 +4631,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STimeWindowAggSupp as = {.waterMark = pIntervalPhyNode->window.watermark, .calTrigger = pIntervalPhyNode->window.triggerType, - .maxTs = INT64_MIN}; + .maxTs = INT64_MIN, + .winMap = NULL,}; + if (isSmaStream(pIntervalPhyNode->window.triggerType)) { + if (FLT_LESS(pIntervalPhyNode->window.filesFactor, 1.000000)) { + as.calTrigger = STREAM_TRIGGER_AT_ONCE_SMA; + } else { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); + as.winMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK); + as.waterMark = getSmaWaterMark(interval.interval, + pIntervalPhyNode->window.filesFactor); + as.calTrigger = STREAM_TRIGGER_WINDOW_CLOSE_SMA; + } + } int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo); @@ -5300,3 +5312,18 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey) { } return createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH); } + +int64_t getSmaWaterMark(int64_t interval, double filesFactor) { + int64_t waterMark = 0; + ASSERT(FLT_GREATEREQUAL(filesFactor,0.000000)); + waterMark = -1 * filesFactor; + return waterMark; +} + +bool isSmaStream(int8_t triggerType) { + if (triggerType == STREAM_TRIGGER_AT_ONCE || + triggerType == STREAM_TRIGGER_WINDOW_CLOSE) { + return false; + } + return true; +} diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b954eb3a22..3e14589faa 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -875,7 +875,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { if (rows == 0) { pOperator->status = OP_EXEC_DONE; } else if (pInfo->pUpdateInfo) { - SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); // TODO(liuyao) get invertible from plan + SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); if (upRes) { pInfo->pUpdateRes = upRes; if (upRes->info.type == STREAM_REPROCESS) { @@ -894,7 +894,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, - STimeWindowAggSupp* pTwSup, int16_t tsColId) { + STimeWindowAggSupp* pTwSup) { SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -939,8 +939,12 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan goto _error; } - pInfo->primaryTsIndex = tsColId; - if (pSTInfo->interval.interval > 0) { + if (isSmaStream(pTableScanNode->triggerType)) { + pTwSup->waterMark = getSmaWaterMark(pSTInfo->interval.interval, + pTableScanNode->filesFactor); + } + pInfo->primaryTsIndex = 0; // pTableScanNode->tsColId; + if (pSTInfo->interval.interval > 0 && pDataReader) { pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark); } else { pInfo->pUpdateInfo = NULL; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9b8c2f75ba..14344daf81 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -748,10 +748,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && - (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || - pInfo->twAggSup.calTrigger == 0) ) { - saveResult(pResult, tableGroupId, pUpdated); + if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE_SMA) { + saveResult(pResult, tableGroupId, pUpdated); + } + if (pInfo->twAggSup.winMap) { + taosHashRemove(pInfo->twAggSup.winMap, &win.skey, sizeof(TSKEY)); + } } int32_t forwardStep = 0; @@ -824,10 +828,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && - (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || - pInfo->twAggSup.calTrigger == 0) ) { - saveResult(pResult, tableGroupId, pUpdated); + if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE_SMA) { + saveResult(pResult, tableGroupId, pUpdated); + } + if (pInfo->twAggSup.winMap) { + taosHashRemove(pInfo->twAggSup.winMap, &win.skey, sizeof(TSKEY)); + } } ekey = ascScan? nextWin.ekey:nextWin.skey; @@ -1172,15 +1180,23 @@ static int32_t closeIntervalWindow(SHashObj *pHashMap, STimeWindowAggSupp *pSup, void* key = taosHashGetKey(pIte, &keyLen); uint64_t groupId = *(uint64_t*) key; ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); - TSKEY ts = *(uint64_t*) ((char*)key + sizeof(uint64_t)); + TSKEY ts = *(int64_t*) ((char*)key + sizeof(uint64_t)); SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL); if (win.ekey < pSup->maxTs - pSup->waterMark) { + if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE_SMA) { + if (taosHashGet(pSup->winMap, &win.skey, sizeof(TSKEY))) { + continue; + } + } char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); - taosHashRemove(pHashMap, keyBuf, keyLen); + if (pSup->calTrigger != STREAM_TRIGGER_AT_ONCE_SMA && + pSup->calTrigger != STREAM_TRIGGER_WINDOW_CLOSE_SMA) { + taosHashRemove(pHashMap, keyBuf, keyLen); + } SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); if (pos == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -1192,6 +1208,7 @@ static int32_t closeIntervalWindow(SHashObj *pHashMap, STimeWindowAggSupp *pSup, taosMemoryFree(pos); return TSDB_CODE_OUT_OF_MEMORY; } + taosHashPut(pSup->winMap, &win.skey, sizeof(TSKEY), NULL, 0); } } return TSDB_CODE_SUCCESS; @@ -1248,7 +1265,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { &pInfo->interval, pClosed); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset); - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) { + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE || + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE_SMA) { taosArrayAddAll(pUpdated, pClosed); } taosArrayDestroy(pClosed); @@ -2412,7 +2430,7 @@ int32_t closeSessionWindow(SArray *pWins, STimeWindowAggSupp *pTwSup, SArray *pC return TSDB_CODE_OUT_OF_MEMORY; } pSeWin->isClosed = true; - if (calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) { + if (calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { pSeWin->isOutput = true; } } @@ -2486,7 +2504,7 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) { SArray* pUpdated = taosArrayInit(16, POINTER_BYTES); copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId); taosHashCleanup(pStUpdated); - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) { + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { taosArrayAddAll(pUpdated, pClosed); } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 84b5e0a043..cb4a4f104c 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -329,6 +329,10 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { COPY_SCALAR_FIELD(intervalUnit); COPY_SCALAR_FIELD(slidingUnit); CLONE_NODE_FIELD(pTagCond); + COPY_SCALAR_FIELD(triggerType); + COPY_SCALAR_FIELD(watermark); + COPY_SCALAR_FIELD(tsColId); + COPY_SCALAR_FIELD(filesFactor); return (SNode*)pDst; } @@ -385,6 +389,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD CLONE_NODE_FIELD(pStateExpr); COPY_SCALAR_FIELD(triggerType); COPY_SCALAR_FIELD(watermark); + COPY_SCALAR_FIELD(filesFactor); return (SNode*)pDst; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 78710569cb..4b62963ac6 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1133,6 +1133,7 @@ static const char* jkTableScanPhysiPlanSlidingUnit = "slidingUnit"; static const char* jkTableScanPhysiPlanTriggerType = "triggerType"; static const char* jkTableScanPhysiPlanWatermark = "watermark"; static const char* jkTableScanPhysiPlanTsColId = "tsColId"; +static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor"; static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; @@ -1183,6 +1184,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanTsColId, pNode->tsColId); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor); + } return code; } @@ -1242,7 +1246,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTsColId, pNode->tsColId, code); } - + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor); + } return code; } @@ -1496,6 +1502,7 @@ static const char* jkWindowPhysiPlanFuncs = "Funcs"; static const char* jkWindowPhysiPlanTsPk = "TsPk"; static const char* jkWindowPhysiPlanTriggerType = "TriggerType"; static const char* jkWindowPhysiPlanWatermark = "Watermark"; +static const char* jkWindowPhysiPlanFilesFactor = "FilesFactor"; static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj; @@ -1516,6 +1523,9 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanWatermark, pNode->watermark); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddDoubleToObject(pJson, jkWindowPhysiPlanFilesFactor, pNode->filesFactor); + } return code; } @@ -1541,6 +1551,9 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) { tjsonGetNumberValue(pJson, jkWindowPhysiPlanWatermark, pNode->watermark, code); ; } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetDoubleValue(pJson, jkWindowPhysiPlanFilesFactor, &pNode->filesFactor); + } return code; } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 4380e28727..eb51830d61 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -881,7 +881,7 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType } break; case TABLE_OPTION_FILE_FACTOR: - ((STableOptions*)pOptions)->filesFactor = taosStr2Float(((SToken*)pVal)->z, NULL); + ((STableOptions*)pOptions)->filesFactor = taosStr2Double(((SToken*)pVal)->z, NULL); break; case TABLE_OPTION_ROLLUP: ((STableOptions*)pOptions)->pRollupFuncs = pVal; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 2df248e53f..eae8799b04 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -487,6 +487,10 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm pWindow->watermark = pCxt->pPlanCxt->watermark; } + if (pCxt->pPlanCxt->rSmaQuery) { + pWindow->filesFactor = pCxt->pPlanCxt->filesFactor; + } + if (TSDB_CODE_SUCCESS == code) { code = rewriteExprForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW); } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index adc07fcd0d..5f88fc40e5 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -99,7 +99,8 @@ static bool osdMayBeOptimized(SLogicNode* pNode) { return false; } // todo: release after function splitting - if (TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType) { + if (TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType && + SCAN_TYPE_STREAM != ((SScanLogicNode*)pNode)->scanType) { return false; } if (NULL == pNode->pParent || (QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) && @@ -226,6 +227,7 @@ static void setScanWindowInfo(SScanLogicNode* pScan) { pScan->triggerType = ((SWindowLogicNode*)pScan->node.pParent)->triggerType; pScan->watermark = ((SWindowLogicNode*)pScan->node.pParent)->watermark; pScan->tsColId = ((SColumnNode*)((SWindowLogicNode*)pScan->node.pParent)->pTspk)->colId; + pScan->filesFactor = ((SWindowLogicNode*)pScan->node.pParent)->filesFactor; } } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index a45eabefb9..62fde889da 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -506,6 +506,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp pTableScan->triggerType = pScanLogicNode->triggerType; pTableScan->watermark = pScanLogicNode->watermark; pTableScan->tsColId = pScanLogicNode->tsColId; + pTableScan->filesFactor = pScanLogicNode->filesFactor; return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode); } @@ -917,6 +918,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pWindow->triggerType = pWindowLogicNode->triggerType; pWindow->watermark = pWindowLogicNode->watermark; + pWindow->filesFactor = pWindowLogicNode->filesFactor; if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pWindow; diff --git a/source/libs/stream/src/tstreamUpdate.c b/source/libs/stream/src/tstreamUpdate.c index 7587fcecc9..6935355a93 100644 --- a/source/libs/stream/src/tstreamUpdate.c +++ b/source/libs/stream/src/tstreamUpdate.c @@ -73,8 +73,8 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) { } static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) { - if (watermark <= 0) { - watermark = TMIN(originInt/adjInterval, 1) * adjInterval; + if (watermark <= adjInterval) { + watermark = TMAX(originInt/adjInterval, 1) * adjInterval; } else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) { watermark = MAX_NUM_SCALABLE_BF * adjInterval; }/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) { diff --git a/tests/script/tsim/sma/tsmaCreateInsertData.sim b/tests/script/tsim/sma/tsmaCreateInsertData.sim index 07c5adef5d..0202c53800 100644 --- a/tests/script/tsim/sma/tsmaCreateInsertData.sim +++ b/tests/script/tsim/sma/tsmaCreateInsertData.sim @@ -5,7 +5,7 @@ sleep 50 sql connect print =============== create database -sql create database d1 +sql create database d1 vgroups 1 sql use d1 print =============== create super table, include column type for count/sum/min/max/first diff --git a/tests/script/tsim/stream/session0.sim b/tests/script/tsim/stream/session0.sim index 46b343632a..41a8b33710 100644 --- a/tests/script/tsim/stream/session0.sim +++ b/tests/script/tsim/stream/session0.sim @@ -23,7 +23,7 @@ sql insert into t1 values(1648791223001,10,2,3,1.1,2); sql insert into t1 values(1648791233002,3,2,3,2.1,3); sql insert into t1 values(1648791243003,NULL,NULL,NULL,NULL,4); sql insert into t1 values(1648791213002,NULL,NULL,NULL,NULL,5) (1648791233012,NULL,NULL,NULL,NULL,6); - +sleep 300 sql select * from streamt order by s desc; # row 0 @@ -115,7 +115,7 @@ sql insert into t1 values(1648791233002,3,2,3,2.1,9); sql insert into t1 values(1648791243003,4,2,3,3.1,10); sql insert into t1 values(1648791213002,4,2,3,4.1,11) ; sql insert into t1 values(1648791213002,4,2,3,4.1,12) (1648791223009,4,2,3,4.1,13); - +sleep 300 sql select * from streamt order by s desc ; # row 0 diff --git a/tests/script/tsim/stream/session1.sim b/tests/script/tsim/stream/session1.sim index a44639ba7a..fb31818f98 100644 --- a/tests/script/tsim/stream/session1.sim +++ b/tests/script/tsim/stream/session1.sim @@ -22,7 +22,7 @@ sql insert into t1 values(1648791210000,1,1,1,1.1,1); sql insert into t1 values(1648791220000,2,2,2,2.1,2); sql insert into t1 values(1648791230000,3,3,3,3.1,3); sql insert into t1 values(1648791240000,4,4,4,4.1,4); - +sleep 300 sql select * from streamt order by s desc; # row 0 @@ -50,7 +50,7 @@ sql insert into t1 values(1648791250005,5,5,5,5.1,5); sql insert into t1 values(1648791260006,6,6,6,6.1,6); sql insert into t1 values(1648791270007,7,7,7,7.1,7); sql insert into t1 values(1648791240005,5,5,5,5.1,8) (1648791250006,6,6,6,6.1,9); - +sleep 300 sql select * from streamt order by s desc; # row 0 @@ -100,7 +100,7 @@ sql insert into t1 values(1648791260007,7,7,7,7.1,12) (1648791290008,7,7,7,7.1,1 sql insert into t1 values(1648791500000,7,7,7,7.1,15) (1648791520000,8,8,8,8.1,16) (1648791540000,8,8,8,8.1,17); sql insert into t1 values(1648791530000,8,8,8,8.1,18); sql insert into t1 values(1648791220000,10,10,10,10.1,19) (1648791290008,2,2,2,2.1,20) (1648791540000,17,17,17,17.1,21) (1648791500001,22,22,22,22.1,22); - +sleep 300 sql select * from streamt order by s desc; # row 0 diff --git a/tests/script/tsim/stream/triggerInterval0.sim b/tests/script/tsim/stream/triggerInterval0.sim index 6f1d8f4b7b..756f591f3f 100644 --- a/tests/script/tsim/stream/triggerInterval0.sim +++ b/tests/script/tsim/stream/triggerInterval0.sim @@ -94,92 +94,4 @@ if $data11 != 5 then return -1 endi -sql create table t2(ts timestamp, a int, b int , c int, d double); -sql create stream streams2 trigger window_close watermark 20s into streamt2 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t2 interval(10s); -sql insert into t2 values(1648791213000,1,2,3,1.0); -sql insert into t2 values(1648791239999,1,2,3,1.0); -sleep 300 -sql select * from streamt2; -if $rows != 0 then - print ======$rows - return -1 -endi - -sql insert into t2 values(1648791240000,1,2,3,1.0); -sleep 300 -sql select * from streamt2; -if $rows != 1 then - print ======$rows - return -1 -endi -if $data01 != 1 then - print ======$data01 - return -1 -endi - -sql insert into t2 values(1648791250001,1,2,3,1.0) (1648791250002,1,2,3,1.0) (1648791250003,1,2,3,1.0) (1648791240000,1,2,3,1.0); -sleep 300 -sql select * from streamt2; -if $rows != 1 then - print ======$rows - return -1 -endi -if $data01 != 1 then - print ======$data01 - return -1 -endi - -sql insert into t2 values(1648791280000,1,2,3,1.0); -sleep 300 -sql select * from streamt2; -if $rows != 4 then - print ======$rows - return -1 -endi -if $data01 != 1 then - print ======$data01 - return -1 -endi -if $data11 != 1 then - print ======$data11 - return -1 -endi -if $data21 != 1 then - print ======$data21 - return -1 -endi -if $data31 != 3 then - print ======$data31 - return -1 -endi - -sql insert into t2 values(1648791250001,1,2,3,1.0) (1648791250002,1,2,3,1.0) (1648791250003,1,2,3,1.0) (1648791280000,1,2,3,1.0) (1648791280001,1,2,3,1.0) (1648791280002,1,2,3,1.0) (1648791310000,1,2,3,1.0) (1648791280001,1,2,3,1.0); -sleep 300 -sql select * from streamt2; - -if $rows != 5 then - print ======$rows - return -1 -endi -if $data01 != 1 then - print ======$data01 - return -1 -endi -if $data11 != 1 then - print ======$data11 - return -1 -endi -if $data21 != 1 then - print ======$data21 - return -1 -endi -if $data31 != 3 then - print ======$data31 - return -1 -endi -if $data41 != 3 then - print ======$data31 - return -1 -endi - system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file