tsdb support compress
This commit is contained in:
parent
ec227180e1
commit
f607cd60e3
|
@ -57,9 +57,9 @@ const static uint8_t BIT2_MAP[4] = {0b11111100, 0b11110011, 0b11001111, 0b001111
|
||||||
#define ONE ((uint8_t)1)
|
#define ONE ((uint8_t)1)
|
||||||
#define THREE ((uint8_t)3)
|
#define THREE ((uint8_t)3)
|
||||||
#define DIV_8(i) ((i) >> 3)
|
#define DIV_8(i) ((i) >> 3)
|
||||||
#define MOD_8(i) ((i) & 7)
|
#define MOD_8(i) ((i)&7)
|
||||||
#define DIV_4(i) ((i) >> 2)
|
#define DIV_4(i) ((i) >> 2)
|
||||||
#define MOD_4(i) ((i) & 3)
|
#define MOD_4(i) ((i)&3)
|
||||||
#define MOD_4_TIME_2(i) (MOD_4(i) << 1)
|
#define MOD_4_TIME_2(i) (MOD_4(i) << 1)
|
||||||
#define BIT1_SIZE(n) (DIV_8((n)-1) + 1)
|
#define BIT1_SIZE(n) (DIV_8((n)-1) + 1)
|
||||||
#define BIT2_SIZE(n) (DIV_4((n)-1) + 1)
|
#define BIT2_SIZE(n) (DIV_4((n)-1) + 1)
|
||||||
|
@ -97,7 +97,7 @@ const static uint8_t BIT2_MAP[4] = {0b11111100, 0b11110011, 0b11001111, 0b001111
|
||||||
|
|
||||||
// SValueColumn ================================
|
// SValueColumn ================================
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t cmprAlg; // filled by caller
|
uint32_t cmprAlg; // filled by caller
|
||||||
int8_t type;
|
int8_t type;
|
||||||
int32_t dataOriginalSize;
|
int32_t dataOriginalSize;
|
||||||
int32_t dataCompressedSize;
|
int32_t dataCompressedSize;
|
||||||
|
@ -149,7 +149,7 @@ int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, voi
|
||||||
|
|
||||||
// SColData ================================
|
// SColData ================================
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t cmprAlg; // filled by caller
|
uint32_t cmprAlg; // filled by caller
|
||||||
int8_t columnFlag;
|
int8_t columnFlag;
|
||||||
int8_t flag;
|
int8_t flag;
|
||||||
int8_t dataType;
|
int8_t dataType;
|
||||||
|
@ -338,8 +338,8 @@ struct SValueColumn {
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t dataType; // filled by caller
|
int32_t dataType; // filled by caller
|
||||||
int8_t cmprAlg; // filled by caller
|
uint32_t cmprAlg; // filled by caller
|
||||||
int32_t originalSize; // filled by caller
|
int32_t originalSize; // filled by caller
|
||||||
int32_t compressedSize;
|
int32_t compressedSize;
|
||||||
} SCompressInfo;
|
} SCompressInfo;
|
||||||
|
|
|
@ -269,8 +269,10 @@ typedef struct {
|
||||||
(IS_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
|
(IS_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
|
||||||
|
|
||||||
#define IS_VAR_DATA_TYPE(t) \
|
#define IS_VAR_DATA_TYPE(t) \
|
||||||
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR) || ((t) == TSDB_DATA_TYPE_JSON) || ((t) == TSDB_DATA_TYPE_GEOMETRY))
|
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR) || \
|
||||||
#define IS_STR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR))
|
((t) == TSDB_DATA_TYPE_JSON) || ((t) == TSDB_DATA_TYPE_GEOMETRY))
|
||||||
|
#define IS_STR_DATA_TYPE(t) \
|
||||||
|
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR))
|
||||||
|
|
||||||
#define IS_VALID_TINYINT(_t) ((_t) >= INT8_MIN && (_t) <= INT8_MAX)
|
#define IS_VALID_TINYINT(_t) ((_t) >= INT8_MIN && (_t) <= INT8_MAX)
|
||||||
#define IS_VALID_SMALLINT(_t) ((_t) >= INT16_MIN && (_t) <= INT16_MAX)
|
#define IS_VALID_SMALLINT(_t) ((_t) >= INT16_MIN && (_t) <= INT16_MAX)
|
||||||
|
@ -340,13 +342,28 @@ typedef struct tDataTypeDescriptor {
|
||||||
int32_t nBuf);
|
int32_t nBuf);
|
||||||
} tDataTypeDescriptor;
|
} tDataTypeDescriptor;
|
||||||
|
|
||||||
|
typedef struct tDataTypeCompress {
|
||||||
|
int16_t type;
|
||||||
|
int16_t nameLen;
|
||||||
|
int32_t bytes;
|
||||||
|
char *name;
|
||||||
|
int64_t minValue;
|
||||||
|
int64_t maxValue;
|
||||||
|
int32_t (*compFunc)(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
int32_t (*decompFunc)(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
} tDataTypeCompress;
|
||||||
|
|
||||||
extern tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX];
|
extern tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX];
|
||||||
|
extern tDataTypeCompress tDataCompress[TSDB_DATA_TYPE_MAX];
|
||||||
|
|
||||||
bool isValidDataType(int32_t type);
|
bool isValidDataType(int32_t type);
|
||||||
|
|
||||||
int32_t operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type);
|
int32_t operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type);
|
||||||
void assignVal(char *val, const char *src, int32_t len, int32_t type);
|
void assignVal(char *val, const char *src, int32_t len, int32_t type);
|
||||||
void *getDataMin(int32_t type, void* value);
|
void *getDataMin(int32_t type, void *value);
|
||||||
void *getDataMax(int32_t type, void* value);
|
void *getDataMax(int32_t type, void *value);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,6 +145,55 @@ int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelem
|
||||||
bool bigEndian);
|
bool bigEndian);
|
||||||
int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian);
|
int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian);
|
||||||
|
|
||||||
|
/*************************************************************************
|
||||||
|
* REGULAR COMPRESSION 2
|
||||||
|
*************************************************************************/
|
||||||
|
int32_t tsCompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
|
||||||
|
void *pBuf, int32_t nBuf);
|
||||||
|
int32_t tsDecompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
|
||||||
|
void *pBuf, int32_t nBuf);
|
||||||
|
int32_t tsCompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
int32_t tsCompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
int32_t tsDecompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
int32_t tsCompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
int32_t tsDecompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
int32_t tsCompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
int32_t tsDecompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
|
||||||
|
void *pBuf, int32_t nBuf);
|
||||||
|
int32_t tsCompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
int32_t tsDecompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
|
||||||
|
void *pBuf, int32_t nBuf);
|
||||||
|
int32_t tsCompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
int32_t tsDecompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf);
|
||||||
|
// for internal usage
|
||||||
|
int32_t getWordLength(char type);
|
||||||
|
|
||||||
|
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type);
|
||||||
|
int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output);
|
||||||
|
int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output);
|
||||||
|
int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output,
|
||||||
|
bool bigEndian);
|
||||||
|
int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian);
|
||||||
|
|
||||||
/*************************************************************************
|
/*************************************************************************
|
||||||
* STREAM COMPRESSION
|
* STREAM COMPRESSION
|
||||||
*************************************************************************/
|
*************************************************************************/
|
||||||
|
|
|
@ -4183,7 +4183,7 @@ int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *info, SBu
|
||||||
uint8_t fmtVer = 0;
|
uint8_t fmtVer = 0;
|
||||||
|
|
||||||
if ((code = tBufferPutU8(buffer, fmtVer))) return code;
|
if ((code = tBufferPutU8(buffer, fmtVer))) return code;
|
||||||
if ((code = tBufferPutI8(buffer, info->cmprAlg))) return code;
|
if ((code = tBufferPutU32(buffer, info->cmprAlg))) return code;
|
||||||
if ((code = tBufferPutI8(buffer, info->type))) return code;
|
if ((code = tBufferPutI8(buffer, info->type))) return code;
|
||||||
if (IS_VAR_DATA_TYPE(info->type)) {
|
if (IS_VAR_DATA_TYPE(info->type)) {
|
||||||
if ((code = tBufferPutI32v(buffer, info->offsetOriginalSize))) return code;
|
if ((code = tBufferPutI32v(buffer, info->offsetOriginalSize))) return code;
|
||||||
|
@ -4201,7 +4201,7 @@ int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompre
|
||||||
|
|
||||||
if ((code = tBufferGetU8(reader, &fmtVer))) return code;
|
if ((code = tBufferGetU8(reader, &fmtVer))) return code;
|
||||||
if (fmtVer == 0) {
|
if (fmtVer == 0) {
|
||||||
if ((code = tBufferGetI8(reader, &info->cmprAlg))) return code;
|
if ((code = tBufferGetU32(reader, &info->cmprAlg))) return code;
|
||||||
if ((code = tBufferGetI8(reader, &info->type))) return code;
|
if ((code = tBufferGetI8(reader, &info->type))) return code;
|
||||||
if (IS_VAR_DATA_TYPE(info->type)) {
|
if (IS_VAR_DATA_TYPE(info->type)) {
|
||||||
if ((code = tBufferGetI32v(reader, &info->offsetOriginalSize))) return code;
|
if ((code = tBufferGetI32v(reader, &info->offsetOriginalSize))) return code;
|
||||||
|
@ -4234,7 +4234,7 @@ int32_t tCompressData(void *input, // input
|
||||||
if (info->cmprAlg == NO_COMPRESSION) {
|
if (info->cmprAlg == NO_COMPRESSION) {
|
||||||
memcpy(output, input, info->originalSize);
|
memcpy(output, input, info->originalSize);
|
||||||
info->compressedSize = info->originalSize;
|
info->compressedSize = info->originalSize;
|
||||||
} else {
|
} else if (info->cmprAlg == TWO_STAGE_COMP) {
|
||||||
SBuffer local;
|
SBuffer local;
|
||||||
|
|
||||||
tBufferInit(&local);
|
tBufferInit(&local);
|
||||||
|
@ -4266,6 +4266,8 @@ int32_t tCompressData(void *input, // input
|
||||||
}
|
}
|
||||||
|
|
||||||
tBufferDestroy(&local);
|
tBufferDestroy(&local);
|
||||||
|
} else {
|
||||||
|
// new col compress
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -4284,7 +4286,7 @@ int32_t tDecompressData(void *input, // input
|
||||||
if (info->cmprAlg == NO_COMPRESSION) {
|
if (info->cmprAlg == NO_COMPRESSION) {
|
||||||
ASSERT(info->compressedSize == info->originalSize);
|
ASSERT(info->compressedSize == info->originalSize);
|
||||||
memcpy(output, input, info->compressedSize);
|
memcpy(output, input, info->compressedSize);
|
||||||
} else {
|
} else if (info->cmprAlg == ONE_STAGE_COMP || info->cmprAlg == TWO_STAGE_COMP) {
|
||||||
SBuffer local;
|
SBuffer local;
|
||||||
|
|
||||||
tBufferInit(&local);
|
tBufferInit(&local);
|
||||||
|
@ -4315,6 +4317,38 @@ int32_t tDecompressData(void *input, // input
|
||||||
return TSDB_CODE_COMPRESS_ERROR;
|
return TSDB_CODE_COMPRESS_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(decompressedSize == info->originalSize);
|
||||||
|
tBufferDestroy(&local);
|
||||||
|
} else {
|
||||||
|
SBuffer local;
|
||||||
|
|
||||||
|
tBufferInit(&local);
|
||||||
|
if (buffer == NULL) {
|
||||||
|
buffer = &local;
|
||||||
|
}
|
||||||
|
if (info->cmprAlg == TWO_STAGE_COMP) {
|
||||||
|
code = tBufferEnsureCapacity(buffer, info->originalSize + COMP_OVERFLOW_BYTES);
|
||||||
|
if (code) {
|
||||||
|
tBufferDestroy(&local);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t decompressedSize = tDataTypes[info->dataType].decompFunc(
|
||||||
|
input, // input
|
||||||
|
info->compressedSize, // inputSize
|
||||||
|
info->originalSize / tDataTypes[info->dataType].bytes, // number of elements
|
||||||
|
output, // output
|
||||||
|
outputSize, // output size
|
||||||
|
info->cmprAlg, // compression algorithm
|
||||||
|
buffer->data, // helper buffer
|
||||||
|
buffer->capacity // extra buffer size
|
||||||
|
);
|
||||||
|
if (decompressedSize < 0) {
|
||||||
|
tBufferDestroy(&local);
|
||||||
|
return TSDB_CODE_COMPRESS_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
ASSERT(decompressedSize == info->originalSize);
|
ASSERT(decompressedSize == info->originalSize);
|
||||||
tBufferDestroy(&local);
|
tBufferDestroy(&local);
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,13 +61,44 @@ tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX] = {
|
||||||
{TSDB_DATA_TYPE_UINT, 12, INT_BYTES, "INT UNSIGNED", 0, UINT32_MAX, tsCompressInt, tsDecompressInt},
|
{TSDB_DATA_TYPE_UINT, 12, INT_BYTES, "INT UNSIGNED", 0, UINT32_MAX, tsCompressInt, tsDecompressInt},
|
||||||
{TSDB_DATA_TYPE_UBIGINT, 15, LONG_BYTES, "BIGINT UNSIGNED", 0, UINT64_MAX, tsCompressBigint, tsDecompressBigint},
|
{TSDB_DATA_TYPE_UBIGINT, 15, LONG_BYTES, "BIGINT UNSIGNED", 0, UINT64_MAX, tsCompressBigint, tsDecompressBigint},
|
||||||
{TSDB_DATA_TYPE_JSON, 4, TSDB_MAX_JSON_TAG_LEN, "JSON", 0, 0, tsCompressString, tsDecompressString},
|
{TSDB_DATA_TYPE_JSON, 4, TSDB_MAX_JSON_TAG_LEN, "JSON", 0, 0, tsCompressString, tsDecompressString},
|
||||||
{TSDB_DATA_TYPE_VARBINARY, 9, 1, "VARBINARY", 0, 0, tsCompressString, tsDecompressString}, // placeholder, not implemented
|
{TSDB_DATA_TYPE_VARBINARY, 9, 1, "VARBINARY", 0, 0, tsCompressString,
|
||||||
|
tsDecompressString}, // placeholder, not implemented
|
||||||
{TSDB_DATA_TYPE_DECIMAL, 7, 1, "DECIMAL", 0, 0, NULL, NULL}, // placeholder, not implemented
|
{TSDB_DATA_TYPE_DECIMAL, 7, 1, "DECIMAL", 0, 0, NULL, NULL}, // placeholder, not implemented
|
||||||
{TSDB_DATA_TYPE_BLOB, 4, 1, "BLOB", 0, 0, NULL, NULL}, // placeholder, not implemented
|
{TSDB_DATA_TYPE_BLOB, 4, 1, "BLOB", 0, 0, NULL, NULL}, // placeholder, not implemented
|
||||||
{TSDB_DATA_TYPE_MEDIUMBLOB, 10, 1, "MEDIUMBLOB", 0, 0, NULL, NULL}, // placeholder, not implemented
|
{TSDB_DATA_TYPE_MEDIUMBLOB, 10, 1, "MEDIUMBLOB", 0, 0, NULL, NULL}, // placeholder, not implemented
|
||||||
{TSDB_DATA_TYPE_GEOMETRY, 8, 1, "GEOMETRY", 0, 0, tsCompressString, tsDecompressString},
|
{TSDB_DATA_TYPE_GEOMETRY, 8, 1, "GEOMETRY", 0, 0, tsCompressString, tsDecompressString},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
tDataTypeCompress tDataCompress[TSDB_DATA_TYPE_MAX] = {
|
||||||
|
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", 0, 0, NULL, NULL},
|
||||||
|
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", false, true, tsCompressBool2, tsDecompressBool2},
|
||||||
|
{TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", INT8_MIN, INT8_MAX, tsCompressTinyint2, tsDecompressTinyint2},
|
||||||
|
{TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", INT16_MIN, INT16_MAX, tsCompressSmallint2,
|
||||||
|
tsDecompressSmallint2},
|
||||||
|
{TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT", INT32_MIN, INT32_MAX, tsCompressInt2, tsDecompressInt2},
|
||||||
|
{TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", INT64_MIN, INT64_MAX, tsCompressBigint2, tsDecompressBigint2},
|
||||||
|
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", 0, 0, tsCompressFloat2, tsDecompressFloat2},
|
||||||
|
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", 0, 0, tsCompressDouble2, tsDecompressDouble2},
|
||||||
|
{TSDB_DATA_TYPE_VARCHAR, 6, 1, "VARCHAR", 0, 0, tsCompressString2, tsDecompressString2},
|
||||||
|
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", INT64_MIN, INT64_MAX, tsCompressTimestamp2,
|
||||||
|
tsDecompressTimestamp2},
|
||||||
|
{TSDB_DATA_TYPE_NCHAR, 5, 1, "NCHAR", 0, 0, tsCompressString2, tsDecompressString2},
|
||||||
|
{TSDB_DATA_TYPE_UTINYINT, 16, CHAR_BYTES, "TINYINT UNSIGNED", 0, UINT8_MAX, tsCompressTinyint2,
|
||||||
|
tsDecompressTinyint2},
|
||||||
|
{TSDB_DATA_TYPE_USMALLINT, 17, SHORT_BYTES, "SMALLINT UNSIGNED", 0, UINT16_MAX, tsCompressSmallint2,
|
||||||
|
tsDecompressSmallint2},
|
||||||
|
{TSDB_DATA_TYPE_UINT, 12, INT_BYTES, "INT UNSIGNED", 0, UINT32_MAX, tsCompressInt2, tsDecompressInt2},
|
||||||
|
{TSDB_DATA_TYPE_UBIGINT, 15, LONG_BYTES, "BIGINT UNSIGNED", 0, UINT64_MAX, tsCompressBigint2, tsDecompressBigint2},
|
||||||
|
{TSDB_DATA_TYPE_JSON, 4, TSDB_MAX_JSON_TAG_LEN, "JSON", 0, 0, tsCompressString2, tsDecompressString2},
|
||||||
|
{TSDB_DATA_TYPE_VARBINARY, 9, 1, "VARBINARY", 0, 0, tsCompressString2,
|
||||||
|
tsDecompressString2}, // placeholder, not implemented
|
||||||
|
{TSDB_DATA_TYPE_DECIMAL, 7, 1, "DECIMAL", 0, 0, NULL, NULL}, // placeholder, not implemented
|
||||||
|
{TSDB_DATA_TYPE_BLOB, 4, 1, "BLOB", 0, 0, NULL, NULL}, // placeholder, not implemented
|
||||||
|
{TSDB_DATA_TYPE_MEDIUMBLOB, 10, 1, "MEDIUMBLOB", 0, 0, NULL, NULL}, // placeholder, not implemented
|
||||||
|
{TSDB_DATA_TYPE_GEOMETRY, 8, 1, "GEOMETRY", 0, 0, tsCompressString2, tsDecompressString2},
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
static float floatMin = -FLT_MAX, floatMax = FLT_MAX;
|
static float floatMin = -FLT_MAX, floatMax = FLT_MAX;
|
||||||
static double doubleMin = -DBL_MAX, doubleMax = DBL_MAX;
|
static double doubleMin = -DBL_MAX, doubleMax = DBL_MAX;
|
||||||
|
|
||||||
|
|
|
@ -176,7 +176,7 @@ int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
|
||||||
int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid);
|
int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid);
|
||||||
int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
|
int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
|
||||||
void tBlockDataClear(SBlockData *pBlockData);
|
void tBlockDataClear(SBlockData *pBlockData);
|
||||||
int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers, SBuffer *assist);
|
int32_t tBlockDataCompress(SBlockData *bData, void *pCmprInfo, SBuffer *buffers, SBuffer *assist);
|
||||||
int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist);
|
int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist);
|
||||||
int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData, SBuffer *assist);
|
int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData, SBuffer *assist);
|
||||||
int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br,
|
int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br,
|
||||||
|
|
|
@ -117,8 +117,8 @@ typedef struct SQueryNode SQueryNode;
|
||||||
#define VNODE_METRIC_TAG_NAME_RESULT "result"
|
#define VNODE_METRIC_TAG_NAME_RESULT "result"
|
||||||
|
|
||||||
#define VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS "inserted_rows"
|
#define VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS "inserted_rows"
|
||||||
//#define VNODE_METRIC_TAG_VALUE_INSERT "insert"
|
// #define VNODE_METRIC_TAG_VALUE_INSERT "insert"
|
||||||
//#define VNODE_METRIC_TAG_VALUE_DELETE "delete"
|
// #define VNODE_METRIC_TAG_VALUE_DELETE "delete"
|
||||||
|
|
||||||
// vnd.h
|
// vnd.h
|
||||||
typedef int32_t (*_query_reseek_func_t)(void* pQHandle);
|
typedef int32_t (*_query_reseek_func_t)(void* pQHandle);
|
||||||
|
@ -461,12 +461,12 @@ typedef struct SVCommitSched {
|
||||||
int64_t maxWaitMs;
|
int64_t maxWaitMs;
|
||||||
} SVCommitSched;
|
} SVCommitSched;
|
||||||
|
|
||||||
typedef struct SVMonitorObj{
|
typedef struct SVMonitorObj {
|
||||||
char strClusterId[TSDB_CLUSTER_ID_LEN];
|
char strClusterId[TSDB_CLUSTER_ID_LEN];
|
||||||
char strDnodeId[TSDB_NODE_ID_LEN];
|
char strDnodeId[TSDB_NODE_ID_LEN];
|
||||||
char strVgId[TSDB_VGROUP_ID_LEN];
|
char strVgId[TSDB_VGROUP_ID_LEN];
|
||||||
taos_counter_t *insertCounter;
|
taos_counter_t* insertCounter;
|
||||||
}SVMonitorObj;
|
} SVMonitorObj;
|
||||||
|
|
||||||
struct SVnode {
|
struct SVnode {
|
||||||
char* path;
|
char* path;
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdbDataFileRW.h"
|
#include "tsdbDataFileRW.h"
|
||||||
|
#include "meta.h"
|
||||||
|
|
||||||
// SDataFileReader =============================================
|
// SDataFileReader =============================================
|
||||||
struct SDataFileReader {
|
struct SDataFileReader {
|
||||||
|
@ -878,6 +879,8 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
|
||||||
SBuffer *buffers = writer->buffers;
|
SBuffer *buffers = writer->buffers;
|
||||||
SBuffer *assist = writer->buffers + 4;
|
SBuffer *assist = writer->buffers + 4;
|
||||||
|
|
||||||
|
SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = writer->config->cmprAlg};
|
||||||
|
|
||||||
SBrinRecord record[1] = {{
|
SBrinRecord record[1] = {{
|
||||||
.suid = bData->suid,
|
.suid = bData->suid,
|
||||||
.uid = bData->uid,
|
.uid = bData->uid,
|
||||||
|
@ -909,8 +912,10 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
|
||||||
|
|
||||||
tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer);
|
tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer);
|
||||||
|
|
||||||
// to .data file
|
code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, bData->suid != 0 ? bData->suid : bData->uid,
|
||||||
code = tBlockDataCompress(bData, writer->config->cmprAlg, buffers, assist);
|
&cmprInfo.pColCmpr);
|
||||||
|
|
||||||
|
code = tBlockDataCompress(bData, &cmprInfo, buffers, assist);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
record->blockKeySize = buffers[0].size + buffers[1].size;
|
record->blockKeySize = buffers[0].size + buffers[1].size;
|
||||||
|
@ -953,6 +958,8 @@ _exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
|
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
|
||||||
}
|
}
|
||||||
|
taosHashCleanup(cmprInfo.pColCmpr);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,13 @@ extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t
|
||||||
extern int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint);
|
extern int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint);
|
||||||
extern int32_t tsdbFsyncFile(STsdbFD *pFD);
|
extern int32_t tsdbFsyncFile(STsdbFD *pFD);
|
||||||
|
|
||||||
|
typedef struct SColCompressInfo SColCompressInfo;
|
||||||
|
struct SColCompressInfo {
|
||||||
|
SHashObj *pColCmpr;
|
||||||
|
int32_t defaultCmprAlg;
|
||||||
|
};
|
||||||
|
|
||||||
|
// int32_t tsdbGetCompressByUid(void *meta, tb_uid_t uid, struct SColCompressInfo *info);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -299,3 +299,15 @@ _exit:
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
// int32_t tsdbGetCompressByUid(SFSetWriter *writer, tb_uid_t uid, struct SColCompressInfo *info) {
|
||||||
|
// SHashObj *p = NULL;
|
||||||
|
// int32_t code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, uid, &p);
|
||||||
|
// if (code < 0) {
|
||||||
|
// ASSERT(0);
|
||||||
|
// taosHashCleanup(p);
|
||||||
|
// p = NULL;
|
||||||
|
// } else {
|
||||||
|
// }
|
||||||
|
// info->pColCmpr = p;
|
||||||
|
// return code;
|
||||||
|
// }
|
||||||
|
|
|
@ -41,7 +41,6 @@ typedef struct {
|
||||||
bool exist;
|
bool exist;
|
||||||
STFile file;
|
STFile file;
|
||||||
} files[TSDB_FTYPE_MAX];
|
} files[TSDB_FTYPE_MAX];
|
||||||
SHashObj *pColCmpr;
|
|
||||||
} SFSetWriterConfig;
|
} SFSetWriterConfig;
|
||||||
|
|
||||||
int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer);
|
int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer);
|
||||||
|
|
|
@ -247,7 +247,10 @@ static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
code = tBlockDataCompress(reader->blockData, NO_COMPRESSION, reader->buffers, reader->buffers + 4);
|
SColCompressInfo info;
|
||||||
|
|
||||||
|
SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = NO_COMPRESSION};
|
||||||
|
code = tBlockDataCompress(reader->blockData, (void*)&cmprInfo, reader->buffers, reader->buffers + 4);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
// TSDB_CHECK_CODE(code, lino, _exit);
|
// TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdbSttFileRW.h"
|
#include "tsdbSttFileRW.h"
|
||||||
|
#include "meta.h"
|
||||||
#include "tsdbDataFileRW.h"
|
#include "tsdbDataFileRW.h"
|
||||||
|
|
||||||
// SSttFReader ============================================================
|
// SSttFReader ============================================================
|
||||||
|
@ -464,7 +465,9 @@ static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, i
|
||||||
|
|
||||||
tsdbWriterUpdVerRange(range, sttBlk->minVer, sttBlk->maxVer);
|
tsdbWriterUpdVerRange(range, sttBlk->minVer, sttBlk->maxVer);
|
||||||
|
|
||||||
code = tBlockDataCompress(blockData, cmprAlg, buffers, buffers + 4);
|
SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = cmprAlg};
|
||||||
|
code = tBlockDataCompress(blockData, &cmprInfo, buffers, buffers + 4);
|
||||||
|
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
sttBlk->bInfo.offset = *fileSize;
|
sttBlk->bInfo.offset = *fileSize;
|
||||||
sttBlk->bInfo.szKey = buffers[0].size + buffers[1].size;
|
sttBlk->bInfo.szKey = buffers[0].size + buffers[1].size;
|
||||||
|
@ -492,6 +495,10 @@ static int32_t tsdbSttFileDoWriteBlockData(SSttFileWriter *writer) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
// SHashObj *colCmpr = NULL;
|
||||||
|
// tb_uid_t uid = writer->blockData->suid == 0 ? writer->blockData->uid : writer->blockData->suid;
|
||||||
|
// code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, uid, &colCmpr);
|
||||||
|
|
||||||
code = tsdbFileDoWriteSttBlockData(writer->fd, writer->blockData, writer->config->cmprAlg, &writer->file->size,
|
code = tsdbFileDoWriteSttBlockData(writer->fd, writer->blockData, writer->config->cmprAlg, &writer->file->size,
|
||||||
writer->sttBlkArray, writer->buffers, &writer->ctx->range);
|
writer->sttBlkArray, writer->buffers, &writer->ctx->range);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
|
@ -15,8 +15,12 @@
|
||||||
|
|
||||||
#include "tdataformat.h"
|
#include "tdataformat.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
#include "tsdbDef.h"
|
||||||
|
|
||||||
static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist);
|
int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg);
|
||||||
|
|
||||||
|
static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist,
|
||||||
|
SColCompressInfo *pCompressExt);
|
||||||
|
|
||||||
// SMapData =======================================================================
|
// SMapData =======================================================================
|
||||||
void tMapDataReset(SMapData *pMapData) {
|
void tMapDataReset(SMapData *pMapData) {
|
||||||
|
@ -1403,10 +1407,12 @@ SColData *tBlockDataGetColData(SBlockData *pBlockData, int16_t cid) {
|
||||||
* buffers[2]: SBlockCol part
|
* buffers[2]: SBlockCol part
|
||||||
* buffers[3]: regular column part
|
* buffers[3]: regular column part
|
||||||
*/
|
*/
|
||||||
int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers, SBuffer *assist) {
|
int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SBuffer *assist) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
SColCompressInfo *pInfo = pCompr;
|
||||||
|
|
||||||
SDiskDataHdr hdr = {
|
SDiskDataHdr hdr = {
|
||||||
.delimiter = TSDB_FILE_DLMT,
|
.delimiter = TSDB_FILE_DLMT,
|
||||||
.fmtVer = 1,
|
.fmtVer = 1,
|
||||||
|
@ -1417,13 +1423,12 @@ int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers,
|
||||||
.szKey = 0, // filled by compress key
|
.szKey = 0, // filled by compress key
|
||||||
.szBlkCol = 0, // filled by this func
|
.szBlkCol = 0, // filled by this func
|
||||||
.nRow = bData->nRow,
|
.nRow = bData->nRow,
|
||||||
.cmprAlg = cmprAlg,
|
.cmprAlg = pInfo->defaultCmprAlg,
|
||||||
.numOfPKs = 0, // filled by compress key
|
.numOfPKs = 0, // filled by compress key
|
||||||
};
|
};
|
||||||
|
|
||||||
// Key part
|
// Key part
|
||||||
tBufferClear(&buffers[1]);
|
tBufferClear(&buffers[1]);
|
||||||
code = tBlockDataCompressKeyPart(bData, &hdr, &buffers[1], assist);
|
code = tBlockDataCompressKeyPart(bData, &hdr, &buffers[1], assist, (SColCompressInfo *)pInfo);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// Regulart column part
|
// Regulart column part
|
||||||
|
@ -1440,14 +1445,18 @@ int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers,
|
||||||
}
|
}
|
||||||
|
|
||||||
SColDataCompressInfo cinfo = {
|
SColDataCompressInfo cinfo = {
|
||||||
.cmprAlg = cmprAlg,
|
.cmprAlg = pInfo->defaultCmprAlg,
|
||||||
};
|
};
|
||||||
|
code = tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, colData->cid, &cinfo.cmprAlg);
|
||||||
|
if (code < 0) {
|
||||||
|
//
|
||||||
|
}
|
||||||
|
|
||||||
int32_t offset = buffers[3].size;
|
int32_t offset = buffers[3].size;
|
||||||
code = tColDataCompress(colData, &cinfo, &buffers[3], assist);
|
code = tColDataCompress(colData, &cinfo, &buffers[3], assist);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
SBlockCol blockCol = (SBlockCol){
|
SBlockCol blockCol = (SBlockCol){.cid = cinfo.columnId,
|
||||||
.cid = cinfo.columnId,
|
|
||||||
.type = cinfo.dataType,
|
.type = cinfo.dataType,
|
||||||
.cflag = cinfo.columnFlag,
|
.cflag = cinfo.columnFlag,
|
||||||
.flag = cinfo.flag,
|
.flag = cinfo.flag,
|
||||||
|
@ -1456,7 +1465,7 @@ int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers,
|
||||||
.szOffset = cinfo.offsetCompressedSize,
|
.szOffset = cinfo.offsetCompressedSize,
|
||||||
.szValue = cinfo.dataCompressedSize,
|
.szValue = cinfo.dataCompressedSize,
|
||||||
.offset = offset,
|
.offset = offset,
|
||||||
};
|
.alg = cinfo.cmprAlg};
|
||||||
|
|
||||||
code = tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer);
|
code = tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
@ -1581,7 +1590,8 @@ int32_t tGetColumnDataAgg(SBufferReader *br, SColumnDataAgg *pColAgg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist) {
|
static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist,
|
||||||
|
SColCompressInfo *compressInfo) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SCompressInfo cinfo;
|
SCompressInfo cinfo;
|
||||||
|
@ -1632,6 +1642,12 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S
|
||||||
SColDataCompressInfo info = {
|
SColDataCompressInfo info = {
|
||||||
.cmprAlg = hdr->cmprAlg,
|
.cmprAlg = hdr->cmprAlg,
|
||||||
};
|
};
|
||||||
|
code = tsdbGetColCmprAlgFromSet(compressInfo->pColCmpr, colData->cid, &info.cmprAlg);
|
||||||
|
if (code < 0) {
|
||||||
|
// do nothing
|
||||||
|
} else {
|
||||||
|
}
|
||||||
|
|
||||||
code = tColDataCompress(colData, &info, buffer, assist);
|
code = tColDataCompress(colData, &info, buffer, assist);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
@ -1645,6 +1661,7 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S
|
||||||
.szOffset = info.offsetCompressedSize,
|
.szOffset = info.offsetCompressedSize,
|
||||||
.szValue = info.dataCompressedSize,
|
.szValue = info.dataCompressedSize,
|
||||||
.offset = 0,
|
.offset = 0,
|
||||||
|
.alg = info.cmprAlg,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1665,7 +1682,7 @@ int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *bl
|
||||||
// ASSERT(blockCol->flag != HAS_NONE);
|
// ASSERT(blockCol->flag != HAS_NONE);
|
||||||
|
|
||||||
SColDataCompressInfo info = {
|
SColDataCompressInfo info = {
|
||||||
.cmprAlg = hdr->cmprAlg,
|
.cmprAlg = blockCol->alg,
|
||||||
.columnFlag = blockCol->cflag,
|
.columnFlag = blockCol->cflag,
|
||||||
.flag = blockCol->flag,
|
.flag = blockCol->flag,
|
||||||
.dataType = blockCol->type,
|
.dataType = blockCol->type,
|
||||||
|
@ -1760,3 +1777,13 @@ int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br,
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg) {
|
||||||
|
if (set == NULL) return -1;
|
||||||
|
|
||||||
|
uint32_t *ret = taosHashGet(set, &colId, sizeof(colId));
|
||||||
|
if (ret == NULL) return -1;
|
||||||
|
|
||||||
|
*alg = *ret;
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -169,7 +169,7 @@ typedef struct {
|
||||||
int64_t maxVer;
|
int64_t maxVer;
|
||||||
int32_t numRec;
|
int32_t numRec;
|
||||||
int32_t size[15];
|
int32_t size[15];
|
||||||
int8_t cmprAlg;
|
uint32_t cmprAlg;
|
||||||
int8_t numOfPKs; // number of primary keys
|
int8_t numOfPKs; // number of primary keys
|
||||||
int8_t rsvd[6];
|
int8_t rsvd[6];
|
||||||
} SBrinBlk;
|
} SBrinBlk;
|
||||||
|
|
|
@ -2395,6 +2395,316 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define DEFINE_VAR(cmprAlg) \
|
||||||
|
uint8_t l1 = COMPRESS_L1_TYPE_U32(cmprAlg); \
|
||||||
|
uint8_t l2 = COMPRESS_L2_TYPE_U8(cmprAlg); \
|
||||||
|
uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(cmprAlg);
|
||||||
|
/*************************************************************************
|
||||||
|
* REGULAR COMPRESSION 2
|
||||||
|
*************************************************************************/
|
||||||
|
// Timestamp =====================================================
|
||||||
|
int32_t tsCompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
|
||||||
|
void *pBuf, int32_t nBuf) {
|
||||||
|
DEFINE_VAR(cmprAlg)
|
||||||
|
|
||||||
|
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
|
||||||
|
return tsCompressTimestampImp(pIn, nEle, pOut);
|
||||||
|
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
|
||||||
|
int32_t len = tsCompressTimestampImp(pIn, nEle, pBuf);
|
||||||
|
return tsCompressStringImp(pBuf, len, pOut, nOut);
|
||||||
|
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) {
|
||||||
|
ASSERT(0);
|
||||||
|
} else if (l1 == L1_DISABLED && l2 == L2_DISABLED) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsDecompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
|
||||||
|
void *pBuf, int32_t nBuf) {
|
||||||
|
DEFINE_VAR(cmprAlg)
|
||||||
|
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
|
||||||
|
return tsDecompressTimestampImp(pIn, nEle, pOut);
|
||||||
|
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
|
||||||
|
int32_t len = tsDecompressStringImp(pIn, nIn, pBuf, nBuf);
|
||||||
|
if (len < 0) return -1;
|
||||||
|
return tsDecompressTimestampImp(pBuf, nEle, pOut);
|
||||||
|
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) {
|
||||||
|
ASSERT(0);
|
||||||
|
} else if (l1 == L1_DISABLED && l2 == L2_DISABLED) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Float =====================================================
|
||||||
|
int32_t tsCompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf) {
|
||||||
|
return 0;
|
||||||
|
// #ifdef TD_TSZ
|
||||||
|
// // lossy mode
|
||||||
|
// if (lossyFloat) {
|
||||||
|
// return tsCompressFloatLossyImp(pIn, nEle, pOut);
|
||||||
|
// // lossless mode
|
||||||
|
// } else {
|
||||||
|
// #endif
|
||||||
|
// if (cmprAlg == ONE_STAGE_COMP) {
|
||||||
|
// return tsCompressFloatImp(pIn, nEle, pOut);
|
||||||
|
// } else if (cmprAlg == TWO_STAGE_COMP) {
|
||||||
|
// int32_t len = tsCompressFloatImp(pIn, nEle, pBuf);
|
||||||
|
// return tsCompressStringImp(pBuf, len, pOut, nOut);
|
||||||
|
// } else {
|
||||||
|
// ASSERTS(0, "compress algo invalid");
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
// #ifdef TD_TSZ
|
||||||
|
// }
|
||||||
|
// #endif
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf) {
|
||||||
|
return 0;
|
||||||
|
// #ifdef TD_TSZ
|
||||||
|
// if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
|
||||||
|
// // decompress lossy
|
||||||
|
// return tsDecompressFloatLossyImp(pIn, nIn, nEle, pOut);
|
||||||
|
// } else {
|
||||||
|
// #endif
|
||||||
|
// // decompress lossless
|
||||||
|
// if (cmprAlg == ONE_STAGE_COMP) {
|
||||||
|
// return tsDecompressFloatImp(pIn, nEle, pOut);
|
||||||
|
// } else if (cmprAlg == TWO_STAGE_COMP) {
|
||||||
|
// if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
|
||||||
|
// return tsDecompressFloatImp(pBuf, nEle, pOut);
|
||||||
|
// } else {
|
||||||
|
// ASSERTS(0, "compress algo invalid");
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
// #ifdef TD_TSZ
|
||||||
|
// }
|
||||||
|
// #endif
|
||||||
|
}
|
||||||
|
|
||||||
|
// Double =====================================================
|
||||||
|
int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf) {
|
||||||
|
return 0;
|
||||||
|
// #ifdef TD_TSZ
|
||||||
|
// if (lossyDouble) {
|
||||||
|
// // lossy mode
|
||||||
|
// return tsCompressDoubleLossyImp(pIn, nEle, pOut);
|
||||||
|
// } else {
|
||||||
|
// #endif
|
||||||
|
// // lossless mode
|
||||||
|
// if (cmprAlg == ONE_STAGE_COMP) {
|
||||||
|
// return tsCompressDoubleImp(pIn, nEle, pOut);
|
||||||
|
// } else if (cmprAlg == TWO_STAGE_COMP) {
|
||||||
|
// int32_t len = tsCompressDoubleImp(pIn, nEle, pBuf);
|
||||||
|
// return tsCompressStringImp(pBuf, len, pOut, nOut);
|
||||||
|
// } else {
|
||||||
|
// ASSERTS(0, "compress algo invalid");
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
// #ifdef TD_TSZ
|
||||||
|
// }
|
||||||
|
// #endif
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
|
||||||
|
void *pBuf, int32_t nBuf) {
|
||||||
|
return 0;
|
||||||
|
// #ifdef TD_TSZ
|
||||||
|
// if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
|
||||||
|
// // decompress lossy
|
||||||
|
// return tsDecompressDoubleLossyImp(pIn, nIn, nEle, pOut);
|
||||||
|
// } else {
|
||||||
|
// #endif
|
||||||
|
// // decompress lossless
|
||||||
|
// if (cmprAlg == ONE_STAGE_COMP) {
|
||||||
|
// return tsDecompressDoubleImp(pIn, nEle, pOut);
|
||||||
|
// } else if (cmprAlg == TWO_STAGE_COMP) {
|
||||||
|
// if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
|
||||||
|
// return tsDecompressDoubleImp(pBuf, nEle, pOut);
|
||||||
|
// } else {
|
||||||
|
// ASSERTS(0, "compress algo invalid");
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
// #ifdef TD_TSZ
|
||||||
|
// }
|
||||||
|
// #endif
|
||||||
|
}
|
||||||
|
|
||||||
|
// Binary =====================================================
|
||||||
|
int32_t tsCompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf) {
|
||||||
|
return tsCompressStringImp(pIn, nIn, pOut, nOut);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsDecompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
|
||||||
|
void *pBuf, int32_t nBuf) {
|
||||||
|
// return 0;
|
||||||
|
return tsDecompressStringImp(pIn, nIn, pOut, nOut);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bool =====================================================
|
||||||
|
int32_t tsCompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf) {
|
||||||
|
DEFINE_VAR(cmprAlg)
|
||||||
|
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
|
||||||
|
return tsCompressBoolImp(pIn, nEle, pOut);
|
||||||
|
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
|
||||||
|
int32_t len = tsCompressBoolImp(pIn, nIn, pBuf);
|
||||||
|
if (len < 0) return -1;
|
||||||
|
return tsCompressStringImp(pBuf, len, pOut, nOut);
|
||||||
|
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) {
|
||||||
|
ASSERT(0);
|
||||||
|
} else if (l1 == L1_DISABLED && l2 == L2_DISABLED) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsDecompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf) {
|
||||||
|
DEFINE_VAR(cmprAlg)
|
||||||
|
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
|
||||||
|
return tsDecompressBoolImp(pIn, nEle, pOut);
|
||||||
|
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
|
||||||
|
if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
|
||||||
|
return tsDecompressBoolImp(pBuf, nEle, pOut);
|
||||||
|
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) {
|
||||||
|
return -1;
|
||||||
|
} else if (l2 == L1_DISABLED && l2 == L2_DISABLED) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tinyint =====================================================
|
||||||
|
int32_t tsCompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf) {
|
||||||
|
return 0;
|
||||||
|
// if (cmprAlg == ONE_STAGE_COMP) {
|
||||||
|
// return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_TINYINT);
|
||||||
|
// } else if (cmprAlg == TWO_STAGE_COMP) {
|
||||||
|
// int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_TINYINT);
|
||||||
|
// return tsCompressStringImp(pBuf, len, pOut, nOut);
|
||||||
|
// } else {
|
||||||
|
// ASSERTS(0, "compress algo invalid");
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsDecompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
|
||||||
|
void *pBuf, int32_t nBuf) {
|
||||||
|
return 0;
|
||||||
|
// if (cmprAlg == ONE_STAGE_COMP) {
|
||||||
|
// return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_TINYINT);
|
||||||
|
// } else if (cmprAlg == TWO_STAGE_COMP) {
|
||||||
|
// if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
|
||||||
|
// return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_TINYINT);
|
||||||
|
// } else {
|
||||||
|
// ASSERTS(0, "compress algo invalid");
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
// Smallint =====================================================
|
||||||
|
int32_t tsCompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
|
||||||
|
void *pBuf, int32_t nBuf) {
|
||||||
|
return 0;
|
||||||
|
// if (cmprAlg == ONE_STAGE_COMP) {
|
||||||
|
// return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_SMALLINT);
|
||||||
|
// } else if (cmprAlg == TWO_STAGE_COMP) {
|
||||||
|
// int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_SMALLINT);
|
||||||
|
// return tsCompressStringImp(pBuf, len, pOut, nOut);
|
||||||
|
// } else {
|
||||||
|
// ASSERTS(0, "compress algo invalid");
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsDecompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
|
||||||
|
void *pBuf, int32_t nBuf) {
|
||||||
|
return 0;
|
||||||
|
// if (cmprAlg == ONE_STAGE_COMP) {
|
||||||
|
// return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_SMALLINT);
|
||||||
|
// } else if (cmprAlg == TWO_STAGE_COMP) {
|
||||||
|
// if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
|
||||||
|
// return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_SMALLINT);
|
||||||
|
// } else {
|
||||||
|
// ASSERTS(0, "compress algo invalid");
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
// Int =====================================================
|
||||||
|
int32_t tsCompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf) {
|
||||||
|
DEFINE_VAR(cmprAlg)
|
||||||
|
|
||||||
|
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
|
||||||
|
return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_INT);
|
||||||
|
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
|
||||||
|
int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_INT);
|
||||||
|
return tsCompressStringImp(pBuf, len, pOut, nOut);
|
||||||
|
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
|
||||||
|
ASSERTS(0, "compress algo invalid");
|
||||||
|
return -1;
|
||||||
|
} else if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
|
||||||
|
ASSERTS(0, "compress algo invalid");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsDecompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf) {
|
||||||
|
DEFINE_VAR(cmprAlg)
|
||||||
|
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
|
||||||
|
return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_INT);
|
||||||
|
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
|
||||||
|
if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
|
||||||
|
return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_INT);
|
||||||
|
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) {
|
||||||
|
ASSERTS(0, "compress algo invalid");
|
||||||
|
return -1;
|
||||||
|
} else if (l1 == L1_DISABLED && l2 == L2_DISABLED) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bigint =====================================================
|
||||||
|
int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
|
||||||
|
int32_t nBuf) {
|
||||||
|
return 0;
|
||||||
|
// if (cmprAlg == ONE_STAGE_COMP) {
|
||||||
|
// return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
|
||||||
|
// } else if (cmprAlg == TWO_STAGE_COMP) {
|
||||||
|
// int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_BIGINT);
|
||||||
|
// return tsCompressStringImp(pBuf, len, pOut, nOut);
|
||||||
|
// } else {
|
||||||
|
// ASSERTS(0, "compress algo invalid");
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
|
||||||
|
void *pBuf, int32_t nBuf) {
|
||||||
|
return 0;
|
||||||
|
// if (cmprAlg == ONE_STAGE_COMP) {
|
||||||
|
// return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
|
||||||
|
// } else if (cmprAlg == TWO_STAGE_COMP) {
|
||||||
|
// if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
|
||||||
|
// return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
|
||||||
|
// } else {
|
||||||
|
// ASSERTS(0, "compress algo invalid");
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
}
|
||||||
int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn);
|
int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn);
|
||||||
|
|
||||||
int32_t tsCompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
|
int32_t tsCompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
|
||||||
|
|
Loading…
Reference in New Issue