support stddev and hyperloglog for tsma

This commit is contained in:
wangjiaming0909 2024-03-12 13:28:35 +08:00
parent 40a7262fcd
commit 34dbdfd4dc
3 changed files with 135 additions and 25 deletions

View File

@ -507,6 +507,35 @@ static int32_t translateStddevMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t
return TSDB_CODE_SUCCESS;
}
static int32_t translateStddevState(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t paraType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type;
if (!IS_NUMERIC_TYPE(paraType) && !IS_NULL_TYPE(paraType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = getStddevInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
return TSDB_CODE_SUCCESS;
}
static int32_t translateStddevStateMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t paraType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type;
if (TSDB_DATA_TYPE_BINARY != paraType) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = getStddevInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
return TSDB_CODE_SUCCESS;
}
static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// pseudo column do not need to check parameters
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT,
@ -1361,6 +1390,24 @@ static int32_t translateHLLMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t le
return translateHLLImpl(pFunc, pErrBuf, len, false);
}
static int32_t translateHLLState(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateHLLPartial(pFunc, pErrBuf, len);
}
static int32_t translateHLLStateMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
if (getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type != TSDB_DATA_TYPE_BINARY) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType =
(SDataType){.bytes = getHistogramInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
return TSDB_CODE_SUCCESS;
}
static bool validateStateOper(const SValueNode* pVal) {
if (TSDB_DATA_TYPE_BINARY != pVal->node.resType.type) {
return false;
@ -1865,8 +1912,7 @@ static int32_t translateFirstLastStateMerge(SFunctionNode* pFunc, char* pErrBuf,
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType =
(SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
pFunc->node.resType = (SDataType){.bytes = paraBytes, .type = TSDB_DATA_TYPE_BINARY};
return TSDB_CODE_SUCCESS;
}
@ -2535,7 +2581,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "stddev",
.type = FUNCTION_TYPE_STDDEV,
.classification = FUNC_MGT_AGG_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateInNumOutDou,
.getEnvFunc = getStddevFuncEnv,
.initFunc = stddevFunctionSetup,
@ -2547,6 +2593,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
#endif
.combineFunc = stddevCombine,
.pPartialFunc = "_stddev_partial",
.pStateFunc = "_stddev_state",
.pMergeFunc = "_stddev_merge"
},
{
@ -2576,6 +2623,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.invertFunc = stddevInvertFunction,
#endif
.combineFunc = stddevCombine,
.pPartialFunc = "_stddev_state_merge",
.pMergeFunc = "_stddev_merge",
},
{
.name = "leastsquares",
@ -3105,7 +3154,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "hyperloglog",
.type = FUNCTION_TYPE_HYPERLOGLOG,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateHLL,
.getEnvFunc = getHLLFuncEnv,
.initFunc = functionSetup,
@ -3117,6 +3166,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
#endif
.combineFunc = hllCombine,
.pPartialFunc = "_hyperloglog_partial",
.pStateFunc = "_hyperloglog_state",
.pMergeFunc = "_hyperloglog_merge"
},
{
@ -3146,6 +3196,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.invertFunc = NULL,
#endif
.combineFunc = hllCombine,
.pPartialFunc = "_hyperloglog_state_merge",
.pMergeFunc = "_hyperloglog_merge",
},
{
.name = "diff",
@ -3906,6 +3958,28 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.sprocessFunc = qPseudoTagFunction,
.finalizeFunc = NULL
},
{
.name = "_stddev_state",
.type = FUNCTION_TYPE_STDDEV_STATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateStddevState,
.getEnvFunc = getStddevFuncEnv,
.initFunc = stddevFunctionSetup,
.processFunc = stddevFunction,
.finalizeFunc = stddevPartialFinalize,
.pPartialFunc = "_stddev_partial",
.pMergeFunc = "_stddev_state_merge",
},
{
.name = "_stddev_state_merge",
.type = FUNCTION_TYPE_STDDEV_STATE_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateStddevStateMerge,
.getEnvFunc = getStddevFuncEnv,
.initFunc = stddevFunctionSetup,
.processFunc = stddevFunctionMerge,
.finalizeFunc = stddevPartialFinalize,
},
{
//TODO for outer use not only internal
.name = "_avg_state",
@ -3931,6 +4005,28 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = avgFunctionMerge,
.finalizeFunc = avgPartialFinalize,
},
{
.name = "_spread_state",
.type = FUNCTION_TYPE_SPREAD_STATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateSpreadState,
.getEnvFunc = getSpreadFuncEnv,
.initFunc = spreadFunctionSetup,
.processFunc = spreadFunction,
.finalizeFunc = spreadPartialFinalize,
.pPartialFunc = "_spread_partial",
.pMergeFunc = "_spread_state_merge"
},
{
.name = "_spread_state_merge",
.type = FUNCTION_TYPE_SPREAD_STATE_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateSpreadStateMerge,
.getEnvFunc = getSpreadFuncEnv,
.initFunc = spreadFunctionSetup,
.processFunc = spreadFunctionMerge,
.finalizeFunc = spreadPartialFinalize,
},
{
.name = "_first_state",
.type = FUNCTION_TYPE_FIRST_STATE,
@ -3980,26 +4076,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.finalizeFunc = firstLastPartialFinalize,
},
{
.name = "_spread_state",
.type = FUNCTION_TYPE_SPREAD_STATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateSpreadState,
.getEnvFunc = getSpreadFuncEnv,
.initFunc = spreadFunctionSetup,
.processFunc = spreadFunction,
.finalizeFunc = spreadPartialFinalize,
.pPartialFunc = "_spread_partial",
.pMergeFunc = "_spread_state_merge"
.name = "_hyperloglog_state",
.type = FUNCTION_TYPE_HYPERLOGLOG_STATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateHLLState,
.getEnvFunc = getHLLFuncEnv,
.initFunc = functionSetup,
.processFunc = hllFunction,
.finalizeFunc = hllPartialFinalize,
.pPartialFunc = "_hyperloglog_partial",
.pMergeFunc = "_hyperloglog_state_merge",
},
{
.name = "_spread_state_merge",
.type = FUNCTION_TYPE_SPREAD_STATE_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateSpreadStateMerge,
.getEnvFunc = getSpreadFuncEnv,
.initFunc = spreadFunctionSetup,
.processFunc = spreadFunctionMerge,
.finalizeFunc = spreadPartialFinalize,
.name = "_hyperloglog_state_merge",
.type = FUNCTION_TYPE_HYPERLOGLOG_STATE_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateHLLStateMerge,
.getEnvFunc = getHLLFuncEnv,
.initFunc = functionSetup,
.processFunc = hllFunctionMerge,
.finalizeFunc = hllPartialFinalize,
},
};
// clang-format on

View File

@ -1454,6 +1454,12 @@ static void stddevTransferInfo(SStddevRes* pInput, SStddevRes* pOutput) {
int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
if (IS_NULL_TYPE(pCol->info.type)) {
SET_VAL(GET_RES_INFO(pCtx), 0, 1);
return TSDB_CODE_SUCCESS;
}
if (pCol->info.type != TSDB_DATA_TYPE_BINARY) {
return TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
}
@ -1461,6 +1467,7 @@ int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) {
SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (colDataIsNull_s(pCol, i)) continue;
char* data = colDataGetData(pCol, i);
SStddevRes* pInputInfo = (SStddevRes*)varDataVal(data);
stddevTransferInfo(pInputInfo, pInfo);
@ -3866,6 +3873,7 @@ int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx) {
SColumnInfoData* pCol = pInput->pData[0];
if (IS_NULL_TYPE(pCol->info.type)) {
SET_VAL(GET_RES_INFO(pCtx), 0, 1);
return TSDB_CODE_SUCCESS;
}
@ -4606,6 +4614,11 @@ int32_t hllFunctionMerge(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
if (IS_NULL_TYPE(pCol->info.type)) {
SET_VAL(GET_RES_INFO(pCtx), 0, 1);
return TSDB_CODE_SUCCESS;
}
if (pCol->info.type != TSDB_DATA_TYPE_BINARY) {
return TSDB_CODE_SUCCESS;
}
@ -4615,6 +4628,7 @@ int32_t hllFunctionMerge(SqlFunctionCtx* pCtx) {
int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
if (colDataIsNull_s(pCol, i)) continue;
char* data = colDataGetData(pCol, i);
SHLLInfo* pInputInfo = (SHLLInfo*)varDataVal(data);
hllTransferInfo(pInputInfo, pInfo);

View File

@ -771,7 +771,7 @@ class TDTestCase:
def test_recursive_tsma(self):
tdSql.execute('drop tsma tsma2')
tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)']
tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)', 'stddev(c3)', 'hyperloglog(c5)']
select_func_list: List[str] = tsma_func_list.copy()
select_func_list.append('count(*)')
self.create_tsma('tsma3', 'test', 'meters', tsma_func_list, '5m')
@ -858,7 +858,7 @@ class TDTestCase:
sql = "SELECT avg(c1), avg(c2),_wstart, _wend,t3,t4,t5,t2 FROM meters WHERE ts >= '2018-09-17 8:00:00' AND ts < '2018-09-17 09:03:18.334' PARTITION BY t3,t4,t5,t2 INTERVAL(1d);"
ctxs.append(TSMAQCBuilder().with_sql(sql)
.should_query_with_table('meters', '2018-09-17 8:00:00', '2018-09-17 09:03:18.333').get_qc())
#ctxs.extend(self.test_query_tsma_all())
ctxs.extend(self.test_query_tsma_all())
return ctxs
def test_query_with_tsma_interval_partition_by_col(self):
@ -1000,7 +1000,7 @@ class TDTestCase:
def run(self):
self.init_data()
# time.sleep(999999)
self.test_ddl()
#self.test_ddl()
self.test_query_with_tsma()
# time.sleep(999999)