add compress
This commit is contained in:
parent
7cec8836b1
commit
f8dcf806d2
|
@ -1594,6 +1594,7 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
|
|||
* ret < 0 : error or timeout
|
||||
*/
|
||||
int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
|
||||
int32_t code = -1;
|
||||
SStreamValue key = {0};
|
||||
char* p = value;
|
||||
if (streamStateValueIsStale(p)) {
|
||||
|
@ -1601,10 +1602,16 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
|
|||
}
|
||||
p = taosDecodeFixedI64(p, &key.unixTimestamp);
|
||||
p = taosDecodeFixedI32(p, &key.len);
|
||||
if (key.len == 0) {
|
||||
code = 0;
|
||||
goto _EXCEPT;
|
||||
}
|
||||
|
||||
if (vlen == (sizeof(key.unixTimestamp) + sizeof(key.len) + key.len)) {
|
||||
// compatiable with previous data
|
||||
if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len);
|
||||
if (dest != NULL) {
|
||||
p = taosDecodeBinary(p, (void**)dest, key.len);
|
||||
}
|
||||
|
||||
} else {
|
||||
p = taosDecodeFixedI32(p, &key.rawLen);
|
||||
|
@ -1615,26 +1622,39 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
|
|||
}
|
||||
if (key.compress == 1) {
|
||||
char* pCompressData = NULL;
|
||||
if (key.len != 0) {
|
||||
p = taosDecodeBinary(p, (void**)&pCompressData, key.len);
|
||||
if (*dest) *dest = taosMemoryCalloc(1, key.rawLen);
|
||||
int32_t decompressSize = LZ4_decompress_safe(pCompressData, *dest, key.len, key.rawLen);
|
||||
ASSERT(decompressSize == key.rawLen);
|
||||
key.len = decompressSize;
|
||||
p = taosDecodeBinary(p, (void**)&pCompressData, key.len);
|
||||
|
||||
char* buf = taosMemoryCalloc(1, key.rawLen);
|
||||
int32_t rawLen = LZ4_decompress_safe(pCompressData, buf, key.len, key.rawLen);
|
||||
if (rawLen != key.rawLen) {
|
||||
stError("read invalid read, rawlen: %d, currlen: %d", key.rawLen, key.len);
|
||||
taosMemoryFree(buf);
|
||||
taosMemoryFree(pCompressData);
|
||||
goto _EXCEPT;
|
||||
}
|
||||
key.len = rawLen;
|
||||
taosMemoryFree(pCompressData);
|
||||
|
||||
if (dest) {
|
||||
*dest = buf;
|
||||
} else {
|
||||
taosMemoryFree(buf);
|
||||
}
|
||||
} else {
|
||||
if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len);
|
||||
if (dest != NULL) {
|
||||
p = taosDecodeBinary(p, (void**)dest, key.len);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs();
|
||||
|
||||
if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs();
|
||||
code = 0;
|
||||
return key.len;
|
||||
|
||||
_EXCEPT:
|
||||
if (dest != NULL) *dest = NULL;
|
||||
if (ttl != NULL) *ttl = 0;
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
const char* compareDefaultName(void* arg) {
|
||||
|
|
Loading…
Reference in New Issue