feat(stream): stream state window support top and bottom
This commit is contained in:
parent
dbaa4a5815
commit
016a7b698f
|
@ -146,6 +146,7 @@ typedef struct SqlFunctionCtx {
|
||||||
struct SDiskbasedBuf *pBuf;
|
struct SDiskbasedBuf *pBuf;
|
||||||
struct SSDataBlock *pSrcBlock;
|
struct SSDataBlock *pSrcBlock;
|
||||||
int32_t curBufPage;
|
int32_t curBufPage;
|
||||||
|
bool increase;
|
||||||
|
|
||||||
char udfName[TSDB_FUNC_NAME_LEN];
|
char udfName[TSDB_FUNC_NAME_LEN];
|
||||||
} SqlFunctionCtx;
|
} SqlFunctionCtx;
|
||||||
|
|
|
@ -804,7 +804,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
||||||
|
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream);
|
||||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
|
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
|
||||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
@ -892,8 +892,8 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
|
||||||
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
|
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
|
||||||
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
|
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
|
||||||
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||||
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
|
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey,
|
||||||
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
|
SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t size);
|
||||||
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
|
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
|
||||||
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex);
|
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex);
|
||||||
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
|
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
|
||||||
|
|
|
@ -1130,6 +1130,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
||||||
pCtx->start.key = INT64_MIN;
|
pCtx->start.key = INT64_MIN;
|
||||||
pCtx->end.key = INT64_MIN;
|
pCtx->end.key = INT64_MIN;
|
||||||
pCtx->numOfParams = pExpr->base.numOfParams;
|
pCtx->numOfParams = pExpr->base.numOfParams;
|
||||||
|
pCtx->increase = false;
|
||||||
|
|
||||||
pCtx->param = pFunct->pParam;
|
pCtx->param = pFunct->pParam;
|
||||||
// for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
|
// for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
|
||||||
|
@ -2008,11 +2009,19 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
||||||
// the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
|
// the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
|
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
|
||||||
|
if (pCtx[j].increase) {
|
||||||
|
int64_t ts = *(int64_t*) in;
|
||||||
|
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
|
||||||
|
colDataAppend(pColInfoData, pBlock->info.rows + k, (const char *)&ts, pCtx[j].resultInfo->isNullRes);
|
||||||
|
ts++;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
|
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
|
||||||
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
|
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
releaseBufPage(pBuf, page);
|
releaseBufPage(pBuf, page);
|
||||||
pBlock->info.rows += pRow->numOfRows;
|
pBlock->info.rows += pRow->numOfRows;
|
||||||
|
@ -4676,7 +4685,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||||
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo);
|
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
|
||||||
|
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
|
||||||
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
|
||||||
int32_t children = 8;
|
int32_t children = 8;
|
||||||
|
@ -5339,7 +5349,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, size_t size) {
|
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t size) {
|
||||||
|
pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
|
||||||
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
|
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
|
||||||
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
|
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
|
||||||
pSup->pResultRows = taosArrayInit(1024, size);
|
pSup->pResultRows = taosArrayInit(1024, size);
|
||||||
|
@ -5358,15 +5369,11 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, size
|
||||||
if (bufSize <= pageSize) {
|
if (bufSize <= pageSize) {
|
||||||
bufSize = pageSize * 4;
|
bufSize = pageSize * 4;
|
||||||
}
|
}
|
||||||
return createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
|
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
|
||||||
}
|
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
|
pCtx[i].pBuf = pSup->pResultBuf;
|
||||||
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
|
}
|
||||||
return initStreamAggSupporter(pSup, pKey, sizeof(SResultWindowInfo));
|
return code;
|
||||||
}
|
|
||||||
|
|
||||||
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
|
|
||||||
return initStreamAggSupporter(pSup, pKey, sizeof(SStateWindowInfo));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t getSmaWaterMark(int64_t interval, double filesFactor) {
|
int64_t getSmaWaterMark(int64_t interval, double filesFactor) {
|
||||||
|
|
|
@ -1438,9 +1438,15 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
|
||||||
return needed;
|
return needed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void increaseTs(SqlFunctionCtx* pCtx) {
|
||||||
|
if (pCtx[0].pExpr->pExpr->_function.pFunctNode->funcType == FUNCTION_TYPE_WSTARTTS) {
|
||||||
|
pCtx[0].increase = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
|
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream) {
|
||||||
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
|
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -1461,6 +1467,11 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
int32_t code =
|
int32_t code =
|
||||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
|
|
||||||
|
if (isStream) {
|
||||||
|
ASSERT(numOfCols > 0);
|
||||||
|
increaseTs(pInfo->binfo.pCtx);
|
||||||
|
}
|
||||||
|
|
||||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
|
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
|
||||||
|
|
||||||
pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols);
|
pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols);
|
||||||
|
@ -2128,6 +2139,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols,
|
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols,
|
||||||
pResBlock, keyBufSize, pTaskInfo->id.str);
|
pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
|
ASSERT(numOfCols > 0);
|
||||||
|
increaseTs(pInfo->binfo.pCtx);
|
||||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -2212,6 +2225,8 @@ int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo,
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
pBasicInfo->pCtx[i].pBuf = NULL;
|
pBasicInfo->pCtx[i].pBuf = NULL;
|
||||||
}
|
}
|
||||||
|
ASSERT(numOfCols > 0);
|
||||||
|
increaseTs(pBasicInfo->pCtx);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2228,6 +2243,10 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int
|
||||||
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark);
|
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
|
return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SResultWindowInfo));
|
||||||
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
|
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
|
||||||
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
|
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
|
||||||
|
@ -2244,8 +2263,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols);
|
|
||||||
code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo");
|
code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo", pInfo->binfo.pCtx, numOfCols);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -3097,6 +3116,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
|
return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SStateWindowInfo));
|
||||||
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo) {
|
||||||
SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode;
|
SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode;
|
||||||
|
@ -3130,8 +3153,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols);
|
code = initStateAggSupporter(&pInfo->streamAggSup, "StreamStateAggOperatorInfo", pInfo->binfo.pCtx, numOfCols);
|
||||||
code = initStateAggSupporter(&pInfo->streamAggSup, "StreamStateAggOperatorInfo");
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,6 +102,8 @@ bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
||||||
int32_t topFunction(SqlFunctionCtx *pCtx);
|
int32_t topFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t bottomFunction(SqlFunctionCtx *pCtx);
|
int32_t bottomFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
|
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
|
|
||||||
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
|
|
@ -1364,6 +1364,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = topFunction,
|
.processFunc = topFunction,
|
||||||
.finalizeFunc = topBotFinalize,
|
.finalizeFunc = topBotFinalize,
|
||||||
|
.combineFunc = topCombine,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "bottom",
|
.name = "bottom",
|
||||||
|
@ -1373,7 +1374,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getTopBotFuncEnv,
|
.getEnvFunc = getTopBotFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = bottomFunction,
|
.processFunc = bottomFunction,
|
||||||
.finalizeFunc = topBotFinalize
|
.finalizeFunc = topBotFinalize,
|
||||||
|
.combineFunc = bottomCombine,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "spread",
|
.name = "spread",
|
||||||
|
|
|
@ -1389,6 +1389,18 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void releaseSource(STuplePos* pPos) {
|
||||||
|
if (pPos->pageId == -1) {
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
// Todo(liuyao) relase row
|
||||||
|
}
|
||||||
|
|
||||||
|
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) {
|
||||||
|
releaseSource(pDestPos);
|
||||||
|
*pDestPos = *pSourcePos;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t isMinFunc) {
|
int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t isMinFunc) {
|
||||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||||
SMinmaxResInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
SMinmaxResInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||||
|
@ -1400,10 +1412,12 @@ int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
|
||||||
if (pSBuf->assign &&
|
if (pSBuf->assign &&
|
||||||
( (((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign ) ) {
|
( (((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign ) ) {
|
||||||
*(double*) &pDBuf->v = *(double*) &pSBuf->v;
|
*(double*) &pDBuf->v = *(double*) &pSBuf->v;
|
||||||
|
replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if ( pSBuf->assign && ( ((pDBuf->v < pSBuf->v) ^ isMinFunc) || !pDBuf->assign ) ) {
|
if ( pSBuf->assign && ( ((pDBuf->v < pSBuf->v) ^ isMinFunc) || !pDBuf->assign ) ) {
|
||||||
pDBuf->v = pSBuf->v;
|
pDBuf->v = pSBuf->v;
|
||||||
|
replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
|
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
|
||||||
|
@ -2856,7 +2870,6 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
||||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
pEntryInfo->complete = true;
|
|
||||||
|
|
||||||
int32_t type = pCtx->input.pData[0]->info.type;
|
int32_t type = pCtx->input.pData[0]->info.type;
|
||||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
@ -2881,6 +2894,67 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return pEntryInfo->numOfRes;
|
return pEntryInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type,
|
||||||
|
bool isTopQuery) {
|
||||||
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||||
|
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
||||||
|
int32_t maxSize = pCtx->param[1].param.i;
|
||||||
|
STopBotResItem* pItems = pRes->pItems;
|
||||||
|
assert(pItems != NULL);
|
||||||
|
|
||||||
|
// not full yet
|
||||||
|
if (pEntryInfo->numOfRes < maxSize) {
|
||||||
|
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
|
||||||
|
pItem->v = pSourceItem->v;
|
||||||
|
pItem->uid = pSourceItem->uid;
|
||||||
|
pItem->tuplePos.pageId = -1;
|
||||||
|
replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos);
|
||||||
|
pEntryInfo->numOfRes++;
|
||||||
|
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
||||||
|
!isTopQuery);
|
||||||
|
} else { // replace the minimum value in the result
|
||||||
|
if ((isTopQuery && (
|
||||||
|
(IS_SIGNED_NUMERIC_TYPE(type) && pSourceItem->v.i > pItems[0].v.i) ||
|
||||||
|
(IS_UNSIGNED_NUMERIC_TYPE(type) && pSourceItem->v.u > pItems[0].v.u) ||
|
||||||
|
(IS_FLOAT_TYPE(type) && pSourceItem->v.d > pItems[0].v.d)))
|
||||||
|
|| (!isTopQuery && (
|
||||||
|
(IS_SIGNED_NUMERIC_TYPE(type) && pSourceItem->v.i < pItems[0].v.i) ||
|
||||||
|
(IS_UNSIGNED_NUMERIC_TYPE(type) && pSourceItem->v.u < pItems[0].v.u) ||
|
||||||
|
(IS_FLOAT_TYPE(type) && pSourceItem->v.d < pItems[0].v.d))
|
||||||
|
)) {
|
||||||
|
// replace the old data and the coresponding tuple data
|
||||||
|
STopBotResItem* pItem = &pItems[0];
|
||||||
|
pItem->v = pSourceItem->v;
|
||||||
|
pItem->uid = pSourceItem->uid;
|
||||||
|
|
||||||
|
// save the data of this tuple by over writing the old data
|
||||||
|
replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos);
|
||||||
|
taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
|
||||||
|
topBotResComparFn, NULL, !isTopQuery);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
|
int32_t type = pDestCtx->input.pData[0]->info.type;
|
||||||
|
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||||
|
STopBotRes* pSBuf = getTopBotOutputInfo(pSourceCtx);
|
||||||
|
for (int32_t i = 0; i < pSResInfo->numOfRes; i++) {
|
||||||
|
addResult(pDestCtx, pSBuf->pItems + i, type, true);
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
|
int32_t type = pDestCtx->input.pData[0]->info.type;
|
||||||
|
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||||
|
STopBotRes* pSBuf = getTopBotOutputInfo(pSourceCtx);
|
||||||
|
for (int32_t i = 0; i < pSResInfo->numOfRes; i++) {
|
||||||
|
addResult(pDestCtx, pSBuf->pItems + i, type, false);
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SSpreadInfo);
|
pEnv->calcMemSize = sizeof(SSpreadInfo);
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -17,7 +17,7 @@ sql use test
|
||||||
|
|
||||||
|
|
||||||
sql create table t1(ts timestamp, a int, b int , c int, d double,id int);
|
sql create table t1(ts timestamp, a int, b int , c int, d double,id int);
|
||||||
sql create stream streams2 trigger at_once into streamt as select _wstartts, count(*) c1, sum(a), max(a), min(d), stddev(a), last(a), first(d), max(id) s from t1 session(ts,10s);
|
sql create stream streams1 trigger at_once into streamt as select _wstartts, count(*) c1, sum(a), max(a), min(d), stddev(a), last(a), first(d), max(id) s from t1 session(ts,10s);
|
||||||
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL,1);
|
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL,1);
|
||||||
sql insert into t1 values(1648791223001,10,2,3,1.1,2);
|
sql insert into t1 values(1648791223001,10,2,3,1.1,2);
|
||||||
sql insert into t1 values(1648791233002,3,2,3,2.1,3);
|
sql insert into t1 values(1648791233002,3,2,3,2.1,3);
|
||||||
|
@ -176,4 +176,107 @@ if $data08 != 13 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql create database test2 vgroups 1;
|
||||||
|
sql use test2;
|
||||||
|
sql create table t2(ts timestamp, a int, b int , c int, d double, id int);
|
||||||
|
sql create stream streams2 trigger at_once watermark 1d into streamt2 as select _wstartts,apercentile(a,30) c1, apercentile(a,70), apercentile(a,20,"t-digest") c2, apercentile(a,60,"t-digest") c3, max(id) c4 from t2 session(ts,10s);
|
||||||
|
sql insert into t2 values(1648791213001,1,1,3,1.0,1);
|
||||||
|
sql insert into t2 values(1648791213002,2,2,6,3.4,2);
|
||||||
|
sql insert into t2 values(1648791213003,4,9,3,4.8,3);
|
||||||
|
sql insert into t2 values(1648791233003,3,4,3,2.1,4);
|
||||||
|
sql insert into t2 values(1648791233004,3,5,3,3.4,5);
|
||||||
|
sql insert into t2 values(1648791233005,3,6,3,7.6,6);
|
||||||
|
|
||||||
|
#
|
||||||
|
sql insert into t2 values(1648791223003,20,7,3,10.1,7);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop2:
|
||||||
|
sleep 300
|
||||||
|
sql select * from streamt2 where c4=7;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print ======$rows
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 2.091607978 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 3.274823935 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data03 != 1.800000000 then
|
||||||
|
print ======$data03
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data04 != 3.350000000 then
|
||||||
|
print ======$data04
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql create database test3 vgroups 1;
|
||||||
|
sql use test3;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
sql create stream streams3 trigger at_once watermark 1d into streamt3 as select _wstartts, min(b), a,c from t1 session(ts,10s);
|
||||||
|
sql create stream streams4 trigger at_once watermark 1d into streamt4 as select _wstartts, max(b), a,c from t1 session(ts,10s);
|
||||||
|
sql create stream streams5 trigger at_once watermark 1d into streamt5 as select _wstartts, max(b), a,c from t1 session(ts,10s);
|
||||||
|
sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, max(b), a,c from t1 session(ts,10s);
|
||||||
|
sql insert into t1 values(1648791213001,1,1,1,1.0);
|
||||||
|
sql insert into t1 values(1648791213002,2,3,2,3.4);
|
||||||
|
sql insert into t1 values(1648791213003,4,9,3,4.8);
|
||||||
|
sql insert into t1 values(1648791213004,4,5,4,4.8);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791233004,3,4,0,2.1);
|
||||||
|
sql insert into t1 values(1648791233005,3,0,6,3.4);
|
||||||
|
sql insert into t1 values(1648791233006,3,6,7,7.6);
|
||||||
|
sql insert into t1 values(1648791233007,3,13,8,7.6);
|
||||||
|
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791223004,20,7,9,10.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop3:
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt3;
|
||||||
|
if $rows == 0 then
|
||||||
|
print ======$rows
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt4;
|
||||||
|
if $rows == 0 then
|
||||||
|
print ======$rows
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt5;
|
||||||
|
if $rows == 0 then
|
||||||
|
print ======$rows
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt6;
|
||||||
|
if $rows == 0 then
|
||||||
|
print ======$rows
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
Loading…
Reference in New Issue