diff --git a/source/dnode/vnode/src/tsdb/tsdbCompress.c b/source/dnode/vnode/src/tsdb/tsdbCompress.c index 76be7c1070..68cf6ef0ca 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompress.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompress.c @@ -13,45 +13,129 @@ * along with this program. If not, see . */ +#include "lz4.h" #include "tsdb.h" -// Integer ===================================================== typedef struct { - int8_t rawCopy; - int64_t prevVal; - int32_t nVal; - int32_t nBuf; - uint8_t *pBuf; -} SIntCompressor; + int8_t type; + int8_t cmprAlg; + uint8_t *aBuf[2]; + int64_t nBuf[2]; + union { + // Timestamp ---- + struct { + /* data */ + }; + // Integer ---- + struct { + /* data */ + }; + // Binary ---- + struct { + /* data */ + }; + // Float ---- + struct { + /* data */ + }; + // Bool ---- + struct { + int32_t bool_n; + uint8_t bool_b; + }; + }; +} SCompressor; +// Timestamp ===================================================== +static int32_t tCompTimestamp(SCompressor *pCmprsor, TSKEY ts) { + int32_t code = 0; + // TODO + return code; +} + +// Integer ===================================================== #define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) #define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) -static int32_t tsdbCmprI64(SIntCompressor *pCompressor, int64_t val) { +static int32_t tCompI64(SCompressor *pCmprsor, int64_t val) { int32_t code = 0; +#if 0 // raw copy - if (pCompressor->rawCopy) { - memcpy(pCompressor->pBuf + pCompressor->nBuf, &val, sizeof(val)); - pCompressor->nBuf += sizeof(val); - pCompressor->nVal++; + if (pCmprsor->rawCopy) { + memcpy(pCmprsor->pBuf + pCmprsor->nBuf, &val, sizeof(val)); + pCmprsor->nBuf += sizeof(val); + pCmprsor->nVal++; goto _exit; } - if (!I64_SAFE_ADD(val, pCompressor->prevVal)) { - pCompressor->rawCopy = 1; + if (!I64_SAFE_ADD(val, pCmprsor->prevVal)) { + pCmprsor->rawCopy = 1; // TODO: decompress and copy - pCompressor->nVal++; + pCmprsor->nVal++; goto _exit; } - int64_t diff = val - pCompressor->prevVal; + int64_t diff = val - pCmprsor->prevVal; uint8_t zigzag = ZIGZAGE(int64_t, diff); if (zigzag >= SIMPLE8B_MAX) { - pCompressor->rawCopy = 1; + pCmprsor->rawCopy = 1; // TODO: decompress and copy - pCompressor->nVal++; + pCmprsor->nVal++; + goto _exit; + } + +_exit: +#endif + return code; +} + +// Float ===================================================== +static int32_t tCompFloat() { + int32_t code = 0; + // TODO + return code; +} + +// Binary ===================================================== +static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t nData) { + int32_t code = 0; + + if (nData) { + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); + pCmprsor->nBuf[0] += nData; + } + + return code; +} + +// Bool ===================================================== +static uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; +static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { + int32_t code = 0; + + if (vBool) { + pCmprsor->bool_b |= BOOL_CMPR_TABLE[pCmprsor->bool_n % 4]; + } + pCmprsor->bool_n++; + + if (pCmprsor->bool_n % 4 == 0) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = pCmprsor->bool_b; + pCmprsor->nBuf[0]++; + pCmprsor->bool_b = 0; + } + + return code; +} + +// SCompressor ===================================================== +int32_t tCompressorCreate(SCompressor **ppCmprsor) { + int32_t code = 0; + + *ppCmprsor = (SCompressor *)taosMemoryCalloc(1, sizeof(SCompressor)); + if ((*ppCmprsor) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -59,6 +143,61 @@ _exit: return code; } -// Timestamp ===================================================== +int32_t tCompressorDestroy(SCompressor *pCmprsor) { + int32_t code = 0; -// Float ===================================================== \ No newline at end of file + if (pCmprsor) { + for (int32_t iBuf = 0; iBuf < sizeof(pCmprsor->aBuf) / sizeof(pCmprsor->aBuf[0]); iBuf++) { + tFree(pCmprsor->aBuf[iBuf]); + } + } + + return code; +} + +int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { + int32_t code = 0; + + pCmprsor->type = type; + pCmprsor->cmprAlg = cmprAlg; + + switch (type) { + case TSDB_DATA_TYPE_BOOL: + pCmprsor->bool_n = 0; + pCmprsor->bool_b = 0; + break; + + default: + break; + } + + return code; +} + +int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData) { + int32_t code = 0; + + if (pCmprsor->cmprAlg == TWO_STAGE_COMP || IS_VAR_DATA_TYPE(pCmprsor->type)) { + code = tRealloc(&pCmprsor->aBuf[1], pCmprsor->nBuf[0] + 1); + if (code) goto _exit; + + int64_t ret = LZ4_compress_default(pCmprsor->aBuf[0], pCmprsor->aBuf[1] + 1, pCmprsor->nBuf[0], pCmprsor->nBuf[0]); + if (ret) { + pCmprsor->aBuf[1][0] = 0; + pCmprsor->nBuf[1] = ret + 1; + } else { + pCmprsor->aBuf[1][0] = 1; + memcpy(pCmprsor->aBuf[1] + 1, pCmprsor->aBuf[0], pCmprsor->nBuf[0]); + pCmprsor->nBuf[1] = pCmprsor->nBuf[0] + 1; + } + + *ppData = pCmprsor->aBuf[1]; + *nData = pCmprsor->nBuf[1]; + } else { + *ppData = pCmprsor->aBuf[0]; + *nData = pCmprsor->nBuf[0]; + } + +_exit: + return code; +}