diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 2f31d57907..1bb30934c6 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -55,7 +55,9 @@ int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t avgFunction(SqlFunctionCtx* pCtx); +int32_t avgFunctionMerge(SqlFunctionCtx* pCtx); int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t avgInvertFunction(SqlFunctionCtx* pCtx); int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getAvgInfoSize(); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index ee47b1aa7f..fe9b4e1c3a 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -51,6 +51,7 @@ typedef struct SAvgRes { double result; SSumRes sum; int64_t count; + int16_t type; // store the original input type, used in merge function } SAvgRes; typedef struct STuplePos { @@ -649,6 +650,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { int32_t type = pInput->pData[0]->info.type; SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + pAvgRes->type = type; // computing based on the true data block SColumnInfoData* pCol = pInput->pData[0]; @@ -771,6 +773,38 @@ _avg_over: return TSDB_CODE_SUCCESS; } +static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) { + int16_t type = pInput->type; + if (IS_INTEGER_TYPE(type)) { + pOutput->sum.isum += pInput->sum.isum; + } else { + pOutput->sum.dsum += pInput->sum.dsum; + } + + pOutput->count += pInput->count; + + return; +} + +int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + SAvgRes* pInputInfo; + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + pInputInfo = (SAvgRes*)varDataVal(data); + + avgTransferInfo(pInputInfo, pInfo); + + SET_VAL(GET_RES_INFO(pCtx), 1, 1); + + return TSDB_CODE_SUCCESS; +} + #define LIST_AVG_N(sumT, T) \ do { \ T* plist = (T*)pCol->pData; \ @@ -872,6 +906,24 @@ int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getAvgInfoSize(); + char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pInfo, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return pResInfo->numOfRes; +} + EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) { return FUNC_DATA_REQUIRED_STATIS_LOAD; }