From b8a4878b0005c6aee9baa61724abfe038d9a1daf Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 15 Jul 2022 14:34:40 +0800 Subject: [PATCH] fix(query): add leastsquares function scalar version --- include/libs/scalar/scalar.h | 1 + source/libs/function/src/builtins.c | 1 + source/libs/scalar/src/sclfunc.c | 168 ++++++++++++++++++++++++++++ 3 files changed, 170 insertions(+) diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index 7c23184d93..d5e9a2e625 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -103,6 +103,7 @@ int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * int32_t maxScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t avgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t stddevScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t leastSQRScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); #ifdef __cplusplus } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 747b1c8d9f..6fb808a2e8 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1981,6 +1981,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getLeastSQRFuncEnv, .initFunc = leastSQRFunctionSetup, .processFunc = leastSQRFunction, + .sprocessFunc = leastSQRScalarFunction, .finalizeFunc = leastSQRFinalize, .invertFunc = NULL, .combineFunc = leastSQRCombine, diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index c64b1d79ba..ba35356a9c 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -2139,3 +2139,171 @@ int32_t stddevScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara return TSDB_CODE_SUCCESS; } +#define LEASTSQR_CAL(p, x, y, index, step) \ + do { \ + (p)[0][0] += (double)(x) * (x); \ + (p)[0][1] += (double)(x); \ + (p)[0][2] += (double)(x) * (y)[index]; \ + (p)[1][2] += (y)[index]; \ + (x) += step; \ + } while (0) + +int32_t leastSQRScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + + double startVal, stepVal; + double matrix[2][3] = {0}; + GET_TYPED_DATA(startVal, double, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData); + GET_TYPED_DATA(stepVal, double, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData); + + int32_t type = GET_PARAM_TYPE(pInput); + int64_t count = 0; + + switch(type) { + case TSDB_DATA_TYPE_TINYINT: { + int8_t *in = (int8_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t *in = (int16_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_INT: { + int32_t *in = (int32_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_BIGINT: { + int64_t *in = (int64_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_UTINYINT: { + uint8_t *in = (uint8_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_USMALLINT: { + uint16_t *in = (uint16_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_UINT: { + uint32_t *in = (uint32_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_UBIGINT: { + uint64_t *in = (uint64_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_FLOAT: { + float *in = (float *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + double *in = (double *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + } + + if (count == 0) { + colDataAppendNULL(pOutputData, 0); + } else { + matrix[1][1] = (double)count; + matrix[1][0] = matrix[0][1]; + + double matrix00 = matrix[0][0] - matrix[1][0] * (matrix[0][1] / matrix[1][1]); + double matrix02 = matrix[0][2] - matrix[1][2] * (matrix[0][1] / matrix[1][1]); + double matrix12 = matrix[1][2] - matrix02 * (matrix[1][0] / matrix00); + matrix02 /= matrix00; + + matrix12 /= matrix[1][1]; + + char buf[64] = {0}; + size_t len = + snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%.6lf, intercept:%.6lf}", matrix02, matrix12); + varDataSetLen(buf, len); + colDataAppend(pOutputData, 0, buf, false); + + } + + pOutput->numOfRows = 1; + return TSDB_CODE_SUCCESS; +}