From 2a6dfbb0d7185d53eb1519ce9e66d583346d775c Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 14 Jun 2022 13:48:33 +0800 Subject: [PATCH 1/5] enh(query): avg function use pre-agg results --- source/libs/function/src/builtinsimpl.c | 154 +++++++++++++----------- 1 file changed, 83 insertions(+), 71 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index b5009b7d41..3510a59254 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -642,8 +642,8 @@ bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { int32_t avgFunction(SqlFunctionCtx* pCtx) { int32_t numOfElem = 0; - // Only the pre-computing information loaded and actual data does not loaded SInputColumnInfoData* pInput = &pCtx->input; + SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0]; int32_t type = pInput->pData[0]->info.type; SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); @@ -660,95 +660,107 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { goto _avg_over; } - switch (type) { - case TSDB_DATA_TYPE_TINYINT: { - int8_t* plist = (int8_t*)pCol->pData; - for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { - if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { - continue; + if (pInput->colDataAggIsSet) { + numOfElem = numOfRows - pAgg->numOfNull; + ASSERT(numOfElem >= 0); + + pAvgRes->count += numOfElem; + if (IS_INTEGER_TYPE(type)) { + pAvgRes->sum.isum += pAgg->sum; + } else if (IS_FLOAT_TYPE(type)) { + pAvgRes->sum.dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum)); + } + } else { // computing based on the true data block + switch (type) { + case TSDB_DATA_TYPE_TINYINT: { + int8_t* plist = (int8_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.isum += plist[i]; } - numOfElem += 1; - pAvgRes->count += 1; - pAvgRes->sum.isum += plist[i]; + break; } - break; - } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t* plist = (int16_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } - case TSDB_DATA_TYPE_SMALLINT: { - int16_t* plist = (int16_t*)pCol->pData; - for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { - if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { - continue; + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.isum += plist[i]; + } + break; + } + + case TSDB_DATA_TYPE_INT: { + int32_t* plist = (int32_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.isum += plist[i]; } - numOfElem += 1; - pAvgRes->count += 1; - pAvgRes->sum.isum += plist[i]; + break; } - break; - } - case TSDB_DATA_TYPE_INT: { - int32_t* plist = (int32_t*)pCol->pData; - for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { - if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { - continue; + case TSDB_DATA_TYPE_BIGINT: { + int64_t* plist = (int64_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.isum += plist[i]; } - - numOfElem += 1; - pAvgRes->count += 1; - pAvgRes->sum.isum += plist[i]; + break; } - break; - } + case TSDB_DATA_TYPE_FLOAT: { + float* plist = (float*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } - case TSDB_DATA_TYPE_BIGINT: { - int64_t* plist = (int64_t*)pCol->pData; - for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { - if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { - continue; + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.dsum += plist[i]; } - - numOfElem += 1; - pAvgRes->count += 1; - pAvgRes->sum.isum += plist[i]; + break; } - break; - } - case TSDB_DATA_TYPE_FLOAT: { - float* plist = (float*)pCol->pData; - for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { - if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { - continue; + case TSDB_DATA_TYPE_DOUBLE: { + double* plist = (double*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.dsum += plist[i]; } - - numOfElem += 1; - pAvgRes->count += 1; - pAvgRes->sum.dsum += plist[i]; + break; } - break; + + default: + break; } - - case TSDB_DATA_TYPE_DOUBLE: { - double* plist = (double*)pCol->pData; - for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { - if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { - continue; - } - - numOfElem += 1; - pAvgRes->count += 1; - pAvgRes->sum.dsum += plist[i]; - } - break; - } - - default: - break; } _avg_over: From 4113472e89c69c1bd55036b03016346305409351 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 14 Jun 2022 15:00:28 +0800 Subject: [PATCH 2/5] add avg function translate partial/merge --- include/libs/function/functionMgt.h | 2 + source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 53 +++++++++++++++++++++++++ source/libs/function/src/builtinsimpl.c | 2 + 4 files changed, 58 insertions(+) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index b5641f2d07..f324d8fed1 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -142,6 +142,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_FIRST_MERGE, FUNCTION_TYPE_LAST_PARTIAL, FUNCTION_TYPE_LAST_MERGE, + FUNCTION_TYPE_AVG_PARTIAL, + FUNCTION_TYPE_AVG_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 7db1396603..2f31d57907 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -58,6 +58,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx); int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t avgInvertFunction(SqlFunctionCtx* pCtx); int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); +int32_t getAvgInfoSize(); bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 1c3f274ee9..4fd0db4819 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -155,6 +155,35 @@ static int32_t translateSum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return TSDB_CODE_SUCCESS; } +static int32_t translateAvgPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (!IS_NUMERIC_TYPE(paraType) && !IS_NULL_TYPE(paraType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = getAvgInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateAvgMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (TSDB_DATA_TYPE_BINARY != paraType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; + + return TSDB_CODE_SUCCESS; +} + static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // pseudo column do not need to check parameters pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; @@ -1518,6 +1547,30 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .invertFunc = avgInvertFunction, .combineFunc = avgCombine, }, + { + .name = "_avg_partial", + .type = FUNCTION_TYPE_AVG_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateInNumOutDou, + .getEnvFunc = getAvgFuncEnv, + .initFunc = avgFunctionSetup, + .processFunc = avgFunction, + .finalizeFunc = avgFinalize, + .invertFunc = avgInvertFunction, + .combineFunc = avgCombine, + }, + { + .name = "_avg_merge", + .type = FUNCTION_TYPE_AVG_MERGE, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateInNumOutDou, + .getEnvFunc = getAvgFuncEnv, + .initFunc = avgFunctionSetup, + .processFunc = avgFunction, + .finalizeFunc = avgFinalize, + .invertFunc = avgInvertFunction, + .combineFunc = avgCombine, + }, { .name = "percentile", .type = FUNCTION_TYPE_PERCENTILE, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3510a59254..ee47b1aa7f 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -624,6 +624,8 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } +int32_t getAvgInfoSize() { return (int32_t)sizeof(SAvgRes); } + bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SAvgRes); return true; From e82289542df8f1cb0c546b3e8db229f0efedaf89 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 14 Jun 2022 15:15:37 +0800 Subject: [PATCH 3/5] avg function add partial/merge --- source/libs/function/inc/builtinsimpl.h | 2 + source/libs/function/src/builtinsimpl.c | 52 +++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 2f31d57907..1bb30934c6 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -55,7 +55,9 @@ int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t avgFunction(SqlFunctionCtx* pCtx); +int32_t avgFunctionMerge(SqlFunctionCtx* pCtx); int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t avgInvertFunction(SqlFunctionCtx* pCtx); int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getAvgInfoSize(); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index ee47b1aa7f..fe9b4e1c3a 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -51,6 +51,7 @@ typedef struct SAvgRes { double result; SSumRes sum; int64_t count; + int16_t type; // store the original input type, used in merge function } SAvgRes; typedef struct STuplePos { @@ -649,6 +650,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { int32_t type = pInput->pData[0]->info.type; SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + pAvgRes->type = type; // computing based on the true data block SColumnInfoData* pCol = pInput->pData[0]; @@ -771,6 +773,38 @@ _avg_over: return TSDB_CODE_SUCCESS; } +static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) { + int16_t type = pInput->type; + if (IS_INTEGER_TYPE(type)) { + pOutput->sum.isum += pInput->sum.isum; + } else { + pOutput->sum.dsum += pInput->sum.dsum; + } + + pOutput->count += pInput->count; + + return; +} + +int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + SAvgRes* pInputInfo; + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + pInputInfo = (SAvgRes*)varDataVal(data); + + avgTransferInfo(pInputInfo, pInfo); + + SET_VAL(GET_RES_INFO(pCtx), 1, 1); + + return TSDB_CODE_SUCCESS; +} + #define LIST_AVG_N(sumT, T) \ do { \ T* plist = (T*)pCol->pData; \ @@ -872,6 +906,24 @@ int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getAvgInfoSize(); + char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pInfo, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return pResInfo->numOfRes; +} + EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) { return FUNC_DATA_REQUIRED_STATIS_LOAD; } From 4ccbc4d85fe1caddfe2c171223afbb7fb06f69b0 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 14 Jun 2022 15:34:55 +0800 Subject: [PATCH 4/5] enable avg function partial/merge --- source/libs/function/src/builtins.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4fd0db4819..4dae296f05 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1546,16 +1546,18 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = avgFinalize, .invertFunc = avgInvertFunction, .combineFunc = avgCombine, + .pPartialFunc = "_avg_partial", + .pMergeFunc = "_avg_merge" }, { .name = "_avg_partial", .type = FUNCTION_TYPE_AVG_PARTIAL, .classification = FUNC_MGT_AGG_FUNC, - .translateFunc = translateInNumOutDou, + .translateFunc = translateAvgPartial, .getEnvFunc = getAvgFuncEnv, .initFunc = avgFunctionSetup, .processFunc = avgFunction, - .finalizeFunc = avgFinalize, + .finalizeFunc = avgPartialFinalize, .invertFunc = avgInvertFunction, .combineFunc = avgCombine, }, @@ -1563,10 +1565,10 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_avg_merge", .type = FUNCTION_TYPE_AVG_MERGE, .classification = FUNC_MGT_AGG_FUNC, - .translateFunc = translateInNumOutDou, + .translateFunc = translateAvgMerge, .getEnvFunc = getAvgFuncEnv, .initFunc = avgFunctionSetup, - .processFunc = avgFunction, + .processFunc = avgFunctionMerge, .finalizeFunc = avgFinalize, .invertFunc = avgInvertFunction, .combineFunc = avgCombine, From a17c6e3c33b895865e120ba146b86c7c208f20ae Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 14 Jun 2022 15:47:56 +0800 Subject: [PATCH 5/5] fix bugs --- source/libs/function/src/builtinsimpl.c | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fe9b4e1c3a..5ee9dbdee6 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -774,8 +774,8 @@ _avg_over: } static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) { - int16_t type = pInput->type; - if (IS_INTEGER_TYPE(type)) { + pOutput->type = pInput->type; + if (IS_INTEGER_TYPE(pOutput->type)) { pOutput->sum.isum += pInput->sum.isum; } else { pOutput->sum.dsum += pInput->sum.dsum; @@ -792,11 +792,10 @@ int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) { ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - SAvgRes* pInputInfo; int32_t start = pInput->startRowIndex; char* data = colDataGetData(pCol, start); - pInputInfo = (SAvgRes*)varDataVal(data); + SAvgRes* pInputInfo = (SAvgRes*)varDataVal(data); avgTransferInfo(pInputInfo, pInfo); @@ -889,8 +888,8 @@ int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SInputColumnInfoData* pInput = &pCtx->input; - int32_t type = pInput->pData[0]->info.type; SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t type = pAvgRes->type; if (IS_INTEGER_TYPE(type)) { pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count); @@ -3292,11 +3291,10 @@ int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx) { ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - SSpreadInfo* pInputInfo; int32_t start = pInput->startRowIndex; char* data = colDataGetData(pCol, start); - pInputInfo = (SSpreadInfo*)varDataVal(data); + SSpreadInfo* pInputInfo = (SSpreadInfo*)varDataVal(data); spreadTransferInfo(pInputInfo, pInfo);