Merge pull request #13660 from taosdata/enh/elapsed_split
enh(query): enable elapsed function distributed splitting
This commit is contained in:
commit
50beba99b2
|
@ -131,6 +131,8 @@ typedef enum EFunctionType {
|
|||
FUNCTION_TYPE_HISTOGRAM_MERGE,
|
||||
FUNCTION_TYPE_HYPERLOGLOG_PARTIAL,
|
||||
FUNCTION_TYPE_HYPERLOGLOG_MERGE,
|
||||
FUNCTION_TYPE_ELAPSED_PARTIAL,
|
||||
FUNCTION_TYPE_ELAPSED_MERGE,
|
||||
|
||||
// user defined funcion
|
||||
FUNCTION_TYPE_UDF = 10000
|
||||
|
|
|
@ -116,7 +116,10 @@ int32_t getSpreadInfoSize();
|
|||
bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t elapsedFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t elapsedFunctionMerge(SqlFunctionCtx* pCtx);
|
||||
int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t getElapsedInfoSize();
|
||||
|
||||
bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
|
|
|
@ -444,6 +444,64 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateElapsedImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
|
||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||
|
||||
if (isPartial) {
|
||||
if (1 != numOfParams && 2 != numOfParams) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (TSDB_DATA_TYPE_TIMESTAMP != paraType) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param1
|
||||
if (2 == numOfParams) {
|
||||
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
|
||||
if (QUERY_NODE_VALUE != nodeType(pParamNode1)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
SValueNode* pValue = (SValueNode*)pParamNode1;
|
||||
|
||||
pValue->notReserved = true;
|
||||
|
||||
paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
|
||||
if (!IS_INTEGER_TYPE(paraType)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
if (pValue->datum.i == 0) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"ELAPSED function time unit parameter should be greater than db precision");
|
||||
}
|
||||
}
|
||||
|
||||
pFunc->node.resType = (SDataType){.bytes = getElapsedInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||
} else {
|
||||
if (1 != numOfParams) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (TSDB_DATA_TYPE_BINARY != paraType) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateElapsedPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
return translateElapsedImpl(pFunc, pErrBuf, len, true);
|
||||
}
|
||||
|
||||
static int32_t translateElapsedMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
return translateElapsedImpl(pFunc, pErrBuf, len, false);
|
||||
}
|
||||
|
||||
static int32_t translateLeastSQR(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||
if (3 != numOfParams) {
|
||||
|
@ -1468,6 +1526,30 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getElapsedFuncEnv,
|
||||
.initFunc = elapsedFunctionSetup,
|
||||
.processFunc = elapsedFunction,
|
||||
.finalizeFunc = elapsedFinalize,
|
||||
.pPartialFunc = "_elapsed_partial",
|
||||
.pMergeFunc = "_elapsed_merge"
|
||||
},
|
||||
{
|
||||
.name = "_elapsed_partial",
|
||||
.type = FUNCTION_TYPE_ELAPSED,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.dataRequiredFunc = statisDataRequired,
|
||||
.translateFunc = translateElapsedPartial,
|
||||
.getEnvFunc = getElapsedFuncEnv,
|
||||
.initFunc = elapsedFunctionSetup,
|
||||
.processFunc = elapsedFunction,
|
||||
.finalizeFunc = elapsedPartialFinalize
|
||||
},
|
||||
{
|
||||
.name = "_elapsed_merge",
|
||||
.type = FUNCTION_TYPE_ELAPSED,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.dataRequiredFunc = statisDataRequired,
|
||||
.translateFunc = translateElapsedMerge,
|
||||
.getEnvFunc = getElapsedFuncEnv,
|
||||
.initFunc = elapsedFunctionSetup,
|
||||
.processFunc = elapsedFunctionMerge,
|
||||
.finalizeFunc = elapsedFinalize
|
||||
},
|
||||
{
|
||||
|
|
|
@ -3101,6 +3101,10 @@ int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
int32_t getElapsedInfoSize() {
|
||||
return (int32_t)sizeof(SElapsedInfo);
|
||||
}
|
||||
|
||||
bool getElapsedFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SElapsedInfo);
|
||||
return true;
|
||||
|
@ -3202,6 +3206,30 @@ _elapsed_over:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t elapsedFunctionMerge(SqlFunctionCtx *pCtx) {
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||
|
||||
SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
char* data = colDataGetData(pCol, start);
|
||||
SElapsedInfo* pInputInfo = (SElapsedInfo *)varDataVal(data);
|
||||
|
||||
pInfo->timeUnit = pInputInfo->timeUnit;
|
||||
if (pInfo->min > pInputInfo->min) {
|
||||
pInfo->min = pInputInfo->min;
|
||||
}
|
||||
|
||||
if (pInfo->max < pInputInfo->max) {
|
||||
pInfo->max = pInputInfo->max;
|
||||
}
|
||||
|
||||
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
double result = (double)pInfo->max - (double)pInfo->min;
|
||||
|
@ -3210,6 +3238,24 @@ int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return functionFinalize(pCtx, pBlock);
|
||||
}
|
||||
|
||||
int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
int32_t resultBytes = getElapsedInfoSize();
|
||||
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||
|
||||
memcpy(varDataVal(res), pInfo, resultBytes);
|
||||
varDataSetLen(res, resultBytes);
|
||||
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||
|
||||
taosMemoryFree(res);
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
int32_t getHistogramInfoSize() {
|
||||
return (int32_t)sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue