From 832f2fb7a397584e9594ff59818ac2d028f08944 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 16 Sep 2022 15:49:43 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbCompress.c | 164 ++++++++++++++++++++- 1 file changed, 156 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCompress.c b/source/dnode/vnode/src/tsdb/tsdbCompress.c index 2d402a8976..0b1a70fba7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompress.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompress.c @@ -16,6 +16,8 @@ #include "lz4.h" #include "tsdb.h" +#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) + typedef struct { int8_t type; int8_t cmprAlg; @@ -24,7 +26,11 @@ typedef struct { union { // Timestamp ---- struct { - /* data */ + int8_t ts_copy; + int32_t ts_n; + int64_t ts_prev_val; + int64_t ts_prev_delta; + uint8_t *ts_flag_p; }; // Integer ---- struct { @@ -46,15 +52,133 @@ typedef struct { } SCompressor; // Timestamp ===================================================== +static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { + int32_t code = 0; + + if (pCmprsor->ts_n) { + code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * (pCmprsor->ts_n + 1)); + if (code) return code; + pCmprsor->nBuf[1] = 1; + + int64_t n = 1; + int64_t valPrev; + int64_t delPrev; + uint64_t vZigzag; + while (n < pCmprsor->nBuf[0]) { + uint8_t n1 = pCmprsor->aBuf[0][0] & 0xf; + uint8_t n2 = pCmprsor->aBuf[0][0] >> 4; + + n++; + + vZigzag = 0; + for (uint8_t i = 0; i < n1; i++) { + vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i)); + n++; + } + int64_t delta_of_delta = ZIGZAGD(int64_t, vZigzag); + if (n == 2) { + delPrev = 0; + valPrev = delta_of_delta; + } else { + delPrev = delta_of_delta + delPrev; + valPrev = delPrev + valPrev; + } + + memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &valPrev, sizeof(int64_t)); + pCmprsor->nBuf[1] += sizeof(int64_t); + + if (n >= pCmprsor->nBuf[0]) break; + + vZigzag = 0; + for (uint8_t i = 0; i < n2; i++) { + vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i)); + n++; + } + int64_t delta_of_delta = ZIGZAGD(int64_t, vZigzag); + delPrev = delta_of_delta + delPrev; + valPrev = delPrev + valPrev; + } + + uint8_t *pBuf = pCmprsor->aBuf[0]; + pCmprsor->aBuf[0] = pCmprsor->aBuf[1]; + pCmprsor->aBuf[1] = pBuf; + pCmprsor->nBuf[0] = pCmprsor->nBuf[1]; + } else { + // TODO + } + + pCmprsor->aBuf[0][0] = 0; + pCmprsor->ts_copy = 1; + + return code; +} static int32_t tCompTimestamp(SCompressor *pCmprsor, TSKEY ts) { int32_t code = 0; - // TODO + + if (pCmprsor->ts_n == 0) { + pCmprsor->ts_prev_val = ts; + pCmprsor->ts_prev_delta = -ts; + } + + if (pCmprsor->ts_copy) goto _copy_exit; + + if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) { + code = tCompSetCopyMode(pCmprsor); + if (code) return code; + goto _copy_exit; + } + + int64_t delta = ts - pCmprsor->ts_prev_val; + + if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) { + code = tCompSetCopyMode(pCmprsor); + if (code) return code; + goto _copy_exit; + } + + int64_t delta_of_delta = delta - pCmprsor->ts_prev_delta; + uint64_t zigzag_value = ZIGZAGE(int64_t, delta_of_delta); + + pCmprsor->ts_prev_val = ts; + pCmprsor->ts_prev_delta = delta; + + if (pCmprsor->ts_n & 0x1 == 0) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17 /*sizeof(int64_t) * 2 + 1*/); + if (code) return code; + + pCmprsor->ts_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p[0] = 0; + + while (zigzag_value) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (zigzag_value & 0xff); + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p[0]++; + } + } else { + while (zigzag_value) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (zigzag_value & 0xff); + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p += (uint8_t)0x10; + } + } + + pCmprsor->ts_n++; + return code; + +_copy_exit: + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(int64_t)); + if (code) return code; + + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts)); + pCmprsor->nBuf[0] += sizeof(ts); + + pCmprsor->ts_n++; return code; } // Integer ===================================================== -#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) -#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) +#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) static int32_t tCompI64(SCompressor *pCmprsor, int64_t val) { int32_t code = 0; @@ -110,7 +234,8 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t } // Bool ===================================================== -static uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; +static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; + static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { int32_t code = 0; @@ -122,6 +247,9 @@ static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { if (mod4 == 3) { pCmprsor->nBuf[0]++; pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = 0; + + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0]); + if (code) goto _exit; } _exit: @@ -138,6 +266,13 @@ int32_t tCompressorCreate(SCompressor **ppCmprsor) { goto _exit; } + code = tRealloc(&(*ppCmprsor)->aBuf[0], 1024); + if (code) { + taosMemoryFree(*ppCmprsor); + *ppCmprsor = NULL; + goto _exit; + } + _exit: return code; } @@ -146,9 +281,12 @@ int32_t tCompressorDestroy(SCompressor *pCmprsor) { int32_t code = 0; if (pCmprsor) { - for (int32_t iBuf = 0; iBuf < sizeof(pCmprsor->aBuf) / sizeof(pCmprsor->aBuf[0]); iBuf++) { + int32_t nBuf = sizeof(pCmprsor->aBuf) / sizeof(pCmprsor->aBuf[0]); + for (int32_t iBuf = 0; iBuf < nBuf; iBuf++) { tFree(pCmprsor->aBuf[iBuf]); } + + taosMemoryFree(pCmprsor); } return code; @@ -159,13 +297,17 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { pCmprsor->type = type; pCmprsor->cmprAlg = cmprAlg; + pCmprsor->nBuf[0] = 0; // (todo) may or may not +/- 1 switch (type) { + case TSDB_DATA_TYPE_TIMESTAMP: + pCmprsor->ts_copy = 0; + pCmprsor->ts_n = 0; + break; case TSDB_DATA_TYPE_BOOL: pCmprsor->bool_n = 0; - pCmprsor->bool_b = 0; + pCmprsor->aBuf[0][0] = 0; break; - default: break; } @@ -200,3 +342,9 @@ int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData) _exit: return code; } + +int32_t tCompress(SCompressor *pCmprsor, void *pData, int64_t nData) { + int32_t code = 0; + // TODO + return code; +} \ No newline at end of file