add compress

This commit is contained in:
yihaoDeng 2024-04-17 10:20:11 +08:00
parent a439f61708
commit 469e9ba069
2 changed files with 46 additions and 7 deletions

View File

@ -83,7 +83,8 @@ int32_t streamStateClearBuff(SStreamState* pState, void* pVal);
void streamStateFreeVal(void* val);
// count window
int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal, int32_t* pVLen);
int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal,
int32_t* pVLen);
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key);
@ -128,6 +129,8 @@ typedef struct SStateSessionKey {
typedef struct SStreamValue {
int64_t unixTimestamp;
int32_t len;
int32_t rawLen;
int8_t compress;
char* data;
} SStreamValue;

View File

@ -14,6 +14,7 @@
*/
#include "streamBackendRocksdb.h"
#include "lz4.h"
#include "streamInt.h"
#include "tcommon.h"
#include "tref.h"
@ -1550,23 +1551,42 @@ void destroyCompare(void* arg) {
}
int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)};
SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .rawLen = vlen, .compress = 0, .data = (char*)(value)};
int32_t len = 0;
if (vlen > 512) {
char* dst = taosMemoryCalloc(1, vlen + 128);
int32_t dstCap = vlen + 128;
int32_t compressedSize = LZ4_compress_default((char*)value, dst, vlen, dstCap);
if (compressedSize < vlen) {
key.compress = 1;
key.len = compressedSize;
value = dst;
}
stInfo("vlen: raw size: %d, compressed size: %d", vlen, compressedSize);
}
if (*dest == NULL) {
char* p = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int32_t) + key.len);
char* p = taosMemoryCalloc(
1, sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len);
char* buf = p;
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
len += taosEncodeFixedI32((void**)&buf, key.len);
len += taosEncodeBinary((void**)&buf, (char*)value, vlen);
len += taosEncodeFixedI32((void**)&buf, key.rawLen);
len += taosEncodeFixedI8((void**)&buf, key.compress);
len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
*dest = p;
} else {
char* buf = *dest;
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
len += taosEncodeFixedI32((void**)&buf, key.len);
len += taosEncodeBinary((void**)&buf, (char*)value, vlen);
len += taosEncodeFixedI32((void**)&buf, key.rawLen);
len += taosEncodeFixedI8((void**)&buf, key.compress);
len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
}
return len;
}
/*
* ret >= 0 : found valid value
* ret < 0 : error or timeout
@ -1579,11 +1599,27 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
}
p = taosDecodeFixedI64(p, &key.unixTimestamp);
p = taosDecodeFixedI32(p, &key.len);
if (vlen != (sizeof(int64_t) + sizeof(int32_t) + key.len)) {
p = taosDecodeFixedI32(p, &key.rawLen);
p = taosDecodeFixedI8(p, &key.compress);
if (vlen != (sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len)) {
stError("vlen: %d, read len: %d", vlen, key.len);
goto _EXCEPT;
}
if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len);
if (key.compress == 1) {
char* pCompressData = NULL;
if (key.len != 0) {
p = taosDecodeBinary(p, (void**)&pCompressData, key.len);
*dest = taosMemoryCalloc(1, key.rawLen);
int32_t decompressSize = LZ4_decompress_safe(pCompressData, *dest, key.len, key.rawLen);
ASSERT(decompressSize == key.rawLen);
key.len = decompressSize;
}
taosMemoryFree(pCompressData);
} else {
if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len);
}
if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs();
return key.len;