From e86c501df14a3f675152a7a3163d7b47e5b181d5 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Thu, 30 Apr 2020 15:07:51 +0800 Subject: [PATCH] optimize compression --- src/util/inc/tscompression.h | 250 ++++++++++++++++++++++++++++++----- src/util/src/tcompression.c | 217 ------------------------------ 2 files changed, 214 insertions(+), 253 deletions(-) diff --git a/src/util/inc/tscompression.h b/src/util/inc/tscompression.h index a1a3c060be..9398ff8243 100644 --- a/src/util/inc/tscompression.h +++ b/src/util/inc/tscompression.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "taosdef.h" +#include "tutil.h" #define COMP_OVERFLOW_BYTES 2 #define BITS_PER_BYTE 8 @@ -33,43 +34,220 @@ extern "C" { #define ONE_STAGE_COMP 1 #define TWO_STAGE_COMP 2 -int tsCompressTinyint(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorithm, - char* const buffer, int bufferSize); -int tsCompressSmallint(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, - char* const buffer, int bufferSize); -int tsCompressInt(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, - char* const buffer, int bufferSize); -int tsCompressBigint(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, - char* const buffer, int bufferSize); -int tsCompressBool(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorithm, - char* const buffer, int bufferSize); -int tsCompressString(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, - char* const buffer, int bufferSize); -int tsCompressFloat(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, - char* const buffer, int bufferSize); -int tsCompressDouble(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, - char* const buffer, int bufferSize); -int tsCompressTimestamp(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, - char* const buffer, int bufferSize); +extern int tsCompressINTImp(const char *const input, const int nelements, char *const output, const char type); +extern int tsDecompressINTImp(const char *const input, const int nelements, char *const output, const char type); +extern int tsCompressBoolImp(const char *const input, const int nelements, char *const output); +extern int tsDecompressBoolImp(const char *const input, const int nelements, char *const output); +extern int tsCompressStringImp(const char *const input, int inputSize, char *const output, int outputSize); +extern int tsDecompressStringImp(const char *const input, int compressedSize, char *const output, int outputSize); +extern int tsCompressTimestampImp(const char *const input, const int nelements, char *const output); +extern int tsDecompressTimestampImp(const char *const input, const int nelements, char *const output); +extern int tsCompressDoubleImp(const char *const input, const int nelements, char *const output); +extern int tsDecompressDoubleImp(const char *const input, const int nelements, char *const output); +extern int tsCompressFloatImp(const char *const input, const int nelements, char *const output); +extern int tsDecompressFloatImp(const char *const input, const int nelements, char *const output); -int tsDecompressTinyint(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorithm, char* const buffer, int bufferSize); -int tsDecompressSmallint(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorithm, char* const buffer, int bufferSize); -int tsDecompressInt(const char* const input, int compressedSize, const int nelements, char* const output, int outputSize, - char algorithm, char* const buffer, int bufferSize); -int tsDecompressBigint(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorithm, char* const buffer, int bufferSize); -int tsDecompressBool(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorithm, char* const buffer, int bufferSize); -int tsDecompressString(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorithm, char* const buffer, int bufferSize); -int tsDecompressFloat(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorithm, char* const buffer, int bufferSize); -int tsDecompressDouble(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorith, char* const buffer, int bufferSize); -int tsDecompressTimestamp(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorithm, char* const buffer, int bufferSize); +static FORCE_INLINE int tsCompressTinyint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, + char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_TINYINT); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressTinyint(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_TINYINT); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsCompressSmallint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, + char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_SMALLINT); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressSmallint(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_SMALLINT); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsCompressInt(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, + char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_INT); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressInt(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_INT); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsCompressBigint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, + char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_BIGINT); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressBigint(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_BIGINT); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsCompressBool(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, + char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressBoolImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressBoolImp(input, nelements, buffer); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressBool(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressBoolImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressBoolImp(buffer, nelements, output); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsCompressString(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, + char algorithm, char *const buffer, int bufferSize) { + return tsCompressStringImp(input, inputSize, output, outputSize); +} + +static FORCE_INLINE int tsDecompressString(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + return tsDecompressStringImp(input, compressedSize, output, outputSize); +} + +static FORCE_INLINE int tsCompressFloat(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, + char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressFloatImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressFloatImp(input, nelements, buffer); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressFloat(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressFloatImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressFloatImp(buffer, nelements, output); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsCompressDouble(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, + char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressDoubleImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressDoubleImp(input, nelements, buffer); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressDouble(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressDoubleImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressDoubleImp(buffer, nelements, output); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsCompressTimestamp(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, + char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressTimestampImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressTimestampImp(input, nelements, buffer); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressTimestamp(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressTimestampImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressTimestampImp(buffer, nelements, output); + } else { + assert(0); + } +} #ifdef __cplusplus } diff --git a/src/util/src/tcompression.c b/src/util/src/tcompression.c index 24a53b3fe4..e3b3d65052 100644 --- a/src/util/src/tcompression.c +++ b/src/util/src/tcompression.c @@ -56,223 +56,6 @@ const int TEST_NUMBER = 1; #define is_bigendian() ((*(char *)&TEST_NUMBER) == 0) #define SIMPLE8B_MAX_INT64 ((uint64_t)2305843009213693951L) -// Function declarations -int tsCompressINTImp(const char *const input, const int nelements, char *const output, const char type); -int tsDecompressINTImp(const char *const input, const int nelements, char *const output, const char type); -int tsCompressBoolImp(const char *const input, const int nelements, char *const output); -int tsDecompressBoolImp(const char *const input, const int nelements, char *const output); -int tsCompressStringImp(const char *const input, int inputSize, char *const output, int outputSize); -int tsDecompressStringImp(const char *const input, int compressedSize, char *const output, int outputSize); -int tsCompressTimestampImp(const char *const input, const int nelements, char *const output); -int tsDecompressTimestampImp(const char *const input, const int nelements, char *const output); -int tsCompressDoubleImp(const char *const input, const int nelements, char *const output); -int tsDecompressDoubleImp(const char *const input, const int nelements, char *const output); -int tsCompressFloatImp(const char *const input, const int nelements, char *const output); -int tsDecompressFloatImp(const char *const input, const int nelements, char *const output); - -/* ----------------------------------------------Compression function used by - * others ---------------------------------------------- */ -int tsCompressTinyint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, - char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_TINYINT); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressTinyint(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_TINYINT); - } else { - assert(0); - } -} - -int tsCompressSmallint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, - char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_SMALLINT); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressSmallint(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_SMALLINT); - } else { - assert(0); - } -} - -int tsCompressInt(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, - char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_INT); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressInt(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_INT); - } else { - assert(0); - } -} - -int tsCompressBigint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, - char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_BIGINT); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressBigint(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_BIGINT); - } else { - assert(0); - } -} - -int tsCompressBool(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, - char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressBoolImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressBoolImp(input, nelements, buffer); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressBool(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressBoolImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressBoolImp(buffer, nelements, output); - } else { - assert(0); - } -} - -int tsCompressString(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, - char algorithm, char *const buffer, int bufferSize) { - return tsCompressStringImp(input, inputSize, output, outputSize); -} - -int tsDecompressString(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - return tsDecompressStringImp(input, compressedSize, output, outputSize); -} - -int tsCompressFloat(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, - char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressFloatImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressFloatImp(input, nelements, buffer); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressFloat(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressFloatImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressFloatImp(buffer, nelements, output); - } else { - assert(0); - } -} -int tsCompressDouble(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, - char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressDoubleImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressDoubleImp(input, nelements, buffer); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressDouble(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressDoubleImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressDoubleImp(buffer, nelements, output); - } else { - assert(0); - } -} - -int tsCompressTimestamp(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, - char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressTimestampImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressTimestampImp(input, nelements, buffer); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressTimestamp(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressTimestampImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressTimestampImp(buffer, nelements, output); - } else { - assert(0); - } -} - bool safeInt64Add(int64_t a, int64_t b) { if ((a > 0 && b > INT64_MAX - a) || (a < 0 && b < INT64_MIN - a)) return false; return true;