compression

This commit is contained in:
Shengliang Guan 2022-02-28 10:47:44 +08:00
parent 3136a23b8f
commit 3e49b78ab7
4 changed files with 272 additions and 248 deletions

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_UTIL_ARRAY_H #ifndef _TD_UTIL_ARRAY_H_
#define _TD_UTIL_ARRAY_H #define _TD_UTIL_ARRAY_H_
#include "talgo.h" #include "talgo.h"
@ -268,4 +268,4 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par
} }
#endif #endif
#endif /*_TD_UTIL_ARRAY_H*/ #endif /*_TD_UTIL_ARRAY_H_*/

View File

@ -12,8 +12,9 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_UTIL_CODING_H
#define _TD_UTIL_CODING_H #ifndef _TD_UTIL_CODING_H_
#define _TD_UTIL_CODING_H_
#include "os.h" #include "os.h"
@ -403,4 +404,4 @@ static FORCE_INLINE void *taosDecodeBinaryTo(const void *buf, void *value, int32
} }
#endif #endif
#endif /*_TD_UTIL_CODING_H*/ #endif /*_TD_UTIL_CODING_H_*/

View File

@ -13,16 +13,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_UTIL_COMPRESSION_H #ifndef _TD_UTIL_COMPRESSION_H_
#define _TD_UTIL_COMPRESSION_H #define _TD_UTIL_COMPRESSION_H_
#include "os.h"
#include "taos.h"
#include "tutil.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "taos.h"
#include "tutil.h"
#define COMP_OVERFLOW_BYTES 2 #define COMP_OVERFLOW_BYTES 2
#define BITS_PER_BYTE 8 #define BITS_PER_BYTE 8
// Masks // Masks
@ -50,39 +51,42 @@ extern "C" {
#define HEAD_MODE(x) x % 2 #define HEAD_MODE(x) x % 2
#define HEAD_ALGO(x) x / 2 #define HEAD_ALGO(x) x / 2
extern int tsCompressINTImp(const char *const input, const int nelements, char *const output, const char type); extern int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type);
extern int tsDecompressINTImp(const char *const input, const int nelements, char *const output, const char type); extern int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, char *const output,
extern int tsCompressBoolImp(const char *const input, const int nelements, char *const output); const char type);
extern int tsDecompressBoolImp(const char *const input, const int nelements, char *const output); extern int32_t tsCompressBoolImp(const char *const input, const int32_t nelements, char *const output);
extern int tsCompressStringImp(const char *const input, int inputSize, char *const output, int outputSize); extern int32_t tsDecompressBoolImp(const char *const input, const int32_t nelements, char *const output);
extern int tsDecompressStringImp(const char *const input, int compressedSize, char *const output, int outputSize); extern int32_t tsCompressStringImp(const char *const input, int32_t inputSize, char *const output, int32_t outputSize);
extern int tsCompressTimestampImp(const char *const input, const int nelements, char *const output); extern int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, char *const output,
extern int tsDecompressTimestampImp(const char *const input, const int nelements, char *const output); int32_t outputSize);
extern int tsCompressDoubleImp(const char *const input, const int nelements, char *const output); extern int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output);
extern int tsDecompressDoubleImp(const char *const input, const int nelements, char *const output); extern int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelements, char *const output);
extern int tsCompressFloatImp(const char *const input, const int nelements, char *const output); extern int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, char *const output);
extern int tsDecompressFloatImp(const char *const input, const int nelements, char *const output); extern int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output);
// lossy // lossy
extern int tsCompressFloatLossyImp(const char * input, const int nelements, char *const output); extern int32_t tsCompressFloatLossyImp(const char *input, const int32_t nelements, char *const output);
extern int tsDecompressFloatLossyImp(const char * input, int compressedSize, const int nelements, char *const output); extern int32_t tsDecompressFloatLossyImp(const char *input, int32_t compressedSize, const int32_t nelements,
extern int tsCompressDoubleLossyImp(const char * input, const int nelements, char *const output); char *const output);
extern int tsDecompressDoubleLossyImp(const char * input, int compressedSize, const int nelements, char *const output); extern int32_t tsCompressDoubleLossyImp(const char *input, const int32_t nelements, char *const output);
extern int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, const int32_t nelements,
char *const output);
#ifdef TD_TSZ #ifdef TD_TSZ
extern bool lossyFloat; extern bool lossyFloat;
extern bool lossyDouble; extern bool lossyDouble;
// init call int32_t tsCompressInit();
int tsCompressInit();
// exit call
void tsCompressExit(); void tsCompressExit();
#endif #endif
static FORCE_INLINE int tsCompressTinyint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, static FORCE_INLINE int32_t tsCompressTinyint(const char *const input, int32_t inputSize, const int32_t nelements,
char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT); return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT);
} else if (algorithm == TWO_STAGE_COMP) { } else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_TINYINT); int32_t len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_TINYINT);
return tsCompressStringImp(buffer, len, output, outputSize); return tsCompressStringImp(buffer, len, output, outputSize);
} else { } else {
assert(0); assert(0);
@ -90,8 +94,9 @@ static FORCE_INLINE int tsCompressTinyint(const char *const input, int inputSize
} }
} }
static FORCE_INLINE int tsDecompressTinyint(const char *const input, int compressedSize, const int nelements, char *const output, static FORCE_INLINE int32_t tsDecompressTinyint(const char *const input, int32_t compressedSize,
int outputSize, char algorithm, char *const buffer, int bufferSize) { const int32_t nelements, char *const output, int32_t outputSize,
char algorithm, char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT); return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT);
} else if (algorithm == TWO_STAGE_COMP) { } else if (algorithm == TWO_STAGE_COMP) {
@ -103,12 +108,13 @@ static FORCE_INLINE int tsDecompressTinyint(const char *const input, int compres
} }
} }
static FORCE_INLINE int tsCompressSmallint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, static FORCE_INLINE int32_t tsCompressSmallint(const char *const input, int32_t inputSize, const int32_t nelements,
char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT); return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT);
} else if (algorithm == TWO_STAGE_COMP) { } else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_SMALLINT); int32_t len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_SMALLINT);
return tsCompressStringImp(buffer, len, output, outputSize); return tsCompressStringImp(buffer, len, output, outputSize);
} else { } else {
assert(0); assert(0);
@ -116,8 +122,9 @@ static FORCE_INLINE int tsCompressSmallint(const char *const input, int inputSiz
} }
} }
static FORCE_INLINE int tsDecompressSmallint(const char *const input, int compressedSize, const int nelements, char *const output, static FORCE_INLINE int32_t tsDecompressSmallint(const char *const input, int32_t compressedSize,
int outputSize, char algorithm, char *const buffer, int bufferSize) { const int32_t nelements, char *const output, int32_t outputSize,
char algorithm, char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT); return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT);
} else if (algorithm == TWO_STAGE_COMP) { } else if (algorithm == TWO_STAGE_COMP) {
@ -129,12 +136,13 @@ static FORCE_INLINE int tsDecompressSmallint(const char *const input, int compre
} }
} }
static FORCE_INLINE int tsCompressInt(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, static FORCE_INLINE int32_t tsCompressInt(const char *const input, int32_t inputSize, const int32_t nelements,
char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT); return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT);
} else if (algorithm == TWO_STAGE_COMP) { } else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_INT); int32_t len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_INT);
return tsCompressStringImp(buffer, len, output, outputSize); return tsCompressStringImp(buffer, len, output, outputSize);
} else { } else {
assert(0); assert(0);
@ -142,8 +150,9 @@ static FORCE_INLINE int tsCompressInt(const char *const input, int inputSize, co
} }
} }
static FORCE_INLINE int tsDecompressInt(const char *const input, int compressedSize, const int nelements, char *const output, static FORCE_INLINE int32_t tsDecompressInt(const char *const input, int32_t compressedSize, const int32_t nelements,
int outputSize, char algorithm, char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT); return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT);
} else if (algorithm == TWO_STAGE_COMP) { } else if (algorithm == TWO_STAGE_COMP) {
@ -155,12 +164,13 @@ static FORCE_INLINE int tsDecompressInt(const char *const input, int compressedS
} }
} }
static FORCE_INLINE int tsCompressBigint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, static FORCE_INLINE int32_t tsCompressBigint(const char *const input, int32_t inputSize, const int32_t nelements,
char algorithm, char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT); return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT);
} else if (algorithm == TWO_STAGE_COMP) { } else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_BIGINT); int32_t len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_BIGINT);
return tsCompressStringImp(buffer, len, output, outputSize); return tsCompressStringImp(buffer, len, output, outputSize);
} else { } else {
assert(0); assert(0);
@ -168,8 +178,9 @@ static FORCE_INLINE int tsCompressBigint(const char *const input, int inputSize,
} }
} }
static FORCE_INLINE int tsDecompressBigint(const char *const input, int compressedSize, const int nelements, char *const output, static FORCE_INLINE int32_t tsDecompressBigint(const char *const input, int32_t compressedSize, const int32_t nelements,
int outputSize, char algorithm, char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT); return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT);
} else if (algorithm == TWO_STAGE_COMP) { } else if (algorithm == TWO_STAGE_COMP) {
@ -181,12 +192,13 @@ static FORCE_INLINE int tsDecompressBigint(const char *const input, int compress
} }
} }
static FORCE_INLINE int tsCompressBool(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, static FORCE_INLINE int32_t tsCompressBool(const char *const input, int32_t inputSize, const int32_t nelements,
char algorithm, char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
return tsCompressBoolImp(input, nelements, output); return tsCompressBoolImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) { } else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressBoolImp(input, nelements, buffer); int32_t len = tsCompressBoolImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize); return tsCompressStringImp(buffer, len, output, outputSize);
} else { } else {
assert(0); assert(0);
@ -194,8 +206,9 @@ static FORCE_INLINE int tsCompressBool(const char *const input, int inputSize, c
} }
} }
static FORCE_INLINE int tsDecompressBool(const char *const input, int compressedSize, const int nelements, char *const output, static FORCE_INLINE int32_t tsDecompressBool(const char *const input, int32_t compressedSize, const int32_t nelements,
int outputSize, char algorithm, char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
return tsDecompressBoolImp(input, nelements, output); return tsDecompressBoolImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) { } else if (algorithm == TWO_STAGE_COMP) {
@ -207,18 +220,21 @@ static FORCE_INLINE int tsDecompressBool(const char *const input, int compressed
} }
} }
static FORCE_INLINE int tsCompressString(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, static FORCE_INLINE int32_t tsCompressString(const char *const input, int32_t inputSize, const int32_t nelements,
char algorithm, char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
return tsCompressStringImp(input, inputSize, output, outputSize); return tsCompressStringImp(input, inputSize, output, outputSize);
} }
static FORCE_INLINE int tsDecompressString(const char *const input, int compressedSize, const int nelements, char *const output, static FORCE_INLINE int32_t tsDecompressString(const char *const input, int32_t compressedSize, const int32_t nelements,
int outputSize, char algorithm, char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
return tsDecompressStringImp(input, compressedSize, output, outputSize); return tsDecompressStringImp(input, compressedSize, output, outputSize);
} }
static FORCE_INLINE int tsCompressFloat(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, static FORCE_INLINE int32_t tsCompressFloat(const char *const input, int32_t inputSize, const int32_t nelements,
char algorithm, char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy mode // lossy mode
if (lossyFloat) { if (lossyFloat) {
@ -229,7 +245,7 @@ static FORCE_INLINE int tsCompressFloat(const char *const input, int inputSize,
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
return tsCompressFloatImp(input, nelements, output); return tsCompressFloatImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) { } else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressFloatImp(input, nelements, buffer); int32_t len = tsCompressFloatImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize); return tsCompressStringImp(buffer, len, output, outputSize);
} else { } else {
assert(0); assert(0);
@ -240,8 +256,9 @@ static FORCE_INLINE int tsCompressFloat(const char *const input, int inputSize,
#endif #endif
} }
static FORCE_INLINE int tsDecompressFloat(const char *const input, int compressedSize, const int nelements, char *const output, static FORCE_INLINE int32_t tsDecompressFloat(const char *const input, int32_t compressedSize, const int32_t nelements,
int outputSize, char algorithm, char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
#ifdef TD_TSZ #ifdef TD_TSZ
if (HEAD_ALGO(input[0]) == ALGO_SZ_LOSSY) { if (HEAD_ALGO(input[0]) == ALGO_SZ_LOSSY) {
// decompress lossy // decompress lossy
@ -263,9 +280,9 @@ static FORCE_INLINE int tsDecompressFloat(const char *const input, int compresse
#endif #endif
} }
static FORCE_INLINE int32_t tsCompressDouble(const char *const input, int32_t inputSize, const int32_t nelements,
static FORCE_INLINE int tsCompressDouble(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char *const output, int32_t outputSize, char algorithm, char *const buffer,
char algorithm, char *const buffer, int bufferSize) { int32_t bufferSize) {
#ifdef TD_TSZ #ifdef TD_TSZ
if (lossyDouble) { if (lossyDouble) {
// lossy mode // lossy mode
@ -276,7 +293,7 @@ static FORCE_INLINE int tsCompressDouble(const char *const input, int inputSize,
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
return tsCompressDoubleImp(input, nelements, output); return tsCompressDoubleImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) { } else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressDoubleImp(input, nelements, buffer); int32_t len = tsCompressDoubleImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize); return tsCompressStringImp(buffer, len, output, outputSize);
} else { } else {
assert(0); assert(0);
@ -287,8 +304,9 @@ static FORCE_INLINE int tsCompressDouble(const char *const input, int inputSize,
#endif #endif
} }
static FORCE_INLINE int tsDecompressDouble(const char *const input, int compressedSize, const int nelements, char *const output, static FORCE_INLINE int32_t tsDecompressDouble(const char *const input, int32_t compressedSize, const int32_t nelements,
int outputSize, char algorithm, char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
#ifdef TD_TSZ #ifdef TD_TSZ
if (HEAD_ALGO(input[0]) == ALGO_SZ_LOSSY) { if (HEAD_ALGO(input[0]) == ALGO_SZ_LOSSY) {
// decompress lossy // decompress lossy
@ -314,34 +332,39 @@ static FORCE_INLINE int tsDecompressDouble(const char *const input, int compress
// //
// lossy float double // lossy float double
// //
static FORCE_INLINE int tsCompressFloatLossy(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, static FORCE_INLINE int32_t tsCompressFloatLossy(const char *const input, int32_t inputSize, const int32_t nelements,
char algorithm, char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
return tsCompressFloatLossyImp(input, nelements, output); return tsCompressFloatLossyImp(input, nelements, output);
} }
static FORCE_INLINE int tsDecompressFloatLossy(const char *const input, int compressedSize, const int nelements, char *const output, static FORCE_INLINE int32_t tsDecompressFloatLossy(const char *const input, int32_t compressedSize,
int outputSize, char algorithm, char *const buffer, int bufferSize){ const int32_t nelements, char *const output, int32_t outputSize,
char algorithm, char *const buffer, int32_t bufferSize) {
return tsDecompressFloatLossyImp(input, compressedSize, nelements, output); return tsDecompressFloatLossyImp(input, compressedSize, nelements, output);
} }
static FORCE_INLINE int tsCompressDoubleLossy(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, static FORCE_INLINE int32_t tsCompressDoubleLossy(const char *const input, int32_t inputSize, const int32_t nelements,
char algorithm, char *const buffer, int bufferSize){ char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
return tsCompressDoubleLossyImp(input, nelements, output); return tsCompressDoubleLossyImp(input, nelements, output);
} }
static FORCE_INLINE int tsDecompressDoubleLossy(const char *const input, int compressedSize, const int nelements, char *const output, static FORCE_INLINE int32_t tsDecompressDoubleLossy(const char *const input, int32_t compressedSize,
int outputSize, char algorithm, char *const buffer, int bufferSize){ const int32_t nelements, char *const output, int32_t outputSize,
char algorithm, char *const buffer, int32_t bufferSize) {
return tsDecompressDoubleLossyImp(input, compressedSize, nelements, output); return tsDecompressDoubleLossyImp(input, compressedSize, nelements, output);
} }
#endif #endif
static FORCE_INLINE int tsCompressTimestamp(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, static FORCE_INLINE int32_t tsCompressTimestamp(const char *const input, int32_t inputSize, const int32_t nelements,
char algorithm, char *const buffer, int bufferSize) { char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
return tsCompressTimestampImp(input, nelements, output); return tsCompressTimestampImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) { } else if (algorithm == TWO_STAGE_COMP) {
int len = tsCompressTimestampImp(input, nelements, buffer); int32_t len = tsCompressTimestampImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize); return tsCompressStringImp(buffer, len, output, outputSize);
} else { } else {
assert(0); assert(0);
@ -349,8 +372,9 @@ static FORCE_INLINE int tsCompressTimestamp(const char *const input, int inputSi
} }
} }
static FORCE_INLINE int tsDecompressTimestamp(const char *const input, int compressedSize, const int nelements, char *const output, static FORCE_INLINE int32_t tsDecompressTimestamp(const char *const input, int32_t compressedSize,
int outputSize, char algorithm, char *const buffer, int bufferSize) { const int32_t nelements, char *const output, int32_t outputSize,
char algorithm, char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) { if (algorithm == ONE_STAGE_COMP) {
return tsDecompressTimestampImp(input, nelements, output); return tsDecompressTimestampImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) { } else if (algorithm == TWO_STAGE_COMP) {
@ -366,4 +390,4 @@ static FORCE_INLINE int tsDecompressTimestamp(const char *const input, int compr
} }
#endif #endif
#endif /*_TD_UTIL_COMPRESSION_H*/ #endif /*_TD_UTIL_COMPRESSION_H_*/

View File

@ -16,7 +16,7 @@
/* README.md TAOS compression /* README.md TAOS compression
* *
* INTEGER Compression Algorithm: * INTEGER Compression Algorithm:
* To compress integers (including char, short, int, int64_t), the difference * To compress integers (including char, short, int32_t, int64_t), the difference
* between two integers is calculated at first. Then the difference is * between two integers is calculated at first. Then the difference is
* transformed to positive by zig-zag encoding method * transformed to positive by zig-zag encoding method
* (https://gist.github.com/mfuerstenau/ba870a29e16536fdbaba). Then the value is * (https://gist.github.com/mfuerstenau/ba870a29e16536fdbaba). Then the value is
@ -47,15 +47,16 @@
* *
*/ */
#include "os.h" #define _DEFAULT_SOURCE
#include "tcompression.h"
#include "lz4.h" #include "lz4.h"
#include "tlog.h"
#ifdef TD_TSZ #ifdef TD_TSZ
#include "td_sz.h" #include "td_sz.h"
#endif #endif
#include "tcompression.h"
#include "tlog.h"
static const int TEST_NUMBER = 1; static const int32_t TEST_NUMBER = 1;
#define is_bigendian() ((*(char *)&TEST_NUMBER) == 0) #define is_bigendian() ((*(char *)&TEST_NUMBER) == 0)
#define SIMPLE8B_MAX_INT64 ((uint64_t)2305843009213693951L) #define SIMPLE8B_MAX_INT64 ((uint64_t)2305843009213693951L)
@ -68,7 +69,7 @@ bool lossyFloat = false;
bool lossyDouble = false; bool lossyDouble = false;
// init call // init call
int tsCompressInit(){ int32_t tsCompressInit() {
// config // config
if (lossyColumns[0] == 0) { if (lossyColumns[0] == 0) {
lossyFloat = false; lossyFloat = false;
@ -79,37 +80,32 @@ int tsCompressInit(){
lossyFloat = strstr(lossyColumns, "float") != NULL; lossyFloat = strstr(lossyColumns, "float") != NULL;
lossyDouble = strstr(lossyColumns, "double") != NULL; lossyDouble = strstr(lossyColumns, "double") != NULL;
if(lossyFloat == false && lossyDouble == false) if (lossyFloat == false && lossyDouble == false) return 0;
return 0;
tdszInit(fPrecision, dPrecision, maxRange, curRange, Compressor); tdszInit(fPrecision, dPrecision, maxRange, curRange, Compressor);
if(lossyFloat) if (lossyFloat) uInfo("lossy compression float is opened. ");
uInfo("lossy compression float is opened. "); if (lossyDouble) uInfo("lossy compression double is opened. ");
if(lossyDouble)
uInfo("lossy compression double is opened. ");
return 1; return 1;
} }
// exit call // exit call
void tsCompressExit(){ void tsCompressExit() { tdszExit(); }
tdszExit();
}
#endif #endif
/* /*
* Compress Integer (Simple8B). * Compress Integer (Simple8B).
*/ */
int tsCompressINTImp(const char *const input, const int nelements, char *const output, const char type) { int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) {
// Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 // Selector value: 0 1 2 3 4 5 6 7 8 9 10 11
// 12 13 14 15 // 12 13 14 15
char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
int selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
char bit_to_selector[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 12, 13, 13, 13, 13, 13, char bit_to_selector[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 12, 13, 13, 13, 13, 13,
14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,
15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15};
// get the byte limit. // get the byte limit.
int word_length = 0; int32_t word_length = 0;
switch (type) { switch (type) {
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
word_length = LONG_BYTES; word_length = LONG_BYTES;
@ -128,17 +124,17 @@ int tsCompressINTImp(const char *const input, const int nelements, char *const o
return -1; return -1;
} }
int byte_limit = nelements * word_length + 1; int32_t byte_limit = nelements * word_length + 1;
int opos = 1; int32_t opos = 1;
int64_t prev_value = 0; int64_t prev_value = 0;
for (int i = 0; i < nelements;) { for (int32_t i = 0; i < nelements;) {
char selector = 0; char selector = 0;
char bit = 0; char bit = 0;
int elems = 0; int32_t elems = 0;
int64_t prev_value_tmp = prev_value; int64_t prev_value_tmp = prev_value;
for (int j = i; j < nelements; j++) { for (int32_t j = i; j < nelements; j++) {
// Read data from the input stream and convert it to INT64 type. // Read data from the input stream and convert it to INT64 type.
int64_t curr_value = 0; int64_t curr_value = 0;
switch (type) { switch (type) {
@ -172,16 +168,17 @@ int tsCompressINTImp(const char *const input, const int nelements, char *const o
tmp_bit = (LONG_BYTES * BITS_PER_BYTE) - BUILDIN_CLZL(zigzag_value); tmp_bit = (LONG_BYTES * BITS_PER_BYTE) - BUILDIN_CLZL(zigzag_value);
} }
if (elems + 1 <= selector_to_elems[(int)selector] && elems + 1 <= selector_to_elems[(int)(bit_to_selector[(int)tmp_bit])]) { if (elems + 1 <= selector_to_elems[(int32_t)selector] &&
elems + 1 <= selector_to_elems[(int32_t)(bit_to_selector[(int32_t)tmp_bit])]) {
// If can hold another one. // If can hold another one.
selector = selector > bit_to_selector[(int)tmp_bit] ? selector : bit_to_selector[(int)tmp_bit]; selector = selector > bit_to_selector[(int32_t)tmp_bit] ? selector : bit_to_selector[(int32_t)tmp_bit];
elems++; elems++;
bit = bit_per_integer[(int)selector]; bit = bit_per_integer[(int32_t)selector];
} else { } else {
// if cannot hold another one. // if cannot hold another one.
while (elems < selector_to_elems[(int)selector]) selector++; while (elems < selector_to_elems[(int32_t)selector]) selector++;
elems = selector_to_elems[(int)selector]; elems = selector_to_elems[(int32_t)selector];
bit = bit_per_integer[(int)selector]; bit = bit_per_integer[(int32_t)selector];
break; break;
} }
prev_value_tmp = curr_value; prev_value_tmp = curr_value;
@ -189,7 +186,7 @@ int tsCompressINTImp(const char *const input, const int nelements, char *const o
uint64_t buffer = 0; uint64_t buffer = 0;
buffer |= (uint64_t)selector; buffer |= (uint64_t)selector;
for (int k = 0; k < elems; k++) { for (int32_t k = 0; k < elems; k++) {
int64_t curr_value = 0; /* get current values */ int64_t curr_value = 0; /* get current values */
switch (type) { switch (type) {
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
@ -229,8 +226,8 @@ int tsCompressINTImp(const char *const input, const int nelements, char *const o
return opos; return opos;
} }
int tsDecompressINTImp(const char *const input, const int nelements, char *const output, const char type) { int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) {
int word_length = 0; int32_t word_length = 0;
switch (type) { switch (type) {
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
word_length = LONG_BYTES; word_length = LONG_BYTES;
@ -258,11 +255,11 @@ int tsDecompressINTImp(const char *const input, const int nelements, char *const
// Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 // Selector value: 0 1 2 3 4 5 6 7 8 9 10 11
// 12 13 14 15 // 12 13 14 15
char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
int selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
const char *ip = input + 1; const char *ip = input + 1;
int count = 0; int32_t count = 0;
int _pos = 0; int32_t _pos = 0;
int64_t prev_value = 0; int64_t prev_value = 0;
while (1) { while (1) {
@ -272,10 +269,10 @@ int tsDecompressINTImp(const char *const input, const int nelements, char *const
memcpy(&w, ip, LONG_BYTES); memcpy(&w, ip, LONG_BYTES);
char selector = (char)(w & INT64MASK(4)); // selector = 4 char selector = (char)(w & INT64MASK(4)); // selector = 4
char bit = bit_per_integer[(int)selector]; // bit = 3 char bit = bit_per_integer[(int32_t)selector]; // bit = 3
int elems = selector_to_elems[(int)selector]; int32_t elems = selector_to_elems[(int32_t)selector];
for (int i = 0; i < elems; i++) { for (int32_t i = 0; i < elems; i++) {
uint64_t zigzag_value; uint64_t zigzag_value;
if (selector == 0 || selector == 1) { if (selector == 0 || selector == 1) {
@ -320,11 +317,11 @@ int tsDecompressINTImp(const char *const input, const int nelements, char *const
/* ----------------------------------------------Bool Compression /* ----------------------------------------------Bool Compression
* ---------------------------------------------- */ * ---------------------------------------------- */
// TODO: You can also implement it using RLE method. // TODO: You can also implement it using RLE method.
int tsCompressBoolImp(const char *const input, const int nelements, char *const output) { int32_t tsCompressBoolImp(const char *const input, const int32_t nelements, char *const output) {
int pos = -1; int32_t pos = -1;
int ele_per_byte = BITS_PER_BYTE / 2; int32_t ele_per_byte = BITS_PER_BYTE / 2;
for (int i = 0; i < nelements; i++) { for (int32_t i = 0; i < nelements; i++) {
if (i % ele_per_byte == 0) { if (i % ele_per_byte == 0) {
pos++; pos++;
output[pos] = 0; output[pos] = 0;
@ -351,11 +348,11 @@ int tsCompressBoolImp(const char *const input, const int nelements, char *const
return pos + 1; return pos + 1;
} }
int tsDecompressBoolImp(const char *const input, const int nelements, char *const output) { int32_t tsDecompressBoolImp(const char *const input, const int32_t nelements, char *const output) {
int ipos = -1, opos = 0; int32_t ipos = -1, opos = 0;
int ele_per_byte = BITS_PER_BYTE / 2; int32_t ele_per_byte = BITS_PER_BYTE / 2;
for (int i = 0; i < nelements; i++) { for (int32_t i = 0; i < nelements; i++) {
if (i % ele_per_byte == 0) { if (i % ele_per_byte == 0) {
ipos++; ipos++;
} }
@ -374,10 +371,10 @@ int tsDecompressBoolImp(const char *const input, const int nelements, char *cons
} }
/* Run Length Encoding(RLE) Method */ /* Run Length Encoding(RLE) Method */
int tsCompressBoolRLEImp(const char *const input, const int nelements, char *const output) { int32_t tsCompressBoolRLEImp(const char *const input, const int32_t nelements, char *const output) {
int _pos = 0; int32_t _pos = 0;
for (int i = 0; i < nelements;) { for (int32_t i = 0; i < nelements;) {
unsigned char counter = 1; unsigned char counter = 1;
char num = input[i]; char num = input[i];
@ -407,8 +404,8 @@ int tsCompressBoolRLEImp(const char *const input, const int nelements, char *con
return _pos; return _pos;
} }
int tsDecompressBoolRLEImp(const char *const input, const int nelements, char *const output) { int32_t tsDecompressBoolRLEImp(const char *const input, const int32_t nelements, char *const output) {
int ipos = 0, opos = 0; int32_t ipos = 0, opos = 0;
while (1) { while (1) {
char encode = input[ipos++]; char encode = input[ipos++];
unsigned counter = (encode >> 1) & INT8MASK(7); unsigned counter = (encode >> 1) & INT8MASK(7);
@ -427,9 +424,9 @@ int tsDecompressBoolRLEImp(const char *const input, const int nelements, char *c
// Note: the size of the output must be larger than input_size + 1 and // Note: the size of the output must be larger than input_size + 1 and
// LZ4_compressBound(size) + 1; // LZ4_compressBound(size) + 1;
// >= max(input_size, LZ4_compressBound(input_size)) + 1; // >= max(input_size, LZ4_compressBound(input_size)) + 1;
int tsCompressStringImp(const char *const input, int inputSize, char *const output, int outputSize) { int32_t tsCompressStringImp(const char *const input, int32_t inputSize, char *const output, int32_t outputSize) {
// Try to compress using LZ4 algorithm. // Try to compress using LZ4 algorithm.
const int compressed_data_size = LZ4_compress_default(input, output + 1, inputSize, outputSize-1); const int32_t compressed_data_size = LZ4_compress_default(input, output + 1, inputSize, outputSize - 1);
// If cannot compress or after compression, data becomes larger. // If cannot compress or after compression, data becomes larger.
if (compressed_data_size <= 0 || compressed_data_size > inputSize) { if (compressed_data_size <= 0 || compressed_data_size > inputSize) {
@ -443,12 +440,12 @@ int tsCompressStringImp(const char *const input, int inputSize, char *const outp
return compressed_data_size + 1; return compressed_data_size + 1;
} }
int tsDecompressStringImp(const char *const input, int compressedSize, char *const output, int outputSize) { int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, char *const output, int32_t outputSize) {
// compressedSize is the size of data after compression. // compressedSize is the size of data after compression.
if (input[0] == 1) { if (input[0] == 1) {
/* It is compressed by LZ4 algorithm */ /* It is compressed by LZ4 algorithm */
const int decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize); const int32_t decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize);
if (decompressed_size < 0) { if (decompressed_size < 0) {
uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size); uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size);
return -1; return -1;
@ -468,8 +465,8 @@ int tsDecompressStringImp(const char *const input, int compressedSize, char *con
/* --------------------------------------------Timestamp Compression /* --------------------------------------------Timestamp Compression
* ---------------------------------------------- */ * ---------------------------------------------- */
// TODO: Take care here, we assumes little endian encoding. // TODO: Take care here, we assumes little endian encoding.
int tsCompressTimestampImp(const char *const input, const int nelements, char *const output) { int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
int _pos = 1; int32_t _pos = 1;
assert(nelements >= 0); assert(nelements >= 0);
if (nelements == 0) return 0; if (nelements == 0) return 0;
@ -485,7 +482,7 @@ int tsCompressTimestampImp(const char *const input, const int nelements, char *c
uint8_t flags = 0, flag1 = 0, flag2 = 0; uint8_t flags = 0, flag1 = 0, flag2 = 0;
uint64_t dd1 = 0, dd2 = 0; uint64_t dd1 = 0, dd2 = 0;
for (int i = 0; i < nelements; i++) { for (int32_t i = 0; i < nelements; i++) {
int64_t curr_value = istream[i]; int64_t curr_value = istream[i];
if (!safeInt64Add(curr_value, -prev_value)) goto _exit_over; if (!safeInt64Add(curr_value, -prev_value)) goto _exit_over;
int64_t curr_delta = curr_value - prev_value; int64_t curr_delta = curr_value - prev_value;
@ -564,7 +561,7 @@ _exit_over:
return nelements * LONG_BYTES + 1; return nelements * LONG_BYTES + 1;
} }
int tsDecompressTimestampImp(const char *const input, const int nelements, char *const output) { int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
assert(nelements >= 0); assert(nelements >= 0);
if (nelements == 0) return 0; if (nelements == 0) return 0;
@ -574,7 +571,7 @@ int tsDecompressTimestampImp(const char *const input, const int nelements, char
} else if (input[0] == 1) { // Decompress } else if (input[0] == 1) { // Decompress
int64_t *ostream = (int64_t *)output; int64_t *ostream = (int64_t *)output;
int ipos = 1, opos = 0; int32_t ipos = 1, opos = 0;
int8_t nbytes = 0; int8_t nbytes = 0;
int64_t prev_value = 0; int64_t prev_value = 0;
int64_t prev_delta = 0; int64_t prev_delta = 0;
@ -635,9 +632,9 @@ int tsDecompressTimestampImp(const char *const input, const int nelements, char
} }
/* --------------------------------------------Double Compression /* --------------------------------------------Double Compression
* ---------------------------------------------- */ * ---------------------------------------------- */
void encodeDoubleValue(uint64_t diff, uint8_t flag, char *const output, int *const pos) { void encodeDoubleValue(uint64_t diff, uint8_t flag, char *const output, int32_t *const pos) {
uint8_t nbytes = (flag & INT8MASK(3)) + 1; uint8_t nbytes = (flag & INT8MASK(3)) + 1;
int nshift = (LONG_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3); int32_t nshift = (LONG_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
diff >>= nshift; diff >>= nshift;
while (nbytes) { while (nbytes) {
@ -647,9 +644,9 @@ void encodeDoubleValue(uint64_t diff, uint8_t flag, char *const output, int *con
} }
} }
int tsCompressDoubleImp(const char *const input, const int nelements, char *const output) { int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, char *const output) {
int byte_limit = nelements * DOUBLE_BYTES + 1; int32_t byte_limit = nelements * DOUBLE_BYTES + 1;
int opos = 1; int32_t opos = 1;
uint64_t prev_value = 0; uint64_t prev_value = 0;
uint64_t prev_diff = 0; uint64_t prev_diff = 0;
@ -658,7 +655,7 @@ int tsCompressDoubleImp(const char *const input, const int nelements, char *cons
double *istream = (double *)input; double *istream = (double *)input;
// Main loop // Main loop
for (int i = 0; i < nelements; i++) { for (int32_t i = 0; i < nelements; i++) {
union { union {
double real; double real;
uint64_t bits; uint64_t bits;
@ -670,8 +667,8 @@ int tsCompressDoubleImp(const char *const input, const int nelements, char *cons
uint64_t predicted = prev_value; uint64_t predicted = prev_value;
uint64_t diff = curr.bits ^ predicted; uint64_t diff = curr.bits ^ predicted;
int leading_zeros = LONG_BYTES * BITS_PER_BYTE; int32_t leading_zeros = LONG_BYTES * BITS_PER_BYTE;
int trailing_zeros = leading_zeros; int32_t trailing_zeros = leading_zeros;
if (diff) { if (diff) {
trailing_zeros = BUILDIN_CTZL(diff); trailing_zeros = BUILDIN_CTZL(diff);
@ -696,8 +693,8 @@ int tsCompressDoubleImp(const char *const input, const int nelements, char *cons
prev_diff = diff; prev_diff = diff;
prev_flag = flag; prev_flag = flag;
} else { } else {
int nbyte1 = (prev_flag & INT8MASK(3)) + 1; int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
int nbyte2 = (flag & INT8MASK(3)) + 1; int32_t nbyte2 = (flag & INT8MASK(3)) + 1;
if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) { if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
uint8_t flags = prev_flag | (flag << 4); uint8_t flags = prev_flag | (flag << 4);
output[opos++] = flags; output[opos++] = flags;
@ -713,8 +710,8 @@ int tsCompressDoubleImp(const char *const input, const int nelements, char *cons
} }
if (nelements % 2) { if (nelements % 2) {
int nbyte1 = (prev_flag & INT8MASK(3)) + 1; int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
int nbyte2 = 1; int32_t nbyte2 = 1;
if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) { if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
uint8_t flags = prev_flag; uint8_t flags = prev_flag;
output[opos++] = flags; output[opos++] = flags;
@ -731,19 +728,19 @@ int tsCompressDoubleImp(const char *const input, const int nelements, char *cons
return opos; return opos;
} }
uint64_t decodeDoubleValue(const char *const input, int *const ipos, uint8_t flag) { uint64_t decodeDoubleValue(const char *const input, int32_t *const ipos, uint8_t flag) {
uint64_t diff = 0ul; uint64_t diff = 0ul;
int nbytes = (flag & INT8MASK(3)) + 1; int32_t nbytes = (flag & INT8MASK(3)) + 1;
for (int i = 0; i < nbytes; i++) { for (int32_t i = 0; i < nbytes; i++) {
diff = diff | ((INT64MASK(8) & input[(*ipos)++]) << BITS_PER_BYTE * i); diff = diff | ((INT64MASK(8) & input[(*ipos)++]) << BITS_PER_BYTE * i);
} }
int shift_width = (LONG_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3); int32_t shift_width = (LONG_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
diff <<= shift_width; diff <<= shift_width;
return diff; return diff;
} }
int tsDecompressDoubleImp(const char *const input, const int nelements, char *const output) { int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements, char *const output) {
// output stream // output stream
double *ostream = (double *)output; double *ostream = (double *)output;
@ -753,11 +750,11 @@ int tsDecompressDoubleImp(const char *const input, const int nelements, char *co
} }
uint8_t flags = 0; uint8_t flags = 0;
int ipos = 1; int32_t ipos = 1;
int opos = 0; int32_t opos = 0;
uint64_t prev_value = 0; uint64_t prev_value = 0;
for (int i = 0; i < nelements; i++) { for (int32_t i = 0; i < nelements; i++) {
if (i % 2 == 0) { if (i % 2 == 0) {
flags = input[ipos++]; flags = input[ipos++];
} }
@ -783,9 +780,9 @@ int tsDecompressDoubleImp(const char *const input, const int nelements, char *co
/* --------------------------------------------Float Compression /* --------------------------------------------Float Compression
* ---------------------------------------------- */ * ---------------------------------------------- */
void encodeFloatValue(uint32_t diff, uint8_t flag, char *const output, int *const pos) { void encodeFloatValue(uint32_t diff, uint8_t flag, char *const output, int32_t *const pos) {
uint8_t nbytes = (flag & INT8MASK(3)) + 1; uint8_t nbytes = (flag & INT8MASK(3)) + 1;
int nshift = (FLOAT_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3); int32_t nshift = (FLOAT_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
diff >>= nshift; diff >>= nshift;
while (nbytes) { while (nbytes) {
@ -795,17 +792,17 @@ void encodeFloatValue(uint32_t diff, uint8_t flag, char *const output, int *cons
} }
} }
int tsCompressFloatImp(const char *const input, const int nelements, char *const output) { int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, char *const output) {
float *istream = (float *)input; float *istream = (float *)input;
int byte_limit = nelements * FLOAT_BYTES + 1; int32_t byte_limit = nelements * FLOAT_BYTES + 1;
int opos = 1; int32_t opos = 1;
uint32_t prev_value = 0; uint32_t prev_value = 0;
uint32_t prev_diff = 0; uint32_t prev_diff = 0;
uint8_t prev_flag = 0; uint8_t prev_flag = 0;
// Main loop // Main loop
for (int i = 0; i < nelements; i++) { for (int32_t i = 0; i < nelements; i++) {
union { union {
float real; float real;
uint32_t bits; uint32_t bits;
@ -817,8 +814,8 @@ int tsCompressFloatImp(const char *const input, const int nelements, char *const
uint32_t predicted = prev_value; uint32_t predicted = prev_value;
uint32_t diff = curr.bits ^ predicted; uint32_t diff = curr.bits ^ predicted;
int leading_zeros = FLOAT_BYTES * BITS_PER_BYTE; int32_t leading_zeros = FLOAT_BYTES * BITS_PER_BYTE;
int trailing_zeros = leading_zeros; int32_t trailing_zeros = leading_zeros;
if (diff) { if (diff) {
trailing_zeros = BUILDIN_CTZ(diff); trailing_zeros = BUILDIN_CTZ(diff);
@ -843,8 +840,8 @@ int tsCompressFloatImp(const char *const input, const int nelements, char *const
prev_diff = diff; prev_diff = diff;
prev_flag = flag; prev_flag = flag;
} else { } else {
int nbyte1 = (prev_flag & INT8MASK(3)) + 1; int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
int nbyte2 = (flag & INT8MASK(3)) + 1; int32_t nbyte2 = (flag & INT8MASK(3)) + 1;
if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) { if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
uint8_t flags = prev_flag | (flag << 4); uint8_t flags = prev_flag | (flag << 4);
output[opos++] = flags; output[opos++] = flags;
@ -860,8 +857,8 @@ int tsCompressFloatImp(const char *const input, const int nelements, char *const
} }
if (nelements % 2) { if (nelements % 2) {
int nbyte1 = (prev_flag & INT8MASK(3)) + 1; int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
int nbyte2 = 1; int32_t nbyte2 = 1;
if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) { if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
uint8_t flags = prev_flag; uint8_t flags = prev_flag;
output[opos++] = flags; output[opos++] = flags;
@ -878,19 +875,19 @@ int tsCompressFloatImp(const char *const input, const int nelements, char *const
return opos; return opos;
} }
uint32_t decodeFloatValue(const char *const input, int *const ipos, uint8_t flag) { uint32_t decodeFloatValue(const char *const input, int32_t *const ipos, uint8_t flag) {
uint32_t diff = 0ul; uint32_t diff = 0ul;
int nbytes = (flag & INT8MASK(3)) + 1; int32_t nbytes = (flag & INT8MASK(3)) + 1;
for (int i = 0; i < nbytes; i++) { for (int32_t i = 0; i < nbytes; i++) {
diff = diff | ((INT32MASK(8) & input[(*ipos)++]) << BITS_PER_BYTE * i); diff = diff | ((INT32MASK(8) & input[(*ipos)++]) << BITS_PER_BYTE * i);
} }
int shift_width = (FLOAT_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3); int32_t shift_width = (FLOAT_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
diff <<= shift_width; diff <<= shift_width;
return diff; return diff;
} }
int tsDecompressFloatImp(const char *const input, const int nelements, char *const output) { int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output) {
float *ostream = (float *)output; float *ostream = (float *)output;
if (input[0] == 1) { if (input[0] == 1) {
@ -899,11 +896,11 @@ int tsDecompressFloatImp(const char *const input, const int nelements, char *con
} }
uint8_t flags = 0; uint8_t flags = 0;
int ipos = 1; int32_t ipos = 1;
int opos = 0; int32_t opos = 0;
uint32_t prev_value = 0; uint32_t prev_value = 0;
for (int i = 0; i < nelements; i++) { for (int32_t i = 0; i < nelements; i++) {
if (i % 2 == 0) { if (i % 2 == 0) {
flags = input[ipos++]; flags = input[ipos++];
} }
@ -931,9 +928,9 @@ int tsDecompressFloatImp(const char *const input, const int nelements, char *con
// //
// ---------- float double lossy ----------- // ---------- float double lossy -----------
// //
int tsCompressFloatLossyImp(const char * input, const int nelements, char *const output){ int32_t tsCompressFloatLossyImp(const char *input, const int32_t nelements, char *const output) {
// compress with sz // compress with sz
int compressedSize = tdszCompress(SZ_FLOAT, input, nelements, output + 1); int32_t compressedSize = tdszCompress(SZ_FLOAT, input, nelements, output + 1);
unsigned char algo = ALGO_SZ_LOSSY << 1; unsigned char algo = ALGO_SZ_LOSSY << 1;
if (compressedSize == 0 || compressedSize >= nelements * sizeof(float)) { if (compressedSize == 0 || compressedSize >= nelements * sizeof(float)) {
// compressed error or large than original // compressed error or large than original
@ -949,8 +946,9 @@ int tsCompressFloatLossyImp(const char * input, const int nelements, char *const
return compressedSize; return compressedSize;
} }
int tsDecompressFloatLossyImp(const char * input, int compressedSize, const int nelements, char *const output){ int32_t tsDecompressFloatLossyImp(const char *input, int32_t compressedSize, const int32_t nelements,
int decompressedSize = 0; char *const output) {
int32_t decompressedSize = 0;
if (HEAD_MODE(input[0]) == MODE_NOCOMPRESS) { if (HEAD_MODE(input[0]) == MODE_NOCOMPRESS) {
// orginal so memcpy directly // orginal so memcpy directly
decompressedSize = nelements * sizeof(float); decompressedSize = nelements * sizeof(float);
@ -963,9 +961,9 @@ int tsDecompressFloatLossyImp(const char * input, int compressedSize, const int
return tdszDecompress(SZ_FLOAT, input + 1, compressedSize - 1, nelements, output); return tdszDecompress(SZ_FLOAT, input + 1, compressedSize - 1, nelements, output);
} }
int tsCompressDoubleLossyImp(const char * input, const int nelements, char *const output){ int32_t tsCompressDoubleLossyImp(const char *input, const int32_t nelements, char *const output) {
// compress with sz // compress with sz
int compressedSize = tdszCompress(SZ_DOUBLE, input, nelements, output + 1); int32_t compressedSize = tdszCompress(SZ_DOUBLE, input, nelements, output + 1);
unsigned char algo = ALGO_SZ_LOSSY << 1; unsigned char algo = ALGO_SZ_LOSSY << 1;
if (compressedSize == 0 || compressedSize >= nelements * sizeof(double)) { if (compressedSize == 0 || compressedSize >= nelements * sizeof(double)) {
// compressed error or large than original // compressed error or large than original
@ -981,8 +979,9 @@ int tsCompressDoubleLossyImp(const char * input, const int nelements, char *cons
return compressedSize; return compressedSize;
} }
int tsDecompressDoubleLossyImp(const char * input, int compressedSize, const int nelements, char *const output){ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, const int32_t nelements,
int decompressedSize = 0; char *const output) {
int32_t decompressedSize = 0;
if (HEAD_MODE(input[0]) == MODE_NOCOMPRESS) { if (HEAD_MODE(input[0]) == MODE_NOCOMPRESS) {
// orginal so memcpy directly // orginal so memcpy directly
decompressedSize = nelements * sizeof(double); decompressedSize = nelements * sizeof(double);