From 04e9996db61cde39fdc21c147e9dac8fdf84cb35 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 18 Jun 2022 20:52:42 +0800 Subject: [PATCH] feat: the function manager adds an interface for manually configuring the parameters of the merge function --- source/libs/function/inc/builtins.h | 30 +++--- source/libs/function/inc/functionMgtInt.h | 2 - source/libs/function/src/builtins.c | 11 ++- source/libs/function/src/builtinsimpl.c | 114 ++++++++++------------ source/libs/function/src/functionMgt.c | 50 +++++++--- 5 files changed, 117 insertions(+), 90 deletions(-) diff --git a/source/libs/function/inc/builtins.h b/source/libs/function/inc/builtins.h index bc91875006..256500ff8c 100644 --- a/source/libs/function/inc/builtins.h +++ b/source/libs/function/inc/builtins.h @@ -24,22 +24,24 @@ extern "C" { typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t len); typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow); +typedef int32_t (*FCreateMergeFuncParameters)(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters); typedef struct SBuiltinFuncDefinition { - const char* name; - EFunctionType type; - uint64_t classification; - FTranslateFunc translateFunc; - FFuncDataRequired dataRequiredFunc; - FExecGetEnv getEnvFunc; - FExecInit initFunc; - FExecProcess processFunc; - FScalarExecProcess sprocessFunc; - FExecFinalize finalizeFunc; - FExecProcess invertFunc; - FExecCombine combineFunc; - const char* pPartialFunc; - const char* pMergeFunc; + const char* name; + EFunctionType type; + uint64_t classification; + FTranslateFunc translateFunc; + FFuncDataRequired dataRequiredFunc; + FExecGetEnv getEnvFunc; + FExecInit initFunc; + FExecProcess processFunc; + FScalarExecProcess sprocessFunc; + FExecFinalize finalizeFunc; + FExecProcess invertFunc; + FExecCombine combineFunc; + const char* pPartialFunc; + const char* pMergeFunc; + FCreateMergeFuncParameters createMergeParaFuc; } SBuiltinFuncDefinition; extern const SBuiltinFuncDefinition funcMgtBuiltins[]; diff --git a/source/libs/function/inc/functionMgtInt.h b/source/libs/function/inc/functionMgtInt.h index c02c09e18a..44e1e6f7b1 100644 --- a/source/libs/function/inc/functionMgtInt.h +++ b/source/libs/function/inc/functionMgtInt.h @@ -51,8 +51,6 @@ extern "C" { #define FUNC_UDF_ID_START 5000 -extern const int funcMgtUdfNum; - #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index d2985a33a8..b5904b6b81 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -419,6 +419,14 @@ static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return TSDB_CODE_SUCCESS; } +int32_t topCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) { + int32_t code = nodesListMakeAppend(pParameters, pPartialRes); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListStrictAppend(*pParameters, nodesCloneNode(nodesListGetNode(pRawParameters, 1))); + } + return TSDB_CODE_SUCCESS; +} + static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); @@ -1728,7 +1736,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = topBotFinalize, .combineFunc = topCombine, .pPartialFunc = "_top_partial", - .pMergeFunc = "_top_merge" + .pMergeFunc = "_top_merge", + .createMergeParaFuc = topCreateMergePara }, { .name = "_top_partial", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 55900c7c7e..583e8bd300 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -794,8 +794,8 @@ int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) { SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t start = pInput->startRowIndex; - char* data = colDataGetData(pCol, start); + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); SAvgRes* pInputInfo = (SAvgRes*)varDataVal(data); avgTransferInfo(pInputInfo, pInfo); @@ -908,9 +908,9 @@ int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t resultBytes = getAvgInfoSize(); - char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getAvgInfoSize(); + char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); memcpy(varDataVal(res), pInfo, resultBytes); varDataSetLen(res, resultBytes); @@ -1418,7 +1418,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) { for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; - int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; + int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); colDataAppendNULL(pDstCol, rowIndex); @@ -1663,8 +1663,8 @@ int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) { SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t start = pInput->startRowIndex; - char* data = colDataGetData(pCol, start); + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); SStddevRes* pInputInfo = (SStddevRes*)varDataVal(data); stddevTransferInfo(pInputInfo, pInfo); @@ -1756,9 +1756,9 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t stddevPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t resultBytes = getStddevInfoSize(); - char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getStddevInfoSize(); + char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); memcpy(varDataVal(res), pInfo, resultBytes); varDataSetLen(res, resultBytes); @@ -2360,12 +2360,10 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) return TSDB_CODE_SUCCESS; } -int32_t getFirstLastInfoSize(int32_t resBytes) { - return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t); -} +int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t); } bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - SColumnNode* pNode = (SColumnNode *)nodesListGetNode(pFunc->pParameterList, 0); + SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); pEnv->calcMemSize = sizeof(SFirstLastRes) + pNode->node.resType.bytes + sizeof(int64_t); return true; } @@ -2390,14 +2388,14 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SFirstLastRes *pInfo = GET_ROWCELL_INTERBUF(pResInfo); + SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - int32_t type = pInputCol->info.type; + int32_t type = pInputCol->info.type; int32_t bytes = pInputCol->info.bytes; - pInfo->bytes = bytes; + pInfo->bytes = bytes; // All null data column, return directly. if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { @@ -2488,14 +2486,14 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SFirstLastRes *pInfo = GET_ROWCELL_INTERBUF(pResInfo); + SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - int32_t type = pInputCol->info.type; + int32_t type = pInputCol->info.type; int32_t bytes = pInputCol->info.bytes; - pInfo->bytes = bytes; + pInfo->bytes = bytes; // All null data column, return directly. if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { @@ -2567,8 +2565,8 @@ static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput, return; } pOutput->bytes = pInput->bytes; - TSKEY *tsIn = (TSKEY*)(pInput->buf + pInput->bytes); - TSKEY *tsOut = (TSKEY*)(pOutput->buf + pInput->bytes); + TSKEY* tsIn = (TSKEY*)(pInput->buf + pInput->bytes); + TSKEY* tsOut = (TSKEY*)(pOutput->buf + pInput->bytes); if (pOutput->hasResult) { if (isFirst) { if (*tsIn > *tsOut) { @@ -2586,16 +2584,16 @@ static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput, return; } -static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx *pCtx, bool isFirstQuery) { +static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) { SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pCol = pInput->pData[0]; + SColumnInfoData* pCol = pInput->pData[0]; ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t start = pInput->startRowIndex; - char* data = colDataGetData(pCol, start); - SFirstLastRes* pInputInfo = (SFirstLastRes *)varDataVal(data); + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data); firstLastTransferInfo(pInputInfo, pInfo, isFirstQuery); @@ -2604,13 +2602,9 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx *pCtx, bool isFirstQuer return TSDB_CODE_SUCCESS; } -int32_t firstFunctionMerge(SqlFunctionCtx *pCtx) { - return firstLastFunctionMergeImpl(pCtx, true); -} +int32_t firstFunctionMerge(SqlFunctionCtx* pCtx) { return firstLastFunctionMergeImpl(pCtx, true); } -int32_t lastFunctionMerge(SqlFunctionCtx *pCtx) { - return firstLastFunctionMergeImpl(pCtx, false); -} +int32_t lastFunctionMerge(SqlFunctionCtx* pCtx) { return firstLastFunctionMergeImpl(pCtx, false); } int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t slotId = pCtx->pExpr->base.resSchema.slotId; @@ -2627,9 +2621,9 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t resultBytes = getFirstLastInfoSize(pRes->bytes); - char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getFirstLastInfoSize(pRes->bytes); + char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); memcpy(varDataVal(res), pRes, resultBytes); varDataSetLen(res, resultBytes); @@ -3369,8 +3363,8 @@ int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx) { SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t start = pInput->startRowIndex; - char* data = colDataGetData(pCol, start); + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); SSpreadInfo* pInputInfo = (SSpreadInfo*)varDataVal(data); spreadTransferInfo(pInputInfo, pInfo); @@ -3390,9 +3384,9 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t resultBytes = getSpreadInfoSize(); - char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getSpreadInfoSize(); + char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); memcpy(varDataVal(res), pInfo, resultBytes); varDataSetLen(res, resultBytes); @@ -3484,8 +3478,8 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pCol = pInput->pData[0]; - int32_t start = pInput->startRowIndex; - TSKEY* ptsList = (int64_t*)colDataGetData(pCol, 0); + int32_t start = pInput->startRowIndex; + TSKEY* ptsList = (int64_t*)colDataGetData(pCol, 0); if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->start.key == INT64_MIN) { pInfo->max = @@ -5176,11 +5170,11 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { } typedef struct SDerivInfo { - double prevValue; // previous value - TSKEY prevTs; // previous timestamp - bool ignoreNegative;// ignore the negative value - int64_t tsWindow; // time window for derivative - bool valueSet; // the value has been set already + double prevValue; // previous value + TSKEY prevTs; // previous timestamp + bool ignoreNegative; // ignore the negative value + int64_t tsWindow; // time window for derivative + bool valueSet; // the value has been set already } SDerivInfo; bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { @@ -5188,7 +5182,7 @@ bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool derivativeFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) { +bool derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { if (!functionSetup(pCtx, pResInfo)) { return false; // not initialized since it has been initialized } @@ -5196,25 +5190,25 @@ bool derivativeFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) { SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo); pDerivInfo->ignoreNegative = pCtx->param[2].param.i; - pDerivInfo->prevTs = -1; + pDerivInfo->prevTs = -1; pDerivInfo->tsWindow = pCtx->param[1].param.i; pDerivInfo->valueSet = false; return true; } -int32_t derivativeFunction(SqlFunctionCtx *pCtx) { - SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo); +int32_t derivativeFunction(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pInputCol = pInput->pData[0]; + SColumnInfoData* pInputCol = pInput->pData[0]; - int32_t numOfElems = 0; + int32_t numOfElems = 0; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; SColumnInfoData* pTsOutput = pCtx->pTsOutput; int32_t i = pInput->startRowIndex; - TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; double v = 0; @@ -5224,7 +5218,7 @@ int32_t derivativeFunction(SqlFunctionCtx *pCtx) { continue; } - char* d = (char*)pInputCol->pData + pInputCol->info.bytes * i; + char* d = (char*)pInputCol->pData + pInputCol->info.bytes * i; GET_TYPED_DATA(v, double, pInputCol->info.type, d); int32_t pos = pCtx->offset + numOfElems; @@ -5251,7 +5245,7 @@ int32_t derivativeFunction(SqlFunctionCtx *pCtx) { continue; } - char* d = (char*)pInputCol->pData + pInputCol->info.bytes * i; + char* d = (char*)pInputCol->pData + pInputCol->info.bytes * i; GET_TYPED_DATA(v, double, pInputCol->info.type, d); int32_t pos = pCtx->offset + numOfElems; @@ -5277,7 +5271,7 @@ int32_t derivativeFunction(SqlFunctionCtx *pCtx) { return numOfElems; } -int32_t interpFunction(SqlFunctionCtx *pCtx) { +int32_t interpFunction(SqlFunctionCtx* pCtx) { #if 0 int32_t fillType = (int32_t) pCtx->param[2].i64; //bool ascQuery = (pCtx->order == TSDB_ORDER_ASC); diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 7dbe456d4a..afbb63114d 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -261,14 +261,14 @@ static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterLis return pFunc; } -static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) { +static SNode* createColumnByFunc(const SFunctionNode* pFunc) { SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { return NULL; } strcpy(pCol->colName, pFunc->node.aliasName); pCol->node.resType = pFunc->node.resType; - return pCol; + return (SNode*)pCol; } bool fmIsDistExecFunc(int32_t funcId) { @@ -296,21 +296,45 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod return TSDB_CODE_SUCCESS; } +static int32_t createMergeFuncPara(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc, + SNodeList** pParameterList) { + SNode* pRes = createColumnByFunc(pPartialFunc); + if (NULL != funcMgtBuiltins[pSrcFunc->funcId].createMergeParaFuc) { + return funcMgtBuiltins[pSrcFunc->funcId].createMergeParaFuc(pSrcFunc->pParameterList, pRes, pParameterList); + } else { + return nodesListMakeStrictAppend(pParameterList, pRes); + } +} + static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc, SFunctionNode** pMergeFunc) { - SNodeList* pParameterList = NULL; - nodesListMakeStrictAppend(&pParameterList, (SNode*)createColumnByFunc(pPartialFunc)); - *pMergeFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList); - if (NULL == *pMergeFunc) { + SNodeList* pParameterList = NULL; + SFunctionNode* pFunc = NULL; + + int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList); + if (TSDB_CODE_SUCCESS == code) { + pFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList); + if (NULL == pFunc) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + // overwrite function restype set by translate function + if (fmIsSameInOutType(pSrcFunc->funcId)) { + (*pMergeFunc)->node.resType = pSrcFunc->node.resType; + } + strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName); + } + + if (TSDB_CODE_SUCCESS == code) { + *pMergeFunc = pFunc; + } else { + pFunc->pParameterList = NULL; + nodesDestroyNode((SNode*)pFunc); nodesDestroyList(pParameterList); - return TSDB_CODE_OUT_OF_MEMORY; } - // overwrite function restype set by translate function - if (fmIsSameInOutType(pSrcFunc->funcId)) { - (*pMergeFunc)->node.resType = pSrcFunc->node.resType; - } - strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName); - return TSDB_CODE_SUCCESS; + + return code; } int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc) {