Merge pull request #12376 from taosdata/feature/3.0_glzhao
feat(query): add csum function
This commit is contained in:
commit
aba7e4a6f6
|
@ -95,6 +95,9 @@ bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
|||
int32_t stateCountFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t stateDurationFunction(SqlFunctionCtx* pCtx);
|
||||
|
||||
bool getCsumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
int32_t csumFunction(SqlFunctionCtx* pCtx);
|
||||
|
||||
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -308,6 +308,37 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The input parameter of CSUM function can only be column");
|
||||
}
|
||||
|
||||
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
uint8_t resType;
|
||||
if (!IS_NUMERIC_TYPE(colType)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
} else {
|
||||
if (IS_SIGNED_NUMERIC_TYPE(colType)) {
|
||||
resType = TSDB_DATA_TYPE_BIGINT;
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(colType)) {
|
||||
resType = TSDB_DATA_TYPE_UBIGINT;
|
||||
} else if (IS_FLOAT_TYPE(colType)) {
|
||||
resType = TSDB_DATA_TYPE_DOUBLE;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
// todo
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -742,6 +773,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.processFunc = stateDurationFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "csum",
|
||||
.type = FUNCTION_TYPE_CSUM,
|
||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.translateFunc = translateCsum,
|
||||
.getEnvFunc = getCsumFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = csumFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "abs",
|
||||
.type = FUNCTION_TYPE_ABS,
|
||||
|
|
|
@ -2818,7 +2818,6 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) {
|
|||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||
|
||||
int32_t numOfElems = 0;
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
@ -2856,7 +2855,6 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
|
|||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||
|
||||
int32_t numOfElems = 0;
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
@ -2896,3 +2894,58 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
return numOfElems;
|
||||
}
|
||||
|
||||
bool getCsumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SSumRes);
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t csumFunction(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
||||
int32_t numOfElems = 0;
|
||||
int32_t type = pInputCol->info.type;
|
||||
int32_t startOffset = pCtx->offset;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
int32_t pos = startOffset + numOfElems;
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
//colDataAppendNULL(pOutput, i);
|
||||
continue;
|
||||
}
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
int64_t v;
|
||||
GET_TYPED_DATA(v, int64_t, type, data);
|
||||
pSumRes->isum += v;
|
||||
colDataAppend(pOutput, pos, (char *)&pSumRes->isum, false);
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
uint64_t v;
|
||||
GET_TYPED_DATA(v, uint64_t, type, data);
|
||||
pSumRes->usum += v;
|
||||
colDataAppend(pOutput, pos, (char *)&pSumRes->usum, false);
|
||||
} else if (IS_FLOAT_TYPE(type)) {
|
||||
double v;
|
||||
GET_TYPED_DATA(v, double, type, data);
|
||||
pSumRes->dsum += v;
|
||||
colDataAppend(pOutput, pos, (char *)&pSumRes->dsum, false);
|
||||
}
|
||||
|
||||
//TODO: remove this after pTsOutput is handled
|
||||
if (pTsOutput != NULL) {
|
||||
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
}
|
||||
|
||||
return numOfElems;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue