feat: greatest func

This commit is contained in:
factosea 2025-03-05 18:28:48 +08:00
parent 38cbb5e08e
commit 36183aa36b
10 changed files with 222 additions and 4 deletions

View File

@ -299,6 +299,7 @@ extern bool tsStreamCoverage;
extern int8_t tsS3EpNum;
extern int32_t tsStreamNotifyMessageSize;
extern int32_t tsStreamNotifyFrameSize;
extern bool tsTransToStrWhenMixTypeInLeast;
extern bool tsExperimental;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)

View File

@ -276,6 +276,9 @@ typedef struct {
#define IS_STR_DATA_TYPE(t) \
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR))
#define IS_COMPARE_STR_DATA_TYPE(t) \
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR))
#define IS_VALID_TINYINT(_t) ((_t) >= INT8_MIN && (_t) <= INT8_MAX)
#define IS_VALID_SMALLINT(_t) ((_t) >= INT16_MIN && (_t) <= INT16_MAX)
#define IS_VALID_INT(_t) ((_t) >= INT32_MIN && (_t) <= INT32_MAX)

View File

@ -90,6 +90,8 @@ typedef enum EFunctionType {
FUNCTION_TYPE_DEGREES,
FUNCTION_TYPE_RADIANS,
FUNCTION_TYPE_TRUNCATE,
FUNCTION_TYPE_GREATEST,
FUNCTION_TYPE_LEAST,
// string function
FUNCTION_TYPE_LENGTH = 1500,

View File

@ -66,6 +66,8 @@ int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode *
SNode **pOtherCond);
int32_t filterIsMultiTableColsCond(SNode *pCond, bool *res);
EConditionType filterClassifyCondition(SNode *pNode);
int32_t filterGetCompFunc(__compar_fn_t *func, int32_t type, int32_t optr);
bool filterDoCompare(__compar_fn_t func, uint8_t optr, void *left, void *right);
#ifdef __cplusplus
}

View File

@ -44,6 +44,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type, int8_
int32_t vectorGetConvertType(int32_t type1, int32_t type2);
int32_t vectorConvertSingleColImpl(const SScalarParam *pIn, SScalarParam *pOut, int32_t *overflow, int32_t startIndex, int32_t numOfRows);
int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_t type, int32_t startIndex, int32_t numOfRows);
/* Math functions */
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
@ -71,6 +72,8 @@ int32_t signFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int32_t degreesFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t radiansFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t randFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t greatestFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t leastFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
/* String functions */
int32_t lengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);

View File

@ -130,6 +130,8 @@ uint32_t tsEncryptionKeyChksum = 0;
int8_t tsEncryptionKeyStat = ENCRYPT_KEY_STAT_UNSET;
int8_t tsGrant = 1;
bool tsTransToStrWhenMixTypeInLeast = true;
// monitor
bool tsEnableMonitor = true;
int32_t tsMonitorInterval = 30;
@ -746,6 +748,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(
cfgAddBool(pCfg, "streamCoverage", tsStreamCoverage, CFG_DYN_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "transToStrWhenMixTypeInLeast", tsTransToStrWhenMixTypeInLeast, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT,CFG_CATEGORY_LOCAL));
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -1480,6 +1484,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamCoverage");
tsStreamCoverage = pItem->bval;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "transToStrWhenMixTypeInLeast");
tsTransToStrWhenMixTypeInLeast = pItem->bval;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -2783,7 +2790,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
{"numOfRpcSessions", &tsNumOfRpcSessions},
{"bypassFlag", &tsBypassFlag},
{"safetyCheckLevel", &tsSafetyCheckLevel},
{"streamCoverage", &tsStreamCoverage}};
{"streamCoverage", &tsStreamCoverage},
{"transToStrWhenMixTypeInLeast", &tsTransToStrWhenMixTypeInLeast}};
if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);

View File

@ -22,6 +22,9 @@
#include "tanalytics.h"
#include "taoserror.h"
#include "ttime.h"
#include "functionMgt.h"
#include "ttypes.h"
#include "tglobal.h"
static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, const char* pFormat, ...) {
va_list vArgList;
@ -1745,6 +1748,49 @@ static int32_t translateHistogramPartial(SFunctionNode* pFunc, char* pErrBuf, in
return TSDB_CODE_SUCCESS;
}
static int32_t translateGreatestleast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (LIST_LENGTH(pFunc->pParameterList) < 2) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
bool mixTypeToStrings = tsTransToStrWhenMixTypeInLeast;
SDataType res = {.type = 0};
for (int32_t i = 0; i < LIST_LENGTH(pFunc->pParameterList); i++) {
SDataType* para = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, i));
if (IS_NULL_TYPE(para->type)) {
res.type = TSDB_DATA_TYPE_NULL;
break;
} else if (IS_MATHABLE_TYPE(para->type)) {
if(res.type == 0) {
res.type = para->type;
res.bytes = para->bytes;
} else if(IS_MATHABLE_TYPE(res.type) || !mixTypeToStrings) {
int32_t resType = vectorGetConvertType(res.type, para->type);
res.type = resType == 0 ? res.type : resType;
res.bytes = tDataTypes[resType].bytes;
}
} else if (IS_COMPARE_STR_DATA_TYPE(para->type)) {
if(res.type == 0) {
res.type = para->type;
res.bytes = para->bytes;
} else if(IS_COMPARE_STR_DATA_TYPE(res.type)) {
int32_t resType = vectorGetConvertType(res.type, para->type);
res.type = resType == 0 ? res.type : resType;
res.bytes = TMAX(res.bytes, para->bytes);
} else if(mixTypeToStrings) { // res.type is mathable type
res.type = para->type;
res.bytes = para->bytes;
}
} else {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
}
pFunc->node.resType = res;
return TSDB_CODE_SUCCESS;
}
// clang-format off
const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
@ -5656,6 +5702,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "cols",
.translateFunc = invalidColsFunction,
},
{
.name = "greatest",
.type = FUNCTION_TYPE_GREATEST,
.classification = FUNC_MGT_SCALAR_FUNC,
.translateFunc = translateGreatestleast,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = greatestFunction,
.finalizeFunc = NULL
},
{
.name = "least",
.type = FUNCTION_TYPE_LEAST,
.classification = FUNC_MGT_SCALAR_FUNC,
.translateFunc = translateGreatestleast,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = leastFunction,
.finalizeFunc = NULL
},
};
// clang-format on

View File

@ -1,11 +1,14 @@
#include <stdint.h>
#include "cJSON.h"
#include "function.h"
#include "scalar.h"
#include "sclInt.h"
#include "sclvector.h"
#include "tdatablock.h"
#include "tdef.h"
#include "tjson.h"
#include "ttime.h"
#include "filter.h"
typedef float (*_float_fn)(float);
typedef float (*_float_fn_2)(float, float);
@ -4403,3 +4406,133 @@ int32_t modeScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
return selectScalarFunction(pInput, inputNum, pOutput);
}
typedef struct SCovertScarlarParam {
SScalarParam covertParam;
SScalarParam *param;
bool converted;
} SCovertScarlarParam;
void freeSCovertScarlarParams(SCovertScarlarParam *pCovertParams, int32_t num) {
if (pCovertParams == NULL) {
return;
}
for (int32_t i = 0; i < num; i++) {
if (pCovertParams[i].converted) {
sclFreeParam(pCovertParams[i].param);
}
}
taosMemoryFree(pCovertParams);
}
static int32_t vectorCompareAndSelect(SCovertScarlarParam *pParams, int32_t numOfRows, int numOfCols,
int32_t *resultColIndex, EOperatorType optr) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t type = GET_PARAM_TYPE(pParams[0].param);
__compar_fn_t fp = NULL;
code = filterGetCompFunc(&fp, type, optr);
if(code != TSDB_CODE_SUCCESS) {
qError("failed to get compare function, func:%s type:%d, optr:%d", __FUNCTION__, type, optr);
return code;
}
for (int32_t i = 0; i < numOfRows; i++) {
int selectIndex = 0;
if (colDataIsNull_s(pParams[selectIndex].param->columnData, i)) {
resultColIndex[i] = -1;
continue;
}
for (int32_t j = 1; j < numOfCols; j++) {
if (colDataIsNull_s(pParams[j].param->columnData, i)) {
resultColIndex[i] = -1;
break;
} else {
int32_t leftRowNo = pParams[selectIndex].param->numOfRows == 1 ? 0 : i;
int32_t rightRowNo = pParams[j].param->numOfRows == 1 ? 0 : i;
char *pLeftData = colDataGetData(pParams[selectIndex].param->columnData, leftRowNo);
char *pRightData = colDataGetData(pParams[j].param->columnData, rightRowNo);
bool pRes = filterDoCompare(fp, optr, pLeftData, pRightData);
if (!pRes) {
selectIndex = j;
}
}
resultColIndex[i] = selectIndex;
}
}
return code;
}
static int32_t greatestLeastImpl(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, EOperatorType order) {
int32_t code = TSDB_CODE_SUCCESS;
SColumnInfoData *pOutputData = pOutput[0].columnData;
int16_t outputType = GET_PARAM_TYPE(&pOutput[0]);
int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]);
SCovertScarlarParam *pCovertParams = NULL;
int32_t *resultColIndex = NULL;
int32_t numOfRows = 0;
bool IsNullType = false;
// If any column is NULL type, the output is NULL type
for (int32_t i = 0; i < inputNum; i++) {
if (numOfRows != 0 && numOfRows != pInput[i].numOfRows && pInput[i].numOfRows != 1 && numOfRows != 1) {
qError("input rows not match, func:%s, rows:%d, %d", __FUNCTION__, numOfRows, pInput[i].numOfRows);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto _return;
}
numOfRows = TMAX(numOfRows, pInput[i].numOfRows);
IsNullType |= IS_NULL_TYPE(GET_PARAM_TYPE(&pInput[i]));
}
if (IsNullType) {
colDataSetNNULL(pOutputData, 0, numOfRows);
pOutput->numOfRows = numOfRows;
return TSDB_CODE_SUCCESS;
}
pCovertParams = taosMemoryMalloc(inputNum * sizeof(SCovertScarlarParam));
for (int32_t j = 0; j < inputNum; j++) {
SScalarParam *pParam = &pInput[j];
int16_t oldType = GET_PARAM_TYPE(&pInput[j]);
if (oldType != outputType) {
pCovertParams[j].covertParam = (SScalarParam){0};
setTzCharset(&pCovertParams[j].covertParam, pParam->tz, pParam->charsetCxt);
SCL_ERR_JRET(vectorConvertSingleCol(pParam, &pCovertParams[j].covertParam, outputType, 0, pParam->numOfRows));
pCovertParams[j].param = &pCovertParams[j].covertParam;
pCovertParams[j].converted = true;
} else {
pCovertParams[j].param = pParam;
pCovertParams[j].converted = false;
}
}
resultColIndex = taosMemoryCalloc(numOfRows, sizeof(int32_t));
SCL_ERR_JRET(vectorCompareAndSelect(pCovertParams, numOfRows, inputNum, resultColIndex, order));
for (int32_t i = 0; i < numOfRows; i++) {
int32_t index = resultColIndex[i];
if (index == -1) {
colDataSetNULL(pOutputData, i);
continue;
}
int32_t rowNo = pCovertParams[index].param->numOfRows == 1 ? 0 : i;
char *data = colDataGetData(pCovertParams[index].param->columnData, rowNo);
SCL_ERR_JRET(colDataSetVal(pOutputData, i, data, false));
}
pOutput->numOfRows = numOfRows;
_return:
freeSCovertScarlarParams(pCovertParams, inputNum);
taosMemoryFree(resultColIndex);
return code;
}
int32_t greatestFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return greatestLeastImpl(pInput, inputNum, pOutput, OP_TYPE_GREATER_THAN);
}
int32_t leastFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return greatestLeastImpl(pInput, inputNum, pOutput, OP_TYPE_LOWER_THAN);
}

View File

@ -996,7 +996,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam *pIn, SScalarParam *pOut,
}
int8_t gConvertTypes[TSDB_DATA_TYPE_MAX][TSDB_DATA_TYPE_MAX] = {
/*NULL BOOL TINY SMAL INT BIG FLOA DOUB VARC TIME NCHA UTIN USMA UINT UBIG JSON VARB DECI BLOB MEDB GEOM*/
/* NULL BOOL TINY SMAL INT BIG FLOA DOUB VARC TIME NCHA UTIN USMA UINT UBIG JSON VARB DECI BLOB MEDB GEOM*/
/*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*BOOL*/ 0, 0, 2, 3, 4, 5, 6, 7, 5, 9, 5, 11, 12, 13, 14, 0, -1, 0, 0, 0, -1,
/*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 5, 9, 5, 3, 4, 5, 7, 0, -1, 0, 0, 0, -1,
@ -1021,7 +1021,7 @@ int8_t gConvertTypes[TSDB_DATA_TYPE_MAX][TSDB_DATA_TYPE_MAX] = {
};
int8_t gDisplyTypes[TSDB_DATA_TYPE_MAX][TSDB_DATA_TYPE_MAX] = {
/*NULL BOOL TINY SMAL INT BIGI FLOA DOUB VARC TIM NCHA UTIN USMA UINT UBIG JSON VARB DECI BLOB MEDB GEOM*/
/* NULL BOOL TINY SMAL INT BIGI FLOA DOUB VARC TIM NCHA UTIN USMA UINT UBIG JSON VARB DECI BLOB MEDB GEOM*/
/*NULL*/ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, -1, -1, -1, 20,
/*BOOL*/ 0, 1, 2, 3, 4, 5, 6, 7, 8, 5, 10, 11, 12, 13, 14, -1, -1, -1, -1, -1, -1,
/*TINY*/ 0, 0, 2, 3, 4, 5, 8, 8, 8, 5, 10, 3, 4, 5, 8, -1, -1, -1, -1, -1, -1,

View File

@ -672,7 +672,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
printf("%*.7e", width, GET_FLOAT_VAL(val));
} else {
n = snprintf(buf, LENGTH, "%*.*g", width, FLT_DIG, GET_FLOAT_VAL(val));
if (n > SHELL_FLOAT_WIDTH) {
if (n > width) {
printf("%*.7e", width, GET_FLOAT_VAL(val));
} else {
printf("%s", buf);