diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index a0f5c45df2..75c1997428 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -1004,12 +1004,12 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co typedef struct { int8_t type; int8_t cmprAlg; + int8_t autoAlloc; uint8_t *aBuf[2]; int64_t nBuf[2]; union { // Timestamp ---- struct { - int8_t ts_copy; int32_t ts_n; int64_t ts_prev_val; int64_t ts_prev_delta; @@ -1051,124 +1051,123 @@ static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { int32_t code = 0; if (pCmprsor->ts_n) { - code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * (pCmprsor->ts_n + 1)); - if (code) return code; - pCmprsor->nBuf[1] = 1; + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * pCmprsor->ts_n); + if (code) return code; + } + pCmprsor->nBuf[1] = 0; int64_t n = 1; - int64_t valPrev; - int64_t delPrev; + int64_t value; + int64_t delta; uint64_t vZigzag; while (n < pCmprsor->nBuf[0]) { - uint8_t n1 = pCmprsor->aBuf[0][0] & 0xf; - uint8_t n2 = pCmprsor->aBuf[0][0] >> 4; + uint8_t aN[2]; + aN[0] = pCmprsor->aBuf[0][n] & 0xf; + aN[1] = pCmprsor->aBuf[0][n] >> 4; n++; - vZigzag = 0; - for (uint8_t i = 0; i < n1; i++) { - vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i)); - n++; - } - int64_t delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag); - if (n == 2) { - delPrev = 0; - valPrev = delta_of_delta; - } else { - delPrev = delta_of_delta + delPrev; - valPrev = delPrev + valPrev; - } + for (int32_t i = 0; i < 2; i++) { + vZigzag = 0; + for (uint8_t j = 0; j < aN[i]; j++) { + vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (8 * j)); + n++; + } - memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &valPrev, sizeof(int64_t)); - pCmprsor->nBuf[1] += sizeof(int64_t); + int64_t delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag); + if (pCmprsor->nBuf[1] == 0) { + delta = 0; + value = delta_of_delta; + } else { + delta = delta_of_delta + delta; + value = delta + value; + } - if (n >= pCmprsor->nBuf[0]) break; + memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &value, sizeof(int64_t)); + pCmprsor->nBuf[1] += sizeof(int64_t); - vZigzag = 0; - for (uint8_t i = 0; i < n2; i++) { - vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i)); - n++; + if (n >= pCmprsor->nBuf[0]) break; } - delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag); - delPrev = delta_of_delta + delPrev; - valPrev = delPrev + valPrev; } - uint8_t *pBuf = pCmprsor->aBuf[0]; - pCmprsor->aBuf[0] = pCmprsor->aBuf[1]; - pCmprsor->aBuf[1] = pBuf; - pCmprsor->nBuf[0] = pCmprsor->nBuf[1]; - } else { - // TODO - } + ASSERT(n == pCmprsor->nBuf[0]); - pCmprsor->aBuf[0][0] = 0; - pCmprsor->ts_copy = 1; + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[1] + 1); + if (code) return code; + } + memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->aBuf[1], pCmprsor->nBuf[1]); + pCmprsor->nBuf[0] = 1 + pCmprsor->nBuf[1]; + } + pCmprsor->aBuf[0][0] = MODE_NOCOMPRESS; return code; } -static int32_t tCompTimestamp(SCompressor *pCmprsor, TSKEY ts) { +static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) { int32_t code = 0; - if (pCmprsor->ts_n == 0) { + ASSERT(pCmprsor->type == TSDB_DATA_TYPE_TIMESTAMP); + + if (pCmprsor->aBuf[0][0] == MODE_COMPRESS) { + if (pCmprsor->ts_n == 0) { + pCmprsor->ts_prev_val = ts; + pCmprsor->ts_prev_delta = -ts; + } + + if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) { + code = tCompSetCopyMode(pCmprsor); + if (code) return code; + goto _copy_cmpr; + } + int64_t delta = ts - pCmprsor->ts_prev_val; + + if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) { + code = tCompSetCopyMode(pCmprsor); + if (code) return code; + goto _copy_cmpr; + } + int64_t delta_of_delta = delta - pCmprsor->ts_prev_delta; + uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, delta_of_delta); + pCmprsor->ts_prev_val = ts; - pCmprsor->ts_prev_delta = -ts; - } + pCmprsor->ts_prev_delta = delta; - if (pCmprsor->ts_copy) goto _copy_exit; + if ((pCmprsor->ts_n & 0x1) == 0) { + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17); + if (code) return code; + } - if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) { - code = tCompSetCopyMode(pCmprsor); - if (code) return code; - goto _copy_exit; - } - - int64_t delta = ts - pCmprsor->ts_prev_val; - - if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) { - code = tCompSetCopyMode(pCmprsor); - if (code) return code; - goto _copy_exit; - } - - int64_t delta_of_delta = delta - pCmprsor->ts_prev_delta; - uint64_t zigzag_value = ZIGZAG_ENCODE(int64_t, delta_of_delta); - - pCmprsor->ts_prev_val = ts; - pCmprsor->ts_prev_delta = delta; - - if (pCmprsor->ts_n & 0x1 == 0) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17 /*sizeof(int64_t) * 2 + 1*/); - if (code) return code; - - pCmprsor->ts_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; - pCmprsor->nBuf[0]++; - pCmprsor->ts_flag_p[0] = 0; - - while (zigzag_value) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (zigzag_value & 0xff); + pCmprsor->ts_flag_p = pCmprsor->aBuf[0] + pCmprsor->nBuf[0]; pCmprsor->nBuf[0]++; - pCmprsor->ts_flag_p[0]++; + pCmprsor->ts_flag_p[0] = 0; + while (vZigzag) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff); + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p[0]++; + vZigzag >>= 8; + } + } else { + while (vZigzag) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff); + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p[0] += 0x10; + vZigzag >>= 8; + } } } else { - while (zigzag_value) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (zigzag_value & 0xff); - pCmprsor->nBuf[0]++; - pCmprsor->ts_flag_p += (uint8_t)0x10; + _copy_cmpr: + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(ts)); + if (code) return code; } + + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts)); + pCmprsor->nBuf[0] += sizeof(ts); } - pCmprsor->ts_n++; - return code; -_copy_exit: - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(int64_t)); - if (code) return code; - - memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts)); - pCmprsor->nBuf[0] += sizeof(ts); - - pCmprsor->ts_n++; return code; } @@ -1432,17 +1431,22 @@ int32_t tCompressorDestroy(SCompressor *pCmprsor) { return code; } -int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { +int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int8_t autoAlloc) { int32_t code = 0; pCmprsor->type = type; pCmprsor->cmprAlg = cmprAlg; - pCmprsor->nBuf[0] = 0; // (todo) may or may not +/- 1 + pCmprsor->autoAlloc = autoAlloc; switch (type) { case TSDB_DATA_TYPE_TIMESTAMP: - pCmprsor->ts_copy = 0; pCmprsor->ts_n = 0; + pCmprsor->ts_prev_val = 0; + pCmprsor->ts_prev_delta = 0; + pCmprsor->ts_flag_p = NULL; + pCmprsor->aBuf[0][0] = MODE_COMPRESS; + pCmprsor->nBuf[0] = 1; + pCmprsor->nBuf[1] = 0; break; case TSDB_DATA_TYPE_BOOL: pCmprsor->bool_n = 0;