decimal scalar comparision

This commit is contained in:
wangjiaming0909 2025-01-23 11:05:16 +08:00
parent 045f5b8c64
commit 38bd11d034
18 changed files with 713 additions and 179 deletions

View File

@ -412,11 +412,13 @@ void *getDataMin(int32_t type, void *value);
void *getDataMax(int32_t type, void *value);
STypeMod typeGetTypeMod(uint8_t type, uint8_t prec, uint8_t scale, int32_t bytes);
STypeMod typeGetTypeModFromDataType(const SDataType* pDataType);
uint8_t decimalTypeFromPrecision(uint8_t precision);
STypeMod decimalCalcTypeMod(uint8_t prec, uint8_t scale);
void decimalFromTypeMod(STypeMod typeMod, uint8_t *precision, uint8_t *scale);
// pType->type should has been set
void fillTypeFromTypeMod(SDataType *pType, STypeMod mod);
void fillTypeFromTypeMod(SDataType *pType, STypeMod mod);
uint8_t getScaleFromTypeMod(int32_t type, STypeMod mod);
#ifdef __cplusplus
}

View File

@ -22,8 +22,8 @@ extern "C" {
#include "tdef.h"
#include "ttypes.h"
typedef struct SValue SValue;
typedef void DecimalType;
typedef struct SValue SValue;
typedef void DecimalType;
typedef struct Decimal64 {
DecimalWord words[1]; // do not touch it directly, use DECIMAL64_GET_VALUE MACRO
@ -43,13 +43,19 @@ typedef struct Decimal128 {
#define decimalFromStr decimal128FromStr
#define makeDecimal makeDecimal128
typedef struct SDecimalCompareCtx {
void* pData;
int8_t type;
STypeMod typeMod;
} SDecimalCompareCtx;
// TODO wjm check if we need to expose these functions in decimal.h
void makeDecimal64(Decimal64* pDec64, int64_t w);
void makeDecimal128(Decimal128* pDec128, int64_t hi, uint64_t low);
#define DECIMAL_WORD_NUM(TYPE) sizeof(TYPE) / sizeof(DecimalWord)
void decimalFromTypeMod(STypeMod typeMod, uint8_t* precision, uint8_t* scale);
void decimalFromTypeMod(STypeMod typeMod, uint8_t* precision, uint8_t* scale);
int32_t decimal64FromStr(const char* str, int32_t len, uint8_t expectPrecision, uint8_t expectScale, Decimal64* result);
int32_t decimal128FromStr(const char* str, int32_t len, uint8_t expectPrecision, uint8_t expectScale,
@ -61,6 +67,7 @@ int32_t decimal128ToDataVal(Decimal128* dec, SValue* pVal);
int32_t decimalToStr(const DecimalType* pDec, int8_t type, int8_t precision, int8_t scale, char* pBuf, int32_t bufLen);
int32_t decimalGetRetType(const SDataType* pLeftT, const SDataType* pRightT, EOperatorType opType, SDataType* pOutType);
bool decimalCompare(EOperatorType op, const SDecimalCompareCtx* pLeft, const SDecimalCompareCtx* pRight);
int32_t decimalOp(EOperatorType op, const SDataType* pLeftT, const SDataType* pRightT, const SDataType* pOutT,
const void* pLeftData, const void* pRightData, void* pOutputData);
int32_t convertToDecimal(const void* pData, const SDataType* pInputType, void* pOut, const SDataType* pOutType);
@ -83,9 +90,19 @@ typedef struct SDecimalOps {
int32_t (*toStr)(const DecimalType* pInt, uint8_t scale, char* pBuf, int32_t bufLen);
} SDecimalOps;
// all these ops only used for comparing decimal types with same scale
SDecimalOps* getDecimalOps(int8_t dataType);
__int128 decimal128ToInt128(const Decimal128* pDec);
int32_t TEST_decimal64From_int64_t(Decimal64* pDec, uint8_t prec, uint8_t scale, int64_t v);
int32_t TEST_decimal64From_uint64_t(Decimal64* pDec, uint8_t prec, uint8_t scale, uint64_t v);
int32_t TEST_decimal64From_double(Decimal64* pDec, uint8_t prec, uint8_t scale, double v);
double TEST_decimal64ToDouble(Decimal64* pDec, uint8_t prec, uint8_t scale);
int32_t TEST_decimal128From_int64_t(Decimal128* pDec, uint8_t prec, uint8_t scale, int64_t v);
int32_t TEST_decimal128From_uint64_t(Decimal128* pDec, uint8_t prec, uint8_t scale, uint64_t v);
int32_t TEST_decimal128From_double(Decimal128* pDec, uint8_t prec, uint8_t scale, double v);
double TEST_decimal128ToDouble(Decimal128* pDec, uint8_t prec, uint8_t scale);
#ifdef __cplusplus
}

View File

@ -291,12 +291,14 @@ struct SScalarParam {
SColumnInfoData *columnData;
SHashObj *pHashFilter;
SHashObj *pHashFilterOthers;
int32_t hashValueType;
int32_t filterValueType;
void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value
int32_t numOfRows;
int32_t numOfQualified; // number of qualified elements in the final results
timezone_t tz;
void *charsetCxt;
SArray *pFilterArr; // for types that can't filter with hash
STypeMod filterValueTypeMod;
};
static inline void setTzCharset(SScalarParam* param, timezone_t tz, void* charsetCxt){

View File

@ -40,11 +40,12 @@ pDst need to freed in caller
int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst);
int32_t scalarGetOperatorParamNum(EOperatorType type);
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type, int8_t processType);
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type, STypeMod typeMod, int8_t processType);
int32_t vectorGetConvertType(int32_t type1, int32_t type2);
int32_t vectorConvertSingleColImpl(const SScalarParam *pIn, SScalarParam *pOut, int32_t *overflow, int32_t startIndex, int32_t numOfRows);
int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_t type, int32_t startIndex, int32_t numOfRows);
STypeMod getConvertTypeMod(int32_t type, const SColumnInfo *pCol1, const SColumnInfo *pCol2);
/* Math functions */
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);

View File

@ -60,11 +60,13 @@ int32_t setChkInBytes1(const void *pLeft, const void *pRight);
int32_t setChkInBytes2(const void *pLeft, const void *pRight);
int32_t setChkInBytes4(const void *pLeft, const void *pRight);
int32_t setChkInBytes8(const void *pLeft, const void *pRight);
int32_t setChkInDecimalHash(const void* pLeft, const void* pRight);
int32_t setChkNotInBytes1(const void *pLeft, const void *pRight);
int32_t setChkNotInBytes2(const void *pLeft, const void *pRight);
int32_t setChkNotInBytes4(const void *pLeft, const void *pRight);
int32_t setChkNotInBytes8(const void *pLeft, const void *pRight);
int32_t setChkNotInDecimalHash(const void* pLeft, const void* pRight);
int32_t compareChkInString(const void *pLeft, const void *pRight);
int32_t compareChkNotInString(const void *pLeft, const void *pRight);

View File

@ -692,6 +692,7 @@ typedef enum {
#define TSDB_DECIMAL_MAX_PRECISION TSDB_DECIMAL128_MAX_PRECISION
#define TSDB_DECIMAL_MIN_SCALE 0
#define TSDB_DECIMAL_MAX_SCALE TSDB_DECIMAL_MAX_PRECISION
#define GET_DEICMAL_MAX_PRECISION(type) (type) == TSDB_DATA_TYPE_DECIMAL64 ? TSDB_DECIMAL64_MAX_PRECISION : TSDB_DECIMAL_MAX_SCALE
typedef uint64_t DecimalWord;
#define WORD_NUM(TYPE) (sizeof(TYPE) / sizeof(DecimalWord))

View File

@ -3199,11 +3199,10 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, in
int32_t bytes = pColInfoData->info.bytes;
*((int32_t*)data) = bytes;
if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
bytes <<= 16;
bytes |= pColInfoData->info.precision;
bytes <<= 8;
bytes |= pColInfoData->info.scale;
*(int32_t*)data = bytes;
*(char*)data = bytes;
*((char*)data + 1) = 0;
*((char*)data + 2) = pColInfoData->info.precision;
*((char*)data + 3) = pColInfoData->info.scale;
}
data += sizeof(int32_t);
}
@ -3337,16 +3336,16 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos
pStart += sizeof(int8_t);
pColInfoData->info.bytes = *(int32_t*)pStart;
if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
pColInfoData->info.scale = *(char*)pStart;
pColInfoData->info.precision = *((char*)pStart + 2);
pColInfoData->info.bytes >>= *((char*)pStart + 3);
}
pStart += sizeof(int32_t);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pBlock->info.hasVarCol = true;
}
if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
pColInfoData->info.scale = pColInfoData->info.bytes & 0xFF;
pColInfoData->info.precision = pColInfoData->info.precision = (pColInfoData->info.bytes & 0xFF00) >> 8;
pColInfoData->info.bytes >>= 24;
}
}
int32_t code = blockDataEnsureCapacity(pBlock, numOfRows);
@ -3588,6 +3587,22 @@ int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolL
j += 1;
}
break;
case TSDB_DATA_TYPE_DECIMAL64:
case TSDB_DATA_TYPE_DECIMAL:
while (j < totalRows) {
if (pBoolList[j] == 0) {
j += 1;
continue;
}
if (colDataIsNull_f(pBitmap, j)) {
colDataSetNull_f(pDst->nullbitmap, numOfRows);
} else {
memcpy(pDst->pData + numOfRows * pDst->info.bytes, pDst->pData + j * pDst->info.bytes, pDst->info.bytes);
}
numOfRows += 1;
j += 1;
}
break;
}
}

View File

@ -247,6 +247,11 @@ void decimalFromTypeMod(STypeMod typeMod, uint8_t* precision, uint8_t* scale) {
if (scale) *scale = (uint8_t)(typeMod & 0xFF);
}
STypeMod typeGetTypeModFromDataType(const SDataType* pDataType) {
if (IS_DECIMAL_TYPE(pDataType->type)) return decimalCalcTypeMod(pDataType->precision, pDataType->scale);
return 0;
}
STypeMod typeGetTypeMod(uint8_t type, uint8_t prec, uint8_t scale, int32_t bytes) {
if (IS_DECIMAL_TYPE(type)) {
return decimalCalcTypeMod(prec, scale);
@ -269,3 +274,8 @@ void extractTypeFromTypeMod(uint8_t type, STypeMod typeMod, uint8_t *prec, uint8
}
if (bytes) *bytes = tDataTypes[type].bytes;
}
uint8_t getScaleFromTypeMod(int32_t type, STypeMod mod) {
if (IS_DECIMAL_TYPE(type)) return (uint8_t)(mod & 0xFF);
return 0;
}

View File

@ -23,6 +23,13 @@ typedef enum DecimalInternalType {
DECIMAL_128 = 1,
} DecimalInternalType;
typedef enum DecimalRoundType {
ROUND_TYPE_CEIL,
ROUND_TYPE_FLOOR,
ROUND_TYPE_TRUNC,
ROUND_TYPE_HALF_ROUND_UP, // TODO wjm use this for scaling down/up
} DecimalRoundType;
#define DECIMAL_GET_INTERNAL_TYPE(dataType) ((dataType) == TSDB_DATA_TYPE_DECIMAL ? DECIMAL_128 : DECIMAL_64)
#define DECIMAL_GET_WORD_NUM(decimalInternalType) \
((decimalInternalType) == DECIMAL_64 ? DECIMAL_WORD_NUM(Decimal64) : DECIMAL_WORD_NUM(Decimal128))
@ -208,25 +215,25 @@ int32_t decimal128ToDataVal(Decimal128* dec, SValue* pVal) {
}
// TODO wjm use uint64_t ???
static Decimal64 SCALE_MULTIPLIER_64[19] = {1LL,
10LL,
100LL,
1000LL,
10000LL,
100000LL,
1000000LL,
10000000LL,
100000000LL,
1000000000LL,
10000000000LL,
100000000000LL,
1000000000000LL,
10000000000000LL,
100000000000000LL,
1000000000000000LL,
10000000000000000LL,
100000000000000000LL,
1000000000000000000LL};
static Decimal64 SCALE_MULTIPLIER_64[TSDB_DECIMAL64_MAX_PRECISION + 1] = {1LL,
10LL,
100LL,
1000LL,
10000LL,
100000LL,
1000000LL,
10000000LL,
100000000LL,
1000000000LL,
10000000000LL,
100000000000LL,
1000000000000LL,
10000000000000LL,
100000000000000LL,
1000000000000000LL,
10000000000000000LL,
100000000000000000LL,
1000000000000000000LL};
static const Decimal64 decimal64Zero = {0};
#define DECIMAL64_ONE SCALE_MULTIPLIER_64[0]
@ -284,7 +291,7 @@ static bool decimal64Eq(const DecimalType* pLeft, const DecimalType* pRight,
static int32_t decimal64ToStr(const DecimalType* pInt, uint8_t scale, char* pBuf, int32_t bufLen);
static void decimal64ScaleDown(Decimal64* pDec, uint8_t scaleDown);
static void decimal64ScaleUp(Decimal64* pDec, uint8_t scaleUp);
void decimal64ScaleTo(Decimal64* pDec, uint8_t oldScale, uint8_t newScale);
static void decimal64ScaleTo(Decimal64* pDec, uint8_t oldScale, uint8_t newScale);
static void decimal128Negate(DecimalType* pInt);
static void decimal128Abs(DecimalType* pWord);
@ -298,10 +305,22 @@ static bool decimal128Lt(const DecimalType* pLeft, const DecimalType* pRight,
static bool decimal128Gt(const DecimalType* pLeft, const DecimalType* pRight, uint8_t rightWordNum);
static bool decimal128Eq(const DecimalType* pLeft, const DecimalType* pRight, uint8_t rightWordNum);
static int32_t decimal128ToStr(const DecimalType* pInt, uint8_t scale, char* pBuf, int32_t bufLen);
void decimal128ScaleTo(Decimal128* pDec, uint8_t oldScale, uint8_t newScale);
void decimal128ScaleDown(Decimal128* pDec, uint8_t scaleDown);
void decimal128ScaleUp(Decimal128* pDec, uint8_t scaleUp);
int32_t decimal128CountLeadingBinaryZeros(const Decimal128* pDec);
static void decimal128ScaleTo(Decimal128* pDec, uint8_t oldScale, uint8_t newScale);
static void decimal128ScaleDown(Decimal128* pDec, uint8_t scaleDown);
static void decimal128ScaleUp(Decimal128* pDec, uint8_t scaleUp);
static int32_t decimal128CountLeadingBinaryZeros(const Decimal128* pDec);
static int32_t decimal128FromInt64(DecimalType* pDec, uint8_t prec, uint8_t scale, int64_t val);
static int32_t decimal128FromUint64(DecimalType* pDec, uint8_t prec, uint8_t scale, uint64_t val);
//
// rounding functions
static void decimal128RoundWithPositiveScale(Decimal128* pDec, uint8_t prec, uint8_t scale, uint8_t toPrec,
uint8_t toScale, DecimalRoundType roundType, bool* overflow);
static void decimal128RoundWithNegativeScale(Decimal128* pDec, uint8_t prec, uint8_t scale, int8_t toScale,
DecimalRoundType roundType, bool* overflow);
static void decimal128ModifyScaleAndPrecision(Decimal128* pDec, uint8_t scale, uint8_t toPrec, int8_t toScale,
bool* overflow);
static int32_t decimal128CountRoundingDelta(const Decimal128* pDec, int8_t scale, int8_t toScale,
DecimalRoundType roundType);
SDecimalOps decimal64Ops = {decimal64Negate, decimal64Abs, decimal64Add, decimal64Subtract,
decimal64Multiply, decimal64divide, decimal64Mod, decimal64Lt,
@ -423,7 +442,7 @@ int32_t decimal64ToStr(const DecimalType* pInt, uint8_t scale, char* pBuf, int32
// TODO wjm handle endian problem
#define DEFINE_DECIMAL128(lo, hi) {lo, hi}
static const Decimal128 SCALE_MULTIPLIER_128[38 + 1] = {
static const Decimal128 SCALE_MULTIPLIER_128[TSDB_DECIMAL128_MAX_PRECISION + 1] = {
DEFINE_DECIMAL128(1LL, 0),
DEFINE_DECIMAL128(10LL, 0),
DEFINE_DECIMAL128(100LL, 0),
@ -465,7 +484,21 @@ static const Decimal128 SCALE_MULTIPLIER_128[38 + 1] = {
DEFINE_DECIMAL128(687399551400673280ULL, 5421010862427522170LL),
};
static double getDoubleScaleMultiplier(uint8_t scale) {
static double SCALE_MULTIPLIER_DOUBLE[TSDB_DECIMAL_MAX_PRECISION + 1] = {0};
static bool initialized = false;
if (!initialized) {
SCALE_MULTIPLIER_DOUBLE[0] = 1.0;
for (int32_t idx = 1; idx <= TSDB_DECIMAL_MAX_PRECISION; ++idx) {
SCALE_MULTIPLIER_DOUBLE[idx] = SCALE_MULTIPLIER_DOUBLE[idx - 1] * 10;
}
initialized = true;
}
return SCALE_MULTIPLIER_DOUBLE[scale];
};
static const Decimal128 decimal128Zero = DEFINE_DECIMAL128(0, 0);
static const Decimal128 decimal128Two = DEFINE_DECIMAL128(2, 0);
static const Decimal128 decimal128Max = DEFINE_DECIMAL128(687399551400673280ULL - 1, 5421010862427522170LL);
#define DECIMAL128_ZERO decimal128Zero
@ -598,7 +631,12 @@ static void decimal128Divide(DecimalType* pLeft, const DecimalType* pRight, uint
if (DECIMAL128_SIGN(pLeftDec) == -1 && pRemainder) decimal128Negate(pRemainderDec);
}
static void decimal128Mod(DecimalType* pLeft, const DecimalType* pRight, uint8_t rightWordNum) {}
static void decimal128Mod(DecimalType* pLeft, const DecimalType* pRight, uint8_t rightWordNum) {
Decimal128 pLeftDec = *(Decimal128*)pLeft, *pRightDec = (Decimal128*)pRight, right = {0};
DECIMAL128_CHECK_RIGHT_WORD_NUM(rightWordNum, pRightDec, right, pRight);
decimal128Divide(&pLeftDec, pRightDec, WORD_NUM(Decimal128), pLeft);
}
static bool decimal128Gt(const DecimalType* pLeft, const DecimalType* pRight, uint8_t rightWordNum) {
Decimal128 *pLeftDec = (Decimal128*)pLeft, *pRightDec = (Decimal128*)pRight;
@ -672,7 +710,9 @@ static int32_t decimal128ToStr(const DecimalType* pInt, uint8_t scale, char* pBu
TAOS_STRNCAT(pBuf, "0", 2);
}
if (scale > 0) {
static char format[64] = "0000000000000000000000000000000000000000";
TAOS_STRNCAT(pBuf, ".", 2);
if (wholeLen < 0) TAOS_STRNCAT(pBuf, format, TABS(wholeLen));
TAOS_STRNCAT(pBuf, buf + TMAX(0, wholeLen), scale);
}
return 0;
@ -824,7 +864,7 @@ int32_t decimalDivide(Decimal* pX, const SDataType* pXT, const Decimal* pY, cons
assert(deltaScale >= 0);
Decimal xTmp = *pX;
decimal128Abs(&xTmp);
decimal128Abs(&xTmp); // TODO wjm test decimal64 / decimal64
int32_t bitsOccupied = 128 - decimal128CountLeadingBinaryZeros(&xTmp);
if (bitsOccupied + bitsForNumDigits[deltaScale] <= 127) {
xTmp = *pX;
@ -832,12 +872,12 @@ int32_t decimalDivide(Decimal* pX, const SDataType* pXT, const Decimal* pY, cons
Decimal remainder = {0};
decimal128Divide(&xTmp, pY, WORD_NUM(Decimal), &remainder);
Decimal tmpY = *pY, two = DEFINE_DECIMAL128(2, 0);
decimal64Abs(&tmpY);
decimal128Multiply(&remainder, &two, WORD_NUM(Decimal));
Decimal tmpY = *pY;
decimal128Abs(&tmpY);
decimal128Multiply(&remainder, &decimal128Two, WORD_NUM(Decimal));
decimal128Abs(&remainder);
if (!decimal128Lt(&remainder, &tmpY, WORD_NUM(Decimal))) {
int64_t extra = (DECIMAL128_SIGN(pX) ^ DECIMAL128_SIGN(pY)) + 1;
Decimal64 extra = {(DECIMAL128_SIGN(pX) ^ DECIMAL128_SIGN(pY)) + 1};
decimal128Add(&xTmp, &extra, WORD_NUM(Decimal64));
}
} else {
@ -875,7 +915,6 @@ int32_t decimalOp(EOperatorType op, const SDataType* pLeftT, const SDataType* pR
right = *(Decimal*)pRightData;
}
SDecimalOps* pOps = getDecimalOps(TSDB_DATA_TYPE_DECIMAL);
switch (op) {
case OP_TYPE_ADD:
decimalAdd(&left, &lt, &right, &rt, pOutT);
@ -891,6 +930,7 @@ int32_t decimalOp(EOperatorType op, const SDataType* pLeftT, const SDataType* pR
code = decimalDivide(&left, &lt, &right, &rt, pOutT);
break;
default:
code = TSDB_CODE_TSC_INVALID_OPERATION;
break;
}
if (0 == code && pOutT->type != TSDB_DATA_TYPE_DECIMAL) {
@ -903,27 +943,52 @@ int32_t decimalOp(EOperatorType op, const SDataType* pLeftT, const SDataType* pR
return code;
}
// There is no need to do type conversions, we assume that pLeftT and pRightT are all decimal128 types.
bool decimalCompare(EOperatorType op, const SDecimalCompareCtx* pLeft, const SDecimalCompareCtx* pRight) {
bool ret = false;
uint8_t pLeftPrec = 0, pLeftScale = 0, pRightPrec = 0, pRightScale = 0;
decimalFromTypeMod(pLeft->typeMod, &pLeftPrec, &pLeftScale);
decimalFromTypeMod(pRight->typeMod, &pRightPrec, &pRightScale);
int32_t deltaScale = pLeftScale - pRightScale;
Decimal pLeftDec = *(Decimal*)pLeft->pData, pRightDec = *(Decimal*)pRight->pData;
if (deltaScale != 0) {
bool needInt256 = (deltaScale < 0 && pLeftPrec - deltaScale > TSDB_DECIMAL_MAX_PRECISION) ||
(pRightPrec + deltaScale > TSDB_DECIMAL_MAX_PRECISION);
if (needInt256) {
// TODO wjm impl it
return false;
} else {
if (deltaScale < 0) {
decimal128ScaleUp(&pLeftDec, -deltaScale);
} else {
decimal128ScaleUp(&pRightDec, deltaScale);
}
}
}
switch (op) {
case OP_TYPE_GREATER_THAN:
return decimal128Gt(&pLeftDec, &pRightDec, WORD_NUM(Decimal));
case OP_TYPE_GREATER_EQUAL:
return !decimal128Lt(&pLeftDec, &pRightDec, WORD_NUM(Decimal));
case OP_TYPE_LOWER_THAN:
return decimal128Lt(&pLeftDec, &pRightDec, WORD_NUM(Decimal));
case OP_TYPE_LOWER_EQUAL:
return !decimal128Gt(&pLeftDec, &pRightDec, WORD_NUM(Decimal));
case OP_TYPE_EQUAL:
return decimal128Eq(&pLeftDec, &pRightDec, WORD_NUM(Decimal));
case OP_TYPE_NOT_EQUAL:
return !decimal128Eq(&pLeftDec, &pRightDec, WORD_NUM(Decimal));
default:
break;
}
return ret;
}
#define ABS_INT64(v) (v) == INT64_MIN ? (uint64_t)INT64_MAX + 1 : (uint64_t)llabs(v)
#define ABS_UINT64(v) (v)
#define DECIMAL64_IS_OVERFLOW(v, max) decimal64Gt(&(v), &max, DECIMAL_WORD_NUM(Decimal64)) // TODO wjm DELETE it
#define DECIMAL128_IS_OVERFLOW(v, max) decimal128Gt(&(v), &max, DECIMAL_WORD_NUM(Decimal128))
#define CHECK_OVERFLOW_AND_MAKE_DECIMAL64(pDec, v, max, ABS) \
({ \
int32_t code = 0; \
Decimal64 dv = {ABS(v)}; \
if (DECIMAL64_IS_OVERFLOW(dv, max)) { \
code = TSDB_CODE_DECIMAL_OVERFLOW; \
} else { \
makeDecimal64(pDec, (int64_t)(v)); \
} \
code; \
})
#define MAKE_DECIMAL64_SIGNED(pDec, v, max) CHECK_OVERFLOW_AND_MAKE_DECIMAL64(pDec, v, max, ABS_INT64)
#define MAKE_DECIMAL64_UNSIGNED(pDec, v, max) CHECK_OVERFLOW_AND_MAKE_DECIMAL64(pDec, v, max, ABS_UINT64); // TODO wjm delete it
static int64_t int64FromDecimal64(const DecimalType* pDec, uint8_t prec, uint8_t scale) { return 0; }
static uint64_t uint64FromDecimal64(const DecimalType* pDec, uint8_t prec, uint8_t scale) { return 0; }
@ -986,9 +1051,39 @@ static int32_t decimal64FromDecimal64(DecimalType* pDec, uint8_t prec, uint8_t s
return 0;
}
static int64_t int64FromDecimal128(const DecimalType* pDec, uint8_t prec, uint8_t scale) { return 0; }
static int64_t int64FromDecimal128(const DecimalType* pDec, uint8_t prec, uint8_t scale) {
Decimal128 rounded = *(Decimal128*)pDec;
bool overflow = false; // TODO wjm pass out the overflow??
decimal128RoundWithPositiveScale(&rounded, prec, scale, prec, 0, ROUND_TYPE_HALF_ROUND_UP, &overflow);
if (overflow) {
return 0;
}
Decimal128 max = {0}, min = {0};
decimal128FromInt64(&max, TSDB_DECIMAL128_MAX_PRECISION, 0, INT64_MAX);
decimal128FromInt64(&min, TSDB_DECIMAL128_MAX_PRECISION, 0, INT64_MIN);
if (decimal128Gt(&rounded, &max, WORD_NUM(Decimal128)) || decimal128Lt(&rounded, &min, WORD_NUM(Decimal128))) {
overflow = true;
return 0;
}
static uint64_t uint64FromDecimal128(const DecimalType* pDec, uint8_t prec, uint8_t scale) { return 0; }
return (int64_t)DECIMAL128_LOW_WORD(&rounded);
}
static uint64_t uint64FromDecimal128(const DecimalType* pDec, uint8_t prec, uint8_t scale) {
Decimal128 rounded = *(Decimal128*)pDec;
bool overflow = false;
decimal128RoundWithPositiveScale(&rounded, prec, scale, prec, 0, ROUND_TYPE_HALF_ROUND_UP, &overflow);
if (overflow) return 0;
Decimal128 max = {0};
decimal128FromUint64(&max, TSDB_DECIMAL128_MAX_PRECISION, 0, UINT64_MAX);
if (decimal128Gt(&rounded, &max, WORD_NUM(Decimal128)) ||
decimal128Lt(&rounded, &decimal128Zero, WORD_NUM(Decimal128))) {
overflow = true;
return 0;
}
return DECIMAL128_LOW_WORD(&rounded);
}
static int32_t decimal128FromInt64(DecimalType* pDec, uint8_t prec, uint8_t scale, int64_t val) {
if (prec - scale <= 18) { // TODO wjm test int64 with 19 digits.
@ -1017,7 +1112,31 @@ static int32_t decimal128FromUint64(DecimalType* pDec, uint8_t prec, uint8_t sca
return 0;
}
static int32_t decimal128FromDouble(DecimalType* pDec, uint8_t prec, uint8_t scale, double val) { return 0; }
static int32_t decimal128FromDouble(DecimalType* pDec, uint8_t prec, uint8_t scale, double val) {
double unscaled = val * getDoubleScaleMultiplier(scale);
if (isnan(unscaled)) {
goto _OVERFLOW;
}
unscaled = round(unscaled);
bool negative = unscaled < 0 ? true : false;
double abs = TABS(unscaled);
if (abs > ldexp(1.0, 127) - 1) {
goto _OVERFLOW;
}
uint64_t hi = (uint64_t)ldexp(abs, -64), lo = (uint64_t)(abs - ldexp((double)hi, 64));
makeDecimal128(pDec, hi, lo);
Decimal128 max = {0};
DECIMAL128_GET_MAX(prec, &max);
if (decimal128Gt(pDec, &max, WORD_NUM(Decimal128))) goto _OVERFLOW;
if (negative) decimal128Negate(pDec);
return 0;
_OVERFLOW:
*(Decimal128*)pDec = decimal128Zero;
return TSDB_CODE_DECIMAL_OVERFLOW;
}
static int32_t decimal128FromDecimal64(DecimalType* pDec, uint8_t prec, uint8_t scale, const DecimalType* pVal,
uint8_t valPrec, uint8_t valScale) {
@ -1159,7 +1278,7 @@ void decimal64ScaleUp(Decimal64* pDec, uint8_t scaleUp) {
}
}
void decimal64ScaleTo(Decimal64* pDec, uint8_t oldScale, uint8_t newScale) {
static void decimal64ScaleTo(Decimal64* pDec, uint8_t oldScale, uint8_t newScale) {
if (newScale > oldScale)
decimal64ScaleUp(pDec, newScale - oldScale);
else if (newScale < oldScale)
@ -1181,21 +1300,22 @@ int32_t decimal64FromStr(const char* str, int32_t len, uint8_t expectPrecision,
return code;
}
void decimal128ScaleDown(Decimal128* pDec, uint8_t scaleDown) {
// TODO wjm add round param
static void decimal128ScaleDown(Decimal128* pDec, uint8_t scaleDown) {
if (scaleDown > 0) {
Decimal128 divisor = SCALE_MULTIPLIER_128[scaleDown];
decimal128Divide(pDec, &divisor, 2, NULL);
}
}
void decimal128ScaleUp(Decimal128* pDec, uint8_t scaleUp) {
static void decimal128ScaleUp(Decimal128* pDec, uint8_t scaleUp) {
if (scaleUp > 0) {
Decimal128 multiplier = SCALE_MULTIPLIER_128[scaleUp];
decimal128Multiply(pDec, &multiplier, WORD_NUM(Decimal128));
}
}
void decimal128ScaleTo(Decimal128* pDec, uint8_t oldScale, uint8_t newScale) {
static void decimal128ScaleTo(Decimal128* pDec, uint8_t oldScale, uint8_t newScale) {
if (newScale > oldScale)
decimal128ScaleUp(pDec, newScale - oldScale);
else if (newScale < oldScale)
@ -1227,7 +1347,7 @@ __int128 decimal128ToInt128(const Decimal128* pDec) {
return ret;
}
int32_t decimal128CountLeadingBinaryZeros(const Decimal128* pDec) {
static int32_t decimal128CountLeadingBinaryZeros(const Decimal128* pDec) {
if (DECIMAL128_HIGH_WORD(pDec) == 0) {
return 64 + countLeadingZeros(DECIMAL128_LOW_WORD(pDec));
} else {
@ -1254,7 +1374,12 @@ IMP_UNSIGNED_INTEGER_TYPE_FROM_DECIMAL_TYPE(uint16_t, Decimal64)
IMP_UNSIGNED_INTEGER_TYPE_FROM_DECIMAL_TYPE(uint32_t, Decimal64)
IMP_UNSIGNED_INTEGER_TYPE_FROM_DECIMAL_TYPE(uint64_t, Decimal64)
double doubleFromDecimal64(const void* pDec, uint8_t prec, uint8_t scale) { return 0; }
double doubleFromDecimal64(const void* pDec, uint8_t prec, uint8_t scale) {
int32_t sign = DECIMAL64_SIGN((Decimal64*)pDec);
Decimal64 abs = *(Decimal64*)pDec;
decimal64Abs(&abs);
return (double)DECIMAL64_GET_VALUE(&abs) * sign / getDoubleScaleMultiplier(scale);
}
bool boolFromDecimal64(const void* pDec, uint8_t prec, uint8_t scale) {
return !decimal64Eq(pDec, &decimal64Zero, WORD_NUM(Decimal64));
@ -1277,6 +1402,128 @@ IMP_UNSIGNED_INTEGER_TYPE_FROM_DECIMAL_TYPE(uint16_t, Decimal128)
IMP_UNSIGNED_INTEGER_TYPE_FROM_DECIMAL_TYPE(uint32_t, Decimal128)
IMP_UNSIGNED_INTEGER_TYPE_FROM_DECIMAL_TYPE(uint64_t, Decimal128)
bool boolFromDecimal128(const void* pDec, uint8_t prec, uint8_t scale) { return true; }
double doubleFromDecimal128(const void* pDec, uint8_t prec, uint8_t scale) { return 0; }
bool boolFromDecimal128(const void* pDec, uint8_t prec, uint8_t scale) {
return !decimal128Eq(pDec, &decimal128Zero, WORD_NUM(Decimal128));
}
double doubleFromDecimal128(const void* pDec, uint8_t prec, uint8_t scale) {
int32_t sign = DECIMAL128_SIGN((Decimal128*)pDec);
Decimal128 abs = *(Decimal128*)pDec;
decimal128Abs(&abs);
double unscaled = DECIMAL128_LOW_WORD(&abs);
unscaled += ldexp((double)DECIMAL128_HIGH_WORD(&abs), 64);
return (unscaled * sign) / getDoubleScaleMultiplier(scale);
}
IMPL_REAL_TYPE_FROM_DECIMAL_TYPE(float, Decimal128);
static void decimal128RoundWithPositiveScale(Decimal128* pDec, uint8_t prec, uint8_t scale, uint8_t toPrec,
uint8_t toScale, DecimalRoundType roundType, bool* overflow) {
Decimal128 scaled = *pDec;
bool overflowLocal = false;
decimal128ModifyScaleAndPrecision(&scaled, scale, toPrec, toScale, &overflowLocal);
if (overflowLocal) {
if (overflow) *overflow = true;
*pDec = decimal128Zero;
return;
}
int32_t delta = decimal128CountRoundingDelta(pDec, scale, toScale, roundType);
if (delta == 0) {
*pDec = scaled;
return;
}
Decimal64 deltaDec = {delta};
decimal128Add(&scaled, &deltaDec, WORD_NUM(Decimal64));
Decimal128 max = {0};
DECIMAL128_GET_MAX(toPrec, &max);
Decimal128 scaledAbs = scaled;
decimal128Abs(&scaledAbs);
if (toPrec < prec && decimal128Gt(&scaledAbs, &max, WORD_NUM(Decimal128))) {
if (overflow) *overflow = true;
*(Decimal128*)pDec = decimal128Zero;
} else {
*(Decimal128*)pDec = scaled;
}
}
static void decimal128ModifyScaleAndPrecision(Decimal128* pDec, uint8_t scale, uint8_t toPrec, int8_t toScale,
bool* overflow) {
int8_t deltaScale = toScale - scale;
if (deltaScale >= 0) {
Decimal128 max = {0};
DECIMAL128_GET_MAX(toPrec - deltaScale, &max); // TODO wjm test toPrec == 0
Decimal128 abs = *pDec;
decimal128Abs(&abs);
if (decimal128Gt(&abs, &max, WORD_NUM(Decimal128))) {
if (overflow) *overflow = true;
} else {
decimal128ScaleUp(pDec, deltaScale);
}
} else {
Decimal128 res = *pDec, max = {0};
decimal128ScaleDown(&res, -deltaScale);
DECIMAL128_GET_MAX(toPrec, &max);
if (decimal128Gt(&res, &max, WORD_NUM(Decimal128))) {
if (overflow) *overflow = true;
} else {
*(Decimal128*)pDec = res;
}
}
}
static int32_t decimal128CountRoundingDelta(const Decimal128* pDec, int8_t scale, int8_t toScale,
DecimalRoundType roundType) {
if (roundType == ROUND_TYPE_TRUNC || toScale >= scale) return 0;
Decimal128 dec128 = *pDec;
int32_t res = 0;
switch (roundType) {
case ROUND_TYPE_HALF_ROUND_UP: {
Decimal128 trailing = dec128;
decimal128Mod(&trailing, &SCALE_MULTIPLIER_128[scale - toScale], WORD_NUM(Decimal128));
if (decimal128Eq(&trailing, &decimal128Zero, WORD_NUM(Decimal128))) {
res = 0;
break;
}
Decimal128 tailingAbs = trailing, baseDiv2 = SCALE_MULTIPLIER_128[scale - toScale];
decimal128Abs(&tailingAbs);
decimal128Divide(&baseDiv2, &decimal128Two, WORD_NUM(Decimal128), NULL);
if (decimal128Lt(&tailingAbs, &baseDiv2, WORD_NUM(Decimal128))) {
res = 0;
break;
}
res = decimal128Lt(pDec, &decimal128Zero, WORD_NUM(Decimal128)) ? -1 : 1;
} break;
case ROUND_TYPE_TRUNC:
default:
break;
}
return res;
}
int32_t TEST_decimal64From_int64_t(Decimal64* pDec, uint8_t prec, uint8_t scale, int64_t v) {
return decimal64FromInt64(pDec, prec, scale, v);
}
int32_t TEST_decimal64From_uint64_t(Decimal64* pDec, uint8_t prec, uint8_t scale, uint64_t v) {
return decimal64FromUint64(pDec, prec, scale, v);
}
int32_t TEST_decimal64From_double(Decimal64* pDec, uint8_t prec, uint8_t scale, double v) {
return decimal64FromDouble(pDec, prec, scale, v);
}
double TEST_decimal64ToDouble(Decimal64* pDec, uint8_t prec, uint8_t scale) {
return doubleFromDecimal64(pDec, prec, scale);
}
int32_t TEST_decimal128From_int64_t(Decimal128* pDec, uint8_t prec, uint8_t scale, int64_t v) {
return decimal128FromInt64(pDec, prec, scale, v);
}
int32_t TEST_decimal128From_uint64_t(Decimal128* pDec, uint8_t prec, uint8_t scale, uint64_t v) {
return decimal128FromUint64(pDec, prec, scale, v);
}
int32_t TEST_decimal128From_double(Decimal128* pDec, uint8_t prec, uint8_t scale, double v) {
return decimal128FromDouble(pDec, prec, scale, v);
}
double TEST_decimal128ToDouble(Decimal128* pDec, uint8_t prec, uint8_t scale) {
return doubleFromDecimal128(pDec, prec, scale);
}

View File

@ -118,7 +118,7 @@ Numeric64& Numeric64::operator=(const Numeric64& r) {
return *this;
}
template <int ByteNum>
template <int BitNum>
struct NumericType {};
template <>
@ -143,7 +143,7 @@ struct TrivialTypeInfo {
#define DEFINE_TRIVIAL_TYPE_HELPER(type, tsdb_type) \
template <> \
struct TrivialTypeInfo<type> { \
static constexpr type dataType = tsdb_type; \
static constexpr int8_t dataType = tsdb_type; \
static constexpr int32_t bytes = sizeof(type); \
}
@ -159,16 +159,17 @@ DEFINE_TRIVIAL_TYPE_HELPER(float, TSDB_DATA_TYPE_FLOAT);
DEFINE_TRIVIAL_TYPE_HELPER(double, TSDB_DATA_TYPE_DOUBLE);
DEFINE_TRIVIAL_TYPE_HELPER(bool, TSDB_DATA_TYPE_BOOL);
template <int ByteNum>
template <int BitNum>
class Numeric {
using Type = typename NumericType<ByteNum>::Type;
static_assert(BitNum == 64 || BitNum == 128, "only support Numeric64 and Numeric128");
using Type = typename NumericType<BitNum>::Type;
Type dec_;
uint8_t prec_;
uint8_t scale_;
public:
Numeric(uint8_t prec, uint8_t scale, const std::string& str) : prec_(prec), scale_(scale) {
if (prec > NumericType<ByteNum>::maxPrec) throw std::string("prec too big") + std::to_string(prec);
if (prec > NumericType<BitNum>::maxPrec) throw std::string("prec too big") + std::to_string(prec);
int32_t code = dec_.fromStr(str, prec, scale) != 0;
if (code != 0) {
cout << "failed to init decimal from str: " << str << endl;
@ -187,41 +188,41 @@ class Numeric {
uint8_t scale() const { return scale_; }
const Type& dec() const { return dec_; }
template <int ByteNum2>
Numeric& binaryOp(const Numeric<ByteNum2>& r, EOperatorType op) {
auto out = binaryOp<ByteNum2, ByteNum>(r, op);
template <int BitNum2>
Numeric& binaryOp(const Numeric<BitNum2>& r, EOperatorType op) {
auto out = binaryOp<BitNum2, BitNum>(r, op);
return *this = out;
}
template <int ByteNum2, int ByteNumO>
Numeric<ByteNumO> binaryOp(const Numeric<ByteNum2>& r, EOperatorType op) {
SDataType lt{.type = NumericType<ByteNum>::dataType, .precision = prec_, .scale = scale_, .bytes = ByteNum};
SDataType rt{.type = NumericType<ByteNum2>::dataType, .precision = r.prec(), .scale = r.scale(), .bytes = ByteNum2};
template <int BitNum2, int BitNumO>
Numeric<BitNumO> binaryOp(const Numeric<BitNum2>& r, EOperatorType op) {
SDataType lt{.type = NumericType<BitNum>::dataType, .precision = prec_, .scale = scale_, .bytes = BitNum};
SDataType rt{.type = NumericType<BitNum2>::dataType, .precision = r.prec(), .scale = r.scale(), .bytes = BitNum2};
SDataType ot = getRetType(op, lt, rt);
Numeric<ByteNumO> out{ot.precision, ot.scale, "0"};
int32_t code = decimalOp(op, &lt, &rt, &ot, &dec_, &r.dec(), &out);
Numeric<BitNumO> out{ot.precision, ot.scale, "0"};
int32_t code = decimalOp(op, &lt, &rt, &ot, &dec_, &r.dec(), &out);
if (code != 0) throw std::overflow_error(tstrerror(code));
return out;
}
template <int ByteNumO, typename T>
Numeric<ByteNumO> binaryOp(const T& r, EOperatorType op) {
template <int BitNumO, typename T>
Numeric<BitNumO> binaryOp(const T& r, EOperatorType op) {
using TypeInfo = TrivialTypeInfo<T>;
SDataType lt{.type = NumericType<ByteNum>::dataType, .precision = prec_, .scale = scale_, .bytes = ByteNum};
SDataType rt{.type = TypeInfo::dataType, .precision = 0, .scale = 0, .bytes = TypeInfo::bytes};
SDataType ot = getRetType(op, lt, rt);
Numeric<ByteNumO> out{ot.precision, ot.scale, "0"};
int32_t code = decimalOp(op, &lt, &rt, &ot, &dec_, &r, &out);
SDataType lt{.type = NumericType<BitNum>::dataType, .precision = prec_, .scale = scale_, .bytes = BitNum};
SDataType rt{.type = TypeInfo::dataType, .precision = 0, .scale = 0, .bytes = TypeInfo::bytes};
SDataType ot = getRetType(op, lt, rt);
Numeric<BitNumO> out{ot.precision, ot.scale, "0"};
int32_t code = decimalOp(op, &lt, &rt, &ot, &dec_, &r, &out);
if (code != 0) throw std::overflow_error(tstrerror(code));
return out;
}
#define DEFINE_OPERATOR(op, op_type) \
template <int ByteNum2, int ByteNumO = 128> \
Numeric<ByteNumO> operator op(const Numeric<ByteNum2>& r) { \
cout << *this << " " #op " " << r << " = "; \
auto res = binaryOp<ByteNum2, ByteNumO>(r, op_type); \
cout << res << endl; \
return res; \
#define DEFINE_OPERATOR(op, op_type) \
template <int BitNum2, int BitNumO = 128> \
Numeric<BitNumO> operator op(const Numeric<BitNum2>& r) { \
cout << *this << " " #op " " << r << " = "; \
auto res = binaryOp<BitNum2, BitNumO>(r, op_type); \
cout << res << endl; \
return res; \
}
DEFINE_OPERATOR(+, OP_TYPE_ADD);
@ -229,31 +230,44 @@ class Numeric {
DEFINE_OPERATOR(*, OP_TYPE_MULTI);
DEFINE_OPERATOR(/, OP_TYPE_DIV);
#define DEFINE_OPERATOR_T(op, op_type) \
template <typename T, int ByteNumO = 128> \
Numeric<ByteNumO> operator op(const T & r) { \
cout << *this << " " #op " " << r << " = "; \
auto res = binaryOp<ByteNumO, T>(r, op_type); \
cout << res << endl; \
return res; \
#define DEFINE_TYPE_OP(op, op_type) \
template <typename T, int BitNumO = 128> \
Numeric<BitNumO> operator op(const T & r) { \
cout << *this << " " #op " " << r << "(" << typeid(T).name() << ")" << " = "; \
auto res = binaryOp<BitNumO, T>(r, op_type); \
cout << res << endl; \
return res; \
}
DEFINE_OPERATOR_T(+, OP_TYPE_ADD);
DEFINE_OPERATOR_T(-, OP_TYPE_SUB);
DEFINE_OPERATOR_T(*, OP_TYPE_MULTI);
DEFINE_OPERATOR_T(/, OP_TYPE_DIV);
DEFINE_TYPE_OP(+, OP_TYPE_ADD);
DEFINE_TYPE_OP(-, OP_TYPE_SUB);
DEFINE_TYPE_OP(*, OP_TYPE_MULTI);
DEFINE_TYPE_OP(/, OP_TYPE_DIV);
template <int ByteNum2>
Numeric& operator+=(const Numeric<ByteNum2>& r) {
#define DEFINE_REAL_OP(op) \
double operator op(double v) { \
if (BitNum == 128) \
return TEST_decimal128ToDouble((Decimal128*)&dec_, prec(), scale()) / v; \
else if (BitNum == 64) \
return TEST_decimal64ToDouble((Decimal64*)&dec_, prec(), scale()) / v; \
return 0; \
}
DEFINE_REAL_OP(+);
DEFINE_REAL_OP(-);
DEFINE_REAL_OP(*);
DEFINE_REAL_OP(/);
template <int BitNum2>
Numeric& operator+=(const Numeric<BitNum2>& r) {
return binaryOp(r, OP_TYPE_ADD);
}
template <int ByteNum2>
bool operator==(const Numeric<ByteNum2>& r) {
template <int BitNum2>
bool operator==(const Numeric<BitNum2>& r) {
return binaryOp(r, OP_TYPE_EQUAL);
}
std::string toString() const {
char buf[64] = {0};
int32_t code = decimalToStr(&dec_, NumericType<ByteNum>::dataType, prec(), scale(), buf, 64);
int32_t code = decimalToStr(&dec_, NumericType<BitNum>::dataType, prec(), scale(), buf, 64);
if (code != 0) throw std::string(tstrerror(code));
return {buf};
}
@ -272,10 +286,77 @@ class Numeric {
if (ret.size() - sizeToRemove > 0) ret.resize(ret.size() - sizeToRemove);
return ret;
}
#define DEFINE_OPERATOR_T(type) \
operator type() { \
if (BitNum == 64) { \
return type##FromDecimal64(&dec_, prec(), scale()); \
} else if (BitNum == 128) { \
return type##FromDecimal128(&dec_, prec(), scale()); \
} \
return 0; \
}
DEFINE_OPERATOR_T(bool);
DEFINE_OPERATOR_T(int8_t);
DEFINE_OPERATOR_T(uint8_t);
DEFINE_OPERATOR_T(int16_t);
DEFINE_OPERATOR_T(uint16_t);
DEFINE_OPERATOR_T(int32_t);
DEFINE_OPERATOR_T(uint32_t);
DEFINE_OPERATOR_T(int64_t);
DEFINE_OPERATOR_T(uint64_t);
DEFINE_OPERATOR_T(float);
DEFINE_OPERATOR_T(double);
Numeric operator=(const char* str) {
std::string s = str;
int32_t code = 0;
if (BitNum == 64) {
code = decimal64FromStr(s.c_str(), s.size(), prec(), scale(), (Decimal64*)&dec_);
} else if (BitNum == 128) {
code = decimal128FromStr(s.c_str(), s.size(), prec(), scale(), (Decimal128*)&dec_);
}
if (TSDB_CODE_SUCCESS != code) {
throw std::string("failed to convert str to decimal64: ") + s + " " + tstrerror(code);
}
return *this;
}
#define DEFINE_OPERATOR_FROM_FOR_BITNUM(type, BitNum) \
if (std::is_floating_point<type>::value) { \
code = TEST_decimal##BitNum##From_double((Decimal##BitNum*)&dec_, prec(), scale(), v); \
} else if (std::is_signed<type>::value) { \
code = TEST_decimal##BitNum##From_int64_t((Decimal##BitNum*)&dec_, prec(), scale(), v); \
} else if (std::is_unsigned<type>::value) { \
code = TEST_decimal##BitNum##From_uint64_t((Decimal##BitNum*)&dec_, prec(), scale(), v); \
}
#define DEFINE_OPERATOR_EQ_T(type) \
Numeric operator=(type v) { \
int32_t code = 0; \
if (BitNum == 64) { \
DEFINE_OPERATOR_FROM_FOR_BITNUM(type, 64); \
} else if (BitNum == 128) { \
DEFINE_OPERATOR_FROM_FOR_BITNUM(type, 128); \
} \
return *this; \
}
DEFINE_OPERATOR_EQ_T(int64_t);
DEFINE_OPERATOR_EQ_T(int32_t);
DEFINE_OPERATOR_EQ_T(int16_t);
DEFINE_OPERATOR_EQ_T(int8_t);
DEFINE_OPERATOR_EQ_T(uint64_t);
DEFINE_OPERATOR_EQ_T(uint32_t);
DEFINE_OPERATOR_EQ_T(uint16_t);
DEFINE_OPERATOR_EQ_T(uint8_t);
DEFINE_OPERATOR_EQ_T(bool);
DEFINE_OPERATOR_EQ_T(double);
DEFINE_OPERATOR_EQ_T(float);
};
template <int ByteNum>
ostream& operator<<(ostream& os, const Numeric<ByteNum>& n) {
template <int BitNum>
ostream& operator<<(ostream& os, const Numeric<BitNum>& n) {
os << n.toString() << "(" << (int32_t)n.prec() << ":" << (int32_t)n.scale() << ")";
return os;
}
@ -337,7 +418,70 @@ TEST(decimal, numeric) {
ASSERT_EQ(os.toStringTrimTailingZeros(), "0.61728");
os = dec4 / 123123123;
ASSERT_EQ(os.toStringTrimTailingZeros(), "0.0000000100270361");
os = dec4 / (int64_t)123123123;
ASSERT_EQ(os.toStringTrimTailingZeros(), "0.0000000100270361075880117");
double dv = dec4 / 123123.123;
}
TEST(decimal, decimalFromType) {
Numeric<128> dec1{20, 4, "0"};
dec1 = 123.123;
ASSERT_EQ(dec1.toString(), "123.1230");
dec1 = (float)123.123;
ASSERT_EQ(dec1.toString(), "123.1230");
dec1 = (int64_t)-9999999;
ASSERT_EQ(dec1.toString(), "-9999999.0000");
dec1 = "99.99999";
ASSERT_EQ(dec1.toString(), "99.9999");
}
TEST(decimal, typeFromDecimal) {
Numeric<128> dec1{18, 4, "1234"};
Numeric<64> dec2{18, 4, "1234"};
int64_t intv = dec1;
uint64_t uintv = dec1;
double doublev = dec1;
ASSERT_EQ(intv, 1234);
ASSERT_EQ(uintv, 1234);
ASSERT_EQ(doublev, 1234);
doublev = dec2;
ASSERT_EQ(doublev, 1234);
intv = dec1 = "123.43";
uintv = dec1;
doublev = dec1;
ASSERT_EQ(intv, 123);
ASSERT_EQ(uintv, 123);
ASSERT_EQ(doublev, 123.43);
doublev = dec2 = "123.54";
ASSERT_EQ(doublev, 123.54);
intv = dec1 = "123.66";
uintv = dec1;
doublev = dec1;
ASSERT_EQ(intv, 124);
ASSERT_EQ(uintv, 124);
ASSERT_EQ(doublev, 123.66);
intv = dec1 = "-123.44";
uintv = dec1;
doublev = dec1;
ASSERT_EQ(intv, -123);
ASSERT_EQ(uintv, 0);
ASSERT_EQ(doublev, -123.44);
intv = dec1 = "-123.99";
uintv = dec1;
doublev = dec1;
ASSERT_EQ(intv, -124);
ASSERT_EQ(uintv, 0);
ASSERT_EQ(doublev, -123.99);
bool boolv = false;
boolv = dec1;
ASSERT_TRUE(boolv);
boolv = dec1 = "0";
ASSERT_FALSE(boolv);
}
// TEST where decimal column in (...)
@ -394,7 +538,7 @@ TEST(decimal128, divide) {
printDecimal(&d, TSDB_DATA_TYPE_DECIMAL, out_precision, out_scale);
}
TEST(decimal, cpi_taos_fetch_rows) {
TEST(decimal, api_taos_fetch_rows) {
const char* host = "127.0.0.1";
const char* user = "root";
const char* passwd = "taosdata";
@ -459,18 +603,18 @@ TEST(decimal, cpi_taos_fetch_rows) {
int32_t type_mod = *(int32_t*)(p + 1);
ASSERT_EQ(t, TSDB_DATA_TYPE_DECIMAL64);
auto check_type_mod = [](int32_t type_mod, uint8_t prec, uint8_t scale, int32_t bytes) {
ASSERT_EQ(type_mod & 0xFF, scale);
ASSERT_EQ((type_mod & 0xFF00) >> 8, prec);
ASSERT_EQ(type_mod >> 24, bytes);
auto check_type_mod = [](char* pStart, uint8_t prec, uint8_t scale, int32_t bytes) {
ASSERT_EQ(*pStart, bytes);
ASSERT_EQ(*(pStart + 2), prec);
ASSERT_EQ(*(pStart + 3), scale);
};
check_type_mod(type_mod, 10, 2, 8);
check_type_mod(p + 1, 10, 2, 8);
// col2
p += 5;
t = *(int8_t*)p;
type_mod = *(int32_t*)(p + 1);
check_type_mod(type_mod, 38, 10, 16);
check_type_mod(p + 1, 38, 10, 16);
p = p + 5 + BitmapLen(numOfRows) + colNum * 4;
int64_t row1Val = *(int64_t*)p;

View File

@ -459,7 +459,7 @@ static int32_t doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFu
__COMPARE_ACQUIRED_MAX(i, end, pCol->nullbitmap, pData, pCtx, *(double*)&(pBuf->v), &pBuf->tuplePos)
break;
}
case TSDB_DATA_TYPE_DECIMAL64: {
case TSDB_DATA_TYPE_DECIMAL64: {// TODO wjm merge decimal64 and decimal
const Decimal64* pData = (const Decimal64*)pCol->pData;
const SDecimalOps* pOps = getDecimalOps(TSDB_DATA_TYPE_DECIMAL64);
int32_t code = 0;

View File

@ -326,7 +326,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
indexError("invalid length for node:%p, length: %d", node, LIST_LENGTH(nl->pNodeList));
SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SIF_ERR_RET(scalarGenerateSetFromList((void **)&param->pFilter, node, nl->node.resType.type, 0));
SIF_ERR_RET(scalarGenerateSetFromList((void **)&param->pFilter, node, nl->node.resType.type, 0, 0));
if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
taosHashCleanup(param->pFilter);
param->pFilter = NULL;

View File

@ -25,9 +25,10 @@ extern "C" {
#include "function.h"
typedef struct SOperatorValueType {
int32_t opResType;
int32_t selfType;
int32_t peerType;
int32_t opResType;
int32_t selfType;
int32_t peerType;
STypeMod selfTypeMod;
} SOperatorValueType;
typedef struct SScalarCtx {
@ -138,7 +139,7 @@ int32_t sclConvertValueToSclParam(SValueNode* pValueNode, SScalarParam* out, int
int32_t sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam);
int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode);
#define GET_PARAM_TYPE(_c) ((_c)->columnData ? (_c)->columnData->info.type : (_c)->hashValueType)
#define GET_PARAM_TYPE(_c) ((_c)->columnData ? (_c)->columnData->info.type : (_c)->filterValueType)
#define GET_PARAM_BYTES(_c) ((_c)->columnData->info.bytes)
#define GET_PARAM_PRECISON(_c) ((_c)->columnData->info.precision)

View File

@ -137,7 +137,7 @@ __compar_fn_t gDataCompare[] = {
comparestrRegexNMatch, setChkNotInBytes1, setChkNotInBytes2, setChkNotInBytes4,
setChkNotInBytes8, compareChkNotInString, comparestrPatternNMatch, comparewcsPatternNMatch,
comparewcsRegexMatch, comparewcsRegexNMatch, compareLenBinaryVal, compareDecimal64,
compareDecimal128,
compareDecimal128, setChkInDecimalHash, setChkNotInDecimalHash,
};
__compar_fn_t gInt8SignCompare[] = {compareInt8Val, compareInt8Int16, compareInt8Int32,
@ -210,6 +210,10 @@ int32_t filterGetCompFuncIdx(int32_t type, int32_t optr, int8_t *comparFn, bool
*comparFn = 0;
code = TSDB_CODE_QRY_JSON_IN_ERROR;
break;
case TSDB_DATA_TYPE_DECIMAL64:
case TSDB_DATA_TYPE_DECIMAL:
*comparFn = 33;
break;
default:
*comparFn = 0;
break;
@ -243,6 +247,10 @@ int32_t filterGetCompFuncIdx(int32_t type, int32_t optr, int8_t *comparFn, bool
*comparFn = 0;
code = TSDB_CODE_QRY_JSON_IN_ERROR;
break;
case TSDB_DATA_TYPE_DECIMAL64:
case TSDB_DATA_TYPE_DECIMAL:
*comparFn = 34;
break;
default:
*comparFn = 0;
break;
@ -356,7 +364,7 @@ int32_t filterGetCompFuncIdx(int32_t type, int32_t optr, int8_t *comparFn, bool
break;
case TSDB_DATA_TYPE_DECIMAL:
*comparFn = 32;
break;
default:
*comparFn = 0;
break;
@ -2231,7 +2239,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
}
if (unit->compare.optr == OP_TYPE_IN) {
FLT_ERR_RET(scalarGenerateSetFromList((void **)&fi->data, fi->desc, type, 0));
FLT_ERR_RET(scalarGenerateSetFromList((void **)&fi->data, fi->desc, type, 0, 0));
if (fi->data == NULL) {
fltError("failed to convert in param");
FLT_ERR_RET(TSDB_CODE_APP_ERROR);

View File

@ -118,7 +118,7 @@ _return:
}
// processType = 0 means all type. 1 means number, 2 means var, 3 means float, 4 means var&integer
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type, int8_t processType) {
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type, STypeMod typeMod, int8_t processType) {
SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
if (NULL == pObj) {
sclError("taosHashInit failed, size:%d", 256);
@ -155,6 +155,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type, int8_
}
} else {
out.columnData->info.bytes = tDataTypes[type].bytes;
extractTypeFromTypeMod(type, typeMod, &out.columnData->info.precision, &out.columnData->info.scale, NULL);
}
int32_t overflow = 0;
@ -167,6 +168,10 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type, int8_
if (overflow) {
continue;
}
// TODO For decimal types, after conversion, check if we lose some scale to ignore values with larger scale
// e.g. convert decimal(18, 4) to decimal(18, 2) with value:
// 1.2345 -> 1.23. 1.23 != 1.2345, ignore this value, can't be the same as any decimal(18, 2)
// 1.2300 -> 1.23. 1.2300 == 1.23, take this value.
if (IS_VAR_DATA_TYPE(type)) {
buf = colDataGetVarData(out.columnData, 0);
@ -379,8 +384,9 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t
SCL_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t type = ctx->type.selfType;
SNode* nodeItem = NULL;
int32_t type = ctx->type.selfType;
STypeMod typeMod = 0;
SNode *nodeItem = NULL;
FOREACH(nodeItem, nodeList->pNodeList) {
SValueNode *valueNode = (SValueNode *)nodeItem;
int32_t tmp = vectorGetConvertType(type, valueNode->node.resType.type);
@ -392,18 +398,26 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t
if (IS_NUMERIC_TYPE(type)){
ctx->type.peerType = type;
}
// Currently, all types of node list can't be decimal types.
// Decimal op double/float/nchar/varchar types will convert these types to double type.
// All other types do not need scale info, so here we use self scale
// When decimal types are supported in value list, we need to check convertability of different decimal types.
// And the new decimal scale will also be calculated.
if (IS_DECIMAL_TYPE(type))
typeMod = decimalCalcTypeMod(TSDB_DECIMAL_MAX_PRECISION, getScaleFromTypeMod(type, ctx->type.selfTypeMod));
type = ctx->type.peerType;
if (IS_VAR_DATA_TYPE(ctx->type.selfType) && IS_NUMERIC_TYPE(type)){
SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, type, 1));
SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilterOthers, node, ctx->type.selfType, 2));
SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, type, typeMod, 1));
SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilterOthers, node, ctx->type.selfType, typeMod, 2));
} else if (IS_INTEGER_TYPE(ctx->type.selfType) && IS_FLOAT_TYPE(type)){
SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, type, 2));
SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilterOthers, node, ctx->type.selfType, 4));
SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, type, typeMod, 2));
SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilterOthers, node, ctx->type.selfType, typeMod, 4));
} else {
SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, type, 0));
SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, type, typeMod, 0));
}
param->hashValueType = type;
param->filterValueTypeMod = typeMod;
param->filterValueType = type;
param->colAlloced = true;
if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
taosHashCleanup(param->pHashFilter);
@ -554,7 +568,7 @@ _return:
SCL_RET(code);
}
int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx, int32_t *type) {
int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx, int32_t *type, STypeMod* pTypeMod) {
if (NULL == pNode) {
*type = -1;
return TSDB_CODE_SUCCESS;
@ -564,16 +578,19 @@ int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx, int32_t *type) {
case QUERY_NODE_VALUE: {
SValueNode *valueNode = (SValueNode *)pNode;
*type = valueNode->node.resType.type;
*pTypeMod = typeGetTypeModFromDataType(&valueNode->node.resType);
return TSDB_CODE_SUCCESS;
}
case QUERY_NODE_NODE_LIST: {
SNodeListNode *nodeList = (SNodeListNode *)pNode;
*type = nodeList->node.resType.type;
*pTypeMod = typeGetTypeModFromDataType(&nodeList->node.resType);
return TSDB_CODE_SUCCESS;
}
case QUERY_NODE_COLUMN: {
SColumnNode *colNode = (SColumnNode *)pNode;
*type = colNode->node.resType.type;
*pTypeMod = typeGetTypeModFromDataType(&colNode->node.resType);
return TSDB_CODE_SUCCESS;
}
case QUERY_NODE_FUNCTION:
@ -585,18 +602,20 @@ int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx, int32_t *type) {
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
*type = (int32_t)(res->columnData->info.type);
*pTypeMod = typeGetTypeModFromColInfo(&res->columnData->info);
return TSDB_CODE_SUCCESS;
}
}
*type = -1;
*pTypeMod = 0;
return TSDB_CODE_SUCCESS;
}
int32_t sclSetOperatorValueType(SOperatorNode *node, SScalarCtx *ctx) {
ctx->type.opResType = node->node.resType.type;
SCL_ERR_RET(sclGetNodeType(node->pLeft, ctx, &(ctx->type.selfType)));
SCL_ERR_RET(sclGetNodeType(node->pRight, ctx, &(ctx->type.peerType)));
SCL_ERR_RET(sclGetNodeType(node->pLeft, ctx, &(ctx->type.selfType), &ctx->type.selfTypeMod));
SCL_ERR_RET(sclGetNodeType(node->pRight, ctx, &(ctx->type.peerType), &ctx->type.selfTypeMod));
SCL_RET(TSDB_CODE_SUCCESS);
}
@ -1808,6 +1827,7 @@ static int32_t sclGetBitwiseOperatorResType(SOperatorNode *pOp) {
if (TSDB_DATA_TYPE_VARBINARY == ldt.type || TSDB_DATA_TYPE_VARBINARY == rdt.type) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (IS_DECIMAL_TYPE(ldt.type) || IS_DECIMAL_TYPE(rdt.type)) return TSDB_CODE_TSC_INVALID_OPERATION;
pOp->node.resType.type = TSDB_DATA_TYPE_BIGINT;
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
return TSDB_CODE_SUCCESS;

View File

@ -45,6 +45,10 @@ bool noConvertBeforeCompare(int32_t leftType, int32_t rightType, int32_t optr) {
IS_NUMERIC_TYPE(rightType) && (optr >= OP_TYPE_GREATER_THAN && optr <= OP_TYPE_NOT_EQUAL);
}
bool compareForType(__compar_fn_t fp, int32_t optr, SColumnInfoData* pColL, int32_t idxL, SColumnInfoData* pColR, int32_t idxR);
bool compareForTypeWithColAndHash(__compar_fn_t fp, int32_t optr, SColumnInfoData *pColL, int32_t idxL,
const void *hashData, int32_t hashType, STypeMod hashTypeMod);
static int32_t vectorMathOpForDecimal(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t step, int32_t i, EOperatorType op);
int32_t convertNumberToNumber(const void *inData, void *outData, int8_t inType, int8_t outType) {
@ -1012,7 +1016,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam *pIn, SScalarParam *pOut,
Decimal value = {0};
SDataType inputType = GET_COL_DATA_TYPE(pInputCol->info), outputType = GET_COL_DATA_TYPE(pOutputCol->info);
int32_t code = convertToDecimal(colDataGetData(pInputCol, i), &inputType, &value, &outputType);
if (TSDB_CODE_SUCCESS != code) return code;
if (TSDB_CODE_SUCCESS != code) return code; // TODO wjm handle overflow
code = colDataSetVal(pOutputCol, i, (const char*)&value, false);
if (TSDB_CODE_SUCCESS != code) return code;
}
@ -1090,8 +1094,21 @@ int32_t vectorGetConvertType(int32_t type1, int32_t type2) {
return gConvertTypes[type2][type1];
}
int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_t type, int32_t startIndex,
int32_t numOfRows) {
STypeMod getConvertTypeMod(int32_t type, const SColumnInfo* pCol1, const SColumnInfo* pCol2) {
if (IS_DECIMAL_TYPE(type)) {
if (IS_DECIMAL_TYPE(pCol1->type) && (!pCol2 || !IS_DECIMAL_TYPE(pCol2->type))) {
return decimalCalcTypeMod(GET_DEICMAL_MAX_PRECISION(type), pCol1->scale);
} else if (pCol2 && IS_DECIMAL_TYPE(pCol2->type) && !IS_DECIMAL_TYPE(pCol1->type)) {
return decimalCalcTypeMod(GET_DEICMAL_MAX_PRECISION(type), pCol2->scale);
} else {
return 0;
}
}
return 0;
}
int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_t type, STypeMod typeMod,
int32_t startIndex, int32_t numOfRows) {
if (input->columnData == NULL && (input->pHashFilter != NULL || input->pHashFilterOthers != NULL)){
return TSDB_CODE_SUCCESS;
}
@ -1100,6 +1117,11 @@ int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_
SDataType t = {.type = type};
t.bytes = (IS_VAR_DATA_TYPE(t.type) && input->columnData) ? input->columnData->info.bytes:tDataTypes[type].bytes;
t.precision = (IS_TIMESTAMP_TYPE(t.type) && input->columnData) ? input->columnData->info.precision : TSDB_TIME_PRECISION_MILLI;
if (IS_DECIMAL_TYPE(type)) {
extractTypeFromTypeMod(type, typeMod, &t.precision, &t.scale, NULL);
// We do not change scale here for decimal types.
if (IS_DECIMAL_TYPE(input->columnData->info.type)) t.scale = input->columnData->info.scale;
}
int32_t code = sclCreateColumnInfoData(&t, input->numOfRows, output);
if (code != TSDB_CODE_SUCCESS) {
@ -1120,13 +1142,14 @@ int32_t vectorConvertCols(SScalarParam *pLeft, SScalarParam *pRight, SScalarPara
int32_t rightType = GET_PARAM_TYPE(pRight);
if (leftType == rightType) {
if (IS_DECIMAL_TYPE(leftType)) {
//TODO wjm force do conversion for decimal type
//TODO wjm force do conversion for decimal type, do not convert any more, do conversion inside decimal.c
}
return TSDB_CODE_SUCCESS;
}
int8_t type = 0;
int32_t code = 0;
int8_t type = 0;
int32_t code = 0;
STypeMod outTypeMod = 0;
SScalarParam *param1 = pLeft, *paramOut1 = pLeftOut;
SScalarParam *param2 = pRight, *paramOut2 = pRightOut;
@ -1149,14 +1172,15 @@ int32_t vectorConvertCols(SScalarParam *pLeft, SScalarParam *pRight, SScalarPara
terrno = TSDB_CODE_SCALAR_CONVERT_ERROR;
return TSDB_CODE_SCALAR_CONVERT_ERROR;
}
outTypeMod = getConvertTypeMod(type, &param1->columnData->info, param2->columnData ? &param2->columnData->info : NULL);
}
if (type != GET_PARAM_TYPE(param1)) {
SCL_ERR_RET(vectorConvertSingleCol(param1, paramOut1, type, startIndex, numOfRows));
SCL_ERR_RET(vectorConvertSingleCol(param1, paramOut1, type, outTypeMod, startIndex, numOfRows));
}
if (type != GET_PARAM_TYPE(param2)) {
SCL_ERR_RET(vectorConvertSingleCol(param2, paramOut2, type, startIndex, numOfRows));
SCL_ERR_RET(vectorConvertSingleCol(param2, paramOut2, type, outTypeMod, startIndex, numOfRows));
}
return TSDB_CODE_SUCCESS;
@ -1228,7 +1252,7 @@ static int32_t vectorConvertVarToDouble(SScalarParam *pInput, int32_t *converted
int32_t code = TSDB_CODE_SUCCESS;
*pOutputCol = NULL;
if (IS_VAR_DATA_TYPE(pCol->info.type) && pCol->info.type != TSDB_DATA_TYPE_JSON && pCol->info.type != TSDB_DATA_TYPE_VARBINARY) {
SCL_ERR_RET(vectorConvertSingleCol(pInput, &output, TSDB_DATA_TYPE_DOUBLE, -1, -1));
SCL_ERR_RET(vectorConvertSingleCol(pInput, &output, TSDB_DATA_TYPE_DOUBLE, 0, -1, -1));
*converted = VECTOR_DO_CONVERT;
*pOutputCol = output.columnData;
SCL_RET(code);
@ -1393,6 +1417,8 @@ int32_t vectorMathSub(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *p
SColumnInfoData *pRightCol = NULL;
if (pOutputCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) { // timestamp minus duration
SCL_ERR_JRET(vectorConvertVarToDouble(pLeft, &leftConvert, &pLeftCol));
SCL_ERR_JRET(vectorConvertVarToDouble(pRight, &rightConvert, &pRightCol));
int64_t *output = (int64_t *)pOutputCol->pData;
_getBigintValue_fn_t getVectorBigintValueFnLeft;
_getBigintValue_fn_t getVectorBigintValueFnRight;
@ -1812,10 +1838,7 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i;
int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i;
char *pLeftData = colDataGetData(pLeft->columnData, leftIndex);
char *pRightData = colDataGetData(pRight->columnData, rightIndex);
pRes[i] = filterDoCompare(fp, optr, pLeftData, pRightData);
pRes[i] = compareForType(fp, optr, pLeft->columnData, leftIndex, pRight->columnData, rightIndex);
if (pRes[i]) {
++(*num);
}
@ -1830,9 +1853,7 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
pRes[i] = false;
continue;
}
char *pLeftData = colDataGetData(pLeft->columnData, leftIndex);
char *pRightData = colDataGetData(pRight->columnData, rightIndex);
pRes[i] = filterDoCompare(fp, optr, pLeftData, pRightData);
pRes[i] = compareForType(fp, optr, pLeft->columnData, leftIndex, pRight->columnData, rightIndex);
if (pRes[i]) {
++(*num);
}
@ -1912,7 +1933,7 @@ int32_t doVectorCompare(SScalarParam *pLeft, SScalarParam *pLeftVar, SScalarPara
return TSDB_CODE_INTERNAL_ERROR;
}
if (pLeftVar != NULL) {
if (pLeftVar != NULL) {// TODO wjm test when pLeftVar is not NULL
SCL_ERR_RET(filterGetCompFunc(&fpVar, GET_PARAM_TYPE(pLeftVar), optr));
}
if (startIndex < 0) {
@ -1932,8 +1953,8 @@ int32_t doVectorCompare(SScalarParam *pLeft, SScalarParam *pLeftVar, SScalarPara
continue;
}
char *pLeftData = colDataGetData(pLeft->columnData, i);
bool res = filterDoCompare(fp, optr, pLeftData, pRight->pHashFilter);
bool res = compareForTypeWithColAndHash(fp, optr, pLeft->columnData, i, pRight->pHashFilter,
pRight->filterValueType, pRight->filterValueTypeMod);
if (pLeftVar != NULL && taosHashGetSize(pRight->pHashFilterOthers) > 0){
do{
if (optr == OP_TYPE_IN && res){
@ -1942,8 +1963,8 @@ int32_t doVectorCompare(SScalarParam *pLeft, SScalarParam *pLeftVar, SScalarPara
if (optr == OP_TYPE_NOT_IN && !res){
break;
}
pLeftData = colDataGetData(pLeftVar->columnData, i);
res = filterDoCompare(fpVar, optr, pLeftData, pRight->pHashFilterOthers);
res = compareForTypeWithColAndHash(fpVar, optr, pLeftVar->columnData, i, pRight->pHashFilterOthers,
pRight->filterValueType, pRight->filterValueTypeMod);
}while(0);
}
colDataSetInt8(pOut->columnData, i, (int8_t *)&res);
@ -2291,3 +2312,32 @@ static int32_t vectorMathOpForDecimal(SScalarParam *pLeft, SScalarParam *pRight,
}
return code;
}
bool compareForType(__compar_fn_t fp, int32_t optr, SColumnInfoData* pColL, int32_t idxL, SColumnInfoData* pColR, int32_t idxR) {
void* pLeftData = colDataGetData(pColL, idxL), *pRightData = colDataGetData(pColR, idxR);
if (IS_DECIMAL_TYPE(pColL->info.type) || IS_DECIMAL_TYPE(pColR->info.type)) {
SDecimalCompareCtx ctxL = {.pData = pLeftData,
.type = pColL->info.type,
.typeMod = typeGetTypeModFromColInfo(&pColL->info)},
ctxR = {.pData = pRightData,
.type = pColR->info.type,
.typeMod = typeGetTypeModFromColInfo(&pColR->info)};
return filterDoCompare(fp, optr, &ctxL, &ctxR);
} else {
return filterDoCompare(fp, optr, pLeftData, pRightData);
}
}
bool compareForTypeWithColAndHash(__compar_fn_t fp, int32_t optr, SColumnInfoData *pColL, int32_t idxL,
const void *pHashData, int32_t hashType, STypeMod hashTypeMod) {
void * pLeftData = colDataGetData(pColL, idxL);
if (IS_DECIMAL_TYPE(pColL->info.type) || IS_DECIMAL_TYPE(hashType)) {
SDecimalCompareCtx ctxL = {.pData = pLeftData,
.type = pColL->info.type,
.typeMod = typeGetTypeModFromColInfo(&pColL->info)},
ctxR = {.pData = (void *)pHashData, .type = hashType, .typeMod = hashTypeMod};
return filterDoCompare(fp, optr, &ctxL, &ctxR);
} else {
return filterDoCompare(fp, optr, pLeftData, (void*)pHashData);
}
}

View File

@ -59,6 +59,15 @@ int32_t setChkNotInBytes8(const void *pLeft, const void *pRight) {
return NULL == taosHashGet((SHashObj *)pRight, pLeft, 8) ? 1 : 0;
}
int32_t setChkInDecimalHash(const void* pLeft, const void* pRight) {
const SDecimalCompareCtx *pCtxL = pLeft, *pCtxR = pRight;
return NULL != taosHashGet((SHashObj *)(pCtxR->pData), pCtxL->pData, tDataTypes[pCtxL->type].bytes) ? 1 : 0;
}
int32_t setChkNotInDecimalHash(const void* pLeft, const void* pRight) {
return NULL == taosHashGet((SHashObj *)pRight, pLeft, 16) ? 1 : 0;
}
int32_t compareChkInString(const void *pLeft, const void *pRight) {
return NULL != taosHashGet((SHashObj *)pRight, pLeft, varDataTLen(pLeft)) ? 1 : 0;
}
@ -1036,18 +1045,15 @@ int32_t compareUint64Uint32(const void *pLeft, const void *pRight) {
}
int32_t compareDecimal64(const void* pleft, const void* pright) {
SDecimalOps* pOps = getDecimalOps(TSDB_DATA_TYPE_DECIMAL64);
if (pOps->gt(pleft, pright, WORD_NUM(Decimal64))) return 1;
if (pOps->lt(pleft, pright, WORD_NUM(Decimal64))) return -1;
if (decimalCompare(OP_TYPE_GREATER_THAN, pleft, pright)) return 1;
if (decimalCompare(OP_TYPE_LOWER_THAN, pleft, pright)) return -1;
return 0;
}
int32_t compareDecimal128(const void* pleft, const void* pright) {
SDecimalOps* pOps = getDecimalOps(TSDB_DATA_TYPE_DECIMAL);
if (pOps->gt(pleft, pright, WORD_NUM(Decimal128))) return 1;
if (pOps->lt(pleft,pright, WORD_NUM(Decimal128))) return -1;
if (decimalCompare(OP_TYPE_GREATER_THAN, pleft, pright)) return 1;
if (decimalCompare(OP_TYPE_LOWER_THAN, pleft, pright)) return -1;
return 0;
}
int32_t compareJsonValDesc(const void *pLeft, const void *pRight) { return compareJsonVal(pRight, pLeft); }

View File

@ -235,6 +235,10 @@ int32_t taosDoubleEqual(const void *a, const void *b, size_t UNUSED_PARAM(sz)) {
return getComparFunc(TSDB_DATA_TYPE_DOUBLE, -1)(a, b);
}
int32_t taosDecimalEqual(const void* a, const void* b, size_t UNUSED_PARAM(sz)) {
return 0;
}
_equal_fn_t taosGetDefaultEqualFunction(int32_t type) {
_equal_fn_t fn = NULL;
switch (type) {
@ -244,6 +248,10 @@ _equal_fn_t taosGetDefaultEqualFunction(int32_t type) {
case TSDB_DATA_TYPE_DOUBLE:
fn = taosDoubleEqual;
break;
case TSDB_DATA_TYPE_DECIMAL64:
case TSDB_DATA_TYPE_DECIMAL:
fn = memcmp;
break;
default:
fn = memcmp;
break;