From e33c255ad189fda15d0c9ae451ba432546894ea5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 19 Sep 2022 15:57:58 +0800 Subject: [PATCH] more code --- source/util/src/tcompression.c | 243 +++++++++++++++++++++------------ 1 file changed, 156 insertions(+), 87 deletions(-) diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 4bec2f16ed..748ab8c975 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -1000,59 +1000,76 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co * STREAM COMPRESSION *************************************************************************/ #define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) +typedef struct SCompressor SCompressor; -typedef struct { +static struct { + int8_t type; + int32_t bytes; + int8_t isVarLen; + int32_t (*cmprFn)(SCompressor *, const void *, int32_t nData); +} DATA_TYPE_INFO[] = { + {TSDB_DATA_TYPE_NULL, 0, 0}, // TSDB_DATA_TYPE_NULL + {TSDB_DATA_TYPE_BOOL, 1, 0}, // TSDB_DATA_TYPE_BOOL + {TSDB_DATA_TYPE_TINYINT, 1, 0}, // TSDB_DATA_TYPE_TINYINT + {TSDB_DATA_TYPE_SMALLINT, 2, 0}, // TSDB_DATA_TYPE_SMALLINT + {TSDB_DATA_TYPE_INT, 4, 0}, // TSDB_DATA_TYPE_INT + {TSDB_DATA_TYPE_BIGINT, 8, 0}, // TSDB_DATA_TYPE_BIGINT + {TSDB_DATA_TYPE_FLOAT, 4, 0}, // TSDB_DATA_TYPE_FLOAT + {TSDB_DATA_TYPE_DOUBLE, 8, 0}, // TSDB_DATA_TYPE_DOUBLE + {TSDB_DATA_TYPE_VARCHAR, 1, 1}, // TSDB_DATA_TYPE_VARCHAR + {TSDB_DATA_TYPE_TIMESTAMP, 8, 0}, // pTSDB_DATA_TYPE_TIMESTAMP + {TSDB_DATA_TYPE_NCHAR, 1, 1}, // TSDB_DATA_TYPE_NCHAR + {TSDB_DATA_TYPE_UTINYINT, 1, 0}, // TSDB_DATA_TYPE_UTINYINT + {TSDB_DATA_TYPE_USMALLINT, 2, 0}, // TSDB_DATA_TYPE_USMALLINT + {TSDB_DATA_TYPE_UINT, 4, 0}, // TSDB_DATA_TYPE_UINT + {TSDB_DATA_TYPE_UBIGINT, 8, 0}, // TSDB_DATA_TYPE_UBIGINT + {TSDB_DATA_TYPE_JSON, 1, 1}, // TSDB_DATA_TYPE_JSON + {TSDB_DATA_TYPE_VARBINARY, 1, 1}, // TSDB_DATA_TYPE_VARBINARY + {TSDB_DATA_TYPE_DECIMAL, 1, 1}, // TSDB_DATA_TYPE_DECIMAL + {TSDB_DATA_TYPE_BLOB, 1, 1}, // TSDB_DATA_TYPE_BLOB + {TSDB_DATA_TYPE_MEDIUMBLOB, 1, 1}, // TSDB_DATA_TYPE_MEDIUMBLOB +}; + +struct SCompressor { int8_t type; int8_t cmprAlg; int8_t autoAlloc; + int32_t nVal; uint8_t *aBuf[2]; int64_t nBuf[2]; union { // Timestamp ---- struct { - int32_t ts_n; int64_t ts_prev_val; int64_t ts_prev_delta; uint8_t *ts_flag_p; }; // Integer ---- struct { - int8_t i_copy; - int32_t i_n; int64_t i_prev; int32_t i_selector; int32_t i_nele; }; // Float ---- struct { - int32_t f_n; uint32_t f_prev; uint8_t *f_flag_p; }; // Double ---- struct { - int32_t d_n; uint64_t d_prev; uint8_t *d_flag_p; }; - // Bool ---- - struct { - int32_t bool_n; - }; - // Binary ---- - struct { - int32_t binary_n; - }; }; -} SCompressor; +}; // Timestamp ===================================================== static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { int32_t code = 0; - if (pCmprsor->ts_n) { + if (pCmprsor->nVal) { if (pCmprsor->autoAlloc) { - code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * pCmprsor->ts_n); + code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * pCmprsor->nVal); if (code) return code; } pCmprsor->nBuf[1] = 0; @@ -1110,7 +1127,7 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) { ASSERT(pCmprsor->type == TSDB_DATA_TYPE_TIMESTAMP); if (pCmprsor->aBuf[0][0] == 1) { - if (pCmprsor->ts_n == 0) { + if (pCmprsor->nVal == 0) { pCmprsor->ts_prev_val = ts; pCmprsor->ts_prev_delta = -ts; } @@ -1133,7 +1150,7 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) { pCmprsor->ts_prev_val = ts; pCmprsor->ts_prev_delta = delta; - if ((pCmprsor->ts_n & 0x1) == 0) { + if ((pCmprsor->nVal & 0x1) == 0) { if (pCmprsor->autoAlloc) { code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17); if (code) return code; @@ -1166,7 +1183,7 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) { memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts)); pCmprsor->nBuf[0] += sizeof(ts); } - pCmprsor->ts_n++; + pCmprsor->nVal++; return code; } @@ -1180,76 +1197,114 @@ static const char bit_to_selector[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; -static int32_t tCompI64(SCompressor *pCmprsor, int64_t val) { +static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; - if (pCmprsor->i_copy == 1) goto _copy_cmpr; + ASSERT(nData == DATA_TYPE_INFO[pCmprsor->type].bytes); - if (!I64_SAFE_ADD(val, pCmprsor->i_prev)) { - // TODO - goto _copy_cmpr; - } + if (pCmprsor->aBuf[0][0] == 0) { + int64_t val; - int64_t diff = val - pCmprsor->i_prev; - uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, diff); - - if (vZigzag >= SIMPLE8B_MAX) { - // TODO - goto _copy_cmpr; - } - - int64_t nBit; - if (vZigzag) { - nBit = 64 - BUILDIN_CLZL(vZigzag); - } else { - nBit = 0; - } - - if (pCmprsor->i_nele + 1 <= selector_to_elems[pCmprsor->i_selector] && - pCmprsor->i_nele + 1 <= selector_to_elems[bit_to_selector[nBit]]) { - if (pCmprsor->i_selector < bit_to_selector[nBit]) { - pCmprsor->i_selector = bit_to_selector[nBit]; + switch (pCmprsor->type) { + case TSDB_DATA_TYPE_TINYINT: + val = *(int8_t *)pData; + break; + case TSDB_DATA_TYPE_SMALLINT: + val = *(int16_t *)pData; + break; + case TSDB_DATA_TYPE_INT: + val = *(int32_t *)pData; + break; + case TSDB_DATA_TYPE_BIGINT: + val = *(int64_t *)pData; + break; + case TSDB_DATA_TYPE_UTINYINT: + val = *(uint8_t *)pData; + break; + case TSDB_DATA_TYPE_USMALLINT: + val = *(uint16_t *)pData; + break; + case TSDB_DATA_TYPE_UINT: + val = *(uint32_t *)pData; + break; + // case TSDB_DATA_TYPE_UBIGINT: + // val = *(int64_t *)pData; + // break; + default: + ASSERT(0); + break; } - pCmprsor->i_nele++; - } else { - while (pCmprsor->i_nele < selector_to_elems[pCmprsor->i_selector]) { - pCmprsor->i_selector++; - } - pCmprsor->i_nele = selector_to_elems[pCmprsor->i_selector]; - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t)); + if (!I64_SAFE_ADD(val, pCmprsor->i_prev)) { + // TODO + goto _copy_cmpr; + } + + int64_t diff = val - pCmprsor->i_prev; + uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, diff); + + if (vZigzag >= SIMPLE8B_MAX) { + // TODO + goto _copy_cmpr; + } + + int64_t nBit; + if (vZigzag) { + nBit = 64 - BUILDIN_CLZL(vZigzag); + } else { + nBit = 0; + } + + if (pCmprsor->i_nele + 1 <= selector_to_elems[pCmprsor->i_selector] && + pCmprsor->i_nele + 1 <= selector_to_elems[bit_to_selector[nBit]]) { + if (pCmprsor->i_selector < bit_to_selector[nBit]) { + pCmprsor->i_selector = bit_to_selector[nBit]; + } + pCmprsor->i_nele++; + pCmprsor->i_prev = val; + } else { + while (pCmprsor->i_nele < selector_to_elems[pCmprsor->i_selector]) { + pCmprsor->i_selector++; + } + pCmprsor->i_nele = selector_to_elems[pCmprsor->i_selector]; + + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t)); + if (code) return code; + + uint64_t *bp = (uint64_t *)(pCmprsor->aBuf[0] + pCmprsor->nBuf[0]); + pCmprsor->nBuf[0] += sizeof(uint64_t); + bp[0] = pCmprsor->i_selector; + for (int32_t iVal = 0; iVal < pCmprsor->i_nele; iVal++) { + /* code */ + } + + // reset and continue + pCmprsor->i_nele = 0; + pCmprsor->i_selector = 0; + } + } else { + _copy_cmpr: + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData); if (code) return code; - uint64_t *bp = (uint64_t *)(pCmprsor->aBuf[0] + pCmprsor->nBuf[0]); - pCmprsor->nBuf[0] += sizeof(uint64_t); - bp[0] = pCmprsor->i_selector; - for (int32_t iVal = 0; iVal < pCmprsor->i_nele; iVal++) { - /* code */ - } - - // reset and continue + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); + pCmprsor->nBuf[0] += nData; } - - return code; - -_copy_cmpr: - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 0 /*tDataTypes[pCmprsor->type].bytes (todo)*/); - if (code) return code; - - // memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], NULL /* todo */, 0 /*tDataTypes[pCmprsor->type].bytes (todo)*/); - // pCmprsor->nBuf[0] += tDataTypes[pCmprsor->type].bytes; + pCmprsor->nVal++; return code; } // Float ===================================================== -static int32_t tCompFloat(SCompressor *pCmprsor, float f) { +static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; + ASSERT(nData == sizeof(float)); + union { float f; uint32_t u; - } val = {.f = f}; + } val = {.f = *(float *)pData}; uint32_t diff = val.u ^ pCmprsor->f_prev; pCmprsor->f_prev = val.u; @@ -1272,7 +1327,7 @@ static int32_t tCompFloat(SCompressor *pCmprsor, float f) { } if (nBytes == 0) nBytes++; - if ((pCmprsor->f_n & 0x1) == 0) { + if ((pCmprsor->nVal & 0x1) == 0) { if (pCmprsor->autoAlloc) { code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 9); if (code) return code; @@ -1298,19 +1353,21 @@ static int32_t tCompFloat(SCompressor *pCmprsor, float f) { pCmprsor->nBuf[0]++; diff >>= BITS_PER_BYTE; } - pCmprsor->f_n++; + pCmprsor->nVal++; return code; } // Double ===================================================== -static int32_t tCompDouble(SCompressor *pCmprsor, double d) { +static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; + ASSERT(nData == sizeof(double)); + union { double d; uint64_t u; - } val = {.d = d}; + } val = {.d = *(double *)pData}; uint64_t diff = val.u ^ pCmprsor->d_prev; pCmprsor->d_prev = val.u; @@ -1333,7 +1390,7 @@ static int32_t tCompDouble(SCompressor *pCmprsor, double d) { } if (nBytes == 0) nBytes++; - if ((pCmprsor->d_n & 0x1) == 0) { + if ((pCmprsor->nVal & 0x1) == 0) { if (pCmprsor->autoAlloc) { code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17); if (code) return code; @@ -1359,13 +1416,13 @@ static int32_t tCompDouble(SCompressor *pCmprsor, double d) { pCmprsor->nBuf[0]++; diff >>= BITS_PER_BYTE; } - pCmprsor->d_n++; + pCmprsor->nVal++; return code; } // Binary ===================================================== -static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t nData) { +static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; if (nData) { @@ -1377,7 +1434,7 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); pCmprsor->nBuf[0] += nData; } - pCmprsor->binary_n++; + pCmprsor->nVal++; return code; } @@ -1385,10 +1442,12 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t // Bool ===================================================== static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; -static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { +static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; - int32_t mod4 = pCmprsor->bool_n & 3; + bool vBool = *(int8_t *)pData; + + int32_t mod4 = pCmprsor->nVal & 3; if (mod4 == 0) { pCmprsor->nBuf[0]++; @@ -1402,7 +1461,7 @@ static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) { if (vBool) { pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] |= BOOL_CMPR_TABLE[mod4]; } - pCmprsor->bool_n++; + pCmprsor->nVal++; return code; } @@ -1449,10 +1508,10 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int pCmprsor->type = type; pCmprsor->cmprAlg = cmprAlg; pCmprsor->autoAlloc = autoAlloc; + pCmprsor->nVal = 0; switch (type) { case TSDB_DATA_TYPE_TIMESTAMP: - pCmprsor->ts_n = 0; pCmprsor->ts_prev_val = 0; pCmprsor->ts_prev_delta = 0; pCmprsor->ts_flag_p = NULL; @@ -1460,27 +1519,37 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int pCmprsor->nBuf[0] = 1; break; case TSDB_DATA_TYPE_BOOL: - pCmprsor->bool_n = 0; pCmprsor->nBuf[0] = 0; break; case TSDB_DATA_TYPE_BINARY: - pCmprsor->binary_n = 0; pCmprsor->nBuf[0] = 0; break; case TSDB_DATA_TYPE_FLOAT: - pCmprsor->f_n = 0; pCmprsor->f_prev = 0; pCmprsor->f_flag_p = NULL; pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) pCmprsor->nBuf[0] = 1; break; case TSDB_DATA_TYPE_DOUBLE: - pCmprsor->d_n = 0; pCmprsor->d_prev = 0; pCmprsor->d_flag_p = NULL; pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) pCmprsor->nBuf[0] = 1; break; + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_USMALLINT: + case TSDB_DATA_TYPE_UINT: + case TSDB_DATA_TYPE_UBIGINT: + pCmprsor->i_prev = 0; + pCmprsor->i_selector = 0; + pCmprsor->i_nele = 0; + pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) + pCmprsor->nBuf[0] = 1; + break; default: break; }