Merge pull request #13820 from taosdata/enh/avg_stddev_split
enh(query): enable stddev function distributed splitting
This commit is contained in:
commit
2830d238ae
|
@ -144,6 +144,8 @@ typedef enum EFunctionType {
|
||||||
FUNCTION_TYPE_LAST_MERGE,
|
FUNCTION_TYPE_LAST_MERGE,
|
||||||
FUNCTION_TYPE_AVG_PARTIAL,
|
FUNCTION_TYPE_AVG_PARTIAL,
|
||||||
FUNCTION_TYPE_AVG_MERGE,
|
FUNCTION_TYPE_AVG_MERGE,
|
||||||
|
FUNCTION_TYPE_STDDEV_PARTIAL,
|
||||||
|
FUNCTION_TYPE_STDDEV_MERGE,
|
||||||
|
|
||||||
// user defined funcion
|
// user defined funcion
|
||||||
FUNCTION_TYPE_UDF = 10000
|
FUNCTION_TYPE_UDF = 10000
|
||||||
|
|
|
@ -65,9 +65,12 @@ int32_t getAvgInfoSize();
|
||||||
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t stddevFunction(SqlFunctionCtx* pCtx);
|
int32_t stddevFunction(SqlFunctionCtx* pCtx);
|
||||||
|
int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx);
|
||||||
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
int32_t stddevPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
int32_t stddevInvertFunction(SqlFunctionCtx* pCtx);
|
int32_t stddevInvertFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
|
int32_t getStddevInfoSize();
|
||||||
|
|
||||||
bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool leastSQRFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool leastSQRFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
|
|
@ -184,6 +184,35 @@ static int32_t translateAvgMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t le
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateStddevPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
if (!IS_NUMERIC_TYPE(paraType) && !IS_NULL_TYPE(paraType)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = getStddevInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateStddevMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
if (TSDB_DATA_TYPE_BINARY != paraType) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// pseudo column do not need to check parameters
|
// pseudo column do not need to check parameters
|
||||||
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
||||||
|
@ -1522,6 +1551,32 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.finalizeFunc = stddevFinalize,
|
.finalizeFunc = stddevFinalize,
|
||||||
.invertFunc = stddevInvertFunction,
|
.invertFunc = stddevInvertFunction,
|
||||||
.combineFunc = stddevCombine,
|
.combineFunc = stddevCombine,
|
||||||
|
.pPartialFunc = "_stddev_partial",
|
||||||
|
.pMergeFunc = "_stddev_merge"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "_stddev_partial",
|
||||||
|
.type = FUNCTION_TYPE_STDDEV_PARTIAL,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.translateFunc = translateStddevPartial,
|
||||||
|
.getEnvFunc = getStddevFuncEnv,
|
||||||
|
.initFunc = stddevFunctionSetup,
|
||||||
|
.processFunc = stddevFunction,
|
||||||
|
.finalizeFunc = stddevPartialFinalize,
|
||||||
|
.invertFunc = stddevInvertFunction,
|
||||||
|
.combineFunc = stddevCombine,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "_stddev_merge",
|
||||||
|
.type = FUNCTION_TYPE_STDDEV_MERGE,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.translateFunc = translateStddevMerge,
|
||||||
|
.getEnvFunc = getStddevFuncEnv,
|
||||||
|
.initFunc = stddevFunctionSetup,
|
||||||
|
.processFunc = stddevFunctionMerge,
|
||||||
|
.finalizeFunc = stddevFinalize,
|
||||||
|
.invertFunc = stddevInvertFunction,
|
||||||
|
.combineFunc = stddevCombine,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "leastsquares",
|
.name = "leastsquares",
|
||||||
|
|
|
@ -89,6 +89,7 @@ typedef struct SStddevRes {
|
||||||
double dsum;
|
double dsum;
|
||||||
int64_t isum;
|
int64_t isum;
|
||||||
};
|
};
|
||||||
|
int16_t type;
|
||||||
} SStddevRes;
|
} SStddevRes;
|
||||||
|
|
||||||
typedef struct SLeastSQRInfo {
|
typedef struct SLeastSQRInfo {
|
||||||
|
@ -1488,6 +1489,8 @@ int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
return minMaxCombine(pDestCtx, pSourceCtx, 0);
|
return minMaxCombine(pDestCtx, pSourceCtx, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getStddevInfoSize() { return (int32_t)sizeof(SStddevRes); }
|
||||||
|
|
||||||
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SStddevRes);
|
pEnv->calcMemSize = sizeof(SStddevRes);
|
||||||
return true;
|
return true;
|
||||||
|
@ -1511,6 +1514,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t type = pInput->pData[0]->info.type;
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
|
|
||||||
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
pStddevRes->type = type;
|
||||||
|
|
||||||
// computing based on the true data block
|
// computing based on the true data block
|
||||||
SColumnInfoData* pCol = pInput->pData[0];
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
@ -1627,6 +1631,39 @@ _stddev_over:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void stddevTransferInfo(SStddevRes* pInput, SStddevRes* pOutput) {
|
||||||
|
pOutput->type = pInput->type;
|
||||||
|
if (IS_INTEGER_TYPE(pOutput->type)) {
|
||||||
|
pOutput->quadraticISum += pInput->quadraticISum;
|
||||||
|
pOutput->isum += pInput->isum;
|
||||||
|
} else {
|
||||||
|
pOutput->quadraticDSum += pInput->quadraticDSum;
|
||||||
|
pOutput->dsum += pInput->dsum;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOutput->count += pInput->count;
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
|
SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
char* data = colDataGetData(pCol, start);
|
||||||
|
SStddevRes* pInputInfo = (SStddevRes*)varDataVal(data);
|
||||||
|
|
||||||
|
stddevTransferInfo(pInputInfo, pInfo);
|
||||||
|
|
||||||
|
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
#define LIST_STDDEV_SUB_N(sumT, T) \
|
#define LIST_STDDEV_SUB_N(sumT, T) \
|
||||||
do { \
|
do { \
|
||||||
T* plist = (T*)pCol->pData; \
|
T* plist = (T*)pCol->pData; \
|
||||||
|
@ -1692,9 +1729,10 @@ int32_t stddevInvertFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
int32_t type = pInput->pData[0]->info.type;
|
|
||||||
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
int32_t type = pStddevRes->type;
|
||||||
double avg;
|
double avg;
|
||||||
|
|
||||||
if (IS_INTEGER_TYPE(type)) {
|
if (IS_INTEGER_TYPE(type)) {
|
||||||
avg = pStddevRes->isum / ((double)pStddevRes->count);
|
avg = pStddevRes->isum / ((double)pStddevRes->count);
|
||||||
pStddevRes->result = sqrt(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg);
|
pStddevRes->result = sqrt(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg);
|
||||||
|
@ -1706,6 +1744,24 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return functionFinalize(pCtx, pBlock);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t stddevPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
int32_t resultBytes = getStddevInfoSize();
|
||||||
|
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
|
memcpy(varDataVal(res), pInfo, resultBytes);
|
||||||
|
varDataSetLen(res, resultBytes);
|
||||||
|
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||||
|
|
||||||
|
taosMemoryFree(res);
|
||||||
|
return pResInfo->numOfRes;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||||
SStddevRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
SStddevRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||||
|
|
Loading…
Reference in New Issue