From 9b600d6f8f01ae8f8bf0e98f1784ffab4790b05d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 16 Sep 2022 11:53:58 +0800 Subject: [PATCH 01/13] mroe code --- source/dnode/vnode/src/tsdb/tsdbCompress.c | 179 ++++++++++++++++++--- 1 file changed, 159 insertions(+), 20 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCompress.c b/source/dnode/vnode/src/tsdb/tsdbCompress.c index 76be7c1070..68cf6ef0ca 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompress.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompress.c @@ -13,45 +13,129 @@ * along with this program. If not, see . */ +#include "lz4.h" #include "tsdb.h" -// Integer ===================================================== typedef struct { - int8_t rawCopy; - int64_t prevVal; - int32_t nVal; - int32_t nBuf; - uint8_t *pBuf; -} SIntCompressor; + int8_t type; + int8_t cmprAlg; + uint8_t *aBuf[2]; + int64_t nBuf[2]; + union { + // Timestamp ---- + struct { + /* data */ + }; + // Integer ---- + struct { + /* data */ + }; + // Binary ---- + struct { + /* data */ + }; + // Float ---- + struct { + /* data */ + }; + // Bool ---- + struct { + int32_t bool_n; + uint8_t bool_b; + }; + }; +} SCompressor; +// Timestamp ===================================================== +static int32_t tCompTimestamp(SCompressor *pCmprsor, TSKEY ts) { + int32_t code = 0; + // TODO + return code; +} + +// Integer ===================================================== #define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) #define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) -static int32_t tsdbCmprI64(SIntCompressor *pCompressor, int64_t val) { +static int32_t tCompI64(SCompressor *pCmprsor, int64_t val) { int32_t code = 0; +#if 0 // raw copy - if (pCompressor->rawCopy) { - memcpy(pCompressor->pBuf + pCompressor->nBuf, &val, sizeof(val)); - pCompressor->nBuf += sizeof(val); - pCompressor->nVal++; + if (pCmprsor->rawCopy) { + memcpy(pCmprsor->pBuf + pCmprsor->nBuf, &val, sizeof(val)); + pCmprsor->nBuf += sizeof(val); + pCmprsor->nVal++; goto _exit; } - if (!I64_SAFE_ADD(val, pCompressor->prevVal)) { - pCompressor->rawCopy = 1; + if (!I64_SAFE_ADD(val, pCmprsor->prevVal)) { + pCmprsor->rawCopy = 1; // TODO: decompress and copy - pCompressor->nVal++; + pCmprsor->nVal++; goto _exit; } - int64_t diff = val - pCompressor->prevVal; + int64_t diff = val - pCmprsor->prevVal; uint8_t zigzag = ZIGZAGE(int64_t, diff); if (zigzag >= SIMPLE8B_MAX) { - pCompressor->rawCopy = 1; + pCmprsor->rawCopy = 1; // TODO: decompress and copy - pCompressor->nVal++; + pCmprsor->nVal++; + goto _exit; + } + +_exit: +#endif + return code; +} + +// Float ===================================================== +static int32_t tCompFloat() { + int32_t code = 0; + // TODO + return code; +} + +// Binary ===================================================== +static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t nData) { + int32_t code = 0; + + if (nData) { + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); + pCmprsor->nBuf[0] += nData; + } + + return code; +} + +// Bool ===================================================== +static uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; +static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { + int32_t code = 0; + + if (vBool) { + pCmprsor->bool_b |= BOOL_CMPR_TABLE[pCmprsor->bool_n % 4]; + } + pCmprsor->bool_n++; + + if (pCmprsor->bool_n % 4 == 0) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = pCmprsor->bool_b; + pCmprsor->nBuf[0]++; + pCmprsor->bool_b = 0; + } + + return code; +} + +// SCompressor ===================================================== +int32_t tCompressorCreate(SCompressor **ppCmprsor) { + int32_t code = 0; + + *ppCmprsor = (SCompressor *)taosMemoryCalloc(1, sizeof(SCompressor)); + if ((*ppCmprsor) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -59,6 +143,61 @@ _exit: return code; } -// Timestamp ===================================================== +int32_t tCompressorDestroy(SCompressor *pCmprsor) { + int32_t code = 0; -// Float ===================================================== \ No newline at end of file + if (pCmprsor) { + for (int32_t iBuf = 0; iBuf < sizeof(pCmprsor->aBuf) / sizeof(pCmprsor->aBuf[0]); iBuf++) { + tFree(pCmprsor->aBuf[iBuf]); + } + } + + return code; +} + +int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { + int32_t code = 0; + + pCmprsor->type = type; + pCmprsor->cmprAlg = cmprAlg; + + switch (type) { + case TSDB_DATA_TYPE_BOOL: + pCmprsor->bool_n = 0; + pCmprsor->bool_b = 0; + break; + + default: + break; + } + + return code; +} + +int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData) { + int32_t code = 0; + + if (pCmprsor->cmprAlg == TWO_STAGE_COMP || IS_VAR_DATA_TYPE(pCmprsor->type)) { + code = tRealloc(&pCmprsor->aBuf[1], pCmprsor->nBuf[0] + 1); + if (code) goto _exit; + + int64_t ret = LZ4_compress_default(pCmprsor->aBuf[0], pCmprsor->aBuf[1] + 1, pCmprsor->nBuf[0], pCmprsor->nBuf[0]); + if (ret) { + pCmprsor->aBuf[1][0] = 0; + pCmprsor->nBuf[1] = ret + 1; + } else { + pCmprsor->aBuf[1][0] = 1; + memcpy(pCmprsor->aBuf[1] + 1, pCmprsor->aBuf[0], pCmprsor->nBuf[0]); + pCmprsor->nBuf[1] = pCmprsor->nBuf[0] + 1; + } + + *ppData = pCmprsor->aBuf[1]; + *nData = pCmprsor->nBuf[1]; + } else { + *ppData = pCmprsor->aBuf[0]; + *nData = pCmprsor->nBuf[0]; + } + +_exit: + return code; +} From a34b4ec749bd00f179d1bfefd32b11c7089eff0c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 16 Sep 2022 13:33:36 +0800 Subject: [PATCH 02/13] more code --- source/dnode/vnode/src/tsdb/tsdbCompress.c | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCompress.c b/source/dnode/vnode/src/tsdb/tsdbCompress.c index 68cf6ef0ca..2d402a8976 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompress.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompress.c @@ -41,7 +41,6 @@ typedef struct { // Bool ---- struct { int32_t bool_n; - uint8_t bool_b; }; }; } SCompressor; @@ -115,17 +114,17 @@ static uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { int32_t code = 0; + int32_t mod4 = pCmprsor->bool_n & 3; if (vBool) { - pCmprsor->bool_b |= BOOL_CMPR_TABLE[pCmprsor->bool_n % 4]; + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] |= BOOL_CMPR_TABLE[mod4]; } pCmprsor->bool_n++; - - if (pCmprsor->bool_n % 4 == 0) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = pCmprsor->bool_b; + if (mod4 == 3) { pCmprsor->nBuf[0]++; - pCmprsor->bool_b = 0; + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = 0; } +_exit: return code; } From 832f2fb7a397584e9594ff59818ac2d028f08944 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 16 Sep 2022 15:49:43 +0800 Subject: [PATCH 03/13] more code --- source/dnode/vnode/src/tsdb/tsdbCompress.c | 164 ++++++++++++++++++++- 1 file changed, 156 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCompress.c b/source/dnode/vnode/src/tsdb/tsdbCompress.c index 2d402a8976..0b1a70fba7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompress.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompress.c @@ -16,6 +16,8 @@ #include "lz4.h" #include "tsdb.h" +#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) + typedef struct { int8_t type; int8_t cmprAlg; @@ -24,7 +26,11 @@ typedef struct { union { // Timestamp ---- struct { - /* data */ + int8_t ts_copy; + int32_t ts_n; + int64_t ts_prev_val; + int64_t ts_prev_delta; + uint8_t *ts_flag_p; }; // Integer ---- struct { @@ -46,15 +52,133 @@ typedef struct { } SCompressor; // Timestamp ===================================================== +static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { + int32_t code = 0; + + if (pCmprsor->ts_n) { + code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * (pCmprsor->ts_n + 1)); + if (code) return code; + pCmprsor->nBuf[1] = 1; + + int64_t n = 1; + int64_t valPrev; + int64_t delPrev; + uint64_t vZigzag; + while (n < pCmprsor->nBuf[0]) { + uint8_t n1 = pCmprsor->aBuf[0][0] & 0xf; + uint8_t n2 = pCmprsor->aBuf[0][0] >> 4; + + n++; + + vZigzag = 0; + for (uint8_t i = 0; i < n1; i++) { + vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i)); + n++; + } + int64_t delta_of_delta = ZIGZAGD(int64_t, vZigzag); + if (n == 2) { + delPrev = 0; + valPrev = delta_of_delta; + } else { + delPrev = delta_of_delta + delPrev; + valPrev = delPrev + valPrev; + } + + memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &valPrev, sizeof(int64_t)); + pCmprsor->nBuf[1] += sizeof(int64_t); + + if (n >= pCmprsor->nBuf[0]) break; + + vZigzag = 0; + for (uint8_t i = 0; i < n2; i++) { + vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i)); + n++; + } + int64_t delta_of_delta = ZIGZAGD(int64_t, vZigzag); + delPrev = delta_of_delta + delPrev; + valPrev = delPrev + valPrev; + } + + uint8_t *pBuf = pCmprsor->aBuf[0]; + pCmprsor->aBuf[0] = pCmprsor->aBuf[1]; + pCmprsor->aBuf[1] = pBuf; + pCmprsor->nBuf[0] = pCmprsor->nBuf[1]; + } else { + // TODO + } + + pCmprsor->aBuf[0][0] = 0; + pCmprsor->ts_copy = 1; + + return code; +} static int32_t tCompTimestamp(SCompressor *pCmprsor, TSKEY ts) { int32_t code = 0; - // TODO + + if (pCmprsor->ts_n == 0) { + pCmprsor->ts_prev_val = ts; + pCmprsor->ts_prev_delta = -ts; + } + + if (pCmprsor->ts_copy) goto _copy_exit; + + if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) { + code = tCompSetCopyMode(pCmprsor); + if (code) return code; + goto _copy_exit; + } + + int64_t delta = ts - pCmprsor->ts_prev_val; + + if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) { + code = tCompSetCopyMode(pCmprsor); + if (code) return code; + goto _copy_exit; + } + + int64_t delta_of_delta = delta - pCmprsor->ts_prev_delta; + uint64_t zigzag_value = ZIGZAGE(int64_t, delta_of_delta); + + pCmprsor->ts_prev_val = ts; + pCmprsor->ts_prev_delta = delta; + + if (pCmprsor->ts_n & 0x1 == 0) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17 /*sizeof(int64_t) * 2 + 1*/); + if (code) return code; + + pCmprsor->ts_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p[0] = 0; + + while (zigzag_value) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (zigzag_value & 0xff); + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p[0]++; + } + } else { + while (zigzag_value) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (zigzag_value & 0xff); + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p += (uint8_t)0x10; + } + } + + pCmprsor->ts_n++; + return code; + +_copy_exit: + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(int64_t)); + if (code) return code; + + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts)); + pCmprsor->nBuf[0] += sizeof(ts); + + pCmprsor->ts_n++; return code; } // Integer ===================================================== -#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) -#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) +#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) static int32_t tCompI64(SCompressor *pCmprsor, int64_t val) { int32_t code = 0; @@ -110,7 +234,8 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t } // Bool ===================================================== -static uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; +static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; + static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { int32_t code = 0; @@ -122,6 +247,9 @@ static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { if (mod4 == 3) { pCmprsor->nBuf[0]++; pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = 0; + + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0]); + if (code) goto _exit; } _exit: @@ -138,6 +266,13 @@ int32_t tCompressorCreate(SCompressor **ppCmprsor) { goto _exit; } + code = tRealloc(&(*ppCmprsor)->aBuf[0], 1024); + if (code) { + taosMemoryFree(*ppCmprsor); + *ppCmprsor = NULL; + goto _exit; + } + _exit: return code; } @@ -146,9 +281,12 @@ int32_t tCompressorDestroy(SCompressor *pCmprsor) { int32_t code = 0; if (pCmprsor) { - for (int32_t iBuf = 0; iBuf < sizeof(pCmprsor->aBuf) / sizeof(pCmprsor->aBuf[0]); iBuf++) { + int32_t nBuf = sizeof(pCmprsor->aBuf) / sizeof(pCmprsor->aBuf[0]); + for (int32_t iBuf = 0; iBuf < nBuf; iBuf++) { tFree(pCmprsor->aBuf[iBuf]); } + + taosMemoryFree(pCmprsor); } return code; @@ -159,13 +297,17 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { pCmprsor->type = type; pCmprsor->cmprAlg = cmprAlg; + pCmprsor->nBuf[0] = 0; // (todo) may or may not +/- 1 switch (type) { + case TSDB_DATA_TYPE_TIMESTAMP: + pCmprsor->ts_copy = 0; + pCmprsor->ts_n = 0; + break; case TSDB_DATA_TYPE_BOOL: pCmprsor->bool_n = 0; - pCmprsor->bool_b = 0; + pCmprsor->aBuf[0][0] = 0; break; - default: break; } @@ -200,3 +342,9 @@ int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData) _exit: return code; } + +int32_t tCompress(SCompressor *pCmprsor, void *pData, int64_t nData) { + int32_t code = 0; + // TODO + return code; +} \ No newline at end of file From 512aa3d8d71dddbe48e5d50b219389ca85c3a1f6 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 16 Sep 2022 17:36:31 +0800 Subject: [PATCH 04/13] more code --- source/dnode/vnode/src/tsdb/tsdbCompress.c | 127 +++++++++++++++++++-- 1 file changed, 119 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCompress.c b/source/dnode/vnode/src/tsdb/tsdbCompress.c index 0b1a70fba7..55d94e3b37 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompress.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompress.c @@ -36,13 +36,17 @@ typedef struct { struct { /* data */ }; - // Binary ---- - struct { - /* data */ - }; // Float ---- struct { - /* data */ + int32_t f_n; + uint32_t f_prev; + uint8_t *f_flag_p; + }; + // Double ---- + struct { + int32_t d_n; + uint64_t d_prev; + uint8_t *d_flag_p; }; // Bool ---- struct { @@ -94,7 +98,7 @@ static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i)); n++; } - int64_t delta_of_delta = ZIGZAGD(int64_t, vZigzag); + delta_of_delta = ZIGZAGD(int64_t, vZigzag); delPrev = delta_of_delta + delPrev; valPrev = delPrev + valPrev; } @@ -215,9 +219,116 @@ _exit: } // Float ===================================================== -static int32_t tCompFloat() { +static int32_t tCompFloat(SCompressor *pCmprsor, float f) { int32_t code = 0; - // TODO + + union { + float f; + uint32_t u; + } val = {.f = f}; + + uint32_t diff = val.u ^ pCmprsor->f_prev; + pCmprsor->f_prev = val.u; + + int32_t clz, ctz; + if (diff) { + clz = BUILDIN_CLZ(diff); + ctz = BUILDIN_CTZ(diff); + } else { + clz = sizeof(uint32_t); + ctz = sizeof(uint32_t); + } + + if (pCmprsor->f_n & 0x1 == 0) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 9 /* sizeof(float) * 2 + 1 */); + if (code) return code; + + pCmprsor->f_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; + pCmprsor->nBuf[0]++; + + if (clz < ctz) { + uint8_t nBytes = sizeof(uint32_t) - ctz / BITS_PER_BYTE; + pCmprsor->f_flag_p[0] = (0x08 | (nBytes - 1)); + diff >>= (32 - nBytes * BITS_PER_BYTE); + } else { + uint8_t nBytes = sizeof(uint32_t) - clz / BITS_PER_BYTE; + pCmprsor->f_flag_p[0] = nBytes - 1; + } + } else { + if (clz < ctz) { + uint8_t nBytes = sizeof(uint32_t) - ctz / BITS_PER_BYTE; + pCmprsor->f_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4); + diff >>= (32 - nBytes * BITS_PER_BYTE); + } else { + uint8_t nBytes = sizeof(uint32_t) - clz / BITS_PER_BYTE; + pCmprsor->f_flag_p[0] |= ((nBytes - 1) << 4); + } + } + + while (diff) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff); + pCmprsor->nBuf[0]++; + diff >>= BITS_PER_BYTE; + } + + pCmprsor->f_n++; + return code; +} + +// Double ===================================================== +static int32_t tCompDouble(SCompressor *pCmprsor, double d) { + int32_t code = 0; + + union { + double d; + uint64_t u; + } val = {.d = d}; + + uint64_t diff = val.u ^ pCmprsor->d_prev; + pCmprsor->d_prev = val.u; + + int32_t clz, ctz; + if (diff) { + clz = BUILDIN_CLZ(diff); + ctz = BUILDIN_CTZ(diff); + } else { + clz = sizeof(uint64_t); + ctz = sizeof(uint64_t); + } + + if (pCmprsor->d_n & 0x1 == 0) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17 /* sizeof(double) * 2 + 1 */); + if (code) return code; + + pCmprsor->d_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; + pCmprsor->nBuf[0]++; + + if (clz < ctz) { + uint8_t nBytes = sizeof(uint64_t) - ctz / BITS_PER_BYTE; + pCmprsor->d_flag_p[0] = (0x08 | (nBytes - 1)); + diff >>= (64 - nBytes * BITS_PER_BYTE); + } else { + uint8_t nBytes = sizeof(uint64_t) - clz / BITS_PER_BYTE; + pCmprsor->d_flag_p[0] = nBytes - 1; + } + } else { + if (clz < ctz) { + uint8_t nBytes = sizeof(uint64_t) - ctz / BITS_PER_BYTE; + pCmprsor->d_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4); + diff >>= (64 - nBytes * BITS_PER_BYTE); + } else { + uint8_t nBytes = sizeof(uint64_t) - clz / BITS_PER_BYTE; + pCmprsor->d_flag_p[0] |= ((nBytes - 1) << 4); + } + } + + while (diff) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff); + pCmprsor->nBuf[0]++; + diff >>= BITS_PER_BYTE; + } + + pCmprsor->d_n++; return code; } From 5ffaaebad6aa82f2729a3875ba374c809de46b4b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 16 Sep 2022 17:46:50 +0800 Subject: [PATCH 05/13] more code --- source/dnode/vnode/src/tsdb/tsdbCompress.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCompress.c b/source/dnode/vnode/src/tsdb/tsdbCompress.c index 55d94e3b37..616da35f1f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompress.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompress.c @@ -34,7 +34,8 @@ typedef struct { }; // Integer ---- struct { - /* data */ + int8_t i_copy; + int32_t i_n; }; // Float ---- struct { @@ -52,6 +53,10 @@ typedef struct { struct { int32_t bool_n; }; + // Binary ---- + struct { + int32_t b_n; + }; }; } SCompressor; @@ -337,9 +342,13 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t int32_t code = 0; if (nData) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData); + if (code) return code; + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); pCmprsor->nBuf[0] += nData; } + pCmprsor->b_n++; return code; } @@ -419,6 +428,9 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { pCmprsor->bool_n = 0; pCmprsor->aBuf[0][0] = 0; break; + case TSDB_DATA_TYPE_BINARY: + pCmprsor->b_n = 0; + break; default: break; } From 0ff451dcf3dff8cffd1758ae1d6533680cfa9509 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 16 Sep 2022 18:50:49 +0800 Subject: [PATCH 06/13] more code --- source/dnode/vnode/src/tsdb/tsdbCompress.c | 85 ++++++++++++++++------ 1 file changed, 61 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCompress.c b/source/dnode/vnode/src/tsdb/tsdbCompress.c index 616da35f1f..31e24ca751 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompress.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompress.c @@ -36,6 +36,9 @@ typedef struct { struct { int8_t i_copy; int32_t i_n; + int64_t i_prev; + int32_t i_selector; + int32_t i_nele; }; // Float ---- struct { @@ -188,38 +191,72 @@ _copy_exit: // Integer ===================================================== #define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) +static const char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; +static const int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; +static const char bit_to_selector[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 12, + 13, 13, 13, 13, 13, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, + 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, + 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; static int32_t tCompI64(SCompressor *pCmprsor, int64_t val) { int32_t code = 0; -#if 0 - // raw copy - if (pCmprsor->rawCopy) { - memcpy(pCmprsor->pBuf + pCmprsor->nBuf, &val, sizeof(val)); - pCmprsor->nBuf += sizeof(val); - pCmprsor->nVal++; - goto _exit; + if (pCmprsor->i_copy == 1) goto _copy_cmpr; + + if (!I64_SAFE_ADD(val, pCmprsor->i_prev)) { + // TODO + goto _copy_cmpr; } - if (!I64_SAFE_ADD(val, pCmprsor->prevVal)) { - pCmprsor->rawCopy = 1; - // TODO: decompress and copy - pCmprsor->nVal++; - goto _exit; + int64_t diff = val - pCmprsor->i_prev; + uint64_t vZigzag = ZIGZAGE(int64_t, diff); + + if (vZigzag >= SIMPLE8B_MAX) { + // TODO + goto _copy_cmpr; } - int64_t diff = val - pCmprsor->prevVal; - uint8_t zigzag = ZIGZAGE(int64_t, diff); - - if (zigzag >= SIMPLE8B_MAX) { - pCmprsor->rawCopy = 1; - // TODO: decompress and copy - pCmprsor->nVal++; - goto _exit; + int64_t nBit; + if (vZigzag) { + nBit = 64 - BUILDIN_CLZL(vZigzag); + } else { + nBit = 0; } -_exit: -#endif + if (pCmprsor->i_nele + 1 <= selector_to_elems[pCmprsor->i_selector] && + pCmprsor->i_nele + 1 <= selector_to_elems[bit_to_selector[nBit]]) { + if (pCmprsor->i_selector < bit_to_selector[nBit]) { + pCmprsor->i_selector = bit_to_selector[nBit]; + } + pCmprsor->i_nele++; + } else { + while (pCmprsor->i_nele < selector_to_elems[pCmprsor->i_selector]) { + pCmprsor->i_selector++; + } + pCmprsor->i_nele = selector_to_elems[pCmprsor->i_selector]; + + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t)); + if (code) return code; + + uint64_t *bp = pCmprsor->aBuf[0] + pCmprsor->nBuf[0]; + pCmprsor->nBuf[0] += sizeof(uint64_t); + bp[0] = pCmprsor->i_selector; + for (int32_t iVal = 0; iVal < pCmprsor->i_nele; iVal++) { + /* code */ + } + + // reset and continue + } + + return code; + +_copy_cmpr: + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + tDataTypes[pCmprsor->type].bytes); + if (code) return code; + + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], NULL /* todo */, tDataTypes[pCmprsor->type].bytes); + pCmprsor->nBuf[0] += tDataTypes[pCmprsor->type].bytes; + return code; } @@ -294,8 +331,8 @@ static int32_t tCompDouble(SCompressor *pCmprsor, double d) { int32_t clz, ctz; if (diff) { - clz = BUILDIN_CLZ(diff); - ctz = BUILDIN_CTZ(diff); + clz = BUILDIN_CLZL(diff); + ctz = BUILDIN_CTZL(diff); } else { clz = sizeof(uint64_t); ctz = sizeof(uint64_t); From 7a21781a5581eac7dab679402e9614035cc51e39 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 19 Sep 2022 10:08:08 +0800 Subject: [PATCH 07/13] refact code --- source/dnode/vnode/CMakeLists.txt | 1 - source/dnode/vnode/src/tsdb/tsdbCompress.c | 510 --------------------- source/util/src/tcompression.c | 497 ++++++++++++++++++++ 3 files changed, 497 insertions(+), 511 deletions(-) delete mode 100644 source/dnode/vnode/src/tsdb/tsdbCompress.c diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 7a99d26683..1f7a059ffc 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -51,7 +51,6 @@ target_sources( "src/tsdb/tsdbCacheRead.c" "src/tsdb/tsdbRetention.c" "src/tsdb/tsdbDiskData.c" - "src/tsdb/tsdbCompress.c" "src/tsdb/tsdbCompact.c" "src/tsdb/tsdbMergeTree.c" diff --git a/source/dnode/vnode/src/tsdb/tsdbCompress.c b/source/dnode/vnode/src/tsdb/tsdbCompress.c deleted file mode 100644 index 31e24ca751..0000000000 --- a/source/dnode/vnode/src/tsdb/tsdbCompress.c +++ /dev/null @@ -1,510 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "lz4.h" -#include "tsdb.h" - -#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) - -typedef struct { - int8_t type; - int8_t cmprAlg; - uint8_t *aBuf[2]; - int64_t nBuf[2]; - union { - // Timestamp ---- - struct { - int8_t ts_copy; - int32_t ts_n; - int64_t ts_prev_val; - int64_t ts_prev_delta; - uint8_t *ts_flag_p; - }; - // Integer ---- - struct { - int8_t i_copy; - int32_t i_n; - int64_t i_prev; - int32_t i_selector; - int32_t i_nele; - }; - // Float ---- - struct { - int32_t f_n; - uint32_t f_prev; - uint8_t *f_flag_p; - }; - // Double ---- - struct { - int32_t d_n; - uint64_t d_prev; - uint8_t *d_flag_p; - }; - // Bool ---- - struct { - int32_t bool_n; - }; - // Binary ---- - struct { - int32_t b_n; - }; - }; -} SCompressor; - -// Timestamp ===================================================== -static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { - int32_t code = 0; - - if (pCmprsor->ts_n) { - code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * (pCmprsor->ts_n + 1)); - if (code) return code; - pCmprsor->nBuf[1] = 1; - - int64_t n = 1; - int64_t valPrev; - int64_t delPrev; - uint64_t vZigzag; - while (n < pCmprsor->nBuf[0]) { - uint8_t n1 = pCmprsor->aBuf[0][0] & 0xf; - uint8_t n2 = pCmprsor->aBuf[0][0] >> 4; - - n++; - - vZigzag = 0; - for (uint8_t i = 0; i < n1; i++) { - vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i)); - n++; - } - int64_t delta_of_delta = ZIGZAGD(int64_t, vZigzag); - if (n == 2) { - delPrev = 0; - valPrev = delta_of_delta; - } else { - delPrev = delta_of_delta + delPrev; - valPrev = delPrev + valPrev; - } - - memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &valPrev, sizeof(int64_t)); - pCmprsor->nBuf[1] += sizeof(int64_t); - - if (n >= pCmprsor->nBuf[0]) break; - - vZigzag = 0; - for (uint8_t i = 0; i < n2; i++) { - vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i)); - n++; - } - delta_of_delta = ZIGZAGD(int64_t, vZigzag); - delPrev = delta_of_delta + delPrev; - valPrev = delPrev + valPrev; - } - - uint8_t *pBuf = pCmprsor->aBuf[0]; - pCmprsor->aBuf[0] = pCmprsor->aBuf[1]; - pCmprsor->aBuf[1] = pBuf; - pCmprsor->nBuf[0] = pCmprsor->nBuf[1]; - } else { - // TODO - } - - pCmprsor->aBuf[0][0] = 0; - pCmprsor->ts_copy = 1; - - return code; -} -static int32_t tCompTimestamp(SCompressor *pCmprsor, TSKEY ts) { - int32_t code = 0; - - if (pCmprsor->ts_n == 0) { - pCmprsor->ts_prev_val = ts; - pCmprsor->ts_prev_delta = -ts; - } - - if (pCmprsor->ts_copy) goto _copy_exit; - - if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) { - code = tCompSetCopyMode(pCmprsor); - if (code) return code; - goto _copy_exit; - } - - int64_t delta = ts - pCmprsor->ts_prev_val; - - if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) { - code = tCompSetCopyMode(pCmprsor); - if (code) return code; - goto _copy_exit; - } - - int64_t delta_of_delta = delta - pCmprsor->ts_prev_delta; - uint64_t zigzag_value = ZIGZAGE(int64_t, delta_of_delta); - - pCmprsor->ts_prev_val = ts; - pCmprsor->ts_prev_delta = delta; - - if (pCmprsor->ts_n & 0x1 == 0) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17 /*sizeof(int64_t) * 2 + 1*/); - if (code) return code; - - pCmprsor->ts_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; - pCmprsor->nBuf[0]++; - pCmprsor->ts_flag_p[0] = 0; - - while (zigzag_value) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (zigzag_value & 0xff); - pCmprsor->nBuf[0]++; - pCmprsor->ts_flag_p[0]++; - } - } else { - while (zigzag_value) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (zigzag_value & 0xff); - pCmprsor->nBuf[0]++; - pCmprsor->ts_flag_p += (uint8_t)0x10; - } - } - - pCmprsor->ts_n++; - return code; - -_copy_exit: - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(int64_t)); - if (code) return code; - - memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts)); - pCmprsor->nBuf[0] += sizeof(ts); - - pCmprsor->ts_n++; - return code; -} - -// Integer ===================================================== -#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) -static const char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; -static const int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; -static const char bit_to_selector[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 12, - 13, 13, 13, 13, 13, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, - 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, - 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; - -static int32_t tCompI64(SCompressor *pCmprsor, int64_t val) { - int32_t code = 0; - - if (pCmprsor->i_copy == 1) goto _copy_cmpr; - - if (!I64_SAFE_ADD(val, pCmprsor->i_prev)) { - // TODO - goto _copy_cmpr; - } - - int64_t diff = val - pCmprsor->i_prev; - uint64_t vZigzag = ZIGZAGE(int64_t, diff); - - if (vZigzag >= SIMPLE8B_MAX) { - // TODO - goto _copy_cmpr; - } - - int64_t nBit; - if (vZigzag) { - nBit = 64 - BUILDIN_CLZL(vZigzag); - } else { - nBit = 0; - } - - if (pCmprsor->i_nele + 1 <= selector_to_elems[pCmprsor->i_selector] && - pCmprsor->i_nele + 1 <= selector_to_elems[bit_to_selector[nBit]]) { - if (pCmprsor->i_selector < bit_to_selector[nBit]) { - pCmprsor->i_selector = bit_to_selector[nBit]; - } - pCmprsor->i_nele++; - } else { - while (pCmprsor->i_nele < selector_to_elems[pCmprsor->i_selector]) { - pCmprsor->i_selector++; - } - pCmprsor->i_nele = selector_to_elems[pCmprsor->i_selector]; - - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t)); - if (code) return code; - - uint64_t *bp = pCmprsor->aBuf[0] + pCmprsor->nBuf[0]; - pCmprsor->nBuf[0] += sizeof(uint64_t); - bp[0] = pCmprsor->i_selector; - for (int32_t iVal = 0; iVal < pCmprsor->i_nele; iVal++) { - /* code */ - } - - // reset and continue - } - - return code; - -_copy_cmpr: - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + tDataTypes[pCmprsor->type].bytes); - if (code) return code; - - memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], NULL /* todo */, tDataTypes[pCmprsor->type].bytes); - pCmprsor->nBuf[0] += tDataTypes[pCmprsor->type].bytes; - - return code; -} - -// Float ===================================================== -static int32_t tCompFloat(SCompressor *pCmprsor, float f) { - int32_t code = 0; - - union { - float f; - uint32_t u; - } val = {.f = f}; - - uint32_t diff = val.u ^ pCmprsor->f_prev; - pCmprsor->f_prev = val.u; - - int32_t clz, ctz; - if (diff) { - clz = BUILDIN_CLZ(diff); - ctz = BUILDIN_CTZ(diff); - } else { - clz = sizeof(uint32_t); - ctz = sizeof(uint32_t); - } - - if (pCmprsor->f_n & 0x1 == 0) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 9 /* sizeof(float) * 2 + 1 */); - if (code) return code; - - pCmprsor->f_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; - pCmprsor->nBuf[0]++; - - if (clz < ctz) { - uint8_t nBytes = sizeof(uint32_t) - ctz / BITS_PER_BYTE; - pCmprsor->f_flag_p[0] = (0x08 | (nBytes - 1)); - diff >>= (32 - nBytes * BITS_PER_BYTE); - } else { - uint8_t nBytes = sizeof(uint32_t) - clz / BITS_PER_BYTE; - pCmprsor->f_flag_p[0] = nBytes - 1; - } - } else { - if (clz < ctz) { - uint8_t nBytes = sizeof(uint32_t) - ctz / BITS_PER_BYTE; - pCmprsor->f_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4); - diff >>= (32 - nBytes * BITS_PER_BYTE); - } else { - uint8_t nBytes = sizeof(uint32_t) - clz / BITS_PER_BYTE; - pCmprsor->f_flag_p[0] |= ((nBytes - 1) << 4); - } - } - - while (diff) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff); - pCmprsor->nBuf[0]++; - diff >>= BITS_PER_BYTE; - } - - pCmprsor->f_n++; - return code; -} - -// Double ===================================================== -static int32_t tCompDouble(SCompressor *pCmprsor, double d) { - int32_t code = 0; - - union { - double d; - uint64_t u; - } val = {.d = d}; - - uint64_t diff = val.u ^ pCmprsor->d_prev; - pCmprsor->d_prev = val.u; - - int32_t clz, ctz; - if (diff) { - clz = BUILDIN_CLZL(diff); - ctz = BUILDIN_CTZL(diff); - } else { - clz = sizeof(uint64_t); - ctz = sizeof(uint64_t); - } - - if (pCmprsor->d_n & 0x1 == 0) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17 /* sizeof(double) * 2 + 1 */); - if (code) return code; - - pCmprsor->d_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; - pCmprsor->nBuf[0]++; - - if (clz < ctz) { - uint8_t nBytes = sizeof(uint64_t) - ctz / BITS_PER_BYTE; - pCmprsor->d_flag_p[0] = (0x08 | (nBytes - 1)); - diff >>= (64 - nBytes * BITS_PER_BYTE); - } else { - uint8_t nBytes = sizeof(uint64_t) - clz / BITS_PER_BYTE; - pCmprsor->d_flag_p[0] = nBytes - 1; - } - } else { - if (clz < ctz) { - uint8_t nBytes = sizeof(uint64_t) - ctz / BITS_PER_BYTE; - pCmprsor->d_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4); - diff >>= (64 - nBytes * BITS_PER_BYTE); - } else { - uint8_t nBytes = sizeof(uint64_t) - clz / BITS_PER_BYTE; - pCmprsor->d_flag_p[0] |= ((nBytes - 1) << 4); - } - } - - while (diff) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff); - pCmprsor->nBuf[0]++; - diff >>= BITS_PER_BYTE; - } - - pCmprsor->d_n++; - return code; -} - -// Binary ===================================================== -static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t nData) { - int32_t code = 0; - - if (nData) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData); - if (code) return code; - - memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); - pCmprsor->nBuf[0] += nData; - } - pCmprsor->b_n++; - - return code; -} - -// Bool ===================================================== -static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; - -static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { - int32_t code = 0; - - int32_t mod4 = pCmprsor->bool_n & 3; - if (vBool) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] |= BOOL_CMPR_TABLE[mod4]; - } - pCmprsor->bool_n++; - if (mod4 == 3) { - pCmprsor->nBuf[0]++; - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = 0; - - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0]); - if (code) goto _exit; - } - -_exit: - return code; -} - -// SCompressor ===================================================== -int32_t tCompressorCreate(SCompressor **ppCmprsor) { - int32_t code = 0; - - *ppCmprsor = (SCompressor *)taosMemoryCalloc(1, sizeof(SCompressor)); - if ((*ppCmprsor) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - code = tRealloc(&(*ppCmprsor)->aBuf[0], 1024); - if (code) { - taosMemoryFree(*ppCmprsor); - *ppCmprsor = NULL; - goto _exit; - } - -_exit: - return code; -} - -int32_t tCompressorDestroy(SCompressor *pCmprsor) { - int32_t code = 0; - - if (pCmprsor) { - int32_t nBuf = sizeof(pCmprsor->aBuf) / sizeof(pCmprsor->aBuf[0]); - for (int32_t iBuf = 0; iBuf < nBuf; iBuf++) { - tFree(pCmprsor->aBuf[iBuf]); - } - - taosMemoryFree(pCmprsor); - } - - return code; -} - -int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { - int32_t code = 0; - - pCmprsor->type = type; - pCmprsor->cmprAlg = cmprAlg; - pCmprsor->nBuf[0] = 0; // (todo) may or may not +/- 1 - - switch (type) { - case TSDB_DATA_TYPE_TIMESTAMP: - pCmprsor->ts_copy = 0; - pCmprsor->ts_n = 0; - break; - case TSDB_DATA_TYPE_BOOL: - pCmprsor->bool_n = 0; - pCmprsor->aBuf[0][0] = 0; - break; - case TSDB_DATA_TYPE_BINARY: - pCmprsor->b_n = 0; - break; - default: - break; - } - - return code; -} - -int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData) { - int32_t code = 0; - - if (pCmprsor->cmprAlg == TWO_STAGE_COMP || IS_VAR_DATA_TYPE(pCmprsor->type)) { - code = tRealloc(&pCmprsor->aBuf[1], pCmprsor->nBuf[0] + 1); - if (code) goto _exit; - - int64_t ret = LZ4_compress_default(pCmprsor->aBuf[0], pCmprsor->aBuf[1] + 1, pCmprsor->nBuf[0], pCmprsor->nBuf[0]); - if (ret) { - pCmprsor->aBuf[1][0] = 0; - pCmprsor->nBuf[1] = ret + 1; - } else { - pCmprsor->aBuf[1][0] = 1; - memcpy(pCmprsor->aBuf[1] + 1, pCmprsor->aBuf[0], pCmprsor->nBuf[0]); - pCmprsor->nBuf[1] = pCmprsor->nBuf[0] + 1; - } - - *ppData = pCmprsor->aBuf[1]; - *nData = pCmprsor->nBuf[1]; - } else { - *ppData = pCmprsor->aBuf[0]; - *nData = pCmprsor->nBuf[0]; - } - -_exit: - return code; -} - -int32_t tCompress(SCompressor *pCmprsor, void *pData, int64_t nData) { - int32_t code = 0; - // TODO - return code; -} \ No newline at end of file diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index ba877915b1..a0f5c45df2 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -50,6 +50,7 @@ #define _DEFAULT_SOURCE #include "tcompression.h" #include "lz4.h" +#include "tRealloc.h" #include "tlog.h" #ifdef TD_TSZ @@ -994,3 +995,499 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co return tdszDecompress(SZ_DOUBLE, input + 1, compressedSize - 1, nelements, output); } #endif + +/************************************************************************* + * STREAM COMPRESSION + *************************************************************************/ +#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) + +typedef struct { + int8_t type; + int8_t cmprAlg; + uint8_t *aBuf[2]; + int64_t nBuf[2]; + union { + // Timestamp ---- + struct { + int8_t ts_copy; + int32_t ts_n; + int64_t ts_prev_val; + int64_t ts_prev_delta; + uint8_t *ts_flag_p; + }; + // Integer ---- + struct { + int8_t i_copy; + int32_t i_n; + int64_t i_prev; + int32_t i_selector; + int32_t i_nele; + }; + // Float ---- + struct { + int32_t f_n; + uint32_t f_prev; + uint8_t *f_flag_p; + }; + // Double ---- + struct { + int32_t d_n; + uint64_t d_prev; + uint8_t *d_flag_p; + }; + // Bool ---- + struct { + int32_t bool_n; + }; + // Binary ---- + struct { + int32_t b_n; + }; + }; +} SCompressor; + +// Timestamp ===================================================== +static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { + int32_t code = 0; + + if (pCmprsor->ts_n) { + code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * (pCmprsor->ts_n + 1)); + if (code) return code; + pCmprsor->nBuf[1] = 1; + + int64_t n = 1; + int64_t valPrev; + int64_t delPrev; + uint64_t vZigzag; + while (n < pCmprsor->nBuf[0]) { + uint8_t n1 = pCmprsor->aBuf[0][0] & 0xf; + uint8_t n2 = pCmprsor->aBuf[0][0] >> 4; + + n++; + + vZigzag = 0; + for (uint8_t i = 0; i < n1; i++) { + vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i)); + n++; + } + int64_t delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag); + if (n == 2) { + delPrev = 0; + valPrev = delta_of_delta; + } else { + delPrev = delta_of_delta + delPrev; + valPrev = delPrev + valPrev; + } + + memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &valPrev, sizeof(int64_t)); + pCmprsor->nBuf[1] += sizeof(int64_t); + + if (n >= pCmprsor->nBuf[0]) break; + + vZigzag = 0; + for (uint8_t i = 0; i < n2; i++) { + vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i)); + n++; + } + delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag); + delPrev = delta_of_delta + delPrev; + valPrev = delPrev + valPrev; + } + + uint8_t *pBuf = pCmprsor->aBuf[0]; + pCmprsor->aBuf[0] = pCmprsor->aBuf[1]; + pCmprsor->aBuf[1] = pBuf; + pCmprsor->nBuf[0] = pCmprsor->nBuf[1]; + } else { + // TODO + } + + pCmprsor->aBuf[0][0] = 0; + pCmprsor->ts_copy = 1; + + return code; +} +static int32_t tCompTimestamp(SCompressor *pCmprsor, TSKEY ts) { + int32_t code = 0; + + if (pCmprsor->ts_n == 0) { + pCmprsor->ts_prev_val = ts; + pCmprsor->ts_prev_delta = -ts; + } + + if (pCmprsor->ts_copy) goto _copy_exit; + + if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) { + code = tCompSetCopyMode(pCmprsor); + if (code) return code; + goto _copy_exit; + } + + int64_t delta = ts - pCmprsor->ts_prev_val; + + if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) { + code = tCompSetCopyMode(pCmprsor); + if (code) return code; + goto _copy_exit; + } + + int64_t delta_of_delta = delta - pCmprsor->ts_prev_delta; + uint64_t zigzag_value = ZIGZAG_ENCODE(int64_t, delta_of_delta); + + pCmprsor->ts_prev_val = ts; + pCmprsor->ts_prev_delta = delta; + + if (pCmprsor->ts_n & 0x1 == 0) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17 /*sizeof(int64_t) * 2 + 1*/); + if (code) return code; + + pCmprsor->ts_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p[0] = 0; + + while (zigzag_value) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (zigzag_value & 0xff); + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p[0]++; + } + } else { + while (zigzag_value) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (zigzag_value & 0xff); + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p += (uint8_t)0x10; + } + } + + pCmprsor->ts_n++; + return code; + +_copy_exit: + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(int64_t)); + if (code) return code; + + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts)); + pCmprsor->nBuf[0] += sizeof(ts); + + pCmprsor->ts_n++; + return code; +} + +// Integer ===================================================== +#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) +static const char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; +static const int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; +static const char bit_to_selector[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 12, + 13, 13, 13, 13, 13, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, + 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, + 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; + +static int32_t tCompI64(SCompressor *pCmprsor, int64_t val) { + int32_t code = 0; + + if (pCmprsor->i_copy == 1) goto _copy_cmpr; + + if (!I64_SAFE_ADD(val, pCmprsor->i_prev)) { + // TODO + goto _copy_cmpr; + } + + int64_t diff = val - pCmprsor->i_prev; + uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, diff); + + if (vZigzag >= SIMPLE8B_MAX) { + // TODO + goto _copy_cmpr; + } + + int64_t nBit; + if (vZigzag) { + nBit = 64 - BUILDIN_CLZL(vZigzag); + } else { + nBit = 0; + } + + if (pCmprsor->i_nele + 1 <= selector_to_elems[pCmprsor->i_selector] && + pCmprsor->i_nele + 1 <= selector_to_elems[bit_to_selector[nBit]]) { + if (pCmprsor->i_selector < bit_to_selector[nBit]) { + pCmprsor->i_selector = bit_to_selector[nBit]; + } + pCmprsor->i_nele++; + } else { + while (pCmprsor->i_nele < selector_to_elems[pCmprsor->i_selector]) { + pCmprsor->i_selector++; + } + pCmprsor->i_nele = selector_to_elems[pCmprsor->i_selector]; + + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t)); + if (code) return code; + + uint64_t *bp = (uint64_t *)(pCmprsor->aBuf[0] + pCmprsor->nBuf[0]); + pCmprsor->nBuf[0] += sizeof(uint64_t); + bp[0] = pCmprsor->i_selector; + for (int32_t iVal = 0; iVal < pCmprsor->i_nele; iVal++) { + /* code */ + } + + // reset and continue + } + + return code; + +_copy_cmpr: + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 0 /*tDataTypes[pCmprsor->type].bytes (todo)*/); + if (code) return code; + + // memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], NULL /* todo */, 0 /*tDataTypes[pCmprsor->type].bytes (todo)*/); + // pCmprsor->nBuf[0] += tDataTypes[pCmprsor->type].bytes; + + return code; +} + +// Float ===================================================== +static int32_t tCompFloat(SCompressor *pCmprsor, float f) { + int32_t code = 0; + + union { + float f; + uint32_t u; + } val = {.f = f}; + + uint32_t diff = val.u ^ pCmprsor->f_prev; + pCmprsor->f_prev = val.u; + + int32_t clz, ctz; + if (diff) { + clz = BUILDIN_CLZ(diff); + ctz = BUILDIN_CTZ(diff); + } else { + clz = sizeof(uint32_t); + ctz = sizeof(uint32_t); + } + + if (pCmprsor->f_n & 0x1 == 0) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 9 /* sizeof(float) * 2 + 1 */); + if (code) return code; + + pCmprsor->f_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; + pCmprsor->nBuf[0]++; + + if (clz < ctz) { + uint8_t nBytes = sizeof(uint32_t) - ctz / BITS_PER_BYTE; + pCmprsor->f_flag_p[0] = (0x08 | (nBytes - 1)); + diff >>= (32 - nBytes * BITS_PER_BYTE); + } else { + uint8_t nBytes = sizeof(uint32_t) - clz / BITS_PER_BYTE; + pCmprsor->f_flag_p[0] = nBytes - 1; + } + } else { + if (clz < ctz) { + uint8_t nBytes = sizeof(uint32_t) - ctz / BITS_PER_BYTE; + pCmprsor->f_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4); + diff >>= (32 - nBytes * BITS_PER_BYTE); + } else { + uint8_t nBytes = sizeof(uint32_t) - clz / BITS_PER_BYTE; + pCmprsor->f_flag_p[0] |= ((nBytes - 1) << 4); + } + } + + while (diff) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff); + pCmprsor->nBuf[0]++; + diff >>= BITS_PER_BYTE; + } + + pCmprsor->f_n++; + return code; +} + +// Double ===================================================== +static int32_t tCompDouble(SCompressor *pCmprsor, double d) { + int32_t code = 0; + + union { + double d; + uint64_t u; + } val = {.d = d}; + + uint64_t diff = val.u ^ pCmprsor->d_prev; + pCmprsor->d_prev = val.u; + + int32_t clz, ctz; + if (diff) { + clz = BUILDIN_CLZL(diff); + ctz = BUILDIN_CTZL(diff); + } else { + clz = sizeof(uint64_t); + ctz = sizeof(uint64_t); + } + + if (pCmprsor->d_n & 0x1 == 0) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17 /* sizeof(double) * 2 + 1 */); + if (code) return code; + + pCmprsor->d_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; + pCmprsor->nBuf[0]++; + + if (clz < ctz) { + uint8_t nBytes = sizeof(uint64_t) - ctz / BITS_PER_BYTE; + pCmprsor->d_flag_p[0] = (0x08 | (nBytes - 1)); + diff >>= (64 - nBytes * BITS_PER_BYTE); + } else { + uint8_t nBytes = sizeof(uint64_t) - clz / BITS_PER_BYTE; + pCmprsor->d_flag_p[0] = nBytes - 1; + } + } else { + if (clz < ctz) { + uint8_t nBytes = sizeof(uint64_t) - ctz / BITS_PER_BYTE; + pCmprsor->d_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4); + diff >>= (64 - nBytes * BITS_PER_BYTE); + } else { + uint8_t nBytes = sizeof(uint64_t) - clz / BITS_PER_BYTE; + pCmprsor->d_flag_p[0] |= ((nBytes - 1) << 4); + } + } + + while (diff) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff); + pCmprsor->nBuf[0]++; + diff >>= BITS_PER_BYTE; + } + + pCmprsor->d_n++; + return code; +} + +// Binary ===================================================== +static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t nData) { + int32_t code = 0; + + if (nData) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData); + if (code) return code; + + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); + pCmprsor->nBuf[0] += nData; + } + pCmprsor->b_n++; + + return code; +} + +// Bool ===================================================== +static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; + +static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { + int32_t code = 0; + + int32_t mod4 = pCmprsor->bool_n & 3; + if (vBool) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] |= BOOL_CMPR_TABLE[mod4]; + } + pCmprsor->bool_n++; + if (mod4 == 3) { + pCmprsor->nBuf[0]++; + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = 0; + + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0]); + if (code) goto _exit; + } + +_exit: + return code; +} + +// SCompressor ===================================================== +int32_t tCompressorCreate(SCompressor **ppCmprsor) { + int32_t code = 0; + + *ppCmprsor = (SCompressor *)taosMemoryCalloc(1, sizeof(SCompressor)); + if ((*ppCmprsor) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + code = tRealloc(&(*ppCmprsor)->aBuf[0], 1024); + if (code) { + taosMemoryFree(*ppCmprsor); + *ppCmprsor = NULL; + goto _exit; + } + +_exit: + return code; +} + +int32_t tCompressorDestroy(SCompressor *pCmprsor) { + int32_t code = 0; + + if (pCmprsor) { + int32_t nBuf = sizeof(pCmprsor->aBuf) / sizeof(pCmprsor->aBuf[0]); + for (int32_t iBuf = 0; iBuf < nBuf; iBuf++) { + tFree(pCmprsor->aBuf[iBuf]); + } + + taosMemoryFree(pCmprsor); + } + + return code; +} + +int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { + int32_t code = 0; + + pCmprsor->type = type; + pCmprsor->cmprAlg = cmprAlg; + pCmprsor->nBuf[0] = 0; // (todo) may or may not +/- 1 + + switch (type) { + case TSDB_DATA_TYPE_TIMESTAMP: + pCmprsor->ts_copy = 0; + pCmprsor->ts_n = 0; + break; + case TSDB_DATA_TYPE_BOOL: + pCmprsor->bool_n = 0; + pCmprsor->aBuf[0][0] = 0; + break; + case TSDB_DATA_TYPE_BINARY: + pCmprsor->b_n = 0; + break; + default: + break; + } + + return code; +} + +int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData) { + int32_t code = 0; + + if (pCmprsor->cmprAlg == TWO_STAGE_COMP /*|| IS_VAR_DATA_TYPE(pCmprsor->type)*/) { + code = tRealloc(&pCmprsor->aBuf[1], pCmprsor->nBuf[0] + 1); + if (code) goto _exit; + + int64_t ret = LZ4_compress_default(pCmprsor->aBuf[0], pCmprsor->aBuf[1] + 1, pCmprsor->nBuf[0], pCmprsor->nBuf[0]); + if (ret) { + pCmprsor->aBuf[1][0] = 0; + pCmprsor->nBuf[1] = ret + 1; + } else { + pCmprsor->aBuf[1][0] = 1; + memcpy(pCmprsor->aBuf[1] + 1, pCmprsor->aBuf[0], pCmprsor->nBuf[0]); + pCmprsor->nBuf[1] = pCmprsor->nBuf[0] + 1; + } + + *ppData = pCmprsor->aBuf[1]; + *nData = pCmprsor->nBuf[1]; + } else { + *ppData = pCmprsor->aBuf[0]; + *nData = pCmprsor->nBuf[0]; + } + +_exit: + return code; +} + +int32_t tCompress(SCompressor *pCmprsor, void *pData, int64_t nData) { + int32_t code = 0; + // TODO + return code; +} \ No newline at end of file From f2cd319df22f845ab001b8f56f2e049d79ef5fff Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 19 Sep 2022 12:00:55 +0800 Subject: [PATCH 08/13] more code --- source/util/src/tcompression.c | 190 +++++++++++++++++---------------- 1 file changed, 97 insertions(+), 93 deletions(-) diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index a0f5c45df2..75c1997428 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -1004,12 +1004,12 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co typedef struct { int8_t type; int8_t cmprAlg; + int8_t autoAlloc; uint8_t *aBuf[2]; int64_t nBuf[2]; union { // Timestamp ---- struct { - int8_t ts_copy; int32_t ts_n; int64_t ts_prev_val; int64_t ts_prev_delta; @@ -1051,124 +1051,123 @@ static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { int32_t code = 0; if (pCmprsor->ts_n) { - code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * (pCmprsor->ts_n + 1)); - if (code) return code; - pCmprsor->nBuf[1] = 1; + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * pCmprsor->ts_n); + if (code) return code; + } + pCmprsor->nBuf[1] = 0; int64_t n = 1; - int64_t valPrev; - int64_t delPrev; + int64_t value; + int64_t delta; uint64_t vZigzag; while (n < pCmprsor->nBuf[0]) { - uint8_t n1 = pCmprsor->aBuf[0][0] & 0xf; - uint8_t n2 = pCmprsor->aBuf[0][0] >> 4; + uint8_t aN[2]; + aN[0] = pCmprsor->aBuf[0][n] & 0xf; + aN[1] = pCmprsor->aBuf[0][n] >> 4; n++; - vZigzag = 0; - for (uint8_t i = 0; i < n1; i++) { - vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i)); - n++; - } - int64_t delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag); - if (n == 2) { - delPrev = 0; - valPrev = delta_of_delta; - } else { - delPrev = delta_of_delta + delPrev; - valPrev = delPrev + valPrev; - } + for (int32_t i = 0; i < 2; i++) { + vZigzag = 0; + for (uint8_t j = 0; j < aN[i]; j++) { + vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (8 * j)); + n++; + } - memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &valPrev, sizeof(int64_t)); - pCmprsor->nBuf[1] += sizeof(int64_t); + int64_t delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag); + if (pCmprsor->nBuf[1] == 0) { + delta = 0; + value = delta_of_delta; + } else { + delta = delta_of_delta + delta; + value = delta + value; + } - if (n >= pCmprsor->nBuf[0]) break; + memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &value, sizeof(int64_t)); + pCmprsor->nBuf[1] += sizeof(int64_t); - vZigzag = 0; - for (uint8_t i = 0; i < n2; i++) { - vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i)); - n++; + if (n >= pCmprsor->nBuf[0]) break; } - delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag); - delPrev = delta_of_delta + delPrev; - valPrev = delPrev + valPrev; } - uint8_t *pBuf = pCmprsor->aBuf[0]; - pCmprsor->aBuf[0] = pCmprsor->aBuf[1]; - pCmprsor->aBuf[1] = pBuf; - pCmprsor->nBuf[0] = pCmprsor->nBuf[1]; - } else { - // TODO - } + ASSERT(n == pCmprsor->nBuf[0]); - pCmprsor->aBuf[0][0] = 0; - pCmprsor->ts_copy = 1; + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[1] + 1); + if (code) return code; + } + memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->aBuf[1], pCmprsor->nBuf[1]); + pCmprsor->nBuf[0] = 1 + pCmprsor->nBuf[1]; + } + pCmprsor->aBuf[0][0] = MODE_NOCOMPRESS; return code; } -static int32_t tCompTimestamp(SCompressor *pCmprsor, TSKEY ts) { +static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) { int32_t code = 0; - if (pCmprsor->ts_n == 0) { + ASSERT(pCmprsor->type == TSDB_DATA_TYPE_TIMESTAMP); + + if (pCmprsor->aBuf[0][0] == MODE_COMPRESS) { + if (pCmprsor->ts_n == 0) { + pCmprsor->ts_prev_val = ts; + pCmprsor->ts_prev_delta = -ts; + } + + if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) { + code = tCompSetCopyMode(pCmprsor); + if (code) return code; + goto _copy_cmpr; + } + int64_t delta = ts - pCmprsor->ts_prev_val; + + if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) { + code = tCompSetCopyMode(pCmprsor); + if (code) return code; + goto _copy_cmpr; + } + int64_t delta_of_delta = delta - pCmprsor->ts_prev_delta; + uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, delta_of_delta); + pCmprsor->ts_prev_val = ts; - pCmprsor->ts_prev_delta = -ts; - } + pCmprsor->ts_prev_delta = delta; - if (pCmprsor->ts_copy) goto _copy_exit; + if ((pCmprsor->ts_n & 0x1) == 0) { + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17); + if (code) return code; + } - if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) { - code = tCompSetCopyMode(pCmprsor); - if (code) return code; - goto _copy_exit; - } - - int64_t delta = ts - pCmprsor->ts_prev_val; - - if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) { - code = tCompSetCopyMode(pCmprsor); - if (code) return code; - goto _copy_exit; - } - - int64_t delta_of_delta = delta - pCmprsor->ts_prev_delta; - uint64_t zigzag_value = ZIGZAG_ENCODE(int64_t, delta_of_delta); - - pCmprsor->ts_prev_val = ts; - pCmprsor->ts_prev_delta = delta; - - if (pCmprsor->ts_n & 0x1 == 0) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17 /*sizeof(int64_t) * 2 + 1*/); - if (code) return code; - - pCmprsor->ts_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; - pCmprsor->nBuf[0]++; - pCmprsor->ts_flag_p[0] = 0; - - while (zigzag_value) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (zigzag_value & 0xff); + pCmprsor->ts_flag_p = pCmprsor->aBuf[0] + pCmprsor->nBuf[0]; pCmprsor->nBuf[0]++; - pCmprsor->ts_flag_p[0]++; + pCmprsor->ts_flag_p[0] = 0; + while (vZigzag) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff); + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p[0]++; + vZigzag >>= 8; + } + } else { + while (vZigzag) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff); + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p[0] += 0x10; + vZigzag >>= 8; + } } } else { - while (zigzag_value) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (zigzag_value & 0xff); - pCmprsor->nBuf[0]++; - pCmprsor->ts_flag_p += (uint8_t)0x10; + _copy_cmpr: + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(ts)); + if (code) return code; } + + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts)); + pCmprsor->nBuf[0] += sizeof(ts); } - pCmprsor->ts_n++; - return code; -_copy_exit: - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(int64_t)); - if (code) return code; - - memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts)); - pCmprsor->nBuf[0] += sizeof(ts); - - pCmprsor->ts_n++; return code; } @@ -1432,17 +1431,22 @@ int32_t tCompressorDestroy(SCompressor *pCmprsor) { return code; } -int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { +int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int8_t autoAlloc) { int32_t code = 0; pCmprsor->type = type; pCmprsor->cmprAlg = cmprAlg; - pCmprsor->nBuf[0] = 0; // (todo) may or may not +/- 1 + pCmprsor->autoAlloc = autoAlloc; switch (type) { case TSDB_DATA_TYPE_TIMESTAMP: - pCmprsor->ts_copy = 0; pCmprsor->ts_n = 0; + pCmprsor->ts_prev_val = 0; + pCmprsor->ts_prev_delta = 0; + pCmprsor->ts_flag_p = NULL; + pCmprsor->aBuf[0][0] = MODE_COMPRESS; + pCmprsor->nBuf[0] = 1; + pCmprsor->nBuf[1] = 0; break; case TSDB_DATA_TYPE_BOOL: pCmprsor->bool_n = 0; From 61496324e7cfb86ab06d64af42d741ff6f994e91 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 19 Sep 2022 14:39:21 +0800 Subject: [PATCH 09/13] more code --- source/util/src/tcompression.c | 134 ++++++++++++++++++++------------- 1 file changed, 80 insertions(+), 54 deletions(-) diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 75c1997428..4bec2f16ed 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -815,24 +815,24 @@ int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, cha uint32_t predicted = prev_value; uint32_t diff = curr.bits ^ predicted; - int32_t leading_zeros = FLOAT_BYTES * BITS_PER_BYTE; - int32_t trailing_zeros = leading_zeros; + int32_t clz = FLOAT_BYTES * BITS_PER_BYTE; + int32_t ctz = clz; if (diff) { - trailing_zeros = BUILDIN_CTZ(diff); - leading_zeros = BUILDIN_CLZ(diff); + ctz = BUILDIN_CTZ(diff); + clz = BUILDIN_CLZ(diff); } uint8_t nbytes = 0; uint8_t flag; - if (trailing_zeros > leading_zeros) { - nbytes = (uint8_t)(FLOAT_BYTES - trailing_zeros / BITS_PER_BYTE); + if (ctz > clz) { + nbytes = (uint8_t)(FLOAT_BYTES - ctz / BITS_PER_BYTE); if (nbytes > 0) nbytes--; flag = ((uint8_t)1 << 3) | nbytes; } else { - nbytes = (uint8_t)(FLOAT_BYTES - leading_zeros / BITS_PER_BYTE); + nbytes = (uint8_t)(FLOAT_BYTES - clz / BITS_PER_BYTE); if (nbytes > 0) nbytes--; flag = nbytes; } @@ -1041,7 +1041,7 @@ typedef struct { }; // Binary ---- struct { - int32_t b_n; + int32_t binary_n; }; }; } SCompressor; @@ -1100,7 +1100,7 @@ static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->aBuf[1], pCmprsor->nBuf[1]); pCmprsor->nBuf[0] = 1 + pCmprsor->nBuf[1]; } - pCmprsor->aBuf[0][0] = MODE_NOCOMPRESS; + pCmprsor->aBuf[0][0] = 0; return code; } @@ -1109,7 +1109,7 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) { ASSERT(pCmprsor->type == TSDB_DATA_TYPE_TIMESTAMP); - if (pCmprsor->aBuf[0][0] == MODE_COMPRESS) { + if (pCmprsor->aBuf[0][0] == 1) { if (pCmprsor->ts_n == 0) { pCmprsor->ts_prev_val = ts; pCmprsor->ts_prev_delta = -ts; @@ -1259,43 +1259,47 @@ static int32_t tCompFloat(SCompressor *pCmprsor, float f) { clz = BUILDIN_CLZ(diff); ctz = BUILDIN_CTZ(diff); } else { - clz = sizeof(uint32_t); - ctz = sizeof(uint32_t); + clz = 32; + ctz = 32; } - if (pCmprsor->f_n & 0x1 == 0) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 9 /* sizeof(float) * 2 + 1 */); - if (code) return code; + uint8_t nBytes; + if (clz < ctz) { + nBytes = sizeof(uint32_t) - ctz / BITS_PER_BYTE; + if (nBytes) diff >>= (32 - nBytes * BITS_PER_BYTE); + } else { + nBytes = sizeof(uint32_t) - clz / BITS_PER_BYTE; + } + if (nBytes == 0) nBytes++; + + if ((pCmprsor->f_n & 0x1) == 0) { + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 9); + if (code) return code; + } pCmprsor->f_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; pCmprsor->nBuf[0]++; if (clz < ctz) { - uint8_t nBytes = sizeof(uint32_t) - ctz / BITS_PER_BYTE; pCmprsor->f_flag_p[0] = (0x08 | (nBytes - 1)); - diff >>= (32 - nBytes * BITS_PER_BYTE); } else { - uint8_t nBytes = sizeof(uint32_t) - clz / BITS_PER_BYTE; pCmprsor->f_flag_p[0] = nBytes - 1; } } else { if (clz < ctz) { - uint8_t nBytes = sizeof(uint32_t) - ctz / BITS_PER_BYTE; pCmprsor->f_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4); - diff >>= (32 - nBytes * BITS_PER_BYTE); } else { - uint8_t nBytes = sizeof(uint32_t) - clz / BITS_PER_BYTE; pCmprsor->f_flag_p[0] |= ((nBytes - 1) << 4); } } - - while (diff) { + for (; nBytes; nBytes--) { pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff); pCmprsor->nBuf[0]++; diff >>= BITS_PER_BYTE; } - pCmprsor->f_n++; + return code; } @@ -1316,43 +1320,47 @@ static int32_t tCompDouble(SCompressor *pCmprsor, double d) { clz = BUILDIN_CLZL(diff); ctz = BUILDIN_CTZL(diff); } else { - clz = sizeof(uint64_t); - ctz = sizeof(uint64_t); + clz = 64; + ctz = 64; } - if (pCmprsor->d_n & 0x1 == 0) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17 /* sizeof(double) * 2 + 1 */); - if (code) return code; + uint8_t nBytes; + if (clz < ctz) { + nBytes = sizeof(uint64_t) - ctz / BITS_PER_BYTE; + if (nBytes) diff >>= (64 - nBytes * BITS_PER_BYTE); + } else { + nBytes = sizeof(uint64_t) - clz / BITS_PER_BYTE; + } + if (nBytes == 0) nBytes++; + + if ((pCmprsor->d_n & 0x1) == 0) { + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17); + if (code) return code; + } pCmprsor->d_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; pCmprsor->nBuf[0]++; if (clz < ctz) { - uint8_t nBytes = sizeof(uint64_t) - ctz / BITS_PER_BYTE; pCmprsor->d_flag_p[0] = (0x08 | (nBytes - 1)); - diff >>= (64 - nBytes * BITS_PER_BYTE); } else { - uint8_t nBytes = sizeof(uint64_t) - clz / BITS_PER_BYTE; pCmprsor->d_flag_p[0] = nBytes - 1; } } else { if (clz < ctz) { - uint8_t nBytes = sizeof(uint64_t) - ctz / BITS_PER_BYTE; pCmprsor->d_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4); - diff >>= (64 - nBytes * BITS_PER_BYTE); } else { - uint8_t nBytes = sizeof(uint64_t) - clz / BITS_PER_BYTE; pCmprsor->d_flag_p[0] |= ((nBytes - 1) << 4); } } - - while (diff) { + for (; nBytes; nBytes--) { pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff); pCmprsor->nBuf[0]++; diff >>= BITS_PER_BYTE; } - pCmprsor->d_n++; + return code; } @@ -1361,13 +1369,15 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t int32_t code = 0; if (nData) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData); - if (code) return code; + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData); + if (code) return code; + } memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); pCmprsor->nBuf[0] += nData; } - pCmprsor->b_n++; + pCmprsor->binary_n++; return code; } @@ -1379,19 +1389,21 @@ static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { int32_t code = 0; int32_t mod4 = pCmprsor->bool_n & 3; + if (mod4 == 0) { + pCmprsor->nBuf[0]++; + + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0]); + if (code) return code; + } + + pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] = 0; + } if (vBool) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] |= BOOL_CMPR_TABLE[mod4]; + pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] |= BOOL_CMPR_TABLE[mod4]; } pCmprsor->bool_n++; - if (mod4 == 3) { - pCmprsor->nBuf[0]++; - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = 0; - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0]); - if (code) goto _exit; - } - -_exit: return code; } @@ -1444,16 +1456,30 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int pCmprsor->ts_prev_val = 0; pCmprsor->ts_prev_delta = 0; pCmprsor->ts_flag_p = NULL; - pCmprsor->aBuf[0][0] = MODE_COMPRESS; + pCmprsor->aBuf[0][0] = 1; // For timestamp, 1 means compressed, 0 otherwise pCmprsor->nBuf[0] = 1; - pCmprsor->nBuf[1] = 0; break; case TSDB_DATA_TYPE_BOOL: pCmprsor->bool_n = 0; - pCmprsor->aBuf[0][0] = 0; + pCmprsor->nBuf[0] = 0; break; case TSDB_DATA_TYPE_BINARY: - pCmprsor->b_n = 0; + pCmprsor->binary_n = 0; + pCmprsor->nBuf[0] = 0; + break; + case TSDB_DATA_TYPE_FLOAT: + pCmprsor->f_n = 0; + pCmprsor->f_prev = 0; + pCmprsor->f_flag_p = NULL; + pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) + pCmprsor->nBuf[0] = 1; + break; + case TSDB_DATA_TYPE_DOUBLE: + pCmprsor->d_n = 0; + pCmprsor->d_prev = 0; + pCmprsor->d_flag_p = NULL; + pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) + pCmprsor->nBuf[0] = 1; break; default: break; From e33c255ad189fda15d0c9ae451ba432546894ea5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 19 Sep 2022 15:57:58 +0800 Subject: [PATCH 10/13] more code --- source/util/src/tcompression.c | 243 +++++++++++++++++++++------------ 1 file changed, 156 insertions(+), 87 deletions(-) diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 4bec2f16ed..748ab8c975 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -1000,59 +1000,76 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co * STREAM COMPRESSION *************************************************************************/ #define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) +typedef struct SCompressor SCompressor; -typedef struct { +static struct { + int8_t type; + int32_t bytes; + int8_t isVarLen; + int32_t (*cmprFn)(SCompressor *, const void *, int32_t nData); +} DATA_TYPE_INFO[] = { + {TSDB_DATA_TYPE_NULL, 0, 0}, // TSDB_DATA_TYPE_NULL + {TSDB_DATA_TYPE_BOOL, 1, 0}, // TSDB_DATA_TYPE_BOOL + {TSDB_DATA_TYPE_TINYINT, 1, 0}, // TSDB_DATA_TYPE_TINYINT + {TSDB_DATA_TYPE_SMALLINT, 2, 0}, // TSDB_DATA_TYPE_SMALLINT + {TSDB_DATA_TYPE_INT, 4, 0}, // TSDB_DATA_TYPE_INT + {TSDB_DATA_TYPE_BIGINT, 8, 0}, // TSDB_DATA_TYPE_BIGINT + {TSDB_DATA_TYPE_FLOAT, 4, 0}, // TSDB_DATA_TYPE_FLOAT + {TSDB_DATA_TYPE_DOUBLE, 8, 0}, // TSDB_DATA_TYPE_DOUBLE + {TSDB_DATA_TYPE_VARCHAR, 1, 1}, // TSDB_DATA_TYPE_VARCHAR + {TSDB_DATA_TYPE_TIMESTAMP, 8, 0}, // pTSDB_DATA_TYPE_TIMESTAMP + {TSDB_DATA_TYPE_NCHAR, 1, 1}, // TSDB_DATA_TYPE_NCHAR + {TSDB_DATA_TYPE_UTINYINT, 1, 0}, // TSDB_DATA_TYPE_UTINYINT + {TSDB_DATA_TYPE_USMALLINT, 2, 0}, // TSDB_DATA_TYPE_USMALLINT + {TSDB_DATA_TYPE_UINT, 4, 0}, // TSDB_DATA_TYPE_UINT + {TSDB_DATA_TYPE_UBIGINT, 8, 0}, // TSDB_DATA_TYPE_UBIGINT + {TSDB_DATA_TYPE_JSON, 1, 1}, // TSDB_DATA_TYPE_JSON + {TSDB_DATA_TYPE_VARBINARY, 1, 1}, // TSDB_DATA_TYPE_VARBINARY + {TSDB_DATA_TYPE_DECIMAL, 1, 1}, // TSDB_DATA_TYPE_DECIMAL + {TSDB_DATA_TYPE_BLOB, 1, 1}, // TSDB_DATA_TYPE_BLOB + {TSDB_DATA_TYPE_MEDIUMBLOB, 1, 1}, // TSDB_DATA_TYPE_MEDIUMBLOB +}; + +struct SCompressor { int8_t type; int8_t cmprAlg; int8_t autoAlloc; + int32_t nVal; uint8_t *aBuf[2]; int64_t nBuf[2]; union { // Timestamp ---- struct { - int32_t ts_n; int64_t ts_prev_val; int64_t ts_prev_delta; uint8_t *ts_flag_p; }; // Integer ---- struct { - int8_t i_copy; - int32_t i_n; int64_t i_prev; int32_t i_selector; int32_t i_nele; }; // Float ---- struct { - int32_t f_n; uint32_t f_prev; uint8_t *f_flag_p; }; // Double ---- struct { - int32_t d_n; uint64_t d_prev; uint8_t *d_flag_p; }; - // Bool ---- - struct { - int32_t bool_n; - }; - // Binary ---- - struct { - int32_t binary_n; - }; }; -} SCompressor; +}; // Timestamp ===================================================== static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { int32_t code = 0; - if (pCmprsor->ts_n) { + if (pCmprsor->nVal) { if (pCmprsor->autoAlloc) { - code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * pCmprsor->ts_n); + code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * pCmprsor->nVal); if (code) return code; } pCmprsor->nBuf[1] = 0; @@ -1110,7 +1127,7 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) { ASSERT(pCmprsor->type == TSDB_DATA_TYPE_TIMESTAMP); if (pCmprsor->aBuf[0][0] == 1) { - if (pCmprsor->ts_n == 0) { + if (pCmprsor->nVal == 0) { pCmprsor->ts_prev_val = ts; pCmprsor->ts_prev_delta = -ts; } @@ -1133,7 +1150,7 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) { pCmprsor->ts_prev_val = ts; pCmprsor->ts_prev_delta = delta; - if ((pCmprsor->ts_n & 0x1) == 0) { + if ((pCmprsor->nVal & 0x1) == 0) { if (pCmprsor->autoAlloc) { code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17); if (code) return code; @@ -1166,7 +1183,7 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) { memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts)); pCmprsor->nBuf[0] += sizeof(ts); } - pCmprsor->ts_n++; + pCmprsor->nVal++; return code; } @@ -1180,76 +1197,114 @@ static const char bit_to_selector[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; -static int32_t tCompI64(SCompressor *pCmprsor, int64_t val) { +static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; - if (pCmprsor->i_copy == 1) goto _copy_cmpr; + ASSERT(nData == DATA_TYPE_INFO[pCmprsor->type].bytes); - if (!I64_SAFE_ADD(val, pCmprsor->i_prev)) { - // TODO - goto _copy_cmpr; - } + if (pCmprsor->aBuf[0][0] == 0) { + int64_t val; - int64_t diff = val - pCmprsor->i_prev; - uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, diff); - - if (vZigzag >= SIMPLE8B_MAX) { - // TODO - goto _copy_cmpr; - } - - int64_t nBit; - if (vZigzag) { - nBit = 64 - BUILDIN_CLZL(vZigzag); - } else { - nBit = 0; - } - - if (pCmprsor->i_nele + 1 <= selector_to_elems[pCmprsor->i_selector] && - pCmprsor->i_nele + 1 <= selector_to_elems[bit_to_selector[nBit]]) { - if (pCmprsor->i_selector < bit_to_selector[nBit]) { - pCmprsor->i_selector = bit_to_selector[nBit]; + switch (pCmprsor->type) { + case TSDB_DATA_TYPE_TINYINT: + val = *(int8_t *)pData; + break; + case TSDB_DATA_TYPE_SMALLINT: + val = *(int16_t *)pData; + break; + case TSDB_DATA_TYPE_INT: + val = *(int32_t *)pData; + break; + case TSDB_DATA_TYPE_BIGINT: + val = *(int64_t *)pData; + break; + case TSDB_DATA_TYPE_UTINYINT: + val = *(uint8_t *)pData; + break; + case TSDB_DATA_TYPE_USMALLINT: + val = *(uint16_t *)pData; + break; + case TSDB_DATA_TYPE_UINT: + val = *(uint32_t *)pData; + break; + // case TSDB_DATA_TYPE_UBIGINT: + // val = *(int64_t *)pData; + // break; + default: + ASSERT(0); + break; } - pCmprsor->i_nele++; - } else { - while (pCmprsor->i_nele < selector_to_elems[pCmprsor->i_selector]) { - pCmprsor->i_selector++; - } - pCmprsor->i_nele = selector_to_elems[pCmprsor->i_selector]; - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t)); + if (!I64_SAFE_ADD(val, pCmprsor->i_prev)) { + // TODO + goto _copy_cmpr; + } + + int64_t diff = val - pCmprsor->i_prev; + uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, diff); + + if (vZigzag >= SIMPLE8B_MAX) { + // TODO + goto _copy_cmpr; + } + + int64_t nBit; + if (vZigzag) { + nBit = 64 - BUILDIN_CLZL(vZigzag); + } else { + nBit = 0; + } + + if (pCmprsor->i_nele + 1 <= selector_to_elems[pCmprsor->i_selector] && + pCmprsor->i_nele + 1 <= selector_to_elems[bit_to_selector[nBit]]) { + if (pCmprsor->i_selector < bit_to_selector[nBit]) { + pCmprsor->i_selector = bit_to_selector[nBit]; + } + pCmprsor->i_nele++; + pCmprsor->i_prev = val; + } else { + while (pCmprsor->i_nele < selector_to_elems[pCmprsor->i_selector]) { + pCmprsor->i_selector++; + } + pCmprsor->i_nele = selector_to_elems[pCmprsor->i_selector]; + + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t)); + if (code) return code; + + uint64_t *bp = (uint64_t *)(pCmprsor->aBuf[0] + pCmprsor->nBuf[0]); + pCmprsor->nBuf[0] += sizeof(uint64_t); + bp[0] = pCmprsor->i_selector; + for (int32_t iVal = 0; iVal < pCmprsor->i_nele; iVal++) { + /* code */ + } + + // reset and continue + pCmprsor->i_nele = 0; + pCmprsor->i_selector = 0; + } + } else { + _copy_cmpr: + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData); if (code) return code; - uint64_t *bp = (uint64_t *)(pCmprsor->aBuf[0] + pCmprsor->nBuf[0]); - pCmprsor->nBuf[0] += sizeof(uint64_t); - bp[0] = pCmprsor->i_selector; - for (int32_t iVal = 0; iVal < pCmprsor->i_nele; iVal++) { - /* code */ - } - - // reset and continue + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); + pCmprsor->nBuf[0] += nData; } - - return code; - -_copy_cmpr: - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 0 /*tDataTypes[pCmprsor->type].bytes (todo)*/); - if (code) return code; - - // memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], NULL /* todo */, 0 /*tDataTypes[pCmprsor->type].bytes (todo)*/); - // pCmprsor->nBuf[0] += tDataTypes[pCmprsor->type].bytes; + pCmprsor->nVal++; return code; } // Float ===================================================== -static int32_t tCompFloat(SCompressor *pCmprsor, float f) { +static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; + ASSERT(nData == sizeof(float)); + union { float f; uint32_t u; - } val = {.f = f}; + } val = {.f = *(float *)pData}; uint32_t diff = val.u ^ pCmprsor->f_prev; pCmprsor->f_prev = val.u; @@ -1272,7 +1327,7 @@ static int32_t tCompFloat(SCompressor *pCmprsor, float f) { } if (nBytes == 0) nBytes++; - if ((pCmprsor->f_n & 0x1) == 0) { + if ((pCmprsor->nVal & 0x1) == 0) { if (pCmprsor->autoAlloc) { code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 9); if (code) return code; @@ -1298,19 +1353,21 @@ static int32_t tCompFloat(SCompressor *pCmprsor, float f) { pCmprsor->nBuf[0]++; diff >>= BITS_PER_BYTE; } - pCmprsor->f_n++; + pCmprsor->nVal++; return code; } // Double ===================================================== -static int32_t tCompDouble(SCompressor *pCmprsor, double d) { +static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; + ASSERT(nData == sizeof(double)); + union { double d; uint64_t u; - } val = {.d = d}; + } val = {.d = *(double *)pData}; uint64_t diff = val.u ^ pCmprsor->d_prev; pCmprsor->d_prev = val.u; @@ -1333,7 +1390,7 @@ static int32_t tCompDouble(SCompressor *pCmprsor, double d) { } if (nBytes == 0) nBytes++; - if ((pCmprsor->d_n & 0x1) == 0) { + if ((pCmprsor->nVal & 0x1) == 0) { if (pCmprsor->autoAlloc) { code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17); if (code) return code; @@ -1359,13 +1416,13 @@ static int32_t tCompDouble(SCompressor *pCmprsor, double d) { pCmprsor->nBuf[0]++; diff >>= BITS_PER_BYTE; } - pCmprsor->d_n++; + pCmprsor->nVal++; return code; } // Binary ===================================================== -static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t nData) { +static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; if (nData) { @@ -1377,7 +1434,7 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); pCmprsor->nBuf[0] += nData; } - pCmprsor->binary_n++; + pCmprsor->nVal++; return code; } @@ -1385,10 +1442,12 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t // Bool ===================================================== static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; -static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { +static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; - int32_t mod4 = pCmprsor->bool_n & 3; + bool vBool = *(int8_t *)pData; + + int32_t mod4 = pCmprsor->nVal & 3; if (mod4 == 0) { pCmprsor->nBuf[0]++; @@ -1402,7 +1461,7 @@ static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { if (vBool) { pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] |= BOOL_CMPR_TABLE[mod4]; } - pCmprsor->bool_n++; + pCmprsor->nVal++; return code; } @@ -1449,10 +1508,10 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int pCmprsor->type = type; pCmprsor->cmprAlg = cmprAlg; pCmprsor->autoAlloc = autoAlloc; + pCmprsor->nVal = 0; switch (type) { case TSDB_DATA_TYPE_TIMESTAMP: - pCmprsor->ts_n = 0; pCmprsor->ts_prev_val = 0; pCmprsor->ts_prev_delta = 0; pCmprsor->ts_flag_p = NULL; @@ -1460,27 +1519,37 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int pCmprsor->nBuf[0] = 1; break; case TSDB_DATA_TYPE_BOOL: - pCmprsor->bool_n = 0; pCmprsor->nBuf[0] = 0; break; case TSDB_DATA_TYPE_BINARY: - pCmprsor->binary_n = 0; pCmprsor->nBuf[0] = 0; break; case TSDB_DATA_TYPE_FLOAT: - pCmprsor->f_n = 0; pCmprsor->f_prev = 0; pCmprsor->f_flag_p = NULL; pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) pCmprsor->nBuf[0] = 1; break; case TSDB_DATA_TYPE_DOUBLE: - pCmprsor->d_n = 0; pCmprsor->d_prev = 0; pCmprsor->d_flag_p = NULL; pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) pCmprsor->nBuf[0] = 1; break; + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_USMALLINT: + case TSDB_DATA_TYPE_UINT: + case TSDB_DATA_TYPE_UBIGINT: + pCmprsor->i_prev = 0; + pCmprsor->i_selector = 0; + pCmprsor->i_nele = 0; + pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) + pCmprsor->nBuf[0] = 1; + break; default: break; } From 747ce081b3f2982c70686052d6945de900fd1f96 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 19 Sep 2022 17:59:05 +0800 Subject: [PATCH 11/13] more code --- source/util/src/tcompression.c | 94 ++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 43 deletions(-) diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 748ab8c975..375f9db734 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -1046,9 +1046,12 @@ struct SCompressor { }; // Integer ---- struct { - int64_t i_prev; - int32_t i_selector; - int32_t i_nele; + int64_t i_prev; + int32_t i_selector; + int32_t i_start; + int32_t i_end; + uint64_t i_aZigzag[241]; + int8_t i_aBitN[241]; }; // Float ---- struct { @@ -1190,9 +1193,9 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) { // Integer ===================================================== #define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) -static const char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; -static const int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; -static const char bit_to_selector[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 12, +static const char BIT_PER_INTEGER[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; +static const int32_t SELECTOR_TO_ELEMS[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; +static const char BIT_TO_SELECTOR[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 12, 13, 13, 13, 13, 13, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; @@ -1227,60 +1230,64 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) case TSDB_DATA_TYPE_UINT: val = *(uint32_t *)pData; break; - // case TSDB_DATA_TYPE_UBIGINT: - // val = *(int64_t *)pData; - // break; + case TSDB_DATA_TYPE_UBIGINT: + val = *(int64_t *)pData; + break; default: ASSERT(0); break; } - if (!I64_SAFE_ADD(val, pCmprsor->i_prev)) { + if (!I64_SAFE_ADD(val, -pCmprsor->i_prev)) { // TODO goto _copy_cmpr; } int64_t diff = val - pCmprsor->i_prev; uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, diff); - if (vZigzag >= SIMPLE8B_MAX) { // TODO goto _copy_cmpr; } - int64_t nBit; - if (vZigzag) { - nBit = 64 - BUILDIN_CLZL(vZigzag); - } else { - nBit = 0; - } + int8_t nBit = (vZigzag) ? (64 - BUILDIN_CLZL(vZigzag)) : 0; + pCmprsor->i_prev = val; - if (pCmprsor->i_nele + 1 <= selector_to_elems[pCmprsor->i_selector] && - pCmprsor->i_nele + 1 <= selector_to_elems[bit_to_selector[nBit]]) { - if (pCmprsor->i_selector < bit_to_selector[nBit]) { - pCmprsor->i_selector = bit_to_selector[nBit]; + while (1) { + int32_t nEle = pCmprsor->i_end - pCmprsor->i_start; + + if (nEle + 1 <= SELECTOR_TO_ELEMS[pCmprsor->i_selector] && nEle + 1 <= SELECTOR_TO_ELEMS[BIT_TO_SELECTOR[nBit]]) { + if (pCmprsor->i_selector < BIT_TO_SELECTOR[nBit]) { + pCmprsor->i_selector = BIT_TO_SELECTOR[nBit]; + } + pCmprsor->i_end = (pCmprsor->i_end + 1) % 241; + pCmprsor->i_aZigzag[pCmprsor->i_end] = vZigzag; + pCmprsor->i_aBitN[pCmprsor->i_end] = nBit; + break; + } else { + while (nEle < SELECTOR_TO_ELEMS[pCmprsor->i_selector]) { + pCmprsor->i_selector++; + } + nEle = SELECTOR_TO_ELEMS[pCmprsor->i_selector]; + + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t)); + if (code) return code; + + // uint64_t *bp = (uint64_t *)(pCmprsor->aBuf[0] + pCmprsor->nBuf[0]); + // pCmprsor->nBuf[0] += sizeof(uint64_t); + // bp[0] = pCmprsor->i_selector; + // for (int32_t iVal = 0; iVal < pCmprsor->i_nele; iVal++) { + // /* code */ + // } + + // reset and continue + pCmprsor->i_selector = 0; + for (int32_t iVal = pCmprsor->i_start; iVal < pCmprsor->i_end; iVal++) { + if (pCmprsor->i_selector < BIT_TO_SELECTOR[pCmprsor->i_aBitN[iVal]]) { + pCmprsor->i_selector = BIT_TO_SELECTOR[pCmprsor->i_aBitN[iVal]]; + } + } } - pCmprsor->i_nele++; - pCmprsor->i_prev = val; - } else { - while (pCmprsor->i_nele < selector_to_elems[pCmprsor->i_selector]) { - pCmprsor->i_selector++; - } - pCmprsor->i_nele = selector_to_elems[pCmprsor->i_selector]; - - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t)); - if (code) return code; - - uint64_t *bp = (uint64_t *)(pCmprsor->aBuf[0] + pCmprsor->nBuf[0]); - pCmprsor->nBuf[0] += sizeof(uint64_t); - bp[0] = pCmprsor->i_selector; - for (int32_t iVal = 0; iVal < pCmprsor->i_nele; iVal++) { - /* code */ - } - - // reset and continue - pCmprsor->i_nele = 0; - pCmprsor->i_selector = 0; } } else { _copy_cmpr: @@ -1546,7 +1553,8 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int case TSDB_DATA_TYPE_UBIGINT: pCmprsor->i_prev = 0; pCmprsor->i_selector = 0; - pCmprsor->i_nele = 0; + pCmprsor->i_start = 0; + pCmprsor->i_end = 0; pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) pCmprsor->nBuf[0] = 1; break; From c90123c0d79171879d17f06c61d83c92d8a8e570 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 19 Sep 2022 18:19:41 +0800 Subject: [PATCH 12/13] more code --- source/util/src/tcompression.c | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 375f9db734..febd54820b 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -1193,9 +1193,9 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) { // Integer ===================================================== #define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) -static const char BIT_PER_INTEGER[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; +static const uint8_t BIT_PER_INTEGER[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; static const int32_t SELECTOR_TO_ELEMS[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; -static const char BIT_TO_SELECTOR[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 12, +static const uint8_t BIT_TO_SELECTOR[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 12, 13, 13, 13, 13, 13, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; @@ -1254,7 +1254,7 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) pCmprsor->i_prev = val; while (1) { - int32_t nEle = pCmprsor->i_end - pCmprsor->i_start; + int32_t nEle = (pCmprsor->i_end + 241 - pCmprsor->i_start) % 241; if (nEle + 1 <= SELECTOR_TO_ELEMS[pCmprsor->i_selector] && nEle + 1 <= SELECTOR_TO_ELEMS[BIT_TO_SELECTOR[nBit]]) { if (pCmprsor->i_selector < BIT_TO_SELECTOR[nBit]) { @@ -1270,19 +1270,23 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) } nEle = SELECTOR_TO_ELEMS[pCmprsor->i_selector]; - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t)); - if (code) return code; + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t)); + if (code) return code; + } - // uint64_t *bp = (uint64_t *)(pCmprsor->aBuf[0] + pCmprsor->nBuf[0]); - // pCmprsor->nBuf[0] += sizeof(uint64_t); - // bp[0] = pCmprsor->i_selector; - // for (int32_t iVal = 0; iVal < pCmprsor->i_nele; iVal++) { - // /* code */ - // } + uint64_t *bp = (uint64_t *)(pCmprsor->aBuf[0] + pCmprsor->nBuf[0]); + pCmprsor->nBuf[0] += sizeof(uint64_t); + bp[0] = pCmprsor->i_selector; + uint8_t bits = BIT_PER_INTEGER[pCmprsor->i_selector]; + for (int32_t iVal = 0; iVal < nEle; iVal++) { + bp[0] |= ((pCmprsor->i_aZigzag[pCmprsor->i_start] & ((((uint64_t)1) << bits) - 1)) << (bits * iVal + 4)); + pCmprsor->i_start = (pCmprsor->i_start + 1) % 241; + } // reset and continue pCmprsor->i_selector = 0; - for (int32_t iVal = pCmprsor->i_start; iVal < pCmprsor->i_end; iVal++) { + for (int32_t iVal = pCmprsor->i_start; iVal < pCmprsor->i_end; iVal = (iVal + 1) % 241) { if (pCmprsor->i_selector < BIT_TO_SELECTOR[pCmprsor->i_aBitN[iVal]]) { pCmprsor->i_selector = BIT_TO_SELECTOR[pCmprsor->i_aBitN[iVal]]; } From e43a4df8984d3329b90243e3693c9199d0e5e0dd Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 19 Sep 2022 18:47:39 +0800 Subject: [PATCH 13/13] more code --- source/util/src/tcompression.c | 65 ++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index febd54820b..62b9d87628 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -1002,32 +1002,38 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co #define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) typedef struct SCompressor SCompressor; +static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData); +static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData); +static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData); +static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData); +static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData); +static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData); static struct { int8_t type; int32_t bytes; int8_t isVarLen; int32_t (*cmprFn)(SCompressor *, const void *, int32_t nData); } DATA_TYPE_INFO[] = { - {TSDB_DATA_TYPE_NULL, 0, 0}, // TSDB_DATA_TYPE_NULL - {TSDB_DATA_TYPE_BOOL, 1, 0}, // TSDB_DATA_TYPE_BOOL - {TSDB_DATA_TYPE_TINYINT, 1, 0}, // TSDB_DATA_TYPE_TINYINT - {TSDB_DATA_TYPE_SMALLINT, 2, 0}, // TSDB_DATA_TYPE_SMALLINT - {TSDB_DATA_TYPE_INT, 4, 0}, // TSDB_DATA_TYPE_INT - {TSDB_DATA_TYPE_BIGINT, 8, 0}, // TSDB_DATA_TYPE_BIGINT - {TSDB_DATA_TYPE_FLOAT, 4, 0}, // TSDB_DATA_TYPE_FLOAT - {TSDB_DATA_TYPE_DOUBLE, 8, 0}, // TSDB_DATA_TYPE_DOUBLE - {TSDB_DATA_TYPE_VARCHAR, 1, 1}, // TSDB_DATA_TYPE_VARCHAR - {TSDB_DATA_TYPE_TIMESTAMP, 8, 0}, // pTSDB_DATA_TYPE_TIMESTAMP - {TSDB_DATA_TYPE_NCHAR, 1, 1}, // TSDB_DATA_TYPE_NCHAR - {TSDB_DATA_TYPE_UTINYINT, 1, 0}, // TSDB_DATA_TYPE_UTINYINT - {TSDB_DATA_TYPE_USMALLINT, 2, 0}, // TSDB_DATA_TYPE_USMALLINT - {TSDB_DATA_TYPE_UINT, 4, 0}, // TSDB_DATA_TYPE_UINT - {TSDB_DATA_TYPE_UBIGINT, 8, 0}, // TSDB_DATA_TYPE_UBIGINT - {TSDB_DATA_TYPE_JSON, 1, 1}, // TSDB_DATA_TYPE_JSON - {TSDB_DATA_TYPE_VARBINARY, 1, 1}, // TSDB_DATA_TYPE_VARBINARY - {TSDB_DATA_TYPE_DECIMAL, 1, 1}, // TSDB_DATA_TYPE_DECIMAL - {TSDB_DATA_TYPE_BLOB, 1, 1}, // TSDB_DATA_TYPE_BLOB - {TSDB_DATA_TYPE_MEDIUMBLOB, 1, 1}, // TSDB_DATA_TYPE_MEDIUMBLOB + {TSDB_DATA_TYPE_NULL, 0, 0, NULL}, // TSDB_DATA_TYPE_NULL + {TSDB_DATA_TYPE_BOOL, 1, 0, tCompBool}, // TSDB_DATA_TYPE_BOOL + {TSDB_DATA_TYPE_TINYINT, 1, 0, tCompInt}, // TSDB_DATA_TYPE_TINYINT + {TSDB_DATA_TYPE_SMALLINT, 2, 0, tCompInt}, // TSDB_DATA_TYPE_SMALLINT + {TSDB_DATA_TYPE_INT, 4, 0, tCompInt}, // TSDB_DATA_TYPE_INT + {TSDB_DATA_TYPE_BIGINT, 8, 0, tCompInt}, // TSDB_DATA_TYPE_BIGINT + {TSDB_DATA_TYPE_FLOAT, 4, 0, tCompFloat}, // TSDB_DATA_TYPE_FLOAT + {TSDB_DATA_TYPE_DOUBLE, 8, 0, tCompDouble}, // TSDB_DATA_TYPE_DOUBLE + {TSDB_DATA_TYPE_VARCHAR, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_VARCHAR + {TSDB_DATA_TYPE_TIMESTAMP, 8, 0, tCompTimestamp}, // pTSDB_DATA_TYPE_TIMESTAMP + {TSDB_DATA_TYPE_NCHAR, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_NCHAR + {TSDB_DATA_TYPE_UTINYINT, 1, 0, tCompInt}, // TSDB_DATA_TYPE_UTINYINT + {TSDB_DATA_TYPE_USMALLINT, 2, 0, tCompInt}, // TSDB_DATA_TYPE_USMALLINT + {TSDB_DATA_TYPE_UINT, 4, 0, tCompInt}, // TSDB_DATA_TYPE_UINT + {TSDB_DATA_TYPE_UBIGINT, 8, 0, tCompInt}, // TSDB_DATA_TYPE_UBIGINT + {TSDB_DATA_TYPE_JSON, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_JSON + {TSDB_DATA_TYPE_VARBINARY, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_VARBINARY + {TSDB_DATA_TYPE_DECIMAL, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_DECIMAL + {TSDB_DATA_TYPE_BLOB, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_BLOB + {TSDB_DATA_TYPE_MEDIUMBLOB, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_MEDIUMBLOB }; struct SCompressor { @@ -1124,10 +1130,12 @@ static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { return code; } -static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) { +static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; + int64_t ts = *(int64_t *)pData; ASSERT(pCmprsor->type == TSDB_DATA_TYPE_TIMESTAMP); + ASSERT(nData == 8); if (pCmprsor->aBuf[0][0] == 1) { if (pCmprsor->nVal == 0) { @@ -1572,9 +1580,15 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData) { int32_t code = 0; + if (pCmprsor->nVal == 0) { + *ppData = NULL; + *nData = 0; + return code; + } + if (pCmprsor->cmprAlg == TWO_STAGE_COMP /*|| IS_VAR_DATA_TYPE(pCmprsor->type)*/) { code = tRealloc(&pCmprsor->aBuf[1], pCmprsor->nBuf[0] + 1); - if (code) goto _exit; + if (code) return code; int64_t ret = LZ4_compress_default(pCmprsor->aBuf[0], pCmprsor->aBuf[1] + 1, pCmprsor->nBuf[0], pCmprsor->nBuf[0]); if (ret) { @@ -1593,12 +1607,9 @@ int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData) *nData = pCmprsor->nBuf[0]; } -_exit: return code; } -int32_t tCompress(SCompressor *pCmprsor, void *pData, int64_t nData) { - int32_t code = 0; - // TODO - return code; +int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData) { + return DATA_TYPE_INFO[pCmprsor->type].cmprFn(pCmprsor, pData, nData); } \ No newline at end of file