add elapsed function partial/merge translate
This commit is contained in:
parent
dbaa4a5815
commit
62506bac33
|
@ -131,6 +131,8 @@ typedef enum EFunctionType {
|
|||
FUNCTION_TYPE_HISTOGRAM_MERGE,
|
||||
FUNCTION_TYPE_HYPERLOGLOG_PARTIAL,
|
||||
FUNCTION_TYPE_HYPERLOGLOG_MERGE,
|
||||
FUNCTION_TYPE_ELAPSED_PARTIAL,
|
||||
FUNCTION_TYPE_ELAPSED_MERGE,
|
||||
|
||||
// user defined funcion
|
||||
FUNCTION_TYPE_UDF = 10000
|
||||
|
|
|
@ -115,6 +115,7 @@ bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
|||
bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t elapsedFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t getElapsedInfoSize();
|
||||
|
||||
bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
|
|
|
@ -444,6 +444,64 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateElapsedImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
|
||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||
|
||||
if (isPartial) {
|
||||
if (1 != numOfParams && 2 != numOfParams) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (TSDB_DATA_TYPE_TIMESTAMP != paraType) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param1
|
||||
if (2 == numOfParams) {
|
||||
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
|
||||
if (QUERY_NODE_VALUE != nodeType(pParamNode1)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
SValueNode* pValue = (SValueNode*)pParamNode1;
|
||||
|
||||
pValue->notReserved = true;
|
||||
|
||||
paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
|
||||
if (!IS_INTEGER_TYPE(paraType)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
if (pValue->datum.i == 0) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"ELAPSED function time unit parameter should be greater than db precision");
|
||||
}
|
||||
}
|
||||
|
||||
pFunc->node.resType = (SDataType){.bytes = getElapsedInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||
} else {
|
||||
if (1 != numOfParams) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (TSDB_DATA_TYPE_BINARY != paraType) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateElapsedPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
return translateElapsedImpl(pFunc, pErrBuf, len, true);
|
||||
}
|
||||
|
||||
static int32_t translateElapsedMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
return translateElapsedImpl(pFunc, pErrBuf, len, false);
|
||||
}
|
||||
|
||||
static int32_t translateLeastSQR(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||
if (3 != numOfParams) {
|
||||
|
@ -1421,6 +1479,28 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.processFunc = elapsedFunction,
|
||||
.finalizeFunc = elapsedFinalize
|
||||
},
|
||||
{
|
||||
.name = "_elapsed_partial",
|
||||
.type = FUNCTION_TYPE_ELAPSED,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.dataRequiredFunc = statisDataRequired,
|
||||
.translateFunc = translateElapsedPartial,
|
||||
.getEnvFunc = getElapsedFuncEnv,
|
||||
.initFunc = elapsedFunctionSetup,
|
||||
.processFunc = elapsedFunction,
|
||||
.finalizeFunc = elapsedFinalize
|
||||
},
|
||||
{
|
||||
.name = "_elapsed_merge",
|
||||
.type = FUNCTION_TYPE_ELAPSED,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.dataRequiredFunc = statisDataRequired,
|
||||
.translateFunc = translateElapsedMerge,
|
||||
.getEnvFunc = getElapsedFuncEnv,
|
||||
.initFunc = elapsedFunctionSetup,
|
||||
.processFunc = elapsedFunction,
|
||||
.finalizeFunc = elapsedFinalize
|
||||
},
|
||||
{
|
||||
.name = "last_row",
|
||||
.type = FUNCTION_TYPE_LAST_ROW,
|
||||
|
|
|
@ -3027,6 +3027,10 @@ int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
int32_t getElapsedInfoSize() {
|
||||
return (int32_t)sizeof(SElapsedInfo);
|
||||
}
|
||||
|
||||
bool getElapsedFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SElapsedInfo);
|
||||
return true;
|
||||
|
|
Loading…
Reference in New Issue