refact more code

This commit is contained in:
Hongze Cheng 2022-09-26 10:47:24 +08:00
parent 64434678ca
commit efacafde33
3 changed files with 219 additions and 121 deletions

View File

@ -130,8 +130,8 @@ typedef struct SCompressor SCompressor;
int32_t tCompressorCreate(SCompressor **ppCmprsor); int32_t tCompressorCreate(SCompressor **ppCmprsor);
int32_t tCompressorDestroy(SCompressor *pCmprsor); int32_t tCompressorDestroy(SCompressor *pCmprsor);
int32_t tCompressorInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); int32_t tCompressStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg);
int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData);
int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData); int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -65,12 +65,12 @@ static int32_t tDiskColBuilderInit(SDiskColBuilder *pBuilder, int16_t cid, int8_
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
if (pBuilder->pOffC == NULL && (code = tCompressorCreate(&pBuilder->pOffC))) return code; if (pBuilder->pOffC == NULL && (code = tCompressorCreate(&pBuilder->pOffC))) return code;
code = tCompressorInit(pBuilder->pOffC, TSDB_DATA_TYPE_INT, cmprAlg); code = tCompressStart(pBuilder->pOffC, TSDB_DATA_TYPE_INT, cmprAlg);
if (code) return code; if (code) return code;
} }
if (pBuilder->pValC == NULL && (code = tCompressorCreate(&pBuilder->pValC))) return code; if (pBuilder->pValC == NULL && (code = tCompressorCreate(&pBuilder->pValC))) return code;
code = tCompressorInit(pBuilder->pValC, type, cmprAlg); code = tCompressStart(pBuilder->pValC, type, cmprAlg);
if (code) return code; if (code) return code;
if (pBuilder->calcSma) { if (pBuilder->calcSma) {
@ -116,13 +116,13 @@ static int32_t tGnrtDiskCol(SDiskColBuilder *pBuilder, SDiskCol *pDiskCol) {
// OFFSET // OFFSET
if (IS_VAR_DATA_TYPE(pBuilder->type)) { if (IS_VAR_DATA_TYPE(pBuilder->type)) {
code = tCompGen(pBuilder->pOffC, &pDiskCol->pOff, &pDiskCol->bCol.szOffset); code = tCompressEnd(pBuilder->pOffC, &pDiskCol->pOff, &pDiskCol->bCol.szOffset);
if (code) return code; if (code) return code;
} }
// VALUE // VALUE
if (pBuilder->flag != (HAS_NULL | HAS_NONE)) { if (pBuilder->flag != (HAS_NULL | HAS_NONE)) {
code = tCompGen(pBuilder->pValC, &pDiskCol->pVal, &pDiskCol->bCol.szValue); code = tCompressEnd(pBuilder->pValC, &pDiskCol->pVal, &pDiskCol->bCol.szValue);
if (code) return code; if (code) return code;
} }
@ -457,15 +457,15 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB
pBuilder->calcSma = calcSma; pBuilder->calcSma = calcSma;
if (pBuilder->pUidC == NULL && (code = tCompressorCreate(&pBuilder->pUidC))) return code; if (pBuilder->pUidC == NULL && (code = tCompressorCreate(&pBuilder->pUidC))) return code;
code = tCompressorInit(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg); code = tCompressStart(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg);
if (code) return code; if (code) return code;
if (pBuilder->pVerC == NULL && (code = tCompressorCreate(&pBuilder->pVerC))) return code; if (pBuilder->pVerC == NULL && (code = tCompressorCreate(&pBuilder->pVerC))) return code;
code = tCompressorInit(pBuilder->pVerC, TSDB_DATA_TYPE_BIGINT, cmprAlg); code = tCompressStart(pBuilder->pVerC, TSDB_DATA_TYPE_BIGINT, cmprAlg);
if (code) return code; if (code) return code;
if (pBuilder->pKeyC == NULL && (code = tCompressorCreate(&pBuilder->pKeyC))) return code; if (pBuilder->pKeyC == NULL && (code = tCompressorCreate(&pBuilder->pKeyC))) return code;
code = tCompressorInit(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg); code = tCompressStart(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg);
if (code) return code; if (code) return code;
if (pBuilder->aBuilder == NULL) { if (pBuilder->aBuilder == NULL) {
@ -586,16 +586,16 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) {
// UID // UID
if (pBuilder->uid == 0) { if (pBuilder->uid == 0) {
code = tCompGen(pBuilder->pUidC, &pDiskData->pUid, &pDiskData->hdr.szUid); code = tCompressEnd(pBuilder->pUidC, &pDiskData->pUid, &pDiskData->hdr.szUid);
if (code) return code; if (code) return code;
} }
// VERSION // VERSION
code = tCompGen(pBuilder->pVerC, &pDiskData->pVer, &pDiskData->hdr.szVer); code = tCompressEnd(pBuilder->pVerC, &pDiskData->pVer, &pDiskData->hdr.szVer);
if (code) return code; if (code) return code;
// TSKEY // TSKEY
code = tCompGen(pBuilder->pKeyC, &pDiskData->pKey, &pDiskData->hdr.szKey); code = tCompressEnd(pBuilder->pKeyC, &pDiskData->pKey, &pDiskData->hdr.szKey);
if (code) return code; if (code) return code;
int32_t offset = 0; int32_t offset = 0;

View File

@ -1001,49 +1001,157 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co
*************************************************************************/ *************************************************************************/
#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) #define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a)))
static int32_t tCompBoolInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); static int32_t tCompBoolStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg);
static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData); static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData);
static int32_t tCompIntInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); static int32_t tCompBoolEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData);
static int32_t tCompIntStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg);
static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData); static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData);
static int32_t tCompFloatInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); static int32_t tCompIntEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData);
static int32_t tCompFloatStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg);
static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData); static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData);
static int32_t tCompDoubleInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); static int32_t tCompFloatEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData);
static int32_t tCompDoubleStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg);
static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData); static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData);
static int32_t tCompTimestampInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); static int32_t tCompDoubleEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData);
static int32_t tCompTimestampStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg);
static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData); static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData);
static int32_t tCompBinaryInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); static int32_t tCompTimestampEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData);
static int32_t tCompBinaryStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg);
static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData); static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData);
static int32_t tCompBinaryEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData);
static struct { static struct {
int8_t type; int8_t type;
int32_t bytes; int32_t bytes;
int8_t isVarLen; int8_t isVarLen;
int32_t (*initFn)(SCompressor *, int8_t type, int8_t cmprAlg); int32_t (*startFn)(SCompressor *, int8_t type, int8_t cmprAlg);
int32_t (*cmprFn)(SCompressor *, const void *, int32_t nData); int32_t (*cmprFn)(SCompressor *, const void *, int32_t nData);
int32_t (*endFn)(SCompressor *, const uint8_t **, int32_t *);
} DATA_TYPE_INFO[] = { } DATA_TYPE_INFO[] = {
{.type = TSDB_DATA_TYPE_NULL, .bytes = 0, .isVarLen = 0, .initFn = NULL, .cmprFn = NULL}, // TSDB_DATA_TYPE_NULL {.type = TSDB_DATA_TYPE_NULL,
{.type = TSDB_DATA_TYPE_BOOL, .bytes = 1, .isVarLen = 0, .initFn = tCompBoolInit, .cmprFn = tCompBool}, .bytes = 0,
{.type = TSDB_DATA_TYPE_TINYINT, .bytes = 1, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, .isVarLen = 0,
{.type = TSDB_DATA_TYPE_SMALLINT, .bytes = 2, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, .startFn = NULL,
{.type = TSDB_DATA_TYPE_INT, .bytes = 4, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, .cmprFn = NULL,
{.type = TSDB_DATA_TYPE_BIGINT, .bytes = 8, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, .endFn = NULL}, // TSDB_DATA_TYPE_NULL
{.type = TSDB_DATA_TYPE_FLOAT, .bytes = 4, .isVarLen = 0, .initFn = tCompFloatInit, .cmprFn = tCompFloat}, {.type = TSDB_DATA_TYPE_BOOL,
{.type = TSDB_DATA_TYPE_DOUBLE, .bytes = 8, .isVarLen = 0, .initFn = tCompDoubleInit, .cmprFn = tCompDouble}, .bytes = 1,
{.type = TSDB_DATA_TYPE_VARCHAR, .bytes = 1, .isVarLen = 1, .initFn = tCompBinaryInit, .cmprFn = tCompBinary}, .isVarLen = 0,
.startFn = tCompBoolStart,
.cmprFn = tCompBool,
.endFn = tCompBoolEnd},
{.type = TSDB_DATA_TYPE_TINYINT,
.bytes = 1,
.isVarLen = 0,
.startFn = tCompIntStart,
.cmprFn = tCompInt,
.endFn = tCompIntEnd},
{.type = TSDB_DATA_TYPE_SMALLINT,
.bytes = 2,
.isVarLen = 0,
.startFn = tCompIntStart,
.cmprFn = tCompInt,
.endFn = tCompIntEnd},
{.type = TSDB_DATA_TYPE_INT,
.bytes = 4,
.isVarLen = 0,
.startFn = tCompIntStart,
.cmprFn = tCompInt,
.endFn = tCompIntEnd},
{.type = TSDB_DATA_TYPE_BIGINT,
.bytes = 8,
.isVarLen = 0,
.startFn = tCompIntStart,
.cmprFn = tCompInt,
.endFn = tCompIntEnd},
{.type = TSDB_DATA_TYPE_FLOAT,
.bytes = 4,
.isVarLen = 0,
.startFn = tCompFloatStart,
.cmprFn = tCompFloat,
.endFn = tCompFloatEnd},
{.type = TSDB_DATA_TYPE_DOUBLE,
.bytes = 8,
.isVarLen = 0,
.startFn = tCompDoubleStart,
.cmprFn = tCompDouble,
.endFn = tCompDoubleEnd},
{.type = TSDB_DATA_TYPE_VARCHAR,
.bytes = 1,
.isVarLen = 1,
.startFn = tCompBinaryStart,
.cmprFn = tCompBinary,
.endFn = tCompBinaryEnd},
{.type = TSDB_DATA_TYPE_TIMESTAMP, {.type = TSDB_DATA_TYPE_TIMESTAMP,
.bytes = 8, .bytes = 8,
.isVarLen = 0, .isVarLen = 0,
.initFn = tCompTimestampInit, .startFn = tCompTimestampStart,
.cmprFn = tCompTimestamp}, .cmprFn = tCompTimestamp,
{.type = TSDB_DATA_TYPE_NCHAR, .bytes = 1, .isVarLen = 1, .initFn = tCompBinaryInit, .cmprFn = tCompBinary}, .endFn = tCompTimestampEnd},
{.type = TSDB_DATA_TYPE_UTINYINT, .bytes = 1, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, {.type = TSDB_DATA_TYPE_NCHAR,
{.type = TSDB_DATA_TYPE_USMALLINT, .bytes = 2, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, .bytes = 1,
{.type = TSDB_DATA_TYPE_UINT, .bytes = 4, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, .isVarLen = 1,
{.type = TSDB_DATA_TYPE_UBIGINT, .bytes = 8, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, .startFn = tCompBinaryStart,
{.type = TSDB_DATA_TYPE_JSON, .bytes = 1, .isVarLen = 1, .initFn = tCompBinaryInit, .cmprFn = tCompBinary}, .cmprFn = tCompBinary,
{.type = TSDB_DATA_TYPE_VARBINARY, .bytes = 1, .isVarLen = 1, .initFn = tCompBinaryInit, .cmprFn = tCompBinary}, .endFn = tCompBinaryEnd},
{.type = TSDB_DATA_TYPE_DECIMAL, .bytes = 1, .isVarLen = 1, .initFn = tCompBinaryInit, .cmprFn = tCompBinary}, {.type = TSDB_DATA_TYPE_UTINYINT,
{.type = TSDB_DATA_TYPE_BLOB, .bytes = 1, .isVarLen = 1, .initFn = tCompBinaryInit, .cmprFn = tCompBinary}, .bytes = 1,
{.type = TSDB_DATA_TYPE_MEDIUMBLOB, .bytes = 1, .isVarLen = 1, .initFn = tCompBinaryInit, .cmprFn = tCompBinary}, .isVarLen = 0,
.startFn = tCompIntStart,
.cmprFn = tCompInt,
.endFn = tCompIntEnd},
{.type = TSDB_DATA_TYPE_USMALLINT,
.bytes = 2,
.isVarLen = 0,
.startFn = tCompIntStart,
.cmprFn = tCompInt,
.endFn = tCompIntEnd},
{.type = TSDB_DATA_TYPE_UINT,
.bytes = 4,
.isVarLen = 0,
.startFn = tCompIntStart,
.cmprFn = tCompInt,
.endFn = tCompIntEnd},
{.type = TSDB_DATA_TYPE_UBIGINT,
.bytes = 8,
.isVarLen = 0,
.startFn = tCompIntStart,
.cmprFn = tCompInt,
.endFn = tCompIntEnd},
{.type = TSDB_DATA_TYPE_JSON,
.bytes = 1,
.isVarLen = 1,
.startFn = tCompBinaryStart,
.cmprFn = tCompBinary,
.endFn = tCompBinaryEnd},
{.type = TSDB_DATA_TYPE_VARBINARY,
.bytes = 1,
.isVarLen = 1,
.startFn = tCompBinaryStart,
.cmprFn = tCompBinary,
.endFn = tCompBinaryEnd},
{.type = TSDB_DATA_TYPE_DECIMAL,
.bytes = 1,
.isVarLen = 1,
.startFn = tCompBinaryStart,
.cmprFn = tCompBinary,
.endFn = tCompBinaryEnd},
{.type = TSDB_DATA_TYPE_BLOB,
.bytes = 1,
.isVarLen = 1,
.startFn = tCompBinaryStart,
.cmprFn = tCompBinary,
.endFn = tCompBinaryEnd},
{.type = TSDB_DATA_TYPE_MEDIUMBLOB,
.bytes = 1,
.isVarLen = 1,
.startFn = tCompBinaryStart,
.cmprFn = tCompBinary,
.endFn = tCompBinaryEnd},
}; };
struct SCompressor { struct SCompressor {
@ -1083,9 +1191,13 @@ struct SCompressor {
}; };
// Timestamp ===================================================== // Timestamp =====================================================
static int32_t tCompTimestampInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { static int32_t tCompTimestampStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
// TODO pCmprsor->ts_prev_val = 0;
pCmprsor->ts_prev_delta = 0;
pCmprsor->ts_flag_p = NULL;
pCmprsor->aBuf[0][0] = 1;
pCmprsor->nBuf[0] = 1;
return code; return code;
} }
@ -1215,6 +1327,12 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t
return code; return code;
} }
static int32_t tCompTimestampEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) {
int32_t code = 0;
// TODO
return code;
}
// Integer ===================================================== // Integer =====================================================
#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) #define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL)
static const uint8_t BIT_PER_INTEGER[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; static const uint8_t BIT_PER_INTEGER[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
@ -1224,9 +1342,14 @@ static const uint8_t BIT_TO_SELECTOR[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10
15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,
15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15};
static int32_t tCompIntInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { static int32_t tCompIntStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
// TODO pCmprsor->i_prev = 0;
pCmprsor->i_selector = 0;
pCmprsor->i_start = 0;
pCmprsor->i_end = 0;
pCmprsor->aBuf[0][0] = 0;
pCmprsor->nBuf[0] = 1;
return code; return code;
} }
@ -1336,13 +1459,22 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData)
return code; return code;
} }
// Float ===================================================== static int32_t tCompIntEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) {
static int32_t tCompFloatInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
// TODO // TODO
return code; return code;
} }
// Float =====================================================
static int32_t tCompFloatStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0;
pCmprsor->f_prev = 0;
pCmprsor->f_flag_p = NULL;
pCmprsor->aBuf[0][0] = 0;
pCmprsor->nBuf[0] = 1;
return code;
}
static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData) { static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0; int32_t code = 0;
@ -1405,13 +1537,22 @@ static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nDat
return code; return code;
} }
// Double ===================================================== static int32_t tCompFloatEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) {
static int32_t tCompDoubleInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
// TODO // TODO
return code; return code;
} }
// Double =====================================================
static int32_t tCompDoubleStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0;
pCmprsor->d_prev = 0;
pCmprsor->d_flag_p = NULL;
pCmprsor->aBuf[0][0] = 0;
pCmprsor->nBuf[0] = 1;
return code;
}
static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData) { static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0; int32_t code = 0;
@ -1474,13 +1615,19 @@ static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nDa
return code; return code;
} }
// Binary ===================================================== static int32_t tCompDoubleEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) {
static int32_t tCompBinaryInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
// TODO // TODO
return code; return code;
} }
// Binary =====================================================
static int32_t tCompBinaryStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0;
pCmprsor->nBuf[0] = 0;
return code;
}
static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData) { static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0; int32_t code = 0;
@ -1498,12 +1645,18 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nDa
return code; return code;
} }
static int32_t tCompBinaryEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) {
int32_t code = 0;
// TODO
return code;
}
// Bool ===================================================== // Bool =====================================================
static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000};
static int32_t tCompBoolInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { static int32_t tCompBoolStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
// TODO pCmprsor->nBuf[0] = 0;
return code; return code;
} }
@ -1531,6 +1684,12 @@ static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData
return code; return code;
} }
static int32_t tCompBoolEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) {
int32_t code = 0;
// TODO
return code;
}
// SCompressor ===================================================== // SCompressor =====================================================
int32_t tCompressorCreate(SCompressor **ppCmprsor) { int32_t tCompressorCreate(SCompressor **ppCmprsor) {
int32_t code = 0; int32_t code = 0;
@ -1565,7 +1724,7 @@ int32_t tCompressorDestroy(SCompressor *pCmprsor) {
return code; return code;
} }
int32_t tCompressorInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { int32_t tCompressStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
pCmprsor->type = type; pCmprsor->type = type;
@ -1573,82 +1732,21 @@ int32_t tCompressorInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
pCmprsor->autoAlloc = 1; pCmprsor->autoAlloc = 1;
pCmprsor->nVal = 0; pCmprsor->nVal = 0;
switch (type) { if (DATA_TYPE_INFO[type].startFn) {
case TSDB_DATA_TYPE_TIMESTAMP: DATA_TYPE_INFO[type].startFn(pCmprsor, type, cmprAlg);
pCmprsor->ts_prev_val = 0;
pCmprsor->ts_prev_delta = 0;
pCmprsor->ts_flag_p = NULL;
pCmprsor->aBuf[0][0] = 1; // For timestamp, 1 means compressed, 0 otherwise
pCmprsor->nBuf[0] = 1;
break;
case TSDB_DATA_TYPE_BOOL:
pCmprsor->nBuf[0] = 0;
break;
case TSDB_DATA_TYPE_BINARY:
pCmprsor->nBuf[0] = 0;
break;
case TSDB_DATA_TYPE_FLOAT:
pCmprsor->f_prev = 0;
pCmprsor->f_flag_p = NULL;
pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility)
pCmprsor->nBuf[0] = 1;
break;
case TSDB_DATA_TYPE_DOUBLE:
pCmprsor->d_prev = 0;
pCmprsor->d_flag_p = NULL;
pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility)
pCmprsor->nBuf[0] = 1;
break;
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT:
pCmprsor->i_prev = 0;
pCmprsor->i_selector = 0;
pCmprsor->i_start = 0;
pCmprsor->i_end = 0;
pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility)
pCmprsor->nBuf[0] = 1;
break;
default:
break;
} }
return code; return code;
} }
int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) {
int32_t code = 0; int32_t code = 0;
if (pCmprsor->nVal == 0) {
*ppData = NULL; *ppData = NULL;
*nData = 0; *nData = 0;
return code;
}
if (pCmprsor->cmprAlg == TWO_STAGE_COMP /*|| IS_VAR_DATA_TYPE(pCmprsor->type)*/) { if (DATA_TYPE_INFO[pCmprsor->type].endFn) {
code = tRealloc(&pCmprsor->aBuf[1], pCmprsor->nBuf[0] + 1); return DATA_TYPE_INFO[pCmprsor->type].endFn(pCmprsor, ppData, nData);
if (code) return code;
int64_t ret = LZ4_compress_default(pCmprsor->aBuf[0], pCmprsor->aBuf[1] + 1, pCmprsor->nBuf[0], pCmprsor->nBuf[0]);
if (ret) {
pCmprsor->aBuf[1][0] = 0;
pCmprsor->nBuf[1] = ret + 1;
} else {
pCmprsor->aBuf[1][0] = 1;
memcpy(pCmprsor->aBuf[1] + 1, pCmprsor->aBuf[0], pCmprsor->nBuf[0]);
pCmprsor->nBuf[1] = pCmprsor->nBuf[0] + 1;
}
*ppData = pCmprsor->aBuf[1];
*nData = pCmprsor->nBuf[1];
} else {
*ppData = pCmprsor->aBuf[0];
*nData = pCmprsor->nBuf[0];
} }
return code; return code;