Merge pull request #13655 from taosdata/feature/TD-16390
feat(stream): stream state window support top and bottom
This commit is contained in:
commit
7a5c99e9eb
|
@ -146,6 +146,7 @@ typedef struct SqlFunctionCtx {
|
|||
struct SDiskbasedBuf *pBuf;
|
||||
struct SSDataBlock *pSrcBlock;
|
||||
int32_t curBufPage;
|
||||
bool increase;
|
||||
|
||||
char udfName[TSDB_FUNC_NAME_LEN];
|
||||
} SqlFunctionCtx;
|
||||
|
|
|
@ -804,7 +804,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
|||
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
||||
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream);
|
||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
|
||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
|
@ -892,8 +892,8 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
|
|||
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
|
||||
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
|
||||
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
|
||||
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
|
||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey,
|
||||
SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t size);
|
||||
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
|
||||
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex);
|
||||
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
|
||||
|
|
|
@ -1130,6 +1130,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
|||
pCtx->start.key = INT64_MIN;
|
||||
pCtx->end.key = INT64_MIN;
|
||||
pCtx->numOfParams = pExpr->base.numOfParams;
|
||||
pCtx->increase = false;
|
||||
|
||||
pCtx->param = pFunct->pParam;
|
||||
// for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
|
||||
|
@ -2008,8 +2009,16 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
|||
// the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
|
||||
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
|
||||
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
|
||||
if (pCtx[j].increase) {
|
||||
int64_t ts = *(int64_t*) in;
|
||||
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
|
||||
colDataAppend(pColInfoData, pBlock->info.rows + k, (const char *)&ts, pCtx[j].resultInfo->isNullRes);
|
||||
ts++;
|
||||
}
|
||||
} else {
|
||||
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
|
||||
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4676,7 +4685,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
}
|
||||
|
||||
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo);
|
||||
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
|
||||
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
|
||||
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
|
||||
int32_t children = 8;
|
||||
|
@ -5339,7 +5349,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, size_t size) {
|
||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t size) {
|
||||
pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
|
||||
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
|
||||
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
|
||||
pSup->pResultRows = taosArrayInit(1024, size);
|
||||
|
@ -5358,15 +5369,11 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, size
|
|||
if (bufSize <= pageSize) {
|
||||
bufSize = pageSize * 4;
|
||||
}
|
||||
return createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
|
||||
}
|
||||
|
||||
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
|
||||
return initStreamAggSupporter(pSup, pKey, sizeof(SResultWindowInfo));
|
||||
}
|
||||
|
||||
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
|
||||
return initStreamAggSupporter(pSup, pKey, sizeof(SStateWindowInfo));
|
||||
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
|
||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pCtx[i].pBuf = pSup->pResultBuf;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int64_t getSmaWaterMark(int64_t interval, double filesFactor) {
|
||||
|
|
|
@ -1438,9 +1438,15 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
|
|||
return needed;
|
||||
}
|
||||
|
||||
void increaseTs(SqlFunctionCtx* pCtx) {
|
||||
if (pCtx[0].pExpr->pExpr->_function.pFunctNode->funcType == FUNCTION_TYPE_WSTARTTS) {
|
||||
pCtx[0].increase = true;
|
||||
}
|
||||
}
|
||||
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
|
||||
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream) {
|
||||
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -1460,6 +1466,11 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
|
||||
int32_t code =
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||
|
||||
if (isStream) {
|
||||
ASSERT(numOfCols > 0);
|
||||
increaseTs(pInfo->binfo.pCtx);
|
||||
}
|
||||
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
|
||||
|
||||
|
@ -2128,6 +2139,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols,
|
||||
pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||
ASSERT(numOfCols > 0);
|
||||
increaseTs(pInfo->binfo.pCtx);
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -2212,6 +2225,8 @@ int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo,
|
|||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
pBasicInfo->pCtx[i].pBuf = NULL;
|
||||
}
|
||||
ASSERT(numOfCols > 0);
|
||||
increaseTs(pBasicInfo->pCtx);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -2228,6 +2243,10 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int
|
|||
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark);
|
||||
}
|
||||
|
||||
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||
return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SResultWindowInfo));
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
|
||||
|
@ -2244,8 +2263,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols);
|
||||
code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo");
|
||||
|
||||
code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo", pInfo->binfo.pCtx, numOfCols);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -3097,6 +3116,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
||||
}
|
||||
|
||||
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||
return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SStateWindowInfo));
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode;
|
||||
|
@ -3130,8 +3153,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols);
|
||||
code = initStateAggSupporter(&pInfo->streamAggSup, "StreamStateAggOperatorInfo");
|
||||
code = initStateAggSupporter(&pInfo->streamAggSup, "StreamStateAggOperatorInfo", pInfo->binfo.pCtx, numOfCols);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
|
|
@ -102,6 +102,8 @@ bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
|||
int32_t topFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t bottomFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
|
||||
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
|
|
|
@ -1364,6 +1364,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.initFunc = functionSetup,
|
||||
.processFunc = topFunction,
|
||||
.finalizeFunc = topBotFinalize,
|
||||
.combineFunc = topCombine,
|
||||
},
|
||||
{
|
||||
.name = "bottom",
|
||||
|
@ -1373,7 +1374,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getTopBotFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = bottomFunction,
|
||||
.finalizeFunc = topBotFinalize
|
||||
.finalizeFunc = topBotFinalize,
|
||||
.combineFunc = bottomCombine,
|
||||
},
|
||||
{
|
||||
.name = "spread",
|
||||
|
|
|
@ -1389,6 +1389,18 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
|
|||
}
|
||||
}
|
||||
|
||||
void releaseSource(STuplePos* pPos) {
|
||||
if (pPos->pageId == -1) {
|
||||
return ;
|
||||
}
|
||||
// Todo(liuyao) relase row
|
||||
}
|
||||
|
||||
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) {
|
||||
releaseSource(pDestPos);
|
||||
*pDestPos = *pSourcePos;
|
||||
}
|
||||
|
||||
int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t isMinFunc) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
SMinmaxResInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
|
@ -1400,10 +1412,12 @@ int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
|
|||
if (pSBuf->assign &&
|
||||
( (((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign ) ) {
|
||||
*(double*) &pDBuf->v = *(double*) &pSBuf->v;
|
||||
replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos);
|
||||
}
|
||||
} else {
|
||||
if ( pSBuf->assign && ( ((pDBuf->v < pSBuf->v) ^ isMinFunc) || !pDBuf->assign ) ) {
|
||||
pDBuf->v = pSBuf->v;
|
||||
replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos);
|
||||
}
|
||||
}
|
||||
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
|
||||
|
@ -2856,7 +2870,6 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
|||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
pEntryInfo->complete = true;
|
||||
|
||||
int32_t type = pCtx->input.pData[0]->info.type;
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
|
@ -2881,6 +2894,67 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return pEntryInfo->numOfRes;
|
||||
}
|
||||
|
||||
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type,
|
||||
bool isTopQuery) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
||||
int32_t maxSize = pCtx->param[1].param.i;
|
||||
STopBotResItem* pItems = pRes->pItems;
|
||||
assert(pItems != NULL);
|
||||
|
||||
// not full yet
|
||||
if (pEntryInfo->numOfRes < maxSize) {
|
||||
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
|
||||
pItem->v = pSourceItem->v;
|
||||
pItem->uid = pSourceItem->uid;
|
||||
pItem->tuplePos.pageId = -1;
|
||||
replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos);
|
||||
pEntryInfo->numOfRes++;
|
||||
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
||||
!isTopQuery);
|
||||
} else { // replace the minimum value in the result
|
||||
if ((isTopQuery && (
|
||||
(IS_SIGNED_NUMERIC_TYPE(type) && pSourceItem->v.i > pItems[0].v.i) ||
|
||||
(IS_UNSIGNED_NUMERIC_TYPE(type) && pSourceItem->v.u > pItems[0].v.u) ||
|
||||
(IS_FLOAT_TYPE(type) && pSourceItem->v.d > pItems[0].v.d)))
|
||||
|| (!isTopQuery && (
|
||||
(IS_SIGNED_NUMERIC_TYPE(type) && pSourceItem->v.i < pItems[0].v.i) ||
|
||||
(IS_UNSIGNED_NUMERIC_TYPE(type) && pSourceItem->v.u < pItems[0].v.u) ||
|
||||
(IS_FLOAT_TYPE(type) && pSourceItem->v.d < pItems[0].v.d))
|
||||
)) {
|
||||
// replace the old data and the coresponding tuple data
|
||||
STopBotResItem* pItem = &pItems[0];
|
||||
pItem->v = pSourceItem->v;
|
||||
pItem->uid = pSourceItem->uid;
|
||||
|
||||
// save the data of this tuple by over writing the old data
|
||||
replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos);
|
||||
taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
|
||||
topBotResComparFn, NULL, !isTopQuery);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
STopBotRes* pSBuf = getTopBotOutputInfo(pSourceCtx);
|
||||
for (int32_t i = 0; i < pSResInfo->numOfRes; i++) {
|
||||
addResult(pDestCtx, pSBuf->pItems + i, type, true);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
STopBotRes* pSBuf = getTopBotOutputInfo(pSourceCtx);
|
||||
for (int32_t i = 0; i < pSResInfo->numOfRes; i++) {
|
||||
addResult(pDestCtx, pSBuf->pItems + i, type, false);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SSpreadInfo);
|
||||
return true;
|
||||
|
|
|
@ -17,7 +17,7 @@ sql use test
|
|||
|
||||
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double,id int);
|
||||
sql create stream streams2 trigger at_once into streamt as select _wstartts, count(*) c1, sum(a), max(a), min(d), stddev(a), last(a), first(d), max(id) s from t1 session(ts,10s);
|
||||
sql create stream streams1 trigger at_once into streamt as select _wstartts, count(*) c1, sum(a), max(a), min(d), stddev(a), last(a), first(d), max(id) s from t1 session(ts,10s);
|
||||
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL,1);
|
||||
sql insert into t1 values(1648791223001,10,2,3,1.1,2);
|
||||
sql insert into t1 values(1648791233002,3,2,3,2.1,3);
|
||||
|
@ -176,4 +176,107 @@ if $data08 != 13 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql create database test2 vgroups 1;
|
||||
sql use test2;
|
||||
sql create table t2(ts timestamp, a int, b int , c int, d double, id int);
|
||||
sql create stream streams2 trigger at_once watermark 1d into streamt2 as select _wstartts,apercentile(a,30) c1, apercentile(a,70), apercentile(a,20,"t-digest") c2, apercentile(a,60,"t-digest") c3, max(id) c4 from t2 session(ts,10s);
|
||||
sql insert into t2 values(1648791213001,1,1,3,1.0,1);
|
||||
sql insert into t2 values(1648791213002,2,2,6,3.4,2);
|
||||
sql insert into t2 values(1648791213003,4,9,3,4.8,3);
|
||||
sql insert into t2 values(1648791233003,3,4,3,2.1,4);
|
||||
sql insert into t2 values(1648791233004,3,5,3,3.4,5);
|
||||
sql insert into t2 values(1648791233005,3,6,3,7.6,6);
|
||||
|
||||
#
|
||||
sql insert into t2 values(1648791223003,20,7,3,10.1,7);
|
||||
|
||||
$loop_count = 0
|
||||
loop2:
|
||||
sleep 300
|
||||
sql select * from streamt2 where c4=7;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 1 then
|
||||
print ======$rows
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
# row 0
|
||||
if $data01 != 2.091607978 then
|
||||
print =====data01=$data01
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data02 != 3.274823935 then
|
||||
print =====data02=$data02
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data03 != 1.800000000 then
|
||||
print ======$data03
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 3.350000000 then
|
||||
print ======$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql create database test3 vgroups 1;
|
||||
sql use test3;
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
sql create stream streams3 trigger at_once watermark 1d into streamt3 as select _wstartts, min(b), a,c from t1 session(ts,10s);
|
||||
sql create stream streams4 trigger at_once watermark 1d into streamt4 as select _wstartts, max(b), a,c from t1 session(ts,10s);
|
||||
sql create stream streams5 trigger at_once watermark 1d into streamt5 as select _wstartts, max(b), a,c from t1 session(ts,10s);
|
||||
sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, max(b), a,c from t1 session(ts,10s);
|
||||
sql insert into t1 values(1648791213001,1,1,1,1.0);
|
||||
sql insert into t1 values(1648791213002,2,3,2,3.4);
|
||||
sql insert into t1 values(1648791213003,4,9,3,4.8);
|
||||
sql insert into t1 values(1648791213004,4,5,4,4.8);
|
||||
|
||||
sql insert into t1 values(1648791233004,3,4,0,2.1);
|
||||
sql insert into t1 values(1648791233005,3,0,6,3.4);
|
||||
sql insert into t1 values(1648791233006,3,6,7,7.6);
|
||||
sql insert into t1 values(1648791233007,3,13,8,7.6);
|
||||
|
||||
|
||||
sql insert into t1 values(1648791223004,20,7,9,10.1);
|
||||
|
||||
$loop_count = 0
|
||||
loop3:
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt3;
|
||||
if $rows == 0 then
|
||||
print ======$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
sql select * from streamt4;
|
||||
if $rows == 0 then
|
||||
print ======$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
sql select * from streamt5;
|
||||
if $rows == 0 then
|
||||
print ======$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
sql select * from streamt6;
|
||||
if $rows == 0 then
|
||||
print ======$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue