diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index f1c9d712e8..7813b2cc9a 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -83,7 +83,8 @@ int32_t streamStateClearBuff(SStreamState* pState, void* pVal); void streamStateFreeVal(void* val); // count window -int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal, int32_t* pVLen); +int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal, + int32_t* pVLen); int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen); SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key); @@ -128,6 +129,8 @@ typedef struct SStateSessionKey { typedef struct SStreamValue { int64_t unixTimestamp; int32_t len; + int32_t rawLen; + int8_t compress; char* data; } SStreamValue; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 5dafad8951..9a79bf7185 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -14,6 +14,7 @@ */ #include "streamBackendRocksdb.h" +#include "lz4.h" #include "streamInt.h" #include "tcommon.h" #include "tref.h" @@ -1550,23 +1551,42 @@ void destroyCompare(void* arg) { } int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { - SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)}; + SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .rawLen = vlen, .compress = 0, .data = (char*)(value)}; int32_t len = 0; + if (vlen > 512) { + char* dst = taosMemoryCalloc(1, vlen + 128); + int32_t dstCap = vlen + 128; + int32_t compressedSize = LZ4_compress_default((char*)value, dst, vlen, dstCap); + if (compressedSize < vlen) { + key.compress = 1; + key.len = compressedSize; + value = dst; + } + stInfo("vlen: raw size: %d, compressed size: %d", vlen, compressedSize); + } + if (*dest == NULL) { - char* p = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int32_t) + key.len); + char* p = taosMemoryCalloc( + 1, sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len); char* buf = p; len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); len += taosEncodeFixedI32((void**)&buf, key.len); - len += taosEncodeBinary((void**)&buf, (char*)value, vlen); + len += taosEncodeFixedI32((void**)&buf, key.rawLen); + len += taosEncodeFixedI8((void**)&buf, key.compress); + len += taosEncodeBinary((void**)&buf, (char*)value, key.len); *dest = p; } else { char* buf = *dest; len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); len += taosEncodeFixedI32((void**)&buf, key.len); - len += taosEncodeBinary((void**)&buf, (char*)value, vlen); + len += taosEncodeFixedI32((void**)&buf, key.rawLen); + len += taosEncodeFixedI8((void**)&buf, key.compress); + len += taosEncodeBinary((void**)&buf, (char*)value, key.len); } + return len; } + /* * ret >= 0 : found valid value * ret < 0 : error or timeout @@ -1579,11 +1599,27 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { } p = taosDecodeFixedI64(p, &key.unixTimestamp); p = taosDecodeFixedI32(p, &key.len); - if (vlen != (sizeof(int64_t) + sizeof(int32_t) + key.len)) { + p = taosDecodeFixedI32(p, &key.rawLen); + p = taosDecodeFixedI8(p, &key.compress); + + if (vlen != (sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len)) { stError("vlen: %d, read len: %d", vlen, key.len); goto _EXCEPT; } - if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); + if (key.compress == 1) { + char* pCompressData = NULL; + if (key.len != 0) { + p = taosDecodeBinary(p, (void**)&pCompressData, key.len); + *dest = taosMemoryCalloc(1, key.rawLen); + int32_t decompressSize = LZ4_decompress_safe(pCompressData, *dest, key.len, key.rawLen); + ASSERT(decompressSize == key.rawLen); + key.len = decompressSize; + } + taosMemoryFree(pCompressData); + + } else { + if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); + } if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); return key.len;