commit
17102c8eac
|
@ -1130,8 +1130,15 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
|
||||||
// Decode the data
|
// Decode the data
|
||||||
if (comp) {
|
if (comp) {
|
||||||
// // Need to decompress
|
// // Need to decompress
|
||||||
pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))(
|
int tlen = (*(tDataTypeDesc[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData,
|
||||||
content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize);
|
pDataCol->spaceSize, comp, buffer, bufferSize);
|
||||||
|
if (tlen <= 0) {
|
||||||
|
tsdbError("Failed to decompress column, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bufferSize:%d",
|
||||||
|
len, comp, numOfRows, maxPoints, bufferSize);
|
||||||
|
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pDataCol->len = tlen;
|
||||||
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
|
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
dataColSetOffset(pDataCol, numOfRows);
|
dataColSetOffset(pDataCol, numOfRows);
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,7 @@ static FORCE_INLINE int tsDecompressTinyint(const char *const input, int compres
|
||||||
if (algorithm == ONE_STAGE_COMP) {
|
if (algorithm == ONE_STAGE_COMP) {
|
||||||
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT);
|
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT);
|
||||||
} else if (algorithm == TWO_STAGE_COMP) {
|
} else if (algorithm == TWO_STAGE_COMP) {
|
||||||
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
|
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
|
||||||
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_TINYINT);
|
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_TINYINT);
|
||||||
} else {
|
} else {
|
||||||
assert(0);
|
assert(0);
|
||||||
|
@ -91,7 +91,7 @@ static FORCE_INLINE int tsDecompressSmallint(const char *const input, int compre
|
||||||
if (algorithm == ONE_STAGE_COMP) {
|
if (algorithm == ONE_STAGE_COMP) {
|
||||||
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT);
|
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT);
|
||||||
} else if (algorithm == TWO_STAGE_COMP) {
|
} else if (algorithm == TWO_STAGE_COMP) {
|
||||||
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
|
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
|
||||||
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_SMALLINT);
|
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_SMALLINT);
|
||||||
} else {
|
} else {
|
||||||
assert(0);
|
assert(0);
|
||||||
|
@ -117,7 +117,7 @@ static FORCE_INLINE int tsDecompressInt(const char *const input, int compressedS
|
||||||
if (algorithm == ONE_STAGE_COMP) {
|
if (algorithm == ONE_STAGE_COMP) {
|
||||||
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT);
|
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT);
|
||||||
} else if (algorithm == TWO_STAGE_COMP) {
|
} else if (algorithm == TWO_STAGE_COMP) {
|
||||||
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
|
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
|
||||||
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_INT);
|
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_INT);
|
||||||
} else {
|
} else {
|
||||||
assert(0);
|
assert(0);
|
||||||
|
@ -143,7 +143,7 @@ static FORCE_INLINE int tsDecompressBigint(const char *const input, int compress
|
||||||
if (algorithm == ONE_STAGE_COMP) {
|
if (algorithm == ONE_STAGE_COMP) {
|
||||||
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT);
|
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT);
|
||||||
} else if (algorithm == TWO_STAGE_COMP) {
|
} else if (algorithm == TWO_STAGE_COMP) {
|
||||||
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
|
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
|
||||||
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_BIGINT);
|
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_BIGINT);
|
||||||
} else {
|
} else {
|
||||||
assert(0);
|
assert(0);
|
||||||
|
@ -169,7 +169,7 @@ static FORCE_INLINE int tsDecompressBool(const char *const input, int compressed
|
||||||
if (algorithm == ONE_STAGE_COMP) {
|
if (algorithm == ONE_STAGE_COMP) {
|
||||||
return tsDecompressBoolImp(input, nelements, output);
|
return tsDecompressBoolImp(input, nelements, output);
|
||||||
} else if (algorithm == TWO_STAGE_COMP) {
|
} else if (algorithm == TWO_STAGE_COMP) {
|
||||||
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
|
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
|
||||||
return tsDecompressBoolImp(buffer, nelements, output);
|
return tsDecompressBoolImp(buffer, nelements, output);
|
||||||
} else {
|
} else {
|
||||||
assert(0);
|
assert(0);
|
||||||
|
@ -205,7 +205,7 @@ static FORCE_INLINE int tsDecompressFloat(const char *const input, int compresse
|
||||||
if (algorithm == ONE_STAGE_COMP) {
|
if (algorithm == ONE_STAGE_COMP) {
|
||||||
return tsDecompressFloatImp(input, nelements, output);
|
return tsDecompressFloatImp(input, nelements, output);
|
||||||
} else if (algorithm == TWO_STAGE_COMP) {
|
} else if (algorithm == TWO_STAGE_COMP) {
|
||||||
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
|
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
|
||||||
return tsDecompressFloatImp(buffer, nelements, output);
|
return tsDecompressFloatImp(buffer, nelements, output);
|
||||||
} else {
|
} else {
|
||||||
assert(0);
|
assert(0);
|
||||||
|
@ -231,7 +231,7 @@ static FORCE_INLINE int tsDecompressDouble(const char *const input, int compress
|
||||||
if (algorithm == ONE_STAGE_COMP) {
|
if (algorithm == ONE_STAGE_COMP) {
|
||||||
return tsDecompressDoubleImp(input, nelements, output);
|
return tsDecompressDoubleImp(input, nelements, output);
|
||||||
} else if (algorithm == TWO_STAGE_COMP) {
|
} else if (algorithm == TWO_STAGE_COMP) {
|
||||||
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
|
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
|
||||||
return tsDecompressDoubleImp(buffer, nelements, output);
|
return tsDecompressDoubleImp(buffer, nelements, output);
|
||||||
} else {
|
} else {
|
||||||
assert(0);
|
assert(0);
|
||||||
|
@ -257,7 +257,7 @@ static FORCE_INLINE int tsDecompressTimestamp(const char *const input, int compr
|
||||||
if (algorithm == ONE_STAGE_COMP) {
|
if (algorithm == ONE_STAGE_COMP) {
|
||||||
return tsDecompressTimestampImp(input, nelements, output);
|
return tsDecompressTimestampImp(input, nelements, output);
|
||||||
} else if (algorithm == TWO_STAGE_COMP) {
|
} else if (algorithm == TWO_STAGE_COMP) {
|
||||||
tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
|
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
|
||||||
return tsDecompressTimestampImp(buffer, nelements, output);
|
return tsDecompressTimestampImp(buffer, nelements, output);
|
||||||
} else {
|
} else {
|
||||||
assert(0);
|
assert(0);
|
||||||
|
|
|
@ -47,10 +47,11 @@
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "os.h"
|
|
||||||
#include "lz4.h"
|
#include "lz4.h"
|
||||||
#include "tscompression.h"
|
#include "os.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
|
#include "tscompression.h"
|
||||||
|
#include "tulog.h"
|
||||||
|
|
||||||
static const int TEST_NUMBER = 1;
|
static const int TEST_NUMBER = 1;
|
||||||
#define is_bigendian() ((*(char *)&TEST_NUMBER) == 0)
|
#define is_bigendian() ((*(char *)&TEST_NUMBER) == 0)
|
||||||
|
@ -88,7 +89,7 @@ int tsCompressINTImp(const char *const input, const int nelements, char *const o
|
||||||
word_length = CHAR_BYTES;
|
word_length = CHAR_BYTES;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
perror("Wrong integer types.\n");
|
uError("Invalid compress integer type:%d", type);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,7 +210,7 @@ int tsDecompressINTImp(const char *const input, const int nelements, char *const
|
||||||
word_length = CHAR_BYTES;
|
word_length = CHAR_BYTES;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
perror("Wrong integer types.\n");
|
uError("Invalid decompress integer type:%d", type);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -307,7 +308,7 @@ int tsCompressBoolImp(const char *const input, const int nelements, char *const
|
||||||
/* t = (~((( uint8_t)1) << (7-i%BITS_PER_BYTE))); */
|
/* t = (~((( uint8_t)1) << (7-i%BITS_PER_BYTE))); */
|
||||||
output[pos] |= t;
|
output[pos] |= t;
|
||||||
} else {
|
} else {
|
||||||
perror("Wrong bool value.\n");
|
uError("Invalid compress bool value:%d", output[pos]);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -363,7 +364,7 @@ int tsCompressBoolRLEImp(const char *const input, const int nelements, char *con
|
||||||
} else if (num == 0) {
|
} else if (num == 0) {
|
||||||
output[_pos++] = (counter << 1) | INT8MASK(0);
|
output[_pos++] = (counter << 1) | INT8MASK(0);
|
||||||
} else {
|
} else {
|
||||||
perror("Wrong bool value!\n");
|
uError("Invalid compress bool value:%d", output[_pos]);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -413,9 +414,7 @@ int tsDecompressStringImp(const char *const input, int compressedSize, char *con
|
||||||
/* It is compressed by LZ4 algorithm */
|
/* It is compressed by LZ4 algorithm */
|
||||||
const int decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize);
|
const int decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize);
|
||||||
if (decompressed_size < 0) {
|
if (decompressed_size < 0) {
|
||||||
char msg[128] = {0};
|
uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size);
|
||||||
sprintf(msg, "decomp_size:%d, Error decompress in LZ4 algorithm!\n", decompressed_size);
|
|
||||||
perror(msg);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -425,7 +424,7 @@ int tsDecompressStringImp(const char *const input, int compressedSize, char *con
|
||||||
memcpy(output, input + 1, compressedSize - 1);
|
memcpy(output, input + 1, compressedSize - 1);
|
||||||
return compressedSize - 1;
|
return compressedSize - 1;
|
||||||
} else {
|
} else {
|
||||||
perror("Wrong compressed string indicator!\n");
|
uError("Invalid decompress string indicator:%d", input[0]);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue