Merge pull request #14888 from taosdata/feat/agg_client_api

feat(query): add sum/count/max/min function scalar version
This commit is contained in:
Ganlin Zhao 2022-07-14 19:09:30 +08:00 committed by GitHub
commit 98e542f884
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 292 additions and 34 deletions

View File

@ -143,6 +143,86 @@ typedef struct {
} \
} while (0)
#define SET_TYPED_DATA_MIN(_v, _type) \
do { \
switch (_type) { \
case TSDB_DATA_TYPE_BOOL: \
case TSDB_DATA_TYPE_TINYINT: \
*(int8_t *)(_v) = INT8_MIN; \
break; \
case TSDB_DATA_TYPE_SMALLINT: \
*(int16_t *)(_v) = INT16_MIN; \
break; \
case TSDB_DATA_TYPE_INT: \
*(int32_t *)(_v) = INT32_MIN; \
break; \
case TSDB_DATA_TYPE_BIGINT: \
case TSDB_DATA_TYPE_TIMESTAMP: \
*(int64_t *)(_v) = INT64_MIN; \
break; \
case TSDB_DATA_TYPE_FLOAT: \
*(float *)(_v) = FLT_MIN; \
break; \
case TSDB_DATA_TYPE_DOUBLE: \
*(double *)(_v) = DBL_MIN; \
break; \
case TSDB_DATA_TYPE_UTINYINT: \
*(uint8_t *)(_v) = 0; \
break; \
case TSDB_DATA_TYPE_USMALLINT: \
*(uint16_t *)(_v) = 0; \
break; \
case TSDB_DATA_TYPE_UBIGINT: \
*(uint64_t *)(_v) = 0; \
break; \
case TSDB_DATA_TYPE_UINT: \
*(uint32_t *)(_v) = 0; \
break; \
default: \
break; \
} \
} while (0)
#define SET_TYPED_DATA_MAX(_v, _type) \
do { \
switch (_type) { \
case TSDB_DATA_TYPE_BOOL: \
case TSDB_DATA_TYPE_TINYINT: \
*(int8_t *)(_v) = INT8_MAX; \
break; \
case TSDB_DATA_TYPE_SMALLINT: \
*(int16_t *)(_v) = INT16_MAX; \
break; \
case TSDB_DATA_TYPE_INT: \
*(int32_t *)(_v) = INT32_MAX; \
break; \
case TSDB_DATA_TYPE_BIGINT: \
case TSDB_DATA_TYPE_TIMESTAMP: \
*(int64_t *)(_v) = INT64_MAX; \
break; \
case TSDB_DATA_TYPE_FLOAT: \
*(float *)(_v) = FLT_MAX; \
break; \
case TSDB_DATA_TYPE_DOUBLE: \
*(double *)(_v) = DBL_MAX; \
break; \
case TSDB_DATA_TYPE_UTINYINT: \
*(uint8_t *)(_v) = UINT8_MAX; \
break; \
case TSDB_DATA_TYPE_USMALLINT: \
*(uint16_t *)(_v) = UINT16_MAX; \
break; \
case TSDB_DATA_TYPE_UINT: \
*(uint32_t *)(_v) = UINT32_MAX; \
break; \
case TSDB_DATA_TYPE_UBIGINT: \
*(uint64_t *)(_v) = UINT64_MAX; \
break; \
default: \
break; \
} \
} while (0)
#define NUM_TO_STRING(_inputType, _input, _outputBytes, _output) \
do { \
switch (_inputType) { \

View File

@ -96,6 +96,12 @@ int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
/* Aggregation functions */
int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t sumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t maxScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
#ifdef __cplusplus
}
#endif

View File

@ -1886,6 +1886,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getCountFuncEnv,
.initFunc = functionSetup,
.processFunc = countFunction,
.sprocessFunc = countScalarFunction,
.finalizeFunc = functionFinalize,
.invertFunc = countInvertFunction,
.combineFunc = combineFunction,
@ -1901,6 +1902,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getSumFuncEnv,
.initFunc = functionSetup,
.processFunc = sumFunction,
.sprocessFunc = sumScalarFunction,
.finalizeFunc = functionFinalize,
.invertFunc = sumInvertFunction,
.combineFunc = sumCombine,
@ -1916,6 +1918,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = minmaxFunctionSetup,
.processFunc = minFunction,
.sprocessFunc = minScalarFunction,
.finalizeFunc = minmaxFunctionFinalize,
.combineFunc = minCombine,
.pPartialFunc = "min",
@ -1930,6 +1933,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = minmaxFunctionSetup,
.processFunc = maxFunction,
.sprocessFunc = maxScalarFunction,
.finalizeFunc = minmaxFunctionFinalize,
.combineFunc = maxCombine,
.pPartialFunc = "max",

View File

@ -728,8 +728,9 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
res->translate = true;
if (colDataIsNull_s(output.columnData, 0)) {
res->node.resType.type = TSDB_DATA_TYPE_NULL;
res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
res->isNull = true;
//res->node.resType.type = TSDB_DATA_TYPE_NULL;
//res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
} else {
res->node.resType.type = output.columnData->info.type;
res->node.resType.bytes = output.columnData->info.bytes;

View File

@ -702,6 +702,7 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
return TSDB_CODE_SUCCESS;
}
/** Conversion functions **/
int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
int16_t inputType = GET_PARAM_TYPE(&pInput[0]);
int16_t inputLen = GET_PARAM_BYTES(&pInput[0]);
@ -1164,6 +1165,7 @@ int32_t toJsonFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
return TSDB_CODE_SUCCESS;
}
/** Time functions **/
int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
int32_t type = GET_PARAM_TYPE(&pInput[0]);
@ -1736,3 +1738,168 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO
pOutput->numOfRows += pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
/** Aggregation functions **/
int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
int64_t *out = (int64_t *)pOutputData->pData;
*out = 0;
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_s(pInputData, i)) {
colDataAppendNULL(pOutputData, i);
break;
}
(*out)++;
}
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t sumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
int32_t type = GET_PARAM_TYPE(pInput);
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_s(pInputData, i)) {
colDataAppendNULL(pOutputData, i);
break;
}
if (IS_SIGNED_NUMERIC_TYPE(type)) {
int64_t *in = (int64_t *)pInputData->pData;
int64_t *out = (int64_t *)pOutputData->pData;
*out += in[i];
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
uint64_t *in = (uint64_t *)pInputData->pData;
uint64_t *out = (uint64_t *)pOutputData->pData;
*out += in[i];
} else if (IS_FLOAT_TYPE(type)) {
double *in = (double *)pInputData->pData;
double *out = (double *)pOutputData->pData;
*out += in[i];
}
}
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
static int32_t doMinMaxScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, bool isMinFunc) {
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
int32_t type = GET_PARAM_TYPE(pInput);
if (isMinFunc) {
SET_TYPED_DATA_MAX(pOutputData->pData, type);
} else {
SET_TYPED_DATA_MIN(pOutputData->pData, type);
}
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_s(pInputData, i)) {
colDataAppendNULL(pOutputData, i);
break;
}
switch(type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: {
int8_t *in = (int8_t *)pInputData->pData;
int8_t *out = (int8_t *)pOutputData->pData;
if((in[i] > *out) ^ isMinFunc) {
*out = in[i];
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *in = (int16_t *)pInputData->pData;
int16_t *out = (int16_t *)pOutputData->pData;
if((in[i] > *out) ^ isMinFunc) {
*out = in[i];
}
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t *in = (int32_t *)pInputData->pData;
int32_t *out = (int32_t *)pOutputData->pData;
if((in[i] > *out) ^ isMinFunc) {
*out = in[i];
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t *in = (int64_t *)pInputData->pData;
int64_t *out = (int64_t *)pOutputData->pData;
if((in[i] > *out) ^ isMinFunc) {
*out = in[i];
}
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
uint8_t *in = (uint8_t *)pInputData->pData;
uint8_t *out = (uint8_t *)pOutputData->pData;
if((in[i] > *out) ^ isMinFunc) {
*out = in[i];
}
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
uint16_t *in = (uint16_t *)pInputData->pData;
uint16_t *out = (uint16_t *)pOutputData->pData;
if((in[i] > *out) ^ isMinFunc) {
*out = in[i];
}
break;
}
case TSDB_DATA_TYPE_UINT: {
uint32_t *in = (uint32_t *)pInputData->pData;
uint32_t *out = (uint32_t *)pOutputData->pData;
if((in[i] > *out) ^ isMinFunc) {
*out = in[i];
}
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
uint64_t *in = (uint64_t *)pInputData->pData;
uint64_t *out = (uint64_t *)pOutputData->pData;
if((in[i] > *out) ^ isMinFunc) {
*out = in[i];
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float *in = (float *)pInputData->pData;
float *out = (float *)pOutputData->pData;
if((in[i] > *out) ^ isMinFunc) {
*out = in[i];
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double *in = (double *)pInputData->pData;
double *out = (double *)pOutputData->pData;
if((in[i] > *out) ^ isMinFunc) {
*out = in[i];
}
break;
}
}
}
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return doMinMaxScalarFunction(pInput, inputNum, pOutput, true);
}
int32_t maxScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return doMinMaxScalarFunction(pInput, inputNum, pOutput, false);
}