Merge pull request #13298 from taosdata/feature/TD-16073
feat(sma):files factor
This commit is contained in:
commit
adf77f43d9
|
@ -1439,8 +1439,10 @@ typedef struct {
|
|||
int32_t code;
|
||||
} STaskDropRsp;
|
||||
|
||||
#define STREAM_TRIGGER_AT_ONCE 1
|
||||
#define STREAM_TRIGGER_WINDOW_CLOSE 2
|
||||
#define STREAM_TRIGGER_AT_ONCE_SMA 0
|
||||
#define STREAM_TRIGGER_AT_ONCE 1
|
||||
#define STREAM_TRIGGER_WINDOW_CLOSE 2
|
||||
#define STREAM_TRIGGER_WINDOW_CLOSE_SMA 3
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
|
|
|
@ -80,7 +80,7 @@ typedef struct SAlterDatabaseStmt {
|
|||
typedef struct STableOptions {
|
||||
ENodeType type;
|
||||
char comment[TSDB_TB_COMMENT_LEN];
|
||||
float filesFactor;
|
||||
double filesFactor;
|
||||
SNodeList* pRollupFuncs;
|
||||
int32_t ttl;
|
||||
SNodeList* pSma;
|
||||
|
|
|
@ -59,6 +59,7 @@ typedef struct SScanLogicNode {
|
|||
int8_t triggerType;
|
||||
int64_t watermark;
|
||||
int16_t tsColId;
|
||||
double filesFactor;
|
||||
} SScanLogicNode;
|
||||
|
||||
typedef struct SJoinLogicNode {
|
||||
|
@ -113,6 +114,7 @@ typedef struct SWindowLogicNode {
|
|||
SNode* pStateExpr;
|
||||
int8_t triggerType;
|
||||
int64_t watermark;
|
||||
double filesFactor;
|
||||
} SWindowLogicNode;
|
||||
|
||||
typedef struct SFillLogicNode {
|
||||
|
@ -222,6 +224,7 @@ typedef struct STableScanPhysiNode {
|
|||
int8_t triggerType;
|
||||
int64_t watermark;
|
||||
int16_t tsColId;
|
||||
double filesFactor;
|
||||
} STableScanPhysiNode;
|
||||
|
||||
typedef STableScanPhysiNode STableSeqScanPhysiNode;
|
||||
|
@ -272,6 +275,7 @@ typedef struct SWinodwPhysiNode {
|
|||
SNode* pTspk; // timestamp primary key
|
||||
int8_t triggerType;
|
||||
int64_t watermark;
|
||||
double filesFactor;
|
||||
} SWinodwPhysiNode;
|
||||
|
||||
typedef struct SIntervalPhysiNode {
|
||||
|
|
|
@ -36,6 +36,7 @@ typedef struct SPlanContext {
|
|||
int64_t watermark;
|
||||
char* pMsg;
|
||||
int32_t msgLen;
|
||||
double filesFactor;
|
||||
} SPlanContext;
|
||||
|
||||
// Create the physical plan for the query, according to the AST.
|
||||
|
|
|
@ -30,7 +30,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
|
||||
|
||||
int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr,
|
||||
int32_t* pLen);
|
||||
int32_t* pLen, double filesFactor);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@
|
|||
extern bool tsStreamSchedV;
|
||||
|
||||
int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr,
|
||||
int32_t* pLen) {
|
||||
int32_t* pLen, double filesFactor) {
|
||||
SNode* pAst = NULL;
|
||||
SQueryPlan* pPlan = NULL;
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
@ -58,6 +58,7 @@ int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int
|
|||
.rSmaQuery = true,
|
||||
.triggerType = triggerType,
|
||||
.watermark = watermark,
|
||||
.filesFactor = filesFactor,
|
||||
};
|
||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
|
|
|
@ -397,13 +397,13 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
|||
req.pRSmaParam.xFilesFactor = pStb->xFilesFactor;
|
||||
req.pRSmaParam.delay = pStb->delay;
|
||||
if (pStb->ast1Len > 0) {
|
||||
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len) !=
|
||||
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, req.pRSmaParam.xFilesFactor) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
if (pStb->ast2Len > 0) {
|
||||
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len) !=
|
||||
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, req.pRSmaParam.xFilesFactor) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -149,7 +149,7 @@ int32_t tdUpdateExpireWindow(SSma* pSma, SSubmitReq* pMsg, int64_t version);
|
|||
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
||||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||
|
||||
int32_t tdProcessRSmaCreate(SSma* pSma, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb);
|
||||
int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq* pReq);
|
||||
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
|
||||
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
|
||||
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore);
|
||||
|
|
|
@ -165,7 +165,10 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
|
|||
* @param pReq
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdProcessRSmaCreate(SSma *pSma, SMeta *pMeta, SVCreateStbReq *pReq, SMsgCb *pMsgCb) {
|
||||
int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
|
||||
SSma *pSma = pVnode->pSma;
|
||||
SMeta *pMeta = pVnode->pMeta;
|
||||
SMsgCb *pMsgCb = &pVnode->msgCb;
|
||||
if (!pReq->rollup) {
|
||||
smaTrace("vgId:%d return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -210,6 +213,7 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SMeta *pMeta, SVCreateStbReq *pReq, SMsg
|
|||
.reader = pReadHandle,
|
||||
.meta = pMeta,
|
||||
.pMsgCb = pMsgCb,
|
||||
.vnode = pVnode,
|
||||
};
|
||||
|
||||
if (param->qmsg1) {
|
||||
|
|
|
@ -360,7 +360,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq,
|
|||
goto _err;
|
||||
}
|
||||
|
||||
tdProcessRSmaCreate(pVnode->pSma, pVnode->pMeta, &req, &pVnode->msgCb);
|
||||
tdProcessRSmaCreate(pVnode, &req);
|
||||
|
||||
tDecoderClear(&coder);
|
||||
return 0;
|
||||
|
|
|
@ -440,6 +440,7 @@ typedef struct STimeWindowSupp {
|
|||
int64_t waterMark;
|
||||
TSKEY maxTs;
|
||||
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
|
||||
SHashObj *winMap;
|
||||
} STimeWindowAggSupp;
|
||||
|
||||
typedef struct SIntervalAggOperatorInfo {
|
||||
|
@ -758,7 +759,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
|
|||
|
||||
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
|
||||
SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
|
||||
STimeWindowAggSupp* pTwSup, int16_t tsColId);
|
||||
STimeWindowAggSupp* pTwSup);
|
||||
|
||||
|
||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
||||
|
@ -837,6 +838,8 @@ SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap
|
|||
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
|
||||
int32_t start, int64_t gap, SHashObj* pStDeleted);
|
||||
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||
int64_t getSmaWaterMark(int64_t interval, double filesFactor);
|
||||
bool isSmaStream(int8_t triggerType);
|
||||
|
||||
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -4529,8 +4529,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
|
||||
}
|
||||
SArray* tableIdList = extractTableIdList(pTableListInfo);
|
||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode,
|
||||
pTaskInfo, &twSup, pTableScanNode->tsColId);
|
||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle,
|
||||
tableIdList, pTableScanNode, pTaskInfo, &twSup);
|
||||
|
||||
taosArrayDestroy(tableIdList);
|
||||
return pOperator;
|
||||
|
@ -4631,7 +4631,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
|
||||
STimeWindowAggSupp as = {.waterMark = pIntervalPhyNode->window.watermark,
|
||||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN};
|
||||
.maxTs = INT64_MIN,
|
||||
.winMap = NULL,};
|
||||
if (isSmaStream(pIntervalPhyNode->window.triggerType)) {
|
||||
if (FLT_LESS(pIntervalPhyNode->window.filesFactor, 1.000000)) {
|
||||
as.calTrigger = STREAM_TRIGGER_AT_ONCE_SMA;
|
||||
} else {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
|
||||
as.winMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
|
||||
as.waterMark = getSmaWaterMark(interval.interval,
|
||||
pIntervalPhyNode->window.filesFactor);
|
||||
as.calTrigger = STREAM_TRIGGER_WINDOW_CLOSE_SMA;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo);
|
||||
|
@ -5300,3 +5312,18 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
|
|||
}
|
||||
return createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
|
||||
}
|
||||
|
||||
int64_t getSmaWaterMark(int64_t interval, double filesFactor) {
|
||||
int64_t waterMark = 0;
|
||||
ASSERT(FLT_GREATEREQUAL(filesFactor,0.000000));
|
||||
waterMark = -1 * filesFactor;
|
||||
return waterMark;
|
||||
}
|
||||
|
||||
bool isSmaStream(int8_t triggerType) {
|
||||
if (triggerType == STREAM_TRIGGER_AT_ONCE ||
|
||||
triggerType == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -875,7 +875,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
if (rows == 0) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
} else if (pInfo->pUpdateInfo) {
|
||||
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); // TODO(liuyao) get invertible from plan
|
||||
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true);
|
||||
if (upRes) {
|
||||
pInfo->pUpdateRes = upRes;
|
||||
if (upRes->info.type == STREAM_REPROCESS) {
|
||||
|
@ -894,7 +894,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
|
||||
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
|
||||
SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
|
||||
STimeWindowAggSupp* pTwSup, int16_t tsColId) {
|
||||
STimeWindowAggSupp* pTwSup) {
|
||||
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -939,8 +939,12 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->primaryTsIndex = tsColId;
|
||||
if (pSTInfo->interval.interval > 0) {
|
||||
if (isSmaStream(pTableScanNode->triggerType)) {
|
||||
pTwSup->waterMark = getSmaWaterMark(pSTInfo->interval.interval,
|
||||
pTableScanNode->filesFactor);
|
||||
}
|
||||
pInfo->primaryTsIndex = 0; // pTableScanNode->tsColId;
|
||||
if (pSTInfo->interval.interval > 0 && pDataReader) {
|
||||
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
|
||||
} else {
|
||||
pInfo->pUpdateInfo = NULL;
|
||||
|
|
|
@ -748,10 +748,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM &&
|
||||
(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
|
||||
pInfo->twAggSup.calTrigger == 0) ) {
|
||||
saveResult(pResult, tableGroupId, pUpdated);
|
||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
|
||||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE_SMA) {
|
||||
saveResult(pResult, tableGroupId, pUpdated);
|
||||
}
|
||||
if (pInfo->twAggSup.winMap) {
|
||||
taosHashRemove(pInfo->twAggSup.winMap, &win.skey, sizeof(TSKEY));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t forwardStep = 0;
|
||||
|
@ -824,10 +828,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM &&
|
||||
(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
|
||||
pInfo->twAggSup.calTrigger == 0) ) {
|
||||
saveResult(pResult, tableGroupId, pUpdated);
|
||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
|
||||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE_SMA) {
|
||||
saveResult(pResult, tableGroupId, pUpdated);
|
||||
}
|
||||
if (pInfo->twAggSup.winMap) {
|
||||
taosHashRemove(pInfo->twAggSup.winMap, &win.skey, sizeof(TSKEY));
|
||||
}
|
||||
}
|
||||
|
||||
ekey = ascScan? nextWin.ekey:nextWin.skey;
|
||||
|
@ -1172,15 +1180,23 @@ static int32_t closeIntervalWindow(SHashObj *pHashMap, STimeWindowAggSupp *pSup,
|
|||
void* key = taosHashGetKey(pIte, &keyLen);
|
||||
uint64_t groupId = *(uint64_t*) key;
|
||||
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
|
||||
TSKEY ts = *(uint64_t*) ((char*)key + sizeof(uint64_t));
|
||||
TSKEY ts = *(int64_t*) ((char*)key + sizeof(uint64_t));
|
||||
SResultRowInfo dumyInfo;
|
||||
dumyInfo.cur.pageId = -1;
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval,
|
||||
pInterval->precision, NULL);
|
||||
if (win.ekey < pSup->maxTs - pSup->waterMark) {
|
||||
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE_SMA) {
|
||||
if (taosHashGet(pSup->winMap, &win.skey, sizeof(TSKEY))) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
|
||||
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
|
||||
taosHashRemove(pHashMap, keyBuf, keyLen);
|
||||
if (pSup->calTrigger != STREAM_TRIGGER_AT_ONCE_SMA &&
|
||||
pSup->calTrigger != STREAM_TRIGGER_WINDOW_CLOSE_SMA) {
|
||||
taosHashRemove(pHashMap, keyBuf, keyLen);
|
||||
}
|
||||
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
|
||||
if (pos == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1192,6 +1208,7 @@ static int32_t closeIntervalWindow(SHashObj *pHashMap, STimeWindowAggSupp *pSup,
|
|||
taosMemoryFree(pos);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
taosHashPut(pSup->winMap, &win.skey, sizeof(TSKEY), NULL, 0);
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1248,7 +1265,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
&pInfo->interval, pClosed);
|
||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed,
|
||||
pInfo->binfo.rowCellInfoOffset);
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) {
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE ||
|
||||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE_SMA) {
|
||||
taosArrayAddAll(pUpdated, pClosed);
|
||||
}
|
||||
taosArrayDestroy(pClosed);
|
||||
|
@ -2412,7 +2430,7 @@ int32_t closeSessionWindow(SArray *pWins, STimeWindowAggSupp *pTwSup, SArray *pC
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pSeWin->isClosed = true;
|
||||
if (calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) {
|
||||
if (calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
pSeWin->isOutput = true;
|
||||
}
|
||||
}
|
||||
|
@ -2486,7 +2504,7 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) {
|
|||
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
|
||||
copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId);
|
||||
taosHashCleanup(pStUpdated);
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) {
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
taosArrayAddAll(pUpdated, pClosed);
|
||||
}
|
||||
|
||||
|
|
|
@ -329,6 +329,10 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
|
|||
COPY_SCALAR_FIELD(intervalUnit);
|
||||
COPY_SCALAR_FIELD(slidingUnit);
|
||||
CLONE_NODE_FIELD(pTagCond);
|
||||
COPY_SCALAR_FIELD(triggerType);
|
||||
COPY_SCALAR_FIELD(watermark);
|
||||
COPY_SCALAR_FIELD(tsColId);
|
||||
COPY_SCALAR_FIELD(filesFactor);
|
||||
return (SNode*)pDst;
|
||||
}
|
||||
|
||||
|
@ -385,6 +389,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD
|
|||
CLONE_NODE_FIELD(pStateExpr);
|
||||
COPY_SCALAR_FIELD(triggerType);
|
||||
COPY_SCALAR_FIELD(watermark);
|
||||
COPY_SCALAR_FIELD(filesFactor);
|
||||
return (SNode*)pDst;
|
||||
}
|
||||
|
||||
|
|
|
@ -1133,6 +1133,7 @@ static const char* jkTableScanPhysiPlanSlidingUnit = "slidingUnit";
|
|||
static const char* jkTableScanPhysiPlanTriggerType = "triggerType";
|
||||
static const char* jkTableScanPhysiPlanWatermark = "watermark";
|
||||
static const char* jkTableScanPhysiPlanTsColId = "tsColId";
|
||||
static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor";
|
||||
|
||||
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
||||
|
@ -1183,6 +1184,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanTsColId, pNode->tsColId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1242,7 +1246,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTsColId, pNode->tsColId, code);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1496,6 +1502,7 @@ static const char* jkWindowPhysiPlanFuncs = "Funcs";
|
|||
static const char* jkWindowPhysiPlanTsPk = "TsPk";
|
||||
static const char* jkWindowPhysiPlanTriggerType = "TriggerType";
|
||||
static const char* jkWindowPhysiPlanWatermark = "Watermark";
|
||||
static const char* jkWindowPhysiPlanFilesFactor = "FilesFactor";
|
||||
|
||||
static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj;
|
||||
|
@ -1516,6 +1523,9 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanWatermark, pNode->watermark);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddDoubleToObject(pJson, jkWindowPhysiPlanFilesFactor, pNode->filesFactor);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1541,6 +1551,9 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
|
|||
tjsonGetNumberValue(pJson, jkWindowPhysiPlanWatermark, pNode->watermark, code);
|
||||
;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetDoubleValue(pJson, jkWindowPhysiPlanFilesFactor, &pNode->filesFactor);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -881,7 +881,7 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType
|
|||
}
|
||||
break;
|
||||
case TABLE_OPTION_FILE_FACTOR:
|
||||
((STableOptions*)pOptions)->filesFactor = taosStr2Float(((SToken*)pVal)->z, NULL);
|
||||
((STableOptions*)pOptions)->filesFactor = taosStr2Double(((SToken*)pVal)->z, NULL);
|
||||
break;
|
||||
case TABLE_OPTION_ROLLUP:
|
||||
((STableOptions*)pOptions)->pRollupFuncs = pVal;
|
||||
|
|
|
@ -487,6 +487,10 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
|
|||
pWindow->watermark = pCxt->pPlanCxt->watermark;
|
||||
}
|
||||
|
||||
if (pCxt->pPlanCxt->rSmaQuery) {
|
||||
pWindow->filesFactor = pCxt->pPlanCxt->filesFactor;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
|
||||
}
|
||||
|
|
|
@ -99,7 +99,8 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
|
|||
return false;
|
||||
}
|
||||
// todo: release after function splitting
|
||||
if (TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType) {
|
||||
if (TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType &&
|
||||
SCAN_TYPE_STREAM != ((SScanLogicNode*)pNode)->scanType) {
|
||||
return false;
|
||||
}
|
||||
if (NULL == pNode->pParent || (QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) &&
|
||||
|
@ -226,6 +227,7 @@ static void setScanWindowInfo(SScanLogicNode* pScan) {
|
|||
pScan->triggerType = ((SWindowLogicNode*)pScan->node.pParent)->triggerType;
|
||||
pScan->watermark = ((SWindowLogicNode*)pScan->node.pParent)->watermark;
|
||||
pScan->tsColId = ((SColumnNode*)((SWindowLogicNode*)pScan->node.pParent)->pTspk)->colId;
|
||||
pScan->filesFactor = ((SWindowLogicNode*)pScan->node.pParent)->filesFactor;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -506,6 +506,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
|
|||
pTableScan->triggerType = pScanLogicNode->triggerType;
|
||||
pTableScan->watermark = pScanLogicNode->watermark;
|
||||
pTableScan->tsColId = pScanLogicNode->tsColId;
|
||||
pTableScan->filesFactor = pScanLogicNode->filesFactor;
|
||||
|
||||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
|
||||
}
|
||||
|
@ -917,6 +918,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
|
|||
|
||||
pWindow->triggerType = pWindowLogicNode->triggerType;
|
||||
pWindow->watermark = pWindowLogicNode->watermark;
|
||||
pWindow->filesFactor = pWindowLogicNode->filesFactor;
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pPhyNode = (SPhysiNode*)pWindow;
|
||||
|
|
|
@ -73,8 +73,8 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) {
|
|||
}
|
||||
|
||||
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
|
||||
if (watermark <= 0) {
|
||||
watermark = TMIN(originInt/adjInterval, 1) * adjInterval;
|
||||
if (watermark <= adjInterval) {
|
||||
watermark = TMAX(originInt/adjInterval, 1) * adjInterval;
|
||||
} else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
|
||||
watermark = MAX_NUM_SCALABLE_BF * adjInterval;
|
||||
}/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) {
|
||||
|
|
|
@ -5,7 +5,7 @@ sleep 50
|
|||
sql connect
|
||||
|
||||
print =============== create database
|
||||
sql create database d1
|
||||
sql create database d1 vgroups 1
|
||||
sql use d1
|
||||
|
||||
print =============== create super table, include column type for count/sum/min/max/first
|
||||
|
|
|
@ -23,7 +23,7 @@ sql insert into t1 values(1648791223001,10,2,3,1.1,2);
|
|||
sql insert into t1 values(1648791233002,3,2,3,2.1,3);
|
||||
sql insert into t1 values(1648791243003,NULL,NULL,NULL,NULL,4);
|
||||
sql insert into t1 values(1648791213002,NULL,NULL,NULL,NULL,5) (1648791233012,NULL,NULL,NULL,NULL,6);
|
||||
|
||||
sleep 300
|
||||
sql select * from streamt order by s desc;
|
||||
|
||||
# row 0
|
||||
|
@ -115,7 +115,7 @@ sql insert into t1 values(1648791233002,3,2,3,2.1,9);
|
|||
sql insert into t1 values(1648791243003,4,2,3,3.1,10);
|
||||
sql insert into t1 values(1648791213002,4,2,3,4.1,11) ;
|
||||
sql insert into t1 values(1648791213002,4,2,3,4.1,12) (1648791223009,4,2,3,4.1,13);
|
||||
|
||||
sleep 300
|
||||
sql select * from streamt order by s desc ;
|
||||
|
||||
# row 0
|
||||
|
|
|
@ -22,7 +22,7 @@ sql insert into t1 values(1648791210000,1,1,1,1.1,1);
|
|||
sql insert into t1 values(1648791220000,2,2,2,2.1,2);
|
||||
sql insert into t1 values(1648791230000,3,3,3,3.1,3);
|
||||
sql insert into t1 values(1648791240000,4,4,4,4.1,4);
|
||||
|
||||
sleep 300
|
||||
sql select * from streamt order by s desc;
|
||||
|
||||
# row 0
|
||||
|
@ -50,7 +50,7 @@ sql insert into t1 values(1648791250005,5,5,5,5.1,5);
|
|||
sql insert into t1 values(1648791260006,6,6,6,6.1,6);
|
||||
sql insert into t1 values(1648791270007,7,7,7,7.1,7);
|
||||
sql insert into t1 values(1648791240005,5,5,5,5.1,8) (1648791250006,6,6,6,6.1,9);
|
||||
|
||||
sleep 300
|
||||
sql select * from streamt order by s desc;
|
||||
|
||||
# row 0
|
||||
|
@ -100,7 +100,7 @@ sql insert into t1 values(1648791260007,7,7,7,7.1,12) (1648791290008,7,7,7,7.1,1
|
|||
sql insert into t1 values(1648791500000,7,7,7,7.1,15) (1648791520000,8,8,8,8.1,16) (1648791540000,8,8,8,8.1,17);
|
||||
sql insert into t1 values(1648791530000,8,8,8,8.1,18);
|
||||
sql insert into t1 values(1648791220000,10,10,10,10.1,19) (1648791290008,2,2,2,2.1,20) (1648791540000,17,17,17,17.1,21) (1648791500001,22,22,22,22.1,22);
|
||||
|
||||
sleep 300
|
||||
sql select * from streamt order by s desc;
|
||||
|
||||
# row 0
|
||||
|
|
|
@ -94,92 +94,4 @@ if $data11 != 5 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql create table t2(ts timestamp, a int, b int , c int, d double);
|
||||
sql create stream streams2 trigger window_close watermark 20s into streamt2 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t2 interval(10s);
|
||||
sql insert into t2 values(1648791213000,1,2,3,1.0);
|
||||
sql insert into t2 values(1648791239999,1,2,3,1.0);
|
||||
sleep 300
|
||||
sql select * from streamt2;
|
||||
if $rows != 0 then
|
||||
print ======$rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql insert into t2 values(1648791240000,1,2,3,1.0);
|
||||
sleep 300
|
||||
sql select * from streamt2;
|
||||
if $rows != 1 then
|
||||
print ======$rows
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 1 then
|
||||
print ======$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql insert into t2 values(1648791250001,1,2,3,1.0) (1648791250002,1,2,3,1.0) (1648791250003,1,2,3,1.0) (1648791240000,1,2,3,1.0);
|
||||
sleep 300
|
||||
sql select * from streamt2;
|
||||
if $rows != 1 then
|
||||
print ======$rows
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 1 then
|
||||
print ======$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql insert into t2 values(1648791280000,1,2,3,1.0);
|
||||
sleep 300
|
||||
sql select * from streamt2;
|
||||
if $rows != 4 then
|
||||
print ======$rows
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 1 then
|
||||
print ======$data01
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 1 then
|
||||
print ======$data11
|
||||
return -1
|
||||
endi
|
||||
if $data21 != 1 then
|
||||
print ======$data21
|
||||
return -1
|
||||
endi
|
||||
if $data31 != 3 then
|
||||
print ======$data31
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql insert into t2 values(1648791250001,1,2,3,1.0) (1648791250002,1,2,3,1.0) (1648791250003,1,2,3,1.0) (1648791280000,1,2,3,1.0) (1648791280001,1,2,3,1.0) (1648791280002,1,2,3,1.0) (1648791310000,1,2,3,1.0) (1648791280001,1,2,3,1.0);
|
||||
sleep 300
|
||||
sql select * from streamt2;
|
||||
|
||||
if $rows != 5 then
|
||||
print ======$rows
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 1 then
|
||||
print ======$data01
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 1 then
|
||||
print ======$data11
|
||||
return -1
|
||||
endi
|
||||
if $data21 != 1 then
|
||||
print ======$data21
|
||||
return -1
|
||||
endi
|
||||
if $data31 != 3 then
|
||||
print ======$data31
|
||||
return -1
|
||||
endi
|
||||
if $data41 != 3 then
|
||||
print ======$data31
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue