diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 36242ddea8..b74befcdd3 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -534,13 +534,22 @@ struct SSchema { char name[TSDB_COL_NAME_LEN]; }; +// compress flag + +// |----l1 compAlg----|-----l2 compAlg---|---level--| +// |------8bit--------|------16bit-------|---8bit---| + +#define COMPRESS_L1_TYPE(type) ((type)&0xFF) +#define COMPRESS_L2_TYPE(type) (((type) >> 8) & 0xFFFF) +#define COMPRESS_L2_TYPE_LEVEL(type) (((type) >> 24) & 0xFF) + struct SSchema2 { int8_t type; int8_t flags; col_id_t colId; int32_t bytes; char name[TSDB_COL_NAME_LEN]; - char alias[TSDB_COL_NAME_LEN]; + uint32_t compress; }; typedef struct { @@ -584,7 +593,7 @@ typedef struct { int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp); int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp); // void tFreeSSubmitBlkRsp(void* param); -void tFreeSSubmitRsp(SSubmitRsp* pRsp); +void tFreeSSubmitRsp(SSubmitRsp* pRsp); #define COL_SMA_ON ((int8_t)0x1) #define COL_IDX_ON ((int8_t)0x2) @@ -1582,13 +1591,13 @@ int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); void tFreeSStatusReq(SStatusReq* pReq); typedef struct { - int32_t contLen; - char* pCont; + int32_t contLen; + char* pCont; } SStatisReq; int32_t tSerializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq); int32_t tDeserializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq); -void tFreeSStatisReq(SStatisReq *pReq); +void tFreeSStatisReq(SStatisReq* pReq); typedef struct { int32_t dnodeId; @@ -1946,7 +1955,7 @@ typedef struct { int32_t tSerializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq); // int32_t tDeserializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq); -void tFreeSShowReq(SShowReq* pReq); +void tFreeSShowReq(SShowReq* pReq); typedef struct { int64_t showId; @@ -2735,7 +2744,7 @@ typedef struct { SVCreateTbReq* pReqs; SArray* pArray; }; - int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient + int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient } SVCreateTbBatchReq; int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq); @@ -2828,7 +2837,7 @@ typedef struct { int32_t newCommentLen; char* newComment; int64_t ctimeMs; // fill by vnode - int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient + int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient } SVAlterTbReq; int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq); @@ -3932,7 +3941,7 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); #define SUBMIT_REQ_FROM_FILE 0x4 #define TD_REQ_FROM_TAOX 0x8 -#define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility +#define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility typedef struct { int32_t flags; diff --git a/include/util/tcompression.h b/include/util/tcompression.h index 75ddbb12e7..5d42cd79cd 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -58,15 +58,17 @@ extern "C" { #ifdef TD_TSZ extern bool lossyFloat; extern bool lossyDouble; -int32_t tsCompressInit(char* lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, uint32_t intervals, - int32_t ifAdtFse, const char* compressor); +int32_t tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, + uint32_t intervals, int32_t ifAdtFse, const char *compressor); -void tsCompressExit(); +void tsCompressExit(); int32_t tsCompressFloatLossyImp(const char *const input, const int32_t nelements, char *const output); -int32_t tsDecompressFloatLossyImp(const char *const input, int32_t compressedSize, const int32_t nelements, char *const output); +int32_t tsDecompressFloatLossyImp(const char *const input, int32_t compressedSize, const int32_t nelements, + char *const output); int32_t tsCompressDoubleLossyImp(const char *const input, const int32_t nelements, char *const output); -int32_t tsDecompressDoubleLossyImp(const char *const input, int32_t compressedSize, const int32_t nelements, char *const output); +int32_t tsDecompressDoubleLossyImp(const char *const input, int32_t compressedSize, const int32_t nelements, + char *const output); static FORCE_INLINE int32_t tsCompressFloatLossy(const char *const input, int32_t inputSize, const int32_t nelements, char *const output, int32_t outputSize, char algorithm, @@ -139,8 +141,9 @@ int32_t getWordLength(char type); int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type); int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output); int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output); -int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelements, char *const output, bool bigEndian); -int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelements, char *const output, bool bigEndian); +int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, + bool bigEndian); +int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian); /************************************************************************* * STREAM COMPRESSION @@ -153,6 +156,29 @@ int32_t tCompressStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppOut, int32_t *nOut, int32_t *nOrigin); int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData); + +typedef int32_t (*__data_compress_init)(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, + uint32_t intervals, int32_t ifAdtFse, const char *compressor); +typedef int32_t (*__data_compress_l1_fn_t)(const char *const input, const int32_t nelements, char *const output, + const char type); +typedef int32_t (*__data_decompress_l1_fn_t)(const char *const input, const int32_t nelements, char *const output, + const char type); + +typedef int32_t (*__data_compress_l2_fn_t)(const char *const input, const int32_t nelements, char *const output, + const char type); +typedef int32_t (*__data_decompress_l2_fn_t)(const char *const input, const int32_t nelements, char *const output, + const char type); + +typedef struct { + int8_t type; + int8_t level; + __data_compress_init initFn; + __data_compress_l1_fn_t l1CompFn; + __data_decompress_l1_fn_t l1DeCompFn; + __data_compress_l2_fn_t l2CompFn; + __data_decompress_l2_fn_t l2DeCompFn; +} TCompressPara; + #ifdef __cplusplus } #endif diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 656e2706f2..3f881a410a 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -54,6 +54,11 @@ #include "tlog.h" #include "ttypes.h" + + +TCompressPara compressDict = { + {}, +} #ifdef TD_TSZ #include "td_sz.h" #endif @@ -62,15 +67,16 @@ static const int32_t TEST_NUMBER = 1; #define is_bigendian() ((*(char *)&TEST_NUMBER) == 0) #define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL) -#define safeInt64Add(a, b) (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a))) +#define safeInt64Add(a, b) (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a))) #ifdef TD_TSZ bool lossyFloat = false; bool lossyDouble = false; + // init call -int32_t tsCompressInit(char* lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, uint32_t intervals, - int32_t ifAdtFse, const char* compressor) { +int32_t tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, + uint32_t intervals, int32_t ifAdtFse, const char *compressor) { // config lossyFloat = strstr(lossyColumns, "float") != NULL; lossyDouble = strstr(lossyColumns, "double") != NULL; @@ -861,7 +867,7 @@ uint32_t decodeFloatValue(const char *const input, int32_t *const ipos, uint8_t return diff; } -static void tsDecompressFloatHelper(const char *const input, const int32_t nelements, float* ostream) { +static void tsDecompressFloatHelper(const char *const input, const int32_t nelements, float *ostream) { uint8_t flags = 0; int32_t ipos = 1; int32_t opos = 0; @@ -899,8 +905,8 @@ int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, c tsDecompressFloatImplAvx2(input, nelements, output); } else if (tsSIMDEnable && tsAVX512Enable) { tsDecompressFloatImplAvx512(input, nelements, output); - } else { // alternative implementation without SIMD instructions. - tsDecompressFloatHelper(input, nelements, (float*)output); + } else { // alternative implementation without SIMD instructions. + tsDecompressFloatHelper(input, nelements, (float *)output); } return nelements * FLOAT_BYTES;