decimal + - * /

This commit is contained in:
wangjiaming0909 2025-01-14 09:08:46 +08:00
parent 2481c768b6
commit f18e5879f4
3 changed files with 498 additions and 131 deletions

View File

@ -58,6 +58,20 @@ extern const UInt128 uInt128_1e18;
extern const UInt128 uInt128Zero; extern const UInt128 uInt128Zero;
extern const uint64_t k1e18; extern const uint64_t k1e18;
static inline int32_t countLeadingZeros(uint64_t v) {
#if defined(__clang__) || defined(__GUNC__)
if (v == 0) return 64;
return __builtin_clzll(v);
#else
int32_t bitpos = 0;
while (v != 0) {
v >>= 1;
++bitpos;
}
return 64 - bitpos;
#endif
}
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -98,8 +98,12 @@ int32_t decimalGetRetType(const SDataType* pLeftT, const SDataType* pRightT, EOp
default: default:
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
pOutType->precision = TMIN(pOutType->precision, TSDB_DECIMAL_MAX_PRECISION); if (pOutType->precision > TSDB_DECIMAL_MAX_PRECISION) {
pOutType->scale = TMIN(pOutType->scale, TSDB_DECIMAL_MAX_SCALE); int8_t minScale = TMIN(DECIMAL_MIN_ADJUSTED_SCALE, pOutType->scale);
int8_t delta = pOutType->precision - TSDB_DECIMAL_MAX_PRECISION;
pOutType->precision = TSDB_DECIMAL_MAX_PRECISION;
pOutType->scale = TMAX(minScale, (int8_t)(pOutType->scale) - delta);
}
pOutType->type = pOutType->type =
pOutType->precision > TSDB_DECIMAL64_MAX_PRECISION ? TSDB_DATA_TYPE_DECIMAL : TSDB_DATA_TYPE_DECIMAL64; pOutType->precision > TSDB_DECIMAL64_MAX_PRECISION ? TSDB_DATA_TYPE_DECIMAL : TSDB_DATA_TYPE_DECIMAL64;
pOutType->bytes = tDataTypes[pOutType->type].bytes; pOutType->bytes = tDataTypes[pOutType->type].bytes;
@ -304,6 +308,7 @@ static int32_t decimal128ToStr(const DecimalType* pInt, uint8_t scale, char* pBu
void decimal128ScaleTo(Decimal128* pDec, uint8_t oldScale, uint8_t newScale); void decimal128ScaleTo(Decimal128* pDec, uint8_t oldScale, uint8_t newScale);
void decimal128ScaleDown(Decimal128* pDec, uint8_t scaleDown); void decimal128ScaleDown(Decimal128* pDec, uint8_t scaleDown);
void decimal128ScaleUp(Decimal128* pDec, uint8_t scaleUp); void decimal128ScaleUp(Decimal128* pDec, uint8_t scaleUp);
int32_t decimal128CountLeadingBinaryZeros(const Decimal128* pDec);
SDecimalOps decimal64Ops = {decimal64Negate, decimal64Abs, decimal64Add, decimal64Subtract, SDecimalOps decimal64Ops = {decimal64Negate, decimal64Abs, decimal64Add, decimal64Subtract,
decimal64Multiply, decimal64divide, decimal64Mod, decimal64Lt, decimal64Multiply, decimal64divide, decimal64Mod, decimal64Lt,
@ -425,7 +430,7 @@ int32_t decimal64ToStr(const DecimalType* pInt, uint8_t scale, char* pBuf, int32
// TODO wjm handle endian problem // TODO wjm handle endian problem
#define DEFINE_DECIMAL128(lo, hi) {lo, hi} #define DEFINE_DECIMAL128(lo, hi) {lo, hi}
static Decimal128 SCALE_MULTIPLIER_128[38 + 1] = { static const Decimal128 SCALE_MULTIPLIER_128[38 + 1] = {
DEFINE_DECIMAL128(1LL, 0), DEFINE_DECIMAL128(1LL, 0),
DEFINE_DECIMAL128(10LL, 0), DEFINE_DECIMAL128(10LL, 0),
DEFINE_DECIMAL128(100LL, 0), DEFINE_DECIMAL128(100LL, 0),
@ -467,9 +472,20 @@ static Decimal128 SCALE_MULTIPLIER_128[38 + 1] = {
DEFINE_DECIMAL128(687399551400673280ULL, 5421010862427522170LL), DEFINE_DECIMAL128(687399551400673280ULL, 5421010862427522170LL),
}; };
static const Decimal128 decimal128Zero = DEFINE_DECIMAL128(0, 0);
static const Decimal128 decimal128Max = DEFINE_DECIMAL128(687399551400673280ULL - 1, 5421010862427522170LL);
#define DECIMAL128_ZERO decimal128Zero
#define DECIMAL128_MAX decimal128Max
#define DECIMAL128_ONE SCALE_MULTIPLIER_128[0] #define DECIMAL128_ONE SCALE_MULTIPLIER_128[0]
#define DECIMAL128_TEN SCALE_MULTIPLIER_128[1] #define DECIMAL128_TEN SCALE_MULTIPLIER_128[1]
// To calculate how many bits for integer X.
// eg. 999(3 digits) -> 1111100111(10 bits) -> bitsForNumDigits[3] = 10
static const int32_t bitsForNumDigits[] = {0, 4, 7, 10, 14, 17, 20, 24, 27, 30, 34, 37, 40,
44, 47, 50, 54, 57, 60, 64, 67, 70, 74, 77, 80, 84,
87, 90, 94, 97, 100, 103, 107, 110, 113, 117, 120, 123, 127};
// TODO wjm pre define it?? actually, its MAX_INTEGER, not MAX // TODO wjm pre define it?? actually, its MAX_INTEGER, not MAX
#define DECIMAL128_GET_MAX(precision, pMax) \ #define DECIMAL128_GET_MAX(precision, pMax) \
do { \ do { \
@ -682,33 +698,228 @@ int32_t decimalToStr(const DecimalType* pDec, int8_t dataType, int8_t precision,
return 0; return 0;
} }
static bool needScaleToOutScale(EOperatorType op) {
switch (op) {
case OP_TYPE_ADD:
case OP_TYPE_SUB:
return true; // convert precision and scale
case OP_TYPE_MULTI:
case OP_TYPE_DIV:
return false;
break;
default:
return false;
}
}
static void decimalAddLargePositive(Decimal* pX, const SDataType* pXT, const Decimal* pY, const SDataType* pYT,
const SDataType* pOT) {
Decimal wholeX = *pX, wholeY = *pY, fracX = {0}, fracY = {0};
decimal128Divide(&wholeX, &SCALE_MULTIPLIER_128[pXT->scale], WORD_NUM(Decimal), &fracX);
decimal128Divide(&wholeY, &SCALE_MULTIPLIER_128[pYT->scale], WORD_NUM(Decimal), &fracY);
uint8_t maxScale = TMAX(pXT->scale, pYT->scale);
decimal128ScaleUp(&fracX, maxScale - pXT->scale);
decimal128ScaleUp(&fracY, maxScale - pYT->scale);
Decimal pMultiplier = SCALE_MULTIPLIER_128[maxScale];
Decimal right = fracX;
Decimal carry = {0};
decimal128Subtract(&pMultiplier, &fracY, WORD_NUM(Decimal));
if (!decimal128Gt(&pMultiplier, &fracX, WORD_NUM(Decimal))) {
decimal128Subtract(&right, &pMultiplier, WORD_NUM(Decimal));
makeDecimal128(&carry, 0, 1);
} else {
decimal128Add(&right, &fracY, WORD_NUM(Decimal));
}
decimal128ScaleDown(&right, maxScale - pOT->scale);
decimal128Add(&wholeX, &wholeY, WORD_NUM(Decimal));
decimal128Add(&wholeX, &carry, WORD_NUM(Decimal));
decimal128Multiply(&wholeX, &SCALE_MULTIPLIER_128[pOT->scale], WORD_NUM(Decimal));
decimal128Add(&wholeX, &right, WORD_NUM(Decimal));
*pX = wholeX;
}
static void decimalAddLargeNegative(Decimal* pX, const SDataType* pXT, const Decimal* pY, const SDataType* pYT,
const SDataType* pOT) {
Decimal wholeX = *pX, wholeY = *pY, fracX = {0}, fracY = {0};
decimal128Divide(&wholeX, &SCALE_MULTIPLIER_128[pXT->scale], WORD_NUM(Decimal), &fracX);
decimal128Divide(&wholeY, &SCALE_MULTIPLIER_128[pYT->scale], WORD_NUM(Decimal), &fracY);
uint8_t maxScale = TMAX(pXT->scale, pYT->scale);
decimal128ScaleUp(&fracX, maxScale - pXT->scale);
decimal128ScaleUp(&fracY, maxScale - pYT->scale);
decimal128Add(&wholeX, &wholeY, WORD_NUM(Decimal));
decimal128Add(&fracX, &fracY, WORD_NUM(Decimal));
if (DECIMAL128_SIGN(&wholeX) == -1 && decimal128Gt(&fracX, &DECIMAL128_ZERO, WORD_NUM(Decimal128))) {
decimal128Add(&wholeX, &DECIMAL128_ONE, WORD_NUM(Decimal));
decimal128Subtract(&fracX, &SCALE_MULTIPLIER_128[maxScale], WORD_NUM(Decimal));
} else if (decimal128Gt(&wholeX, &DECIMAL128_ZERO, WORD_NUM(Decimal128)) && DECIMAL128_SIGN(&fracX) == -1) {
decimal128Subtract(&wholeX, &DECIMAL128_ONE, WORD_NUM(Decimal));
decimal128Add(&fracX, &SCALE_MULTIPLIER_128[maxScale], WORD_NUM(Decimal));
}
decimal128ScaleDown(&fracX, maxScale - pOT->scale);
decimal128Multiply(&wholeX, &SCALE_MULTIPLIER_128[pOT->scale], WORD_NUM(Decimal));
decimal128Add(&wholeX, &fracX, WORD_NUM(Decimal));
*pX = wholeX;
}
static void decimalAdd(Decimal* pX, const SDataType* pXT, const Decimal* pY, const SDataType* pYT,
const SDataType* pOT) {
if (pOT->precision < TSDB_DECIMAL_MAX_PRECISION) {
uint8_t maxScale = TMAX(pXT->scale, pYT->scale);
Decimal tmpY = *pY;
decimal128ScaleTo(pX, pXT->scale, maxScale);
decimal128ScaleTo(&tmpY, pYT->scale, maxScale);
decimal128Add(pX, &tmpY, WORD_NUM(Decimal));
} else {
int8_t signX = DECIMAL128_SIGN(pX), signY = DECIMAL128_SIGN(pY);
if (signX == 1 && signY == 1) {
decimalAddLargePositive(pX, pXT, pY, pYT, pOT);
} else if (signX == -1 && signY == -1) {
decimal128Negate(pX);
Decimal y = *pY;
decimal128Negate(&y);
decimalAddLargePositive(pX, pXT, &y, pYT, pOT);
decimal128Negate(pX);
} else {
decimalAddLargeNegative(pX, pXT, pY, pYT, pOT);
}
}
}
static int32_t decimalMultiply(Decimal* pX, const SDataType* pXT, const Decimal* pY, const SDataType* pYT,
const SDataType* pOT) {
if (pOT->precision < TSDB_DECIMAL_MAX_PRECISION) {
decimal128Multiply(pX, pY, WORD_NUM(Decimal));
} else if (decimal128Eq(pX, &DECIMAL128_ZERO, WORD_NUM(Decimal)) ||
decimal128Eq(pY, &DECIMAL128_ZERO, WORD_NUM(Decimal))) {
makeDecimal128(pX, 0, 0);
} else {
int8_t deltaScale = pXT->scale + pYT->scale - pOT->scale;
Decimal xAbs = *pX, yAbs = *pY;
decimal128Abs(&xAbs);
decimal128Abs(&yAbs);
if (deltaScale == 0) {
// no need to trim scale
Decimal max = DECIMAL128_MAX;
decimal128Divide(&max, &yAbs, WORD_NUM(Decimal), NULL);
if (decimal128Gt(&xAbs, &max, WORD_NUM(Decimal))) {
return TSDB_CODE_DECIMAL_OVERFLOW;
} else {
decimal128Multiply(pX, pY, WORD_NUM(Decimal));
}
} else {
int32_t leadingZeros = decimal128CountLeadingBinaryZeros(&xAbs) + decimal128CountLeadingBinaryZeros(&yAbs);
if (leadingZeros <= 128) {
// need to trim scale
return TSDB_CODE_DECIMAL_OVERFLOW;
} else {
// no need to trim scale
if (deltaScale <= 38) {
decimal128Multiply(pX, pY, WORD_NUM(Decimal));
decimal128ScaleDown(pX, deltaScale);
} else {
makeDecimal128(pX, 0, 0);
}
}
}
}
return 0;
}
int32_t decimalDivide(Decimal* pX, const SDataType* pXT, const Decimal* pY, const SDataType* pYT,
const SDataType* pOT) {
if (decimal128Eq(pY, &DECIMAL128_ZERO, WORD_NUM(Decimal))) {
return TSDB_CODE_DECIMAL_OVERFLOW; // TODO wjm divide zero error
}
int8_t deltaScale = pOT->scale + pYT->scale - pXT->scale;
assert(deltaScale >= 0);
Decimal xTmp = *pX;
decimal128Abs(&xTmp);
int32_t bitsOccupied = 128 - decimal128CountLeadingBinaryZeros(&xTmp);
if (bitsOccupied + bitsForNumDigits[deltaScale] <= 127) {
xTmp = *pX;
decimal128ScaleUp(&xTmp, deltaScale);
Decimal remainder = {0};
decimal128Divide(&xTmp, pY, WORD_NUM(Decimal), &remainder);
Decimal tmpY = *pY, two = DEFINE_DECIMAL128(2, 0);
decimal64Abs(&tmpY);
decimal128Multiply(&remainder, &two, WORD_NUM(Decimal));
decimal128Abs(&remainder);
if (!decimal128Lt(&remainder, &tmpY, WORD_NUM(Decimal))) {
int64_t extra = (DECIMAL128_SIGN(pX) ^ DECIMAL128_SIGN(pY)) + 1;
decimal128Add(&xTmp, &extra, WORD_NUM(Decimal64));
}
} else {
return TSDB_CODE_DECIMAL_OVERFLOW;
}
*pX = xTmp;
return 0;
}
int32_t decimalOp(EOperatorType op, const SDataType* pLeftT, const SDataType* pRightT, const SDataType* pOutT, int32_t decimalOp(EOperatorType op, const SDataType* pLeftT, const SDataType* pRightT, const SDataType* pOutT,
const void* pLeftData, const void* pRightData, void* pOutputData) { const void* pLeftData, const void* pRightData, void* pOutputData) {
int32_t code = 0; int32_t code = 0;
// TODO wjm if output precision <= 18, no need to convert to decimal128 // TODO wjm if output precision <= 18, no need to convert to decimal128
Decimal pLeft = {0}, pRight = {0}; Decimal left = {0}, right = {0};
SDataType tmpType = *pOutT; SDataType lt = {.type = TSDB_DATA_TYPE_DECIMAL,
tmpType.type = TSDB_DATA_TYPE_DECIMAL; .precision = TSDB_DECIMAL_MAX_PRECISION,
tmpType.precision = TSDB_DECIMAL_MAX_PRECISION; .bytes = tDataTypes[TSDB_DATA_TYPE_DECIMAL].bytes,
if (TSDB_DATA_TYPE_DECIMAL != pLeftT->type || pLeftT->scale != pOutT->scale) { .scale = pLeftT->scale};
code = convertToDecimal(pLeftData, pLeftT, &pLeft, &tmpType); SDataType rt = {.type = TSDB_DATA_TYPE_DECIMAL,
.precision = TSDB_DECIMAL_MAX_PRECISION,
.bytes = tDataTypes[TSDB_DATA_TYPE_DECIMAL].bytes,
.scale = pRightT->scale};
if (TSDB_DATA_TYPE_DECIMAL != pLeftT->type) {
code = convertToDecimal(pLeftData, pLeftT, &left, &lt);
if (TSDB_CODE_SUCCESS != code) return code; if (TSDB_CODE_SUCCESS != code) return code;
} else {
left = *(Decimal*)pLeftData;
} }
if (pRightT && (TSDB_DATA_TYPE_DECIMAL != pRightT->type || pRightT->scale != pOutT->scale)) { if (TSDB_DATA_TYPE_DECIMAL != pRightT->type) {
code = convertToDecimal(pRightData, pRightT, &pRight, &tmpType); code = convertToDecimal(pRightData, pRightT, &right, &rt);
if (TSDB_CODE_SUCCESS != code) return code; if (TSDB_CODE_SUCCESS != code) return code;
pRightData = &right;
} else {
right = *(Decimal*)pRightData;
} }
SDecimalOps* pOps = getDecimalOps(TSDB_DATA_TYPE_DECIMAL); SDecimalOps* pOps = getDecimalOps(TSDB_DATA_TYPE_DECIMAL);
switch (op) { switch (op) {
case OP_TYPE_ADD: case OP_TYPE_ADD:
pOps->add(&pLeft, &pRight, WORD_NUM(Decimal)); decimalAdd(&left, &lt, &right, &rt, pOutT);
break;
case OP_TYPE_SUB:
decimal128Negate(&right);
decimalAdd(&left, &lt, &right, &rt, pOutT);
break;
case OP_TYPE_MULTI:
code = decimalMultiply(&left, &lt, &right, &rt, pOutT);
break;
case OP_TYPE_DIV:
code = decimalDivide(&left, &lt, &right, &rt, pOutT);
break; break;
default: default:
break; break;
} }
return convertToDecimal(&pLeft, &tmpType, pOutputData, pOutT); if (0 == code && pOutT->type != TSDB_DATA_TYPE_DECIMAL) {
lt = *pOutT;
lt.type = TSDB_DATA_TYPE_DECIMAL;
code = convertToDecimal(&left, &lt, pOutputData, pOutT);
} else {
*(Decimal*)pOutputData = left;
}
return code;
} }
#define ABS_INT64(v) (v) == INT64_MIN ? (uint64_t)INT64_MAX + 1 : (uint64_t)llabs(v) #define ABS_INT64(v) (v) == INT64_MIN ? (uint64_t)INT64_MAX + 1 : (uint64_t)llabs(v)
@ -854,87 +1065,8 @@ static int32_t decimal128FromDecimal128(DecimalType* pDec, uint8_t prec, uint8_t
if (negative) decimal128Negate(pDec); if (negative) decimal128Negate(pDec);
return 0; return 0;
} }
#define CHECK_OVERFLOW_AND_MAKE_DECIMAL128(pDec, v, max, ABS) \
({ \
int32_t code = 0; \
Decimal128 dv = {0}; \
makeDecimal128(&dv, 0, ABS(v)); \
if (DECIMAL128_IS_OVERFLOW(dv, max)) { \
code = TSDB_CODE_DECIMAL_OVERFLOW; \
} else { \
makeDecimal128(pDec, v < 0 ? -1 : 0, ABS(v)); \
} \
code; \
})
#define MAKE_DECIMAL128_SIGNED(pDec, v, max) CHECK_OVERFLOW_AND_MAKE_DECIMAL128(pDec, v, max, ABS_INT64) #define CONVERT_TO_DECIMAL(pData, pInputType, pOut, pOutType, decimal) \
#define MAKE_DECIMAL128_UNSIGNED(pDec, v, max) CHECK_OVERFLOW_AND_MAKE_DECIMAL128(pDec, v, max, ABS_UINT64)
#define CONVERT_TO_DECIMAL(inputType, pInputData, pOut, CHECK_AND_MAKE_DECIMAL, max, code) \
do { \
int64_t v = 0; \
uint64_t uv = 0; \
switch (inputType) { \
case TSDB_DATA_TYPE_NULL: \
uv = 0; \
code = CHECK_AND_MAKE_DECIMAL##_UNSIGNED(pOut, uv, max); \
break; \
case TSDB_DATA_TYPE_BOOL: \
uv = *(bool*)pInputData; \
CHECK_AND_MAKE_DECIMAL##_UNSIGNED(pOut, uv, max); \
break; \
case TSDB_DATA_TYPE_TINYINT: \
v = *(int8_t*)pInputData; \
CHECK_AND_MAKE_DECIMAL##_SIGNED(pOut, v, max); \
break; \
case TSDB_DATA_TYPE_SMALLINT: \
v = *(int16_t*)pInputData; \
CHECK_AND_MAKE_DECIMAL##_SIGNED(pOut, v, max); \
break; \
case TSDB_DATA_TYPE_INT: \
v = *(int32_t*)pInputData; \
CHECK_AND_MAKE_DECIMAL##_SIGNED(pOut, v, max); \
break; \
case TSDB_DATA_TYPE_TIMESTAMP: \
case TSDB_DATA_TYPE_BIGINT: \
v = *(int64_t*)pInputData; \
CHECK_AND_MAKE_DECIMAL##_SIGNED(pOut, v, max); \
break; \
case TSDB_DATA_TYPE_UTINYINT: { \
uv = *(uint8_t*)pInputData; \
CHECK_AND_MAKE_DECIMAL##_UNSIGNED(pOut, uv, max); \
} break; \
case TSDB_DATA_TYPE_USMALLINT: { \
uv = *(uint16_t*)pInputData; \
CHECK_AND_MAKE_DECIMAL##_UNSIGNED(pOut, uv, max); \
} break; \
case TSDB_DATA_TYPE_UINT: { \
uv = *(uint32_t*)pInputData; \
CHECK_AND_MAKE_DECIMAL##_UNSIGNED(pOut, uv, max); \
} break; \
case TSDB_DATA_TYPE_UBIGINT: { \
uv = *(uint64_t*)pInputData; \
CHECK_AND_MAKE_DECIMAL##_UNSIGNED(pOut, uv, max); \
} break; \
case TSDB_DATA_TYPE_FLOAT: { \
} break; \
case TSDB_DATA_TYPE_DOUBLE: { \
} break; \
case TSDB_DATA_TYPE_VARCHAR: \
case TSDB_DATA_TYPE_VARBINARY: \
case TSDB_DATA_TYPE_NCHAR: { \
} break; \
case TSDB_DATA_TYPE_DECIMAL64: { \
} break; \
case TSDB_DATA_TYPE_DECIMAL: { \
} break; \
default: \
code = TSDB_CODE_OPS_NOT_SUPPORT; \
break; \
} \
} while (0)
#define CONVERT_TO_DECIMAL2(pData, pInputType, pOut, pOutType, decimal) \
({ \ ({ \
int32_t code = 0; \ int32_t code = 0; \
int64_t val = 0; \ int64_t val = 0; \
@ -988,11 +1120,6 @@ static int32_t decimal128FromDecimal128(DecimalType* pDec, uint8_t prec, uint8_t
dval = *(const double*)pData; \ dval = *(const double*)pData; \
code = decimal##FromDouble(pOut, pOutType->precision, pOutType->scale, dval); \ code = decimal##FromDouble(pOut, pOutType->precision, pOutType->scale, dval); \
} break; \ } break; \
case TSDB_DATA_TYPE_VARCHAR: \
case TSDB_DATA_TYPE_VARBINARY: \
case TSDB_DATA_TYPE_NCHAR: { \
/*decimal##FromStr()*/ \
} break; \
case TSDB_DATA_TYPE_DECIMAL64: { \ case TSDB_DATA_TYPE_DECIMAL64: { \
code = decimal##FromDecimal64(pOut, pOutType->precision, pOutType->scale, pData, pInputType->precision, \ code = decimal##FromDecimal64(pOut, pOutType->precision, pOutType->scale, pData, pInputType->precision, \
pInputType->scale); \ pInputType->scale); \
@ -1001,6 +1128,9 @@ static int32_t decimal128FromDecimal128(DecimalType* pDec, uint8_t prec, uint8_t
code = decimal##FromDecimal128(pOut, pOutType->precision, pOutType->scale, pData, pInputType->precision, \ code = decimal##FromDecimal128(pOut, pOutType->precision, pOutType->scale, pData, pInputType->precision, \
pInputType->scale); \ pInputType->scale); \
} break; \ } break; \
case TSDB_DATA_TYPE_VARCHAR: \
case TSDB_DATA_TYPE_VARBINARY: \
case TSDB_DATA_TYPE_NCHAR: \
default: \ default: \
code = TSDB_CODE_OPS_NOT_SUPPORT; \ code = TSDB_CODE_OPS_NOT_SUPPORT; \
break; \ break; \
@ -1014,10 +1144,10 @@ int32_t convertToDecimal(const void* pData, const SDataType* pInputType, void* p
switch (pOutType->type) { switch (pOutType->type) {
case TSDB_DATA_TYPE_DECIMAL64: { case TSDB_DATA_TYPE_DECIMAL64: {
code = CONVERT_TO_DECIMAL2(pData, pInputType, pOut, pOutType, decimal64); code = CONVERT_TO_DECIMAL(pData, pInputType, pOut, pOutType, decimal64);
} break; } break;
case TSDB_DATA_TYPE_DECIMAL: { case TSDB_DATA_TYPE_DECIMAL: {
code = CONVERT_TO_DECIMAL2(pData, pInputType, pOut, pOutType, decimal128); code = CONVERT_TO_DECIMAL(pData, pInputType, pOut, pOutType, decimal128);
} break; } break;
default: default:
code = TSDB_CODE_INTERNAL_ERROR; code = TSDB_CODE_INTERNAL_ERROR;
@ -1027,14 +1157,18 @@ int32_t convertToDecimal(const void* pData, const SDataType* pInputType, void* p
} }
void decimal64ScaleDown(Decimal64* pDec, uint8_t scaleDown) { void decimal64ScaleDown(Decimal64* pDec, uint8_t scaleDown) {
if (scaleDown > 0) {
Decimal64 divisor = SCALE_MULTIPLIER_64[scaleDown]; Decimal64 divisor = SCALE_MULTIPLIER_64[scaleDown];
decimal64divide(pDec, &divisor, WORD_NUM(Decimal64), NULL); decimal64divide(pDec, &divisor, WORD_NUM(Decimal64), NULL);
} }
}
void decimal64ScaleUp(Decimal64* pDec, uint8_t scaleUp) { void decimal64ScaleUp(Decimal64* pDec, uint8_t scaleUp) {
if (scaleUp > 0) {
Decimal64 multiplier = SCALE_MULTIPLIER_64[scaleUp]; Decimal64 multiplier = SCALE_MULTIPLIER_64[scaleUp];
decimal64Multiply(pDec, &multiplier, WORD_NUM(Decimal64)); decimal64Multiply(pDec, &multiplier, WORD_NUM(Decimal64));
} }
}
void decimal64ScaleTo(Decimal64* pDec, uint8_t oldScale, uint8_t newScale) { void decimal64ScaleTo(Decimal64* pDec, uint8_t oldScale, uint8_t newScale) {
if (newScale > oldScale) if (newScale > oldScale)
@ -1059,14 +1193,18 @@ int32_t decimal64FromStr(const char* str, int32_t len, uint8_t expectPrecision,
} }
void decimal128ScaleDown(Decimal128* pDec, uint8_t scaleDown) { void decimal128ScaleDown(Decimal128* pDec, uint8_t scaleDown) {
if (scaleDown > 0) {
Decimal128 divisor = SCALE_MULTIPLIER_128[scaleDown]; Decimal128 divisor = SCALE_MULTIPLIER_128[scaleDown];
decimal128Divide(pDec, &divisor, 2, NULL); decimal128Divide(pDec, &divisor, 2, NULL);
} }
}
void decimal128ScaleUp(Decimal128* pDec, uint8_t scaleUp) { void decimal128ScaleUp(Decimal128* pDec, uint8_t scaleUp) {
if (scaleUp > 0) {
Decimal128 multiplier = SCALE_MULTIPLIER_128[scaleUp]; Decimal128 multiplier = SCALE_MULTIPLIER_128[scaleUp];
decimal128Multiply(pDec, &multiplier, WORD_NUM(Decimal128)); decimal128Multiply(pDec, &multiplier, WORD_NUM(Decimal128));
} }
}
void decimal128ScaleTo(Decimal128* pDec, uint8_t oldScale, uint8_t newScale) { void decimal128ScaleTo(Decimal128* pDec, uint8_t oldScale, uint8_t newScale) {
if (newScale > oldScale) if (newScale > oldScale)
@ -1099,3 +1237,11 @@ __int128 decimal128ToInt128(const Decimal128* pDec) {
ret |= DECIMAL128_LOW_WORD(pDec); ret |= DECIMAL128_LOW_WORD(pDec);
return ret; return ret;
} }
int32_t decimal128CountLeadingBinaryZeros(const Decimal128* pDec) {
if (DECIMAL128_HIGH_WORD(pDec) == 0) {
return 64 + countLeadingZeros(DECIMAL128_LOW_WORD(pDec));
} else {
return countLeadingZeros((uint64_t)DECIMAL128_HIGH_WORD(pDec));
}
}

View File

@ -67,6 +67,214 @@ void checkDecimal(const DecimalType* pDec, uint8_t t, uint8_t prec, uint8_t scal
cout << "decimal" << (prec > 18 ? 128 : 64) << " " << (int32_t)prec << ":" << (int32_t)scale << " -> " << buf << endl; cout << "decimal" << (prec > 18 ? 128 : 64) << " " << (int32_t)prec << ":" << (int32_t)scale << " -> " << buf << endl;
} }
class Numeric128;
class Numeric64 {
Decimal64 dec_;
static constexpr uint8_t WORD_NUM = WORD_NUM(Decimal64);
public:
friend class Numeric128;
Numeric64() { dec_ = {0}; }
int32_t fromStr(const string& str, uint8_t prec, uint8_t scale) {
return decimal64FromStr(str.c_str(), str.size(), prec, scale, &dec_);
}
Numeric64& operator+=(const Numeric64& r) {
getOps()->add(&dec_, &r.dec_, WORD_NUM);
return *this;
}
// Numeric64& operator+=(const Numeric128& r);
bool operator==(const Numeric64& r) const { return getOps()->eq(&dec_, &r.dec_, WORD_NUM); }
Numeric64& operator=(const Numeric64& r);
Numeric64& operator=(const Numeric128& r);
static SDecimalOps* getOps() { return getDecimalOps(TSDB_DATA_TYPE_DECIMAL64); }
};
class Numeric128 {
Decimal128 dec_;
static constexpr uint8_t WORD_NUM = WORD_NUM(Decimal128);
public:
friend Numeric64;
Numeric128() { dec_ = {0}; }
Numeric128(const Numeric128& r) = default;
int32_t fromStr(const string& str, uint8_t prec, uint8_t scale) {
return decimal128FromStr(str.c_str(), str.size(), prec, scale, &dec_);
}
Numeric128& operator+=(const Numeric128& r) { return *this; }
Numeric128& operator+=(const Numeric64& r) {
getOps()->add(&dec_, &r.dec_, Numeric64::WORD_NUM);
return *this;
}
static SDecimalOps* getOps() { return getDecimalOps(TSDB_DATA_TYPE_DECIMAL); }
};
Numeric64& Numeric64::operator=(const Numeric64& r) {
dec_ = r.dec_;
return *this;
}
template <int ByteNum>
struct NumericType {};
template <>
struct NumericType<64> {
using Type = Numeric64;
static constexpr int8_t dataType = TSDB_DATA_TYPE_DECIMAL64;
static constexpr int8_t maxPrec = TSDB_DECIMAL64_MAX_PRECISION;
};
template <>
struct NumericType<128> {
using Type = Numeric128;
static constexpr int8_t dataType = TSDB_DATA_TYPE_DECIMAL;
static constexpr int8_t maxPrec = TSDB_DECIMAL_MAX_PRECISION;
};
template <int ByteNum>
class Numeric {
using Type = typename NumericType<ByteNum>::Type;
Type dec_;
uint8_t prec_;
uint8_t scale_;
public:
Numeric(uint8_t prec, uint8_t scale, const std::string& str) : prec_(prec), scale_(scale) {
if (prec > NumericType<ByteNum>::maxPrec) throw std::string("prec too big") + std::to_string(prec);
int32_t code = dec_.fromStr(str, prec, scale) != 0;
if (code != 0) {
cout << "failed to init decimal from str: " << str << endl;
throw std::string(tstrerror(code));
}
}
Numeric(const Numeric& o) = default;
~Numeric() = default;
SDataType getRetType(EOperatorType op, const SDataType& lt, const SDataType& rt) const {
SDataType ot = {0};
decimalGetRetType(&lt, &rt, op, &ot);
return ot;
}
uint8_t prec() const { return prec_; }
uint8_t scale() const { return scale_; }
const Type& dec() const { return dec_; }
template <int ByteNum2>
Numeric& binaryOp(const Numeric<ByteNum2>& r, EOperatorType op) {
auto out = binaryOp<ByteNum2, ByteNum>(r, op);
return *this = out;
}
template <int ByteNum2, int ByteNumO>
Numeric<ByteNumO> binaryOp(const Numeric<ByteNum2>& r, EOperatorType op) {
SDataType lt{.type = NumericType<ByteNum>::dataType, .precision = prec_, .scale = scale_, .bytes = ByteNum};
SDataType rt{.type = NumericType<ByteNum2>::dataType, .precision = r.prec(), .scale = r.scale(), .bytes = ByteNum2};
SDataType ot = getRetType(op, lt, rt);
Numeric<ByteNumO> out{ot.precision, ot.scale, "0"};
int32_t code = decimalOp(op, &lt, &rt, &ot, &dec_, &r.dec(), &out);
if (code != 0) throw std::overflow_error(tstrerror(code));
return out;
}
template <int ByteNum2, int ByteNumO = 128>
Numeric<ByteNumO> operator+(const Numeric<ByteNum2>& r) {
return binaryOp<ByteNum2, ByteNumO>(r, OP_TYPE_ADD);
}
template <int ByteNum2, int ByteNumO = 128>
Numeric<ByteNumO> operator-(const Numeric<ByteNum2>& r) {
return binaryOp<ByteNum2, ByteNumO>(r, OP_TYPE_SUB);
}
template <int ByteNum2, int ByteNumO = 128>
Numeric<ByteNumO> operator*(const Numeric<ByteNum2>& r) {
return binaryOp<ByteNum2, ByteNumO>(r, OP_TYPE_MULTI);
}
template <int ByteNum2, int ByteNumO = 128>
Numeric<ByteNumO> operator/(const Numeric<ByteNum2>& r) {
return binaryOp<ByteNum2, ByteNumO>(r, OP_TYPE_DIV);
}
template <int ByteNum2>
Numeric& operator+=(const Numeric<ByteNum2>& r) {
return binaryOp(r, OP_TYPE_ADD);
}
template <int ByteNum2>
bool operator==(const Numeric<ByteNum2>& r) {
return binaryOp(r, OP_TYPE_EQUAL);
}
std::string toString() const {
char buf[64] = {0};
int32_t code = decimalToStr(&dec_, NumericType<ByteNum>::dataType, prec(), scale(), buf, 64);
if (code != 0) throw std::string(tstrerror(code));
return {buf};
}
std::string toStringTrimTailingZeros() const {
auto ret = toString();
int32_t sizeToRemove = 0;
auto it = ret.rbegin();
for (; it != ret.rend(); ++it) {
if (*it == '0') {
++sizeToRemove;
continue;
}
break;
}
if (ret.size() - sizeToRemove > 0) ret.resize(ret.size() - sizeToRemove);
return ret;
}
};
template <int ByteNum>
ostream& operator<<(ostream& os, const Numeric<ByteNum>& n) {
os << n.toString() << "(" << (int32_t)n.prec() << ":" << (int32_t)n.scale() << ")";
return os;
}
TEST(decimal, numeric) {
Numeric<64> dec{10, 4, "123.456"};
Numeric<64> dec2{18, 10, "123456.123123"};
auto o = dec + dec2;
cout << dec << " + " << dec2 << " = " << o << endl;
ASSERT_EQ(o.toString(), "123579.5791230000");
Numeric<128> dec128{37, 10, "123456789012300.09876543"};
o = dec + dec128;
cout << dec << " + " << dec128 << " = " << o << endl;
ASSERT_EQ(o.toStringTrimTailingZeros(), "123456789012423.55476543");
ASSERT_EQ(o.toString(), "123456789012423.5547654300");
auto os = o - dec;
ASSERT_EQ(os.toStringTrimTailingZeros(), dec128.toStringTrimTailingZeros());
auto os2 = o - dec128;
ASSERT_EQ(os2.toStringTrimTailingZeros(), dec.toStringTrimTailingZeros());
os = dec * dec2;
cout << dec << " * " << dec2 << " = " << os << endl;
ASSERT_EQ(os.toStringTrimTailingZeros(), "15241399.136273088");
ASSERT_EQ(os.toString(), "15241399.13627308800000");
os = dec * dec128;
cout << dec << " * " << dec128 << " = " << os << endl;
ASSERT_EQ(os.toStringTrimTailingZeros(), "15241481344302520.993184");
ASSERT_EQ(os.toString(), "15241481344302520.993184");
os2 = os / dec128;
cout << os << " / " << dec128 << " = " << os2 << endl;
ASSERT_EQ(os2.toStringTrimTailingZeros(), "123.456");
ASSERT_EQ(os2.toString(), "123.456000");
os = dec2 / dec;
cout << dec2 << " / " << dec << " = " << os;
ASSERT_EQ(os.toString(), "1000.000997302682737169518");
}
TEST(decimal, a) { TEST(decimal, a) {
__int128 a = generate_big_int128(37); __int128 a = generate_big_int128(37);
extractWideInteger<9>(a); extractWideInteger<9>(a);
@ -118,7 +326,7 @@ TEST(decimal128, divide) {
} }
TEST(decimal, cpi_taos_fetch_rows) { TEST(decimal, cpi_taos_fetch_rows) {
GTEST_SKIP(); //GTEST_SKIP();
const char* host = "127.0.0.1"; const char* host = "127.0.0.1";
const char* user = "root"; const char* user = "root";
const char* passwd = "taosdata"; const char* passwd = "taosdata";
@ -169,7 +377,6 @@ TEST(decimal, cpi_taos_fetch_rows) {
taos_cleanup(); taos_cleanup();
} }
TEST(decimal, conversion) { TEST(decimal, conversion) {
// convert uint8 to decimal // convert uint8 to decimal
char buf[64] = {0}; char buf[64] = {0};