Merge pull request #13702 from taosdata/feature/TD-16425
feat(stream): combine function spread\elapsed\histogram\hll
This commit is contained in:
commit
2dd1c31f0b
|
@ -1494,10 +1494,8 @@ typedef struct {
|
|||
int32_t code;
|
||||
} STaskDropRsp;
|
||||
|
||||
#define STREAM_TRIGGER_AT_ONCE_SMA 0
|
||||
#define STREAM_TRIGGER_AT_ONCE 1
|
||||
#define STREAM_TRIGGER_WINDOW_CLOSE 2
|
||||
#define STREAM_TRIGGER_WINDOW_CLOSE_SMA 3
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
|
|
|
@ -395,13 +395,13 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
|||
req.pRSmaParam.xFilesFactor = pStb->xFilesFactor;
|
||||
req.pRSmaParam.delay = pStb->delay;
|
||||
if (pStb->ast1Len > 0) {
|
||||
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len,
|
||||
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, STREAM_TRIGGER_AT_ONCE, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len,
|
||||
req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
if (pStb->ast2Len > 0) {
|
||||
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len,
|
||||
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, STREAM_TRIGGER_AT_ONCE, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len,
|
||||
req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -458,7 +458,6 @@ typedef struct STimeWindowSupp {
|
|||
int64_t waterMark;
|
||||
TSKEY maxTs;
|
||||
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
|
||||
SHashObj *winMap;
|
||||
} STimeWindowAggSupp;
|
||||
|
||||
typedef struct SIntervalAggOperatorInfo {
|
||||
|
@ -908,8 +907,6 @@ SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap
|
|||
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
|
||||
int32_t start, int64_t gap, SHashObj* pStDeleted);
|
||||
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||
int64_t getSmaWaterMark(int64_t interval, double filesFactor);
|
||||
bool isSmaStream(int8_t triggerType);
|
||||
|
||||
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
|
||||
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||
|
|
|
@ -4722,18 +4722,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
.waterMark = pIntervalPhyNode->window.watermark,
|
||||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
.winMap = NULL,
|
||||
};
|
||||
if (isSmaStream(pIntervalPhyNode->window.triggerType)) {
|
||||
if (FLT_LESS(pIntervalPhyNode->window.filesFactor, 1.000000)) {
|
||||
as.calTrigger = STREAM_TRIGGER_AT_ONCE_SMA;
|
||||
} else {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
|
||||
as.winMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
|
||||
as.waterMark = getSmaWaterMark(interval.interval, pIntervalPhyNode->window.filesFactor);
|
||||
as.calTrigger = STREAM_TRIGGER_WINDOW_CLOSE_SMA;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
|
||||
|
@ -5446,16 +5435,3 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
|
|||
return code;
|
||||
}
|
||||
|
||||
int64_t getSmaWaterMark(int64_t interval, double filesFactor) {
|
||||
int64_t waterMark = 0;
|
||||
ASSERT(FLT_GREATEREQUAL(filesFactor, 0.000000));
|
||||
waterMark = -1 * filesFactor;
|
||||
return waterMark;
|
||||
}
|
||||
|
||||
bool isSmaStream(int8_t triggerType) {
|
||||
if (triggerType == STREAM_TRIGGER_AT_ONCE || triggerType == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -1028,10 +1028,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
|||
goto _error;
|
||||
}
|
||||
|
||||
if (isSmaStream(pTableScanNode->triggerType)) {
|
||||
pTwSup->waterMark = getSmaWaterMark(pSTInfo->interval.interval, pTableScanNode->filesFactor);
|
||||
}
|
||||
|
||||
if (pSTInfo->interval.interval > 0 && pDataReader) {
|
||||
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
|
||||
} else {
|
||||
|
|
|
@ -818,13 +818,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
}
|
||||
|
||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
|
||||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE_SMA) {
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||
saveResult(pResult, tableGroupId, pUpdated);
|
||||
}
|
||||
if (pInfo->twAggSup.winMap) {
|
||||
taosHashRemove(pInfo->twAggSup.winMap, &win.skey, sizeof(TSKEY));
|
||||
}
|
||||
}
|
||||
|
||||
TSKEY ekey = ascScan ? win.ekey : win.skey;
|
||||
|
@ -872,13 +868,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
}
|
||||
|
||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
|
||||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE_SMA) {
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||
saveResult(pResult, tableGroupId, pUpdated);
|
||||
}
|
||||
if (pInfo->twAggSup.winMap) {
|
||||
taosHashRemove(pInfo->twAggSup.winMap, &win.skey, sizeof(TSKEY));
|
||||
}
|
||||
}
|
||||
|
||||
ekey = ascScan ? nextWin.ekey : nextWin.skey;
|
||||
|
@ -1264,16 +1256,9 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
|
|||
dumyInfo.cur.pageId = -1;
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL);
|
||||
if (win.ekey < pSup->maxTs - pSup->waterMark) {
|
||||
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE_SMA) {
|
||||
if (taosHashGet(pSup->winMap, &win.skey, sizeof(TSKEY))) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
|
||||
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
|
||||
if (pSup->calTrigger != STREAM_TRIGGER_AT_ONCE_SMA && pSup->calTrigger != STREAM_TRIGGER_WINDOW_CLOSE_SMA) {
|
||||
taosHashRemove(pHashMap, keyBuf, keyLen);
|
||||
}
|
||||
taosHashRemove(pHashMap, keyBuf, keyLen);
|
||||
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
|
||||
if (pos == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1281,11 +1266,10 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
|
|||
pos->groupId = groupId;
|
||||
pos->pos = *(SResultRowPosition*)pIte;
|
||||
*(int64_t*)pos->key = ts;
|
||||
if (!taosArrayPush(closeWins, &pos)) {
|
||||
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && !taosArrayPush(closeWins, &pos)) {
|
||||
taosMemoryFree(pos);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
taosHashPut(pSup->winMap, &win.skey, sizeof(TSKEY), NULL, 0);
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1340,10 +1324,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed);
|
||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset);
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE ||
|
||||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE_SMA) {
|
||||
taosArrayAddAll(pUpdated, pClosed);
|
||||
}
|
||||
taosArrayAddAll(pUpdated, pClosed);
|
||||
|
||||
taosArrayDestroy(pClosed);
|
||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
||||
|
@ -2127,7 +2108,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
.waterMark = pIntervalPhyNode->window.watermark,
|
||||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
.winMap = NULL,
|
||||
};
|
||||
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
|
@ -3141,7 +3121,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
.waterMark = pStateNode->window.watermark,
|
||||
.calTrigger = pStateNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
.winMap = NULL,
|
||||
};
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
|
|
|
@ -112,6 +112,7 @@ int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx);
|
|||
int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t getSpreadInfoSize();
|
||||
int32_t spreadCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
|
||||
bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
|
@ -120,6 +121,7 @@ int32_t elapsedFunctionMerge(SqlFunctionCtx* pCtx);
|
|||
int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t getElapsedInfoSize();
|
||||
int32_t elapsedCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
|
||||
bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
|
@ -128,6 +130,7 @@ int32_t histogramFunctionMerge(SqlFunctionCtx* pCtx);
|
|||
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t getHistogramInfoSize();
|
||||
int32_t histogramCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
|
||||
bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
int32_t hllFunction(SqlFunctionCtx* pCtx);
|
||||
|
@ -135,6 +138,7 @@ int32_t hllFunctionMerge(SqlFunctionCtx* pCtx);
|
|||
int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t hllPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t getHLLInfoSize();
|
||||
int32_t hllCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
|
||||
bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
|
|
|
@ -1447,6 +1447,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.initFunc = apercentileFunctionSetup,
|
||||
.processFunc = apercentileFunction,
|
||||
.finalizeFunc = apercentileFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = apercentileCombine,
|
||||
.pPartialFunc = "_apercentile_partial",
|
||||
.pMergeFunc = "_apercentile_merge"
|
||||
|
@ -1459,7 +1460,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getApercentileFuncEnv,
|
||||
.initFunc = apercentileFunctionSetup,
|
||||
.processFunc = apercentileFunction,
|
||||
.finalizeFunc = apercentilePartialFinalize
|
||||
.finalizeFunc = apercentilePartialFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = apercentileCombine,
|
||||
},
|
||||
{
|
||||
.name = "_apercentile_merge",
|
||||
|
@ -1469,7 +1472,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getApercentileFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = apercentileFunctionMerge,
|
||||
.finalizeFunc = apercentileFinalize
|
||||
.finalizeFunc = apercentileFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = apercentileCombine,
|
||||
},
|
||||
{
|
||||
.name = "top",
|
||||
|
@ -1503,6 +1508,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.initFunc = spreadFunctionSetup,
|
||||
.processFunc = spreadFunction,
|
||||
.finalizeFunc = spreadFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = spreadCombine,
|
||||
.pPartialFunc = "_spread_partial",
|
||||
.pMergeFunc = "_spread_merge"
|
||||
},
|
||||
|
@ -1515,7 +1522,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getSpreadFuncEnv,
|
||||
.initFunc = spreadFunctionSetup,
|
||||
.processFunc = spreadFunction,
|
||||
.finalizeFunc = spreadPartialFinalize
|
||||
.finalizeFunc = spreadPartialFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = spreadCombine,
|
||||
},
|
||||
{
|
||||
.name = "_spread_merge",
|
||||
|
@ -1526,7 +1535,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getSpreadFuncEnv,
|
||||
.initFunc = spreadFunctionSetup,
|
||||
.processFunc = spreadFunctionMerge,
|
||||
.finalizeFunc = spreadFinalize
|
||||
.finalizeFunc = spreadFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = spreadCombine,
|
||||
},
|
||||
{
|
||||
.name = "elapsed",
|
||||
|
@ -1538,6 +1549,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.initFunc = elapsedFunctionSetup,
|
||||
.processFunc = elapsedFunction,
|
||||
.finalizeFunc = elapsedFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = elapsedCombine,
|
||||
.pPartialFunc = "_elapsed_partial",
|
||||
.pMergeFunc = "_elapsed_merge"
|
||||
},
|
||||
|
@ -1550,7 +1563,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getElapsedFuncEnv,
|
||||
.initFunc = elapsedFunctionSetup,
|
||||
.processFunc = elapsedFunction,
|
||||
.finalizeFunc = elapsedPartialFinalize
|
||||
.finalizeFunc = elapsedPartialFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = elapsedCombine,
|
||||
},
|
||||
{
|
||||
.name = "_elapsed_merge",
|
||||
|
@ -1561,7 +1576,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getElapsedFuncEnv,
|
||||
.initFunc = elapsedFunctionSetup,
|
||||
.processFunc = elapsedFunctionMerge,
|
||||
.finalizeFunc = elapsedFinalize
|
||||
.finalizeFunc = elapsedFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = elapsedCombine,
|
||||
},
|
||||
{
|
||||
.name = "last_row",
|
||||
|
@ -1614,8 +1631,10 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.initFunc = histogramFunctionSetup,
|
||||
.processFunc = histogramFunction,
|
||||
.finalizeFunc = histogramFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = histogramCombine,
|
||||
.pPartialFunc = "_histogram_partial",
|
||||
.pMergeFunc = "_histogram_merge"
|
||||
.pMergeFunc = "_histogram_merge",
|
||||
},
|
||||
{
|
||||
.name = "_histogram_partial",
|
||||
|
@ -1625,7 +1644,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getHistogramFuncEnv,
|
||||
.initFunc = histogramFunctionSetup,
|
||||
.processFunc = histogramFunction,
|
||||
.finalizeFunc = histogramPartialFinalize
|
||||
.finalizeFunc = histogramPartialFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = histogramCombine,
|
||||
},
|
||||
{
|
||||
.name = "_histogram_merge",
|
||||
|
@ -1635,7 +1656,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getHistogramFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = histogramFunctionMerge,
|
||||
.finalizeFunc = histogramFinalize
|
||||
.finalizeFunc = histogramFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = histogramCombine,
|
||||
},
|
||||
{
|
||||
.name = "hyperloglog",
|
||||
|
@ -1646,6 +1669,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.initFunc = functionSetup,
|
||||
.processFunc = hllFunction,
|
||||
.finalizeFunc = hllFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = hllCombine,
|
||||
.pPartialFunc = "_hyperloglog_partial",
|
||||
.pMergeFunc = "_hyperloglog_merge"
|
||||
},
|
||||
|
@ -1657,7 +1682,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getHLLFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = hllFunction,
|
||||
.finalizeFunc = hllPartialFinalize
|
||||
.finalizeFunc = hllPartialFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = hllCombine,
|
||||
},
|
||||
{
|
||||
.name = "_hyperloglog_merge",
|
||||
|
@ -1667,7 +1694,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getHLLFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = hllFunctionMerge,
|
||||
.finalizeFunc = hllFinalize
|
||||
.finalizeFunc = hllFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = hllCombine,
|
||||
},
|
||||
{
|
||||
.name = "diff",
|
||||
|
|
|
@ -2227,7 +2227,6 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
SAPercentileInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SAPercentileInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
|
@ -3100,6 +3099,17 @@ int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
int32_t spreadCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
SSpreadInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SSpreadInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
spreadTransferInfo(pDBuf, pSBuf);
|
||||
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getElapsedInfoSize() {
|
||||
return (int32_t)sizeof(SElapsedInfo);
|
||||
}
|
||||
|
@ -3259,6 +3269,18 @@ int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
int32_t elapsedCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
SElapsedInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SElapsedInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
|
||||
elapsedTransferInfo(pDBuf, pSBuf);
|
||||
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getHistogramInfoSize() {
|
||||
return (int32_t)sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin);
|
||||
}
|
||||
|
@ -3554,6 +3576,18 @@ int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return 1;
|
||||
}
|
||||
|
||||
int32_t histogramCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
SHistoFuncInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SHistoFuncInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
|
||||
histogramTransferInfo(pDBuf, pSBuf);
|
||||
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getHLLInfoSize() {
|
||||
return (int32_t)sizeof(SHLLInfo);
|
||||
}
|
||||
|
@ -3738,6 +3772,18 @@ int32_t hllPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
int32_t hllCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
SHLLInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SHLLInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
|
||||
hllTransferInfo(pDBuf, pSBuf);
|
||||
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool getStateFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SStateInfo);
|
||||
return true;
|
||||
|
|
Loading…
Reference in New Issue