feat(query): add leastsquares function
This commit is contained in:
parent
b3df34ef08
commit
79570c93a3
|
@ -31,13 +31,13 @@ typedef enum EFunctionType {
|
|||
FUNCTION_TYPE_ELAPSED,
|
||||
FUNCTION_TYPE_IRATE,
|
||||
FUNCTION_TYPE_LAST_ROW,
|
||||
FUNCTION_TYPE_LEASTSQUARES,
|
||||
FUNCTION_TYPE_MAX,
|
||||
FUNCTION_TYPE_MIN,
|
||||
FUNCTION_TYPE_MODE,
|
||||
FUNCTION_TYPE_PERCENTILE,
|
||||
FUNCTION_TYPE_SPREAD,
|
||||
FUNCTION_TYPE_STDDEV,
|
||||
FUNCTION_TYPE_LEASTSQUARES,
|
||||
FUNCTION_TYPE_SUM,
|
||||
FUNCTION_TYPE_TWA,
|
||||
FUNCTION_TYPE_HISTOGRAM,
|
||||
|
|
|
@ -55,6 +55,12 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx);
|
|||
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t stddevInvertFunction(SqlFunctionCtx* pCtx);
|
||||
|
||||
bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool leastSQRFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t leastSQRFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx);
|
||||
|
||||
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t percentileFunction(SqlFunctionCtx *pCtx);
|
||||
|
|
|
@ -225,6 +225,23 @@ static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateLeastSQR(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||
if (3 != numOfParams) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfParams; ++i) {
|
||||
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type;
|
||||
if (!IS_NUMERIC_TYPE(colType)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
}
|
||||
|
||||
pFunc->node.resType = (SDataType) { .bytes = 64, .type = TSDB_DATA_TYPE_BINARY };
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (4 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
@ -535,6 +552,17 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.finalizeFunc = stddevFinalize,
|
||||
.invertFunc = stddevInvertFunction
|
||||
},
|
||||
{
|
||||
.name = "leastsquares",
|
||||
.type = FUNCTION_TYPE_LEASTSQUARES,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.translateFunc = translateLeastSQR,
|
||||
.getEnvFunc = getLeastSQRFuncEnv,
|
||||
.initFunc = leastSQRFunctionSetup,
|
||||
.processFunc = leastSQRFunction,
|
||||
.finalizeFunc = leastSQRFinalize,
|
||||
.invertFunc = leastSQRInvertFunction
|
||||
},
|
||||
{
|
||||
.name = "avg",
|
||||
.type = FUNCTION_TYPE_AVG,
|
||||
|
|
|
@ -63,6 +63,13 @@ typedef struct SStddevRes {
|
|||
};
|
||||
} SStddevRes;
|
||||
|
||||
typedef struct SLeastSQRInfo {
|
||||
double matrix[2][3];
|
||||
double startVal;
|
||||
double stepVal;
|
||||
int64_t num;
|
||||
} SLeastSQRInfo;
|
||||
|
||||
typedef struct SPercentileInfo {
|
||||
double result;
|
||||
tMemBucket* pMemBucket;
|
||||
|
@ -1112,6 +1119,179 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return functionFinalize(pCtx, pBlock);
|
||||
}
|
||||
|
||||
bool getLeastSQRFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SLeastSQRInfo);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
if (!functionSetup(pCtx, pResultInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SLeastSQRInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||
|
||||
pInfo->startVal = pCtx->param[1].param.d;
|
||||
pInfo->stepVal = pCtx->param[2].param.d;
|
||||
return true;
|
||||
}
|
||||
|
||||
#define LEASTSQR_CAL(p, x, y, index, step) \
|
||||
do { \
|
||||
(p)[0][0] += (double)(x) * (x); \
|
||||
(p)[0][1] += (double)(x); \
|
||||
(p)[0][2] += (double)(x) * (y)[index]; \
|
||||
(p)[1][2] += (y)[index]; \
|
||||
(x) += step; \
|
||||
} while (0)
|
||||
|
||||
int32_t leastSQRFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElem = 0;
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
|
||||
SLeastSQRInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
|
||||
double(*param)[3] = pInfo->matrix;
|
||||
double x = pInfo->startVal;
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
int32_t numOfRows = pInput->numOfRows;
|
||||
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t* plist = (int8_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
numOfElem++;
|
||||
LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
int16_t* plist = (int16_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem++;
|
||||
LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
int32_t* plist = (int32_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem++;
|
||||
LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
int64_t* plist = (int64_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem++;
|
||||
LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
float* plist = (float*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem++;
|
||||
LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
double* plist = (double*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem++;
|
||||
LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
pInfo->startVal = x;
|
||||
pInfo->num += numOfElem;
|
||||
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SLeastSQRInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
int32_t currentRow = pBlock->info.rows;
|
||||
|
||||
if (pInfo->num = 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
double(*param)[3] = pInfo->matrix;
|
||||
|
||||
param[1][1] = (double)pInfo->num;
|
||||
param[1][0] = param[0][1];
|
||||
|
||||
param[0][0] -= param[1][0] * (param[0][1] / param[1][1]);
|
||||
param[0][2] -= param[1][2] * (param[0][1] / param[1][1]);
|
||||
param[0][1] = 0;
|
||||
param[1][2] -= param[0][2] * (param[1][0] / param[0][0]);
|
||||
param[1][0] = 0;
|
||||
param[0][2] /= param[0][0];
|
||||
|
||||
param[1][2] /= param[1][1];
|
||||
|
||||
char buf[64] = {0};
|
||||
size_t len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%.6lf, intercept:%.6lf}", param[0][2], param[1][2]);
|
||||
varDataSetLen(buf, len);
|
||||
|
||||
colDataAppend(pCol, currentRow, buf, false);
|
||||
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx) {
|
||||
//TODO
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SPercentileInfo);
|
||||
return true;
|
||||
|
@ -2184,10 +2364,10 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
int32_t len;
|
||||
char buf[512] = {0};
|
||||
if (!pInfo->normalized) {
|
||||
len = sprintf(buf + VARSTR_HEADER_SIZE, "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%"PRId64"}",
|
||||
len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%"PRId64"}",
|
||||
pInfo->bins[i].lower, pInfo->bins[i].upper, pInfo->bins[i].count);
|
||||
} else {
|
||||
len = sprintf(buf + VARSTR_HEADER_SIZE, "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}",
|
||||
len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}",
|
||||
pInfo->bins[i].lower, pInfo->bins[i].upper, pInfo->bins[i].percentage);
|
||||
}
|
||||
varDataSetLen(buf, len);
|
||||
|
|
Loading…
Reference in New Issue