more code

This commit is contained in:
Hongze Cheng 2022-09-19 15:57:58 +08:00
parent 61496324e7
commit e33c255ad1
1 changed files with 156 additions and 87 deletions

View File

@ -1000,59 +1000,76 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co
* STREAM COMPRESSION * STREAM COMPRESSION
*************************************************************************/ *************************************************************************/
#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) #define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a)))
typedef struct SCompressor SCompressor;
typedef struct { static struct {
int8_t type;
int32_t bytes;
int8_t isVarLen;
int32_t (*cmprFn)(SCompressor *, const void *, int32_t nData);
} DATA_TYPE_INFO[] = {
{TSDB_DATA_TYPE_NULL, 0, 0}, // TSDB_DATA_TYPE_NULL
{TSDB_DATA_TYPE_BOOL, 1, 0}, // TSDB_DATA_TYPE_BOOL
{TSDB_DATA_TYPE_TINYINT, 1, 0}, // TSDB_DATA_TYPE_TINYINT
{TSDB_DATA_TYPE_SMALLINT, 2, 0}, // TSDB_DATA_TYPE_SMALLINT
{TSDB_DATA_TYPE_INT, 4, 0}, // TSDB_DATA_TYPE_INT
{TSDB_DATA_TYPE_BIGINT, 8, 0}, // TSDB_DATA_TYPE_BIGINT
{TSDB_DATA_TYPE_FLOAT, 4, 0}, // TSDB_DATA_TYPE_FLOAT
{TSDB_DATA_TYPE_DOUBLE, 8, 0}, // TSDB_DATA_TYPE_DOUBLE
{TSDB_DATA_TYPE_VARCHAR, 1, 1}, // TSDB_DATA_TYPE_VARCHAR
{TSDB_DATA_TYPE_TIMESTAMP, 8, 0}, // pTSDB_DATA_TYPE_TIMESTAMP
{TSDB_DATA_TYPE_NCHAR, 1, 1}, // TSDB_DATA_TYPE_NCHAR
{TSDB_DATA_TYPE_UTINYINT, 1, 0}, // TSDB_DATA_TYPE_UTINYINT
{TSDB_DATA_TYPE_USMALLINT, 2, 0}, // TSDB_DATA_TYPE_USMALLINT
{TSDB_DATA_TYPE_UINT, 4, 0}, // TSDB_DATA_TYPE_UINT
{TSDB_DATA_TYPE_UBIGINT, 8, 0}, // TSDB_DATA_TYPE_UBIGINT
{TSDB_DATA_TYPE_JSON, 1, 1}, // TSDB_DATA_TYPE_JSON
{TSDB_DATA_TYPE_VARBINARY, 1, 1}, // TSDB_DATA_TYPE_VARBINARY
{TSDB_DATA_TYPE_DECIMAL, 1, 1}, // TSDB_DATA_TYPE_DECIMAL
{TSDB_DATA_TYPE_BLOB, 1, 1}, // TSDB_DATA_TYPE_BLOB
{TSDB_DATA_TYPE_MEDIUMBLOB, 1, 1}, // TSDB_DATA_TYPE_MEDIUMBLOB
};
struct SCompressor {
int8_t type; int8_t type;
int8_t cmprAlg; int8_t cmprAlg;
int8_t autoAlloc; int8_t autoAlloc;
int32_t nVal;
uint8_t *aBuf[2]; uint8_t *aBuf[2];
int64_t nBuf[2]; int64_t nBuf[2];
union { union {
// Timestamp ---- // Timestamp ----
struct { struct {
int32_t ts_n;
int64_t ts_prev_val; int64_t ts_prev_val;
int64_t ts_prev_delta; int64_t ts_prev_delta;
uint8_t *ts_flag_p; uint8_t *ts_flag_p;
}; };
// Integer ---- // Integer ----
struct { struct {
int8_t i_copy;
int32_t i_n;
int64_t i_prev; int64_t i_prev;
int32_t i_selector; int32_t i_selector;
int32_t i_nele; int32_t i_nele;
}; };
// Float ---- // Float ----
struct { struct {
int32_t f_n;
uint32_t f_prev; uint32_t f_prev;
uint8_t *f_flag_p; uint8_t *f_flag_p;
}; };
// Double ---- // Double ----
struct { struct {
int32_t d_n;
uint64_t d_prev; uint64_t d_prev;
uint8_t *d_flag_p; uint8_t *d_flag_p;
}; };
// Bool ----
struct {
int32_t bool_n;
}; };
// Binary ---- };
struct {
int32_t binary_n;
};
};
} SCompressor;
// Timestamp ===================================================== // Timestamp =====================================================
static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { static int32_t tCompSetCopyMode(SCompressor *pCmprsor) {
int32_t code = 0; int32_t code = 0;
if (pCmprsor->ts_n) { if (pCmprsor->nVal) {
if (pCmprsor->autoAlloc) { if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * pCmprsor->ts_n); code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * pCmprsor->nVal);
if (code) return code; if (code) return code;
} }
pCmprsor->nBuf[1] = 0; pCmprsor->nBuf[1] = 0;
@ -1110,7 +1127,7 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) {
ASSERT(pCmprsor->type == TSDB_DATA_TYPE_TIMESTAMP); ASSERT(pCmprsor->type == TSDB_DATA_TYPE_TIMESTAMP);
if (pCmprsor->aBuf[0][0] == 1) { if (pCmprsor->aBuf[0][0] == 1) {
if (pCmprsor->ts_n == 0) { if (pCmprsor->nVal == 0) {
pCmprsor->ts_prev_val = ts; pCmprsor->ts_prev_val = ts;
pCmprsor->ts_prev_delta = -ts; pCmprsor->ts_prev_delta = -ts;
} }
@ -1133,7 +1150,7 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) {
pCmprsor->ts_prev_val = ts; pCmprsor->ts_prev_val = ts;
pCmprsor->ts_prev_delta = delta; pCmprsor->ts_prev_delta = delta;
if ((pCmprsor->ts_n & 0x1) == 0) { if ((pCmprsor->nVal & 0x1) == 0) {
if (pCmprsor->autoAlloc) { if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17); code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17);
if (code) return code; if (code) return code;
@ -1166,7 +1183,7 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) {
memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts)); memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts));
pCmprsor->nBuf[0] += sizeof(ts); pCmprsor->nBuf[0] += sizeof(ts);
} }
pCmprsor->ts_n++; pCmprsor->nVal++;
return code; return code;
} }
@ -1180,10 +1197,43 @@ static const char bit_to_selector[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10
15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,
15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15};
static int32_t tCompI64(SCompressor *pCmprsor, int64_t val) { static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0; int32_t code = 0;
if (pCmprsor->i_copy == 1) goto _copy_cmpr; ASSERT(nData == DATA_TYPE_INFO[pCmprsor->type].bytes);
if (pCmprsor->aBuf[0][0] == 0) {
int64_t val;
switch (pCmprsor->type) {
case TSDB_DATA_TYPE_TINYINT:
val = *(int8_t *)pData;
break;
case TSDB_DATA_TYPE_SMALLINT:
val = *(int16_t *)pData;
break;
case TSDB_DATA_TYPE_INT:
val = *(int32_t *)pData;
break;
case TSDB_DATA_TYPE_BIGINT:
val = *(int64_t *)pData;
break;
case TSDB_DATA_TYPE_UTINYINT:
val = *(uint8_t *)pData;
break;
case TSDB_DATA_TYPE_USMALLINT:
val = *(uint16_t *)pData;
break;
case TSDB_DATA_TYPE_UINT:
val = *(uint32_t *)pData;
break;
// case TSDB_DATA_TYPE_UBIGINT:
// val = *(int64_t *)pData;
// break;
default:
ASSERT(0);
break;
}
if (!I64_SAFE_ADD(val, pCmprsor->i_prev)) { if (!I64_SAFE_ADD(val, pCmprsor->i_prev)) {
// TODO // TODO
@ -1211,6 +1261,7 @@ static int32_t tCompI64(SCompressor *pCmprsor, int64_t val) {
pCmprsor->i_selector = bit_to_selector[nBit]; pCmprsor->i_selector = bit_to_selector[nBit];
} }
pCmprsor->i_nele++; pCmprsor->i_nele++;
pCmprsor->i_prev = val;
} else { } else {
while (pCmprsor->i_nele < selector_to_elems[pCmprsor->i_selector]) { while (pCmprsor->i_nele < selector_to_elems[pCmprsor->i_selector]) {
pCmprsor->i_selector++; pCmprsor->i_selector++;
@ -1228,28 +1279,32 @@ static int32_t tCompI64(SCompressor *pCmprsor, int64_t val) {
} }
// reset and continue // reset and continue
pCmprsor->i_nele = 0;
pCmprsor->i_selector = 0;
} }
} else {
return code; _copy_cmpr:
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData);
_copy_cmpr:
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 0 /*tDataTypes[pCmprsor->type].bytes (todo)*/);
if (code) return code; if (code) return code;
// memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], NULL /* todo */, 0 /*tDataTypes[pCmprsor->type].bytes (todo)*/); memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData);
// pCmprsor->nBuf[0] += tDataTypes[pCmprsor->type].bytes; pCmprsor->nBuf[0] += nData;
}
pCmprsor->nVal++;
return code; return code;
} }
// Float ===================================================== // Float =====================================================
static int32_t tCompFloat(SCompressor *pCmprsor, float f) { static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0; int32_t code = 0;
ASSERT(nData == sizeof(float));
union { union {
float f; float f;
uint32_t u; uint32_t u;
} val = {.f = f}; } val = {.f = *(float *)pData};
uint32_t diff = val.u ^ pCmprsor->f_prev; uint32_t diff = val.u ^ pCmprsor->f_prev;
pCmprsor->f_prev = val.u; pCmprsor->f_prev = val.u;
@ -1272,7 +1327,7 @@ static int32_t tCompFloat(SCompressor *pCmprsor, float f) {
} }
if (nBytes == 0) nBytes++; if (nBytes == 0) nBytes++;
if ((pCmprsor->f_n & 0x1) == 0) { if ((pCmprsor->nVal & 0x1) == 0) {
if (pCmprsor->autoAlloc) { if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 9); code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 9);
if (code) return code; if (code) return code;
@ -1298,19 +1353,21 @@ static int32_t tCompFloat(SCompressor *pCmprsor, float f) {
pCmprsor->nBuf[0]++; pCmprsor->nBuf[0]++;
diff >>= BITS_PER_BYTE; diff >>= BITS_PER_BYTE;
} }
pCmprsor->f_n++; pCmprsor->nVal++;
return code; return code;
} }
// Double ===================================================== // Double =====================================================
static int32_t tCompDouble(SCompressor *pCmprsor, double d) { static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0; int32_t code = 0;
ASSERT(nData == sizeof(double));
union { union {
double d; double d;
uint64_t u; uint64_t u;
} val = {.d = d}; } val = {.d = *(double *)pData};
uint64_t diff = val.u ^ pCmprsor->d_prev; uint64_t diff = val.u ^ pCmprsor->d_prev;
pCmprsor->d_prev = val.u; pCmprsor->d_prev = val.u;
@ -1333,7 +1390,7 @@ static int32_t tCompDouble(SCompressor *pCmprsor, double d) {
} }
if (nBytes == 0) nBytes++; if (nBytes == 0) nBytes++;
if ((pCmprsor->d_n & 0x1) == 0) { if ((pCmprsor->nVal & 0x1) == 0) {
if (pCmprsor->autoAlloc) { if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17); code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17);
if (code) return code; if (code) return code;
@ -1359,13 +1416,13 @@ static int32_t tCompDouble(SCompressor *pCmprsor, double d) {
pCmprsor->nBuf[0]++; pCmprsor->nBuf[0]++;
diff >>= BITS_PER_BYTE; diff >>= BITS_PER_BYTE;
} }
pCmprsor->d_n++; pCmprsor->nVal++;
return code; return code;
} }
// Binary ===================================================== // Binary =====================================================
static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t nData) { static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0; int32_t code = 0;
if (nData) { if (nData) {
@ -1377,7 +1434,7 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t
memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData);
pCmprsor->nBuf[0] += nData; pCmprsor->nBuf[0] += nData;
} }
pCmprsor->binary_n++; pCmprsor->nVal++;
return code; return code;
} }
@ -1385,10 +1442,12 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t
// Bool ===================================================== // Bool =====================================================
static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000};
static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0; int32_t code = 0;
int32_t mod4 = pCmprsor->bool_n & 3; bool vBool = *(int8_t *)pData;
int32_t mod4 = pCmprsor->nVal & 3;
if (mod4 == 0) { if (mod4 == 0) {
pCmprsor->nBuf[0]++; pCmprsor->nBuf[0]++;
@ -1402,7 +1461,7 @@ static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) {
if (vBool) { if (vBool) {
pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] |= BOOL_CMPR_TABLE[mod4]; pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] |= BOOL_CMPR_TABLE[mod4];
} }
pCmprsor->bool_n++; pCmprsor->nVal++;
return code; return code;
} }
@ -1449,10 +1508,10 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int
pCmprsor->type = type; pCmprsor->type = type;
pCmprsor->cmprAlg = cmprAlg; pCmprsor->cmprAlg = cmprAlg;
pCmprsor->autoAlloc = autoAlloc; pCmprsor->autoAlloc = autoAlloc;
pCmprsor->nVal = 0;
switch (type) { switch (type) {
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
pCmprsor->ts_n = 0;
pCmprsor->ts_prev_val = 0; pCmprsor->ts_prev_val = 0;
pCmprsor->ts_prev_delta = 0; pCmprsor->ts_prev_delta = 0;
pCmprsor->ts_flag_p = NULL; pCmprsor->ts_flag_p = NULL;
@ -1460,27 +1519,37 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int
pCmprsor->nBuf[0] = 1; pCmprsor->nBuf[0] = 1;
break; break;
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
pCmprsor->bool_n = 0;
pCmprsor->nBuf[0] = 0; pCmprsor->nBuf[0] = 0;
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
pCmprsor->binary_n = 0;
pCmprsor->nBuf[0] = 0; pCmprsor->nBuf[0] = 0;
break; break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
pCmprsor->f_n = 0;
pCmprsor->f_prev = 0; pCmprsor->f_prev = 0;
pCmprsor->f_flag_p = NULL; pCmprsor->f_flag_p = NULL;
pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility)
pCmprsor->nBuf[0] = 1; pCmprsor->nBuf[0] = 1;
break; break;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
pCmprsor->d_n = 0;
pCmprsor->d_prev = 0; pCmprsor->d_prev = 0;
pCmprsor->d_flag_p = NULL; pCmprsor->d_flag_p = NULL;
pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility)
pCmprsor->nBuf[0] = 1; pCmprsor->nBuf[0] = 1;
break; break;
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT:
pCmprsor->i_prev = 0;
pCmprsor->i_selector = 0;
pCmprsor->i_nele = 0;
pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility)
pCmprsor->nBuf[0] = 1;
break;
default: default:
break; break;
} }