Merge pull request #16934 from taosdata/feat/stream_compression
Feat/stream compression
This commit is contained in:
commit
f48553d3fe
|
@ -51,7 +51,6 @@ target_sources(
|
|||
"src/tsdb/tsdbCacheRead.c"
|
||||
"src/tsdb/tsdbRetention.c"
|
||||
"src/tsdb/tsdbDiskData.c"
|
||||
"src/tsdb/tsdbCompress.c"
|
||||
"src/tsdb/tsdbCompact.c"
|
||||
"src/tsdb/tsdbMergeTree.c"
|
||||
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tsdb.h"
|
||||
|
||||
// Integer =====================================================
|
||||
typedef struct {
|
||||
int8_t rawCopy;
|
||||
int64_t prevVal;
|
||||
int32_t nVal;
|
||||
int32_t nBuf;
|
||||
uint8_t *pBuf;
|
||||
} SIntCompressor;
|
||||
|
||||
#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a)))
|
||||
#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL)
|
||||
|
||||
static int32_t tsdbCmprI64(SIntCompressor *pCompressor, int64_t val) {
|
||||
int32_t code = 0;
|
||||
|
||||
// raw copy
|
||||
if (pCompressor->rawCopy) {
|
||||
memcpy(pCompressor->pBuf + pCompressor->nBuf, &val, sizeof(val));
|
||||
pCompressor->nBuf += sizeof(val);
|
||||
pCompressor->nVal++;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (!I64_SAFE_ADD(val, pCompressor->prevVal)) {
|
||||
pCompressor->rawCopy = 1;
|
||||
// TODO: decompress and copy
|
||||
pCompressor->nVal++;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
int64_t diff = val - pCompressor->prevVal;
|
||||
uint8_t zigzag = ZIGZAGE(int64_t, diff);
|
||||
|
||||
if (zigzag >= SIMPLE8B_MAX) {
|
||||
pCompressor->rawCopy = 1;
|
||||
// TODO: decompress and copy
|
||||
pCompressor->nVal++;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
// Timestamp =====================================================
|
||||
|
||||
// Float =====================================================
|
|
@ -50,6 +50,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "tcompression.h"
|
||||
#include "lz4.h"
|
||||
#include "tRealloc.h"
|
||||
#include "tlog.h"
|
||||
|
||||
#ifdef TD_TSZ
|
||||
|
@ -814,24 +815,24 @@ int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, cha
|
|||
uint32_t predicted = prev_value;
|
||||
uint32_t diff = curr.bits ^ predicted;
|
||||
|
||||
int32_t leading_zeros = FLOAT_BYTES * BITS_PER_BYTE;
|
||||
int32_t trailing_zeros = leading_zeros;
|
||||
int32_t clz = FLOAT_BYTES * BITS_PER_BYTE;
|
||||
int32_t ctz = clz;
|
||||
|
||||
if (diff) {
|
||||
trailing_zeros = BUILDIN_CTZ(diff);
|
||||
leading_zeros = BUILDIN_CLZ(diff);
|
||||
ctz = BUILDIN_CTZ(diff);
|
||||
clz = BUILDIN_CLZ(diff);
|
||||
}
|
||||
|
||||
uint8_t nbytes = 0;
|
||||
uint8_t flag;
|
||||
|
||||
if (trailing_zeros > leading_zeros) {
|
||||
nbytes = (uint8_t)(FLOAT_BYTES - trailing_zeros / BITS_PER_BYTE);
|
||||
if (ctz > clz) {
|
||||
nbytes = (uint8_t)(FLOAT_BYTES - ctz / BITS_PER_BYTE);
|
||||
|
||||
if (nbytes > 0) nbytes--;
|
||||
flag = ((uint8_t)1 << 3) | nbytes;
|
||||
} else {
|
||||
nbytes = (uint8_t)(FLOAT_BYTES - leading_zeros / BITS_PER_BYTE);
|
||||
nbytes = (uint8_t)(FLOAT_BYTES - clz / BITS_PER_BYTE);
|
||||
if (nbytes > 0) nbytes--;
|
||||
flag = nbytes;
|
||||
}
|
||||
|
@ -994,3 +995,621 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co
|
|||
return tdszDecompress(SZ_DOUBLE, input + 1, compressedSize - 1, nelements, output);
|
||||
}
|
||||
#endif
|
||||
|
||||
/*************************************************************************
|
||||
* STREAM COMPRESSION
|
||||
*************************************************************************/
|
||||
#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a)))
|
||||
typedef struct SCompressor SCompressor;
|
||||
|
||||
static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData);
|
||||
static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData);
|
||||
static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData);
|
||||
static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData);
|
||||
static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData);
|
||||
static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData);
|
||||
static struct {
|
||||
int8_t type;
|
||||
int32_t bytes;
|
||||
int8_t isVarLen;
|
||||
int32_t (*cmprFn)(SCompressor *, const void *, int32_t nData);
|
||||
} DATA_TYPE_INFO[] = {
|
||||
{TSDB_DATA_TYPE_NULL, 0, 0, NULL}, // TSDB_DATA_TYPE_NULL
|
||||
{TSDB_DATA_TYPE_BOOL, 1, 0, tCompBool}, // TSDB_DATA_TYPE_BOOL
|
||||
{TSDB_DATA_TYPE_TINYINT, 1, 0, tCompInt}, // TSDB_DATA_TYPE_TINYINT
|
||||
{TSDB_DATA_TYPE_SMALLINT, 2, 0, tCompInt}, // TSDB_DATA_TYPE_SMALLINT
|
||||
{TSDB_DATA_TYPE_INT, 4, 0, tCompInt}, // TSDB_DATA_TYPE_INT
|
||||
{TSDB_DATA_TYPE_BIGINT, 8, 0, tCompInt}, // TSDB_DATA_TYPE_BIGINT
|
||||
{TSDB_DATA_TYPE_FLOAT, 4, 0, tCompFloat}, // TSDB_DATA_TYPE_FLOAT
|
||||
{TSDB_DATA_TYPE_DOUBLE, 8, 0, tCompDouble}, // TSDB_DATA_TYPE_DOUBLE
|
||||
{TSDB_DATA_TYPE_VARCHAR, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_VARCHAR
|
||||
{TSDB_DATA_TYPE_TIMESTAMP, 8, 0, tCompTimestamp}, // pTSDB_DATA_TYPE_TIMESTAMP
|
||||
{TSDB_DATA_TYPE_NCHAR, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_NCHAR
|
||||
{TSDB_DATA_TYPE_UTINYINT, 1, 0, tCompInt}, // TSDB_DATA_TYPE_UTINYINT
|
||||
{TSDB_DATA_TYPE_USMALLINT, 2, 0, tCompInt}, // TSDB_DATA_TYPE_USMALLINT
|
||||
{TSDB_DATA_TYPE_UINT, 4, 0, tCompInt}, // TSDB_DATA_TYPE_UINT
|
||||
{TSDB_DATA_TYPE_UBIGINT, 8, 0, tCompInt}, // TSDB_DATA_TYPE_UBIGINT
|
||||
{TSDB_DATA_TYPE_JSON, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_JSON
|
||||
{TSDB_DATA_TYPE_VARBINARY, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_VARBINARY
|
||||
{TSDB_DATA_TYPE_DECIMAL, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_DECIMAL
|
||||
{TSDB_DATA_TYPE_BLOB, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_BLOB
|
||||
{TSDB_DATA_TYPE_MEDIUMBLOB, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_MEDIUMBLOB
|
||||
};
|
||||
|
||||
struct SCompressor {
|
||||
int8_t type;
|
||||
int8_t cmprAlg;
|
||||
int8_t autoAlloc;
|
||||
int32_t nVal;
|
||||
uint8_t *aBuf[2];
|
||||
int64_t nBuf[2];
|
||||
union {
|
||||
// Timestamp ----
|
||||
struct {
|
||||
int64_t ts_prev_val;
|
||||
int64_t ts_prev_delta;
|
||||
uint8_t *ts_flag_p;
|
||||
};
|
||||
// Integer ----
|
||||
struct {
|
||||
int64_t i_prev;
|
||||
int32_t i_selector;
|
||||
int32_t i_start;
|
||||
int32_t i_end;
|
||||
uint64_t i_aZigzag[241];
|
||||
int8_t i_aBitN[241];
|
||||
};
|
||||
// Float ----
|
||||
struct {
|
||||
uint32_t f_prev;
|
||||
uint8_t *f_flag_p;
|
||||
};
|
||||
// Double ----
|
||||
struct {
|
||||
uint64_t d_prev;
|
||||
uint8_t *d_flag_p;
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
// Timestamp =====================================================
|
||||
static int32_t tCompSetCopyMode(SCompressor *pCmprsor) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pCmprsor->nVal) {
|
||||
if (pCmprsor->autoAlloc) {
|
||||
code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * pCmprsor->nVal);
|
||||
if (code) return code;
|
||||
}
|
||||
pCmprsor->nBuf[1] = 0;
|
||||
|
||||
int64_t n = 1;
|
||||
int64_t value;
|
||||
int64_t delta;
|
||||
uint64_t vZigzag;
|
||||
while (n < pCmprsor->nBuf[0]) {
|
||||
uint8_t aN[2];
|
||||
aN[0] = pCmprsor->aBuf[0][n] & 0xf;
|
||||
aN[1] = pCmprsor->aBuf[0][n] >> 4;
|
||||
|
||||
n++;
|
||||
|
||||
for (int32_t i = 0; i < 2; i++) {
|
||||
vZigzag = 0;
|
||||
for (uint8_t j = 0; j < aN[i]; j++) {
|
||||
vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (8 * j));
|
||||
n++;
|
||||
}
|
||||
|
||||
int64_t delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag);
|
||||
if (pCmprsor->nBuf[1] == 0) {
|
||||
delta = 0;
|
||||
value = delta_of_delta;
|
||||
} else {
|
||||
delta = delta_of_delta + delta;
|
||||
value = delta + value;
|
||||
}
|
||||
|
||||
memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &value, sizeof(int64_t));
|
||||
pCmprsor->nBuf[1] += sizeof(int64_t);
|
||||
|
||||
if (n >= pCmprsor->nBuf[0]) break;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(n == pCmprsor->nBuf[0]);
|
||||
|
||||
if (pCmprsor->autoAlloc) {
|
||||
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[1] + 1);
|
||||
if (code) return code;
|
||||
}
|
||||
memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->aBuf[1], pCmprsor->nBuf[1]);
|
||||
pCmprsor->nBuf[0] = 1 + pCmprsor->nBuf[1];
|
||||
}
|
||||
pCmprsor->aBuf[0][0] = 0;
|
||||
|
||||
return code;
|
||||
}
|
||||
static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData) {
|
||||
int32_t code = 0;
|
||||
|
||||
int64_t ts = *(int64_t *)pData;
|
||||
ASSERT(pCmprsor->type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
ASSERT(nData == 8);
|
||||
|
||||
if (pCmprsor->aBuf[0][0] == 1) {
|
||||
if (pCmprsor->nVal == 0) {
|
||||
pCmprsor->ts_prev_val = ts;
|
||||
pCmprsor->ts_prev_delta = -ts;
|
||||
}
|
||||
|
||||
if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) {
|
||||
code = tCompSetCopyMode(pCmprsor);
|
||||
if (code) return code;
|
||||
goto _copy_cmpr;
|
||||
}
|
||||
int64_t delta = ts - pCmprsor->ts_prev_val;
|
||||
|
||||
if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) {
|
||||
code = tCompSetCopyMode(pCmprsor);
|
||||
if (code) return code;
|
||||
goto _copy_cmpr;
|
||||
}
|
||||
int64_t delta_of_delta = delta - pCmprsor->ts_prev_delta;
|
||||
uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, delta_of_delta);
|
||||
|
||||
pCmprsor->ts_prev_val = ts;
|
||||
pCmprsor->ts_prev_delta = delta;
|
||||
|
||||
if ((pCmprsor->nVal & 0x1) == 0) {
|
||||
if (pCmprsor->autoAlloc) {
|
||||
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17);
|
||||
if (code) return code;
|
||||
}
|
||||
|
||||
pCmprsor->ts_flag_p = pCmprsor->aBuf[0] + pCmprsor->nBuf[0];
|
||||
pCmprsor->nBuf[0]++;
|
||||
pCmprsor->ts_flag_p[0] = 0;
|
||||
while (vZigzag) {
|
||||
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff);
|
||||
pCmprsor->nBuf[0]++;
|
||||
pCmprsor->ts_flag_p[0]++;
|
||||
vZigzag >>= 8;
|
||||
}
|
||||
} else {
|
||||
while (vZigzag) {
|
||||
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff);
|
||||
pCmprsor->nBuf[0]++;
|
||||
pCmprsor->ts_flag_p[0] += 0x10;
|
||||
vZigzag >>= 8;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
_copy_cmpr:
|
||||
if (pCmprsor->autoAlloc) {
|
||||
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(ts));
|
||||
if (code) return code;
|
||||
}
|
||||
|
||||
memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts));
|
||||
pCmprsor->nBuf[0] += sizeof(ts);
|
||||
}
|
||||
pCmprsor->nVal++;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
// Integer =====================================================
|
||||
#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL)
|
||||
static const uint8_t BIT_PER_INTEGER[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
|
||||
static const int32_t SELECTOR_TO_ELEMS[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
|
||||
static const uint8_t BIT_TO_SELECTOR[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 12,
|
||||
13, 13, 13, 13, 13, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15,
|
||||
15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,
|
||||
15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15};
|
||||
|
||||
static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) {
|
||||
int32_t code = 0;
|
||||
|
||||
ASSERT(nData == DATA_TYPE_INFO[pCmprsor->type].bytes);
|
||||
|
||||
if (pCmprsor->aBuf[0][0] == 0) {
|
||||
int64_t val;
|
||||
|
||||
switch (pCmprsor->type) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
val = *(int8_t *)pData;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
val = *(int16_t *)pData;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
val = *(int32_t *)pData;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
val = *(int64_t *)pData;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
val = *(uint8_t *)pData;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
val = *(uint16_t *)pData;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
val = *(uint32_t *)pData;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
val = *(int64_t *)pData;
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!I64_SAFE_ADD(val, -pCmprsor->i_prev)) {
|
||||
// TODO
|
||||
goto _copy_cmpr;
|
||||
}
|
||||
|
||||
int64_t diff = val - pCmprsor->i_prev;
|
||||
uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, diff);
|
||||
if (vZigzag >= SIMPLE8B_MAX) {
|
||||
// TODO
|
||||
goto _copy_cmpr;
|
||||
}
|
||||
|
||||
int8_t nBit = (vZigzag) ? (64 - BUILDIN_CLZL(vZigzag)) : 0;
|
||||
pCmprsor->i_prev = val;
|
||||
|
||||
while (1) {
|
||||
int32_t nEle = (pCmprsor->i_end + 241 - pCmprsor->i_start) % 241;
|
||||
|
||||
if (nEle + 1 <= SELECTOR_TO_ELEMS[pCmprsor->i_selector] && nEle + 1 <= SELECTOR_TO_ELEMS[BIT_TO_SELECTOR[nBit]]) {
|
||||
if (pCmprsor->i_selector < BIT_TO_SELECTOR[nBit]) {
|
||||
pCmprsor->i_selector = BIT_TO_SELECTOR[nBit];
|
||||
}
|
||||
pCmprsor->i_end = (pCmprsor->i_end + 1) % 241;
|
||||
pCmprsor->i_aZigzag[pCmprsor->i_end] = vZigzag;
|
||||
pCmprsor->i_aBitN[pCmprsor->i_end] = nBit;
|
||||
break;
|
||||
} else {
|
||||
while (nEle < SELECTOR_TO_ELEMS[pCmprsor->i_selector]) {
|
||||
pCmprsor->i_selector++;
|
||||
}
|
||||
nEle = SELECTOR_TO_ELEMS[pCmprsor->i_selector];
|
||||
|
||||
if (pCmprsor->autoAlloc) {
|
||||
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t));
|
||||
if (code) return code;
|
||||
}
|
||||
|
||||
uint64_t *bp = (uint64_t *)(pCmprsor->aBuf[0] + pCmprsor->nBuf[0]);
|
||||
pCmprsor->nBuf[0] += sizeof(uint64_t);
|
||||
bp[0] = pCmprsor->i_selector;
|
||||
uint8_t bits = BIT_PER_INTEGER[pCmprsor->i_selector];
|
||||
for (int32_t iVal = 0; iVal < nEle; iVal++) {
|
||||
bp[0] |= ((pCmprsor->i_aZigzag[pCmprsor->i_start] & ((((uint64_t)1) << bits) - 1)) << (bits * iVal + 4));
|
||||
pCmprsor->i_start = (pCmprsor->i_start + 1) % 241;
|
||||
}
|
||||
|
||||
// reset and continue
|
||||
pCmprsor->i_selector = 0;
|
||||
for (int32_t iVal = pCmprsor->i_start; iVal < pCmprsor->i_end; iVal = (iVal + 1) % 241) {
|
||||
if (pCmprsor->i_selector < BIT_TO_SELECTOR[pCmprsor->i_aBitN[iVal]]) {
|
||||
pCmprsor->i_selector = BIT_TO_SELECTOR[pCmprsor->i_aBitN[iVal]];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
_copy_cmpr:
|
||||
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData);
|
||||
if (code) return code;
|
||||
|
||||
memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData);
|
||||
pCmprsor->nBuf[0] += nData;
|
||||
}
|
||||
pCmprsor->nVal++;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
// Float =====================================================
|
||||
static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData) {
|
||||
int32_t code = 0;
|
||||
|
||||
ASSERT(nData == sizeof(float));
|
||||
|
||||
union {
|
||||
float f;
|
||||
uint32_t u;
|
||||
} val = {.f = *(float *)pData};
|
||||
|
||||
uint32_t diff = val.u ^ pCmprsor->f_prev;
|
||||
pCmprsor->f_prev = val.u;
|
||||
|
||||
int32_t clz, ctz;
|
||||
if (diff) {
|
||||
clz = BUILDIN_CLZ(diff);
|
||||
ctz = BUILDIN_CTZ(diff);
|
||||
} else {
|
||||
clz = 32;
|
||||
ctz = 32;
|
||||
}
|
||||
|
||||
uint8_t nBytes;
|
||||
if (clz < ctz) {
|
||||
nBytes = sizeof(uint32_t) - ctz / BITS_PER_BYTE;
|
||||
if (nBytes) diff >>= (32 - nBytes * BITS_PER_BYTE);
|
||||
} else {
|
||||
nBytes = sizeof(uint32_t) - clz / BITS_PER_BYTE;
|
||||
}
|
||||
if (nBytes == 0) nBytes++;
|
||||
|
||||
if ((pCmprsor->nVal & 0x1) == 0) {
|
||||
if (pCmprsor->autoAlloc) {
|
||||
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 9);
|
||||
if (code) return code;
|
||||
}
|
||||
|
||||
pCmprsor->f_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]];
|
||||
pCmprsor->nBuf[0]++;
|
||||
|
||||
if (clz < ctz) {
|
||||
pCmprsor->f_flag_p[0] = (0x08 | (nBytes - 1));
|
||||
} else {
|
||||
pCmprsor->f_flag_p[0] = nBytes - 1;
|
||||
}
|
||||
} else {
|
||||
if (clz < ctz) {
|
||||
pCmprsor->f_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4);
|
||||
} else {
|
||||
pCmprsor->f_flag_p[0] |= ((nBytes - 1) << 4);
|
||||
}
|
||||
}
|
||||
for (; nBytes; nBytes--) {
|
||||
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff);
|
||||
pCmprsor->nBuf[0]++;
|
||||
diff >>= BITS_PER_BYTE;
|
||||
}
|
||||
pCmprsor->nVal++;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
// Double =====================================================
|
||||
static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData) {
|
||||
int32_t code = 0;
|
||||
|
||||
ASSERT(nData == sizeof(double));
|
||||
|
||||
union {
|
||||
double d;
|
||||
uint64_t u;
|
||||
} val = {.d = *(double *)pData};
|
||||
|
||||
uint64_t diff = val.u ^ pCmprsor->d_prev;
|
||||
pCmprsor->d_prev = val.u;
|
||||
|
||||
int32_t clz, ctz;
|
||||
if (diff) {
|
||||
clz = BUILDIN_CLZL(diff);
|
||||
ctz = BUILDIN_CTZL(diff);
|
||||
} else {
|
||||
clz = 64;
|
||||
ctz = 64;
|
||||
}
|
||||
|
||||
uint8_t nBytes;
|
||||
if (clz < ctz) {
|
||||
nBytes = sizeof(uint64_t) - ctz / BITS_PER_BYTE;
|
||||
if (nBytes) diff >>= (64 - nBytes * BITS_PER_BYTE);
|
||||
} else {
|
||||
nBytes = sizeof(uint64_t) - clz / BITS_PER_BYTE;
|
||||
}
|
||||
if (nBytes == 0) nBytes++;
|
||||
|
||||
if ((pCmprsor->nVal & 0x1) == 0) {
|
||||
if (pCmprsor->autoAlloc) {
|
||||
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17);
|
||||
if (code) return code;
|
||||
}
|
||||
|
||||
pCmprsor->d_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]];
|
||||
pCmprsor->nBuf[0]++;
|
||||
|
||||
if (clz < ctz) {
|
||||
pCmprsor->d_flag_p[0] = (0x08 | (nBytes - 1));
|
||||
} else {
|
||||
pCmprsor->d_flag_p[0] = nBytes - 1;
|
||||
}
|
||||
} else {
|
||||
if (clz < ctz) {
|
||||
pCmprsor->d_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4);
|
||||
} else {
|
||||
pCmprsor->d_flag_p[0] |= ((nBytes - 1) << 4);
|
||||
}
|
||||
}
|
||||
for (; nBytes; nBytes--) {
|
||||
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff);
|
||||
pCmprsor->nBuf[0]++;
|
||||
diff >>= BITS_PER_BYTE;
|
||||
}
|
||||
pCmprsor->nVal++;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
// Binary =====================================================
|
||||
static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (nData) {
|
||||
if (pCmprsor->autoAlloc) {
|
||||
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData);
|
||||
if (code) return code;
|
||||
}
|
||||
|
||||
memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData);
|
||||
pCmprsor->nBuf[0] += nData;
|
||||
}
|
||||
pCmprsor->nVal++;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
// Bool =====================================================
|
||||
static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000};
|
||||
|
||||
static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData) {
|
||||
int32_t code = 0;
|
||||
|
||||
bool vBool = *(int8_t *)pData;
|
||||
|
||||
int32_t mod4 = pCmprsor->nVal & 3;
|
||||
if (mod4 == 0) {
|
||||
pCmprsor->nBuf[0]++;
|
||||
|
||||
if (pCmprsor->autoAlloc) {
|
||||
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0]);
|
||||
if (code) return code;
|
||||
}
|
||||
|
||||
pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] = 0;
|
||||
}
|
||||
if (vBool) {
|
||||
pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] |= BOOL_CMPR_TABLE[mod4];
|
||||
}
|
||||
pCmprsor->nVal++;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
// SCompressor =====================================================
|
||||
int32_t tCompressorCreate(SCompressor **ppCmprsor) {
|
||||
int32_t code = 0;
|
||||
|
||||
*ppCmprsor = (SCompressor *)taosMemoryCalloc(1, sizeof(SCompressor));
|
||||
if ((*ppCmprsor) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
code = tRealloc(&(*ppCmprsor)->aBuf[0], 1024);
|
||||
if (code) {
|
||||
taosMemoryFree(*ppCmprsor);
|
||||
*ppCmprsor = NULL;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tCompressorDestroy(SCompressor *pCmprsor) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pCmprsor) {
|
||||
int32_t nBuf = sizeof(pCmprsor->aBuf) / sizeof(pCmprsor->aBuf[0]);
|
||||
for (int32_t iBuf = 0; iBuf < nBuf; iBuf++) {
|
||||
tFree(pCmprsor->aBuf[iBuf]);
|
||||
}
|
||||
|
||||
taosMemoryFree(pCmprsor);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int8_t autoAlloc) {
|
||||
int32_t code = 0;
|
||||
|
||||
pCmprsor->type = type;
|
||||
pCmprsor->cmprAlg = cmprAlg;
|
||||
pCmprsor->autoAlloc = autoAlloc;
|
||||
pCmprsor->nVal = 0;
|
||||
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
pCmprsor->ts_prev_val = 0;
|
||||
pCmprsor->ts_prev_delta = 0;
|
||||
pCmprsor->ts_flag_p = NULL;
|
||||
pCmprsor->aBuf[0][0] = 1; // For timestamp, 1 means compressed, 0 otherwise
|
||||
pCmprsor->nBuf[0] = 1;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
pCmprsor->nBuf[0] = 0;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
pCmprsor->nBuf[0] = 0;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
pCmprsor->f_prev = 0;
|
||||
pCmprsor->f_flag_p = NULL;
|
||||
pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility)
|
||||
pCmprsor->nBuf[0] = 1;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
pCmprsor->d_prev = 0;
|
||||
pCmprsor->d_flag_p = NULL;
|
||||
pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility)
|
||||
pCmprsor->nBuf[0] = 1;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
pCmprsor->i_prev = 0;
|
||||
pCmprsor->i_selector = 0;
|
||||
pCmprsor->i_start = 0;
|
||||
pCmprsor->i_end = 0;
|
||||
pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility)
|
||||
pCmprsor->nBuf[0] = 1;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pCmprsor->nVal == 0) {
|
||||
*ppData = NULL;
|
||||
*nData = 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pCmprsor->cmprAlg == TWO_STAGE_COMP /*|| IS_VAR_DATA_TYPE(pCmprsor->type)*/) {
|
||||
code = tRealloc(&pCmprsor->aBuf[1], pCmprsor->nBuf[0] + 1);
|
||||
if (code) return code;
|
||||
|
||||
int64_t ret = LZ4_compress_default(pCmprsor->aBuf[0], pCmprsor->aBuf[1] + 1, pCmprsor->nBuf[0], pCmprsor->nBuf[0]);
|
||||
if (ret) {
|
||||
pCmprsor->aBuf[1][0] = 0;
|
||||
pCmprsor->nBuf[1] = ret + 1;
|
||||
} else {
|
||||
pCmprsor->aBuf[1][0] = 1;
|
||||
memcpy(pCmprsor->aBuf[1] + 1, pCmprsor->aBuf[0], pCmprsor->nBuf[0]);
|
||||
pCmprsor->nBuf[1] = pCmprsor->nBuf[0] + 1;
|
||||
}
|
||||
|
||||
*ppData = pCmprsor->aBuf[1];
|
||||
*nData = pCmprsor->nBuf[1];
|
||||
} else {
|
||||
*ppData = pCmprsor->aBuf[0];
|
||||
*nData = pCmprsor->nBuf[0];
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData) {
|
||||
return DATA_TYPE_INFO[pCmprsor->type].cmprFn(pCmprsor, pData, nData);
|
||||
}
|
Loading…
Reference in New Issue