add compress dict

This commit is contained in:
yihaoDeng 2024-02-28 07:58:24 +00:00
parent 09a04ed0a4
commit bc98978b33
3 changed files with 63 additions and 22 deletions

View File

@ -534,13 +534,22 @@ struct SSchema {
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN];
}; };
// compress flag
// |----l1 compAlg----|-----l2 compAlg---|---level--|
// |------8bit--------|------16bit-------|---8bit---|
#define COMPRESS_L1_TYPE(type) ((type)&0xFF)
#define COMPRESS_L2_TYPE(type) (((type) >> 8) & 0xFFFF)
#define COMPRESS_L2_TYPE_LEVEL(type) (((type) >> 24) & 0xFF)
struct SSchema2 { struct SSchema2 {
int8_t type; int8_t type;
int8_t flags; int8_t flags;
col_id_t colId; col_id_t colId;
int32_t bytes; int32_t bytes;
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN];
char alias[TSDB_COL_NAME_LEN]; uint32_t compress;
}; };
typedef struct { typedef struct {
@ -584,7 +593,7 @@ typedef struct {
int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp); int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp); int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
// void tFreeSSubmitBlkRsp(void* param); // void tFreeSSubmitBlkRsp(void* param);
void tFreeSSubmitRsp(SSubmitRsp* pRsp); void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_SMA_ON ((int8_t)0x1) #define COL_SMA_ON ((int8_t)0x1)
#define COL_IDX_ON ((int8_t)0x2) #define COL_IDX_ON ((int8_t)0x2)
@ -1582,13 +1591,13 @@ int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
void tFreeSStatusReq(SStatusReq* pReq); void tFreeSStatusReq(SStatusReq* pReq);
typedef struct { typedef struct {
int32_t contLen; int32_t contLen;
char* pCont; char* pCont;
} SStatisReq; } SStatisReq;
int32_t tSerializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq); int32_t tSerializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq);
int32_t tDeserializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq); int32_t tDeserializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq);
void tFreeSStatisReq(SStatisReq *pReq); void tFreeSStatisReq(SStatisReq* pReq);
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
@ -1946,7 +1955,7 @@ typedef struct {
int32_t tSerializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq); int32_t tSerializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq);
// int32_t tDeserializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq); // int32_t tDeserializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq);
void tFreeSShowReq(SShowReq* pReq); void tFreeSShowReq(SShowReq* pReq);
typedef struct { typedef struct {
int64_t showId; int64_t showId;
@ -2735,7 +2744,7 @@ typedef struct {
SVCreateTbReq* pReqs; SVCreateTbReq* pReqs;
SArray* pArray; SArray* pArray;
}; };
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
} SVCreateTbBatchReq; } SVCreateTbBatchReq;
int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq); int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
@ -2828,7 +2837,7 @@ typedef struct {
int32_t newCommentLen; int32_t newCommentLen;
char* newComment; char* newComment;
int64_t ctimeMs; // fill by vnode int64_t ctimeMs; // fill by vnode
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
} SVAlterTbReq; } SVAlterTbReq;
int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq); int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq);
@ -3932,7 +3941,7 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
#define SUBMIT_REQ_FROM_FILE 0x4 #define SUBMIT_REQ_FROM_FILE 0x4
#define TD_REQ_FROM_TAOX 0x8 #define TD_REQ_FROM_TAOX 0x8
#define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility #define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility
typedef struct { typedef struct {
int32_t flags; int32_t flags;

View File

@ -58,15 +58,17 @@ extern "C" {
#ifdef TD_TSZ #ifdef TD_TSZ
extern bool lossyFloat; extern bool lossyFloat;
extern bool lossyDouble; extern bool lossyDouble;
int32_t tsCompressInit(char* lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, uint32_t intervals, int32_t tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
int32_t ifAdtFse, const char* compressor); uint32_t intervals, int32_t ifAdtFse, const char *compressor);
void tsCompressExit(); void tsCompressExit();
int32_t tsCompressFloatLossyImp(const char *const input, const int32_t nelements, char *const output); int32_t tsCompressFloatLossyImp(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressFloatLossyImp(const char *const input, int32_t compressedSize, const int32_t nelements, char *const output); int32_t tsDecompressFloatLossyImp(const char *const input, int32_t compressedSize, const int32_t nelements,
char *const output);
int32_t tsCompressDoubleLossyImp(const char *const input, const int32_t nelements, char *const output); int32_t tsCompressDoubleLossyImp(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressDoubleLossyImp(const char *const input, int32_t compressedSize, const int32_t nelements, char *const output); int32_t tsDecompressDoubleLossyImp(const char *const input, int32_t compressedSize, const int32_t nelements,
char *const output);
static FORCE_INLINE int32_t tsCompressFloatLossy(const char *const input, int32_t inputSize, const int32_t nelements, static FORCE_INLINE int32_t tsCompressFloatLossy(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const output, int32_t outputSize, char algorithm,
@ -139,8 +141,9 @@ int32_t getWordLength(char type);
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type); int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output); int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output); int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelements, char *const output, bool bigEndian); int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output,
int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelements, char *const output, bool bigEndian); bool bigEndian);
int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian);
/************************************************************************* /*************************************************************************
* STREAM COMPRESSION * STREAM COMPRESSION
@ -153,6 +156,29 @@ int32_t tCompressStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg);
int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppOut, int32_t *nOut, int32_t *nOrigin); int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppOut, int32_t *nOut, int32_t *nOrigin);
int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData); int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData);
typedef int32_t (*__data_compress_init)(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
uint32_t intervals, int32_t ifAdtFse, const char *compressor);
typedef int32_t (*__data_compress_l1_fn_t)(const char *const input, const int32_t nelements, char *const output,
const char type);
typedef int32_t (*__data_decompress_l1_fn_t)(const char *const input, const int32_t nelements, char *const output,
const char type);
typedef int32_t (*__data_compress_l2_fn_t)(const char *const input, const int32_t nelements, char *const output,
const char type);
typedef int32_t (*__data_decompress_l2_fn_t)(const char *const input, const int32_t nelements, char *const output,
const char type);
typedef struct {
int8_t type;
int8_t level;
__data_compress_init initFn;
__data_compress_l1_fn_t l1CompFn;
__data_decompress_l1_fn_t l1DeCompFn;
__data_compress_l2_fn_t l2CompFn;
__data_decompress_l2_fn_t l2DeCompFn;
} TCompressPara;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -54,6 +54,11 @@
#include "tlog.h" #include "tlog.h"
#include "ttypes.h" #include "ttypes.h"
TCompressPara compressDict = {
{},
}
#ifdef TD_TSZ #ifdef TD_TSZ
#include "td_sz.h" #include "td_sz.h"
#endif #endif
@ -62,15 +67,16 @@ static const int32_t TEST_NUMBER = 1;
#define is_bigendian() ((*(char *)&TEST_NUMBER) == 0) #define is_bigendian() ((*(char *)&TEST_NUMBER) == 0)
#define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL) #define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL)
#define safeInt64Add(a, b) (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a))) #define safeInt64Add(a, b) (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a)))
#ifdef TD_TSZ #ifdef TD_TSZ
bool lossyFloat = false; bool lossyFloat = false;
bool lossyDouble = false; bool lossyDouble = false;
// init call // init call
int32_t tsCompressInit(char* lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, uint32_t intervals, int32_t tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
int32_t ifAdtFse, const char* compressor) { uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
// config // config
lossyFloat = strstr(lossyColumns, "float") != NULL; lossyFloat = strstr(lossyColumns, "float") != NULL;
lossyDouble = strstr(lossyColumns, "double") != NULL; lossyDouble = strstr(lossyColumns, "double") != NULL;
@ -861,7 +867,7 @@ uint32_t decodeFloatValue(const char *const input, int32_t *const ipos, uint8_t
return diff; return diff;
} }
static void tsDecompressFloatHelper(const char *const input, const int32_t nelements, float* ostream) { static void tsDecompressFloatHelper(const char *const input, const int32_t nelements, float *ostream) {
uint8_t flags = 0; uint8_t flags = 0;
int32_t ipos = 1; int32_t ipos = 1;
int32_t opos = 0; int32_t opos = 0;
@ -899,8 +905,8 @@ int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, c
tsDecompressFloatImplAvx2(input, nelements, output); tsDecompressFloatImplAvx2(input, nelements, output);
} else if (tsSIMDEnable && tsAVX512Enable) { } else if (tsSIMDEnable && tsAVX512Enable) {
tsDecompressFloatImplAvx512(input, nelements, output); tsDecompressFloatImplAvx512(input, nelements, output);
} else { // alternative implementation without SIMD instructions. } else { // alternative implementation without SIMD instructions.
tsDecompressFloatHelper(input, nelements, (float*)output); tsDecompressFloatHelper(input, nelements, (float *)output);
} }
return nelements * FLOAT_BYTES; return nelements * FLOAT_BYTES;