add compress

This commit is contained in:
Yihao Deng 2024-04-17 09:09:30 +00:00
parent 2547f29de9
commit 07008747ed
1 changed files with 13 additions and 10 deletions

View File

@ -1597,6 +1597,9 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
int32_t code = -1;
SStreamValue key = {0};
char* p = value;
char* pCompressData = NULL;
char* pOutput = NULL;
if (streamStateValueIsStale(p)) {
goto _EXCEPT;
}
@ -1622,25 +1625,19 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
goto _EXCEPT;
}
if (key.compress == 1) {
char* pCompressData = NULL;
p = taosDecodeBinary(p, (void**)&pCompressData, key.len);
char* buf = taosMemoryCalloc(1, key.rawLen);
int32_t rawLen = LZ4_decompress_safe(pCompressData, buf, key.len, key.rawLen);
pOutput = taosMemoryCalloc(1, key.rawLen);
int32_t rawLen = LZ4_decompress_safe(pCompressData, pOutput, key.len, key.rawLen);
if (rawLen != key.rawLen) {
stError("read invalid read, rawlen: %d, currlen: %d", key.rawLen, key.len);
taosMemoryFree(buf);
taosMemoryFree(pCompressData);
goto _EXCEPT;
}
key.len = rawLen;
if (dest) {
*dest = buf;
} else {
taosMemoryFree(buf);
*dest = pOutput;
pOutput = NULL;
}
taosMemoryFree(pCompressData);
} else {
if (dest != NULL) {
p = taosDecodeBinary(p, (void**)dest, key.len);
@ -1650,11 +1647,17 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs();
code = 0;
taosMemoryFree(pOutput);
taosMemoryFree(pCompressData);
return key.len;
_EXCEPT:
if (dest != NULL) *dest = NULL;
if (ttl != NULL) *ttl = 0;
taosMemoryFree(pOutput);
taosMemoryFree(pCompressData);
return code;
}