From 469e9ba0697b5e0426d15f9498ec543c050f2be2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Apr 2024 10:20:11 +0800 Subject: [PATCH 01/12] add compress --- include/libs/stream/streamState.h | 5 +- source/libs/stream/src/streamBackendRocksdb.c | 48 ++++++++++++++++--- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index f1c9d712e8..7813b2cc9a 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -83,7 +83,8 @@ int32_t streamStateClearBuff(SStreamState* pState, void* pVal); void streamStateFreeVal(void* val); // count window -int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal, int32_t* pVLen); +int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal, + int32_t* pVLen); int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen); SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key); @@ -128,6 +129,8 @@ typedef struct SStateSessionKey { typedef struct SStreamValue { int64_t unixTimestamp; int32_t len; + int32_t rawLen; + int8_t compress; char* data; } SStreamValue; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 5dafad8951..9a79bf7185 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -14,6 +14,7 @@ */ #include "streamBackendRocksdb.h" +#include "lz4.h" #include "streamInt.h" #include "tcommon.h" #include "tref.h" @@ -1550,23 +1551,42 @@ void destroyCompare(void* arg) { } int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { - SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)}; + SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .rawLen = vlen, .compress = 0, .data = (char*)(value)}; int32_t len = 0; + if (vlen > 512) { + char* dst = taosMemoryCalloc(1, vlen + 128); + int32_t dstCap = vlen + 128; + int32_t compressedSize = LZ4_compress_default((char*)value, dst, vlen, dstCap); + if (compressedSize < vlen) { + key.compress = 1; + key.len = compressedSize; + value = dst; + } + stInfo("vlen: raw size: %d, compressed size: %d", vlen, compressedSize); + } + if (*dest == NULL) { - char* p = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int32_t) + key.len); + char* p = taosMemoryCalloc( + 1, sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len); char* buf = p; len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); len += taosEncodeFixedI32((void**)&buf, key.len); - len += taosEncodeBinary((void**)&buf, (char*)value, vlen); + len += taosEncodeFixedI32((void**)&buf, key.rawLen); + len += taosEncodeFixedI8((void**)&buf, key.compress); + len += taosEncodeBinary((void**)&buf, (char*)value, key.len); *dest = p; } else { char* buf = *dest; len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); len += taosEncodeFixedI32((void**)&buf, key.len); - len += taosEncodeBinary((void**)&buf, (char*)value, vlen); + len += taosEncodeFixedI32((void**)&buf, key.rawLen); + len += taosEncodeFixedI8((void**)&buf, key.compress); + len += taosEncodeBinary((void**)&buf, (char*)value, key.len); } + return len; } + /* * ret >= 0 : found valid value * ret < 0 : error or timeout @@ -1579,11 +1599,27 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { } p = taosDecodeFixedI64(p, &key.unixTimestamp); p = taosDecodeFixedI32(p, &key.len); - if (vlen != (sizeof(int64_t) + sizeof(int32_t) + key.len)) { + p = taosDecodeFixedI32(p, &key.rawLen); + p = taosDecodeFixedI8(p, &key.compress); + + if (vlen != (sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len)) { stError("vlen: %d, read len: %d", vlen, key.len); goto _EXCEPT; } - if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); + if (key.compress == 1) { + char* pCompressData = NULL; + if (key.len != 0) { + p = taosDecodeBinary(p, (void**)&pCompressData, key.len); + *dest = taosMemoryCalloc(1, key.rawLen); + int32_t decompressSize = LZ4_decompress_safe(pCompressData, *dest, key.len, key.rawLen); + ASSERT(decompressSize == key.rawLen); + key.len = decompressSize; + } + taosMemoryFree(pCompressData); + + } else { + if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); + } if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); return key.len; From 2da8645d36fde33ef7e5ff153b4fe59fe7c53d1a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Apr 2024 10:22:54 +0800 Subject: [PATCH 02/12] add compress --- source/libs/stream/src/streamBackendRocksdb.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 9a79bf7185..02dba50d84 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1553,8 +1553,9 @@ void destroyCompare(void* arg) { int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .rawLen = vlen, .compress = 0, .data = (char*)(value)}; int32_t len = 0; + char* dst = NULL; if (vlen > 512) { - char* dst = taosMemoryCalloc(1, vlen + 128); + dst = taosMemoryCalloc(1, vlen + 128); int32_t dstCap = vlen + 128; int32_t compressedSize = LZ4_compress_default((char*)value, dst, vlen, dstCap); if (compressedSize < vlen) { @@ -1583,6 +1584,7 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { len += taosEncodeFixedI8((void**)&buf, key.compress); len += taosEncodeBinary((void**)&buf, (char*)value, key.len); } + taosMemoryFree(dst); return len; } From 40390ad9184a38f8cd35f0d63df96669aa1c3463 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Apr 2024 10:23:17 +0800 Subject: [PATCH 03/12] add compress --- source/libs/stream/src/streamBackendRocksdb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 02dba50d84..2a6c4cf4b6 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1563,7 +1563,7 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { key.len = compressedSize; value = dst; } - stInfo("vlen: raw size: %d, compressed size: %d", vlen, compressedSize); + stDebug("vlen: raw size: %d, compressed size: %d", vlen, compressedSize); } if (*dest == NULL) { From 1d9b65226e22af9a8686fddf0714dc5600135fb5 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 17 Apr 2024 07:02:55 +0000 Subject: [PATCH 04/12] add compress --- source/libs/stream/src/tstreamFileState.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 6014350f58..976006dbbd 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -28,8 +28,8 @@ #define MIN_NUM_OF_ROW_BUFF 10240 #define MIN_NUM_OF_RECOVER_ROW_BUFF 128 -#define TASK_KEY "streamFileState" -#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint" +#define TASK_KEY "streamFileState" +#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint" struct SStreamFileState { SList* usedBuffs; @@ -560,7 +560,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName); - int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 1; + int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64; char* buf = taosMemoryCalloc(1, len); void* batch = streamStateCreateBatch(); @@ -612,7 +612,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) { - char keyBuf[128] = {0}; + char keyBuf[128] = {0}; sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, checkpointId); return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf); } @@ -622,8 +622,8 @@ int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) { } int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { - int32_t code = TSDB_CODE_SUCCESS; - int64_t maxCheckPointId = 0; + int32_t code = TSDB_CODE_SUCCESS; + int64_t maxCheckPointId = 0; { char buf[128] = {0}; void* val = NULL; From 0f629bebda32addfbde4ed02ed63363947eca3e0 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 17 Apr 2024 07:35:00 +0000 Subject: [PATCH 05/12] add compress --- source/libs/stream/src/streamBackendRocksdb.c | 43 +++++++++++-------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 2a6c4cf4b6..ce9b69b8bc 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1601,29 +1601,36 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { } p = taosDecodeFixedI64(p, &key.unixTimestamp); p = taosDecodeFixedI32(p, &key.len); - p = taosDecodeFixedI32(p, &key.rawLen); - p = taosDecodeFixedI8(p, &key.compress); - if (vlen != (sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len)) { - stError("vlen: %d, read len: %d", vlen, key.len); - goto _EXCEPT; - } - if (key.compress == 1) { - char* pCompressData = NULL; - if (key.len != 0) { - p = taosDecodeBinary(p, (void**)&pCompressData, key.len); - *dest = taosMemoryCalloc(1, key.rawLen); - int32_t decompressSize = LZ4_decompress_safe(pCompressData, *dest, key.len, key.rawLen); - ASSERT(decompressSize == key.rawLen); - key.len = decompressSize; - } - taosMemoryFree(pCompressData); + if (vlen == (sizeof(key.unixTimestamp) + sizeof(key.len) + key.len)) { + // compatiable with previous data + if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); } else { - if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); + p = taosDecodeFixedI32(p, &key.rawLen); + p = taosDecodeFixedI8(p, &key.compress); + if (vlen != (sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len)) { + stError("vlen: %d, read len: %d", vlen, key.len); + goto _EXCEPT; + } + if (key.compress == 1) { + char* pCompressData = NULL; + if (key.len != 0) { + p = taosDecodeBinary(p, (void**)&pCompressData, key.len); + *dest = taosMemoryCalloc(1, key.rawLen); + int32_t decompressSize = LZ4_decompress_safe(pCompressData, *dest, key.len, key.rawLen); + ASSERT(decompressSize == key.rawLen); + key.len = decompressSize; + } + taosMemoryFree(pCompressData); + + } else { + if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); + } + + if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); } - if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); return key.len; _EXCEPT: From 1363cd0a71f915025e5940331a92b2a4642dfb98 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 17 Apr 2024 08:33:40 +0000 Subject: [PATCH 06/12] add compress --- source/libs/stream/src/streamBackendRocksdb.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index ce9b69b8bc..f06dc368bd 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1615,7 +1615,7 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { } if (key.compress == 1) { char* pCompressData = NULL; - if (key.len != 0) { + if (key.len != 0 && *dest != NULL) { p = taosDecodeBinary(p, (void**)&pCompressData, key.len); *dest = taosMemoryCalloc(1, key.rawLen); int32_t decompressSize = LZ4_decompress_safe(pCompressData, *dest, key.len, key.rawLen); @@ -1623,7 +1623,6 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { key.len = decompressSize; } taosMemoryFree(pCompressData); - } else { if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); } From 15f9e1e29e1d116224cf1633ebc4f4b60bf0a08e Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 17 Apr 2024 08:38:46 +0000 Subject: [PATCH 07/12] add compress --- source/libs/stream/src/streamBackendRocksdb.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f06dc368bd..f66eacde6c 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1615,9 +1615,9 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { } if (key.compress == 1) { char* pCompressData = NULL; - if (key.len != 0 && *dest != NULL) { + if (key.len != 0) { p = taosDecodeBinary(p, (void**)&pCompressData, key.len); - *dest = taosMemoryCalloc(1, key.rawLen); + if (*dest) *dest = taosMemoryCalloc(1, key.rawLen); int32_t decompressSize = LZ4_decompress_safe(pCompressData, *dest, key.len, key.rawLen); ASSERT(decompressSize == key.rawLen); key.len = decompressSize; From 7cec8836b1dbe82c6b397c23df24b8ed403d8279 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 17 Apr 2024 08:39:43 +0000 Subject: [PATCH 08/12] add compress --- source/libs/stream/src/streamBackendRocksdb.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f66eacde6c..c8532bf033 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1626,9 +1626,8 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { } else { if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); } - - if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); } + if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); return key.len; From f8dcf806d224335a28e08d5b82395024a1236c96 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 17 Apr 2024 08:58:10 +0000 Subject: [PATCH 09/12] add compress --- source/libs/stream/src/streamBackendRocksdb.c | 40 ++++++++++++++----- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c8532bf033..ce15088eee 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1594,6 +1594,7 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { * ret < 0 : error or timeout */ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { + int32_t code = -1; SStreamValue key = {0}; char* p = value; if (streamStateValueIsStale(p)) { @@ -1601,10 +1602,16 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { } p = taosDecodeFixedI64(p, &key.unixTimestamp); p = taosDecodeFixedI32(p, &key.len); + if (key.len == 0) { + code = 0; + goto _EXCEPT; + } if (vlen == (sizeof(key.unixTimestamp) + sizeof(key.len) + key.len)) { // compatiable with previous data - if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); + if (dest != NULL) { + p = taosDecodeBinary(p, (void**)dest, key.len); + } } else { p = taosDecodeFixedI32(p, &key.rawLen); @@ -1615,26 +1622,39 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { } if (key.compress == 1) { char* pCompressData = NULL; - if (key.len != 0) { - p = taosDecodeBinary(p, (void**)&pCompressData, key.len); - if (*dest) *dest = taosMemoryCalloc(1, key.rawLen); - int32_t decompressSize = LZ4_decompress_safe(pCompressData, *dest, key.len, key.rawLen); - ASSERT(decompressSize == key.rawLen); - key.len = decompressSize; + p = taosDecodeBinary(p, (void**)&pCompressData, key.len); + + char* buf = taosMemoryCalloc(1, key.rawLen); + int32_t rawLen = LZ4_decompress_safe(pCompressData, buf, key.len, key.rawLen); + if (rawLen != key.rawLen) { + stError("read invalid read, rawlen: %d, currlen: %d", key.rawLen, key.len); + taosMemoryFree(buf); + taosMemoryFree(pCompressData); + goto _EXCEPT; } + key.len = rawLen; taosMemoryFree(pCompressData); + + if (dest) { + *dest = buf; + } else { + taosMemoryFree(buf); + } } else { - if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); + if (dest != NULL) { + p = taosDecodeBinary(p, (void**)dest, key.len); + } } } - if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); + if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); + code = 0; return key.len; _EXCEPT: if (dest != NULL) *dest = NULL; if (ttl != NULL) *ttl = 0; - return -1; + return code; } const char* compareDefaultName(void* arg) { From 2547f29de92e385570562be0c97da5faaded6db9 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 17 Apr 2024 08:59:38 +0000 Subject: [PATCH 10/12] add compress --- source/libs/stream/src/streamBackendRocksdb.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index ce15088eee..f0c4c2e974 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1600,6 +1600,7 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { if (streamStateValueIsStale(p)) { goto _EXCEPT; } + p = taosDecodeFixedI64(p, &key.unixTimestamp); p = taosDecodeFixedI32(p, &key.len); if (key.len == 0) { @@ -1633,13 +1634,13 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { goto _EXCEPT; } key.len = rawLen; - taosMemoryFree(pCompressData); if (dest) { *dest = buf; } else { taosMemoryFree(buf); } + taosMemoryFree(pCompressData); } else { if (dest != NULL) { p = taosDecodeBinary(p, (void**)dest, key.len); From 07008747edebaf4492ea54f6d7efc57b0b3b6b4e Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 17 Apr 2024 09:09:30 +0000 Subject: [PATCH 11/12] add compress --- source/libs/stream/src/streamBackendRocksdb.c | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f0c4c2e974..83558283f4 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1597,6 +1597,9 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { int32_t code = -1; SStreamValue key = {0}; char* p = value; + + char* pCompressData = NULL; + char* pOutput = NULL; if (streamStateValueIsStale(p)) { goto _EXCEPT; } @@ -1622,25 +1625,19 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { goto _EXCEPT; } if (key.compress == 1) { - char* pCompressData = NULL; p = taosDecodeBinary(p, (void**)&pCompressData, key.len); - - char* buf = taosMemoryCalloc(1, key.rawLen); - int32_t rawLen = LZ4_decompress_safe(pCompressData, buf, key.len, key.rawLen); + pOutput = taosMemoryCalloc(1, key.rawLen); + int32_t rawLen = LZ4_decompress_safe(pCompressData, pOutput, key.len, key.rawLen); if (rawLen != key.rawLen) { stError("read invalid read, rawlen: %d, currlen: %d", key.rawLen, key.len); - taosMemoryFree(buf); - taosMemoryFree(pCompressData); goto _EXCEPT; } key.len = rawLen; if (dest) { - *dest = buf; - } else { - taosMemoryFree(buf); + *dest = pOutput; + pOutput = NULL; } - taosMemoryFree(pCompressData); } else { if (dest != NULL) { p = taosDecodeBinary(p, (void**)dest, key.len); @@ -1650,11 +1647,17 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); code = 0; + + taosMemoryFree(pOutput); + taosMemoryFree(pCompressData); return key.len; _EXCEPT: if (dest != NULL) *dest = NULL; if (ttl != NULL) *ttl = 0; + + taosMemoryFree(pOutput); + taosMemoryFree(pCompressData); return code; } From a461b24ac9a4cafc20f036efe3b270eea5265321 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 17 Apr 2024 09:37:23 +0000 Subject: [PATCH 12/12] add compress --- source/libs/stream/src/streamBackendRocksdb.c | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 83558283f4..380680b642 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1610,13 +1610,9 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { code = 0; goto _EXCEPT; } - if (vlen == (sizeof(key.unixTimestamp) + sizeof(key.len) + key.len)) { // compatiable with previous data - if (dest != NULL) { - p = taosDecodeBinary(p, (void**)dest, key.len); - } - + p = taosDecodeBinary(p, (void**)&pOutput, key.len); } else { p = taosDecodeFixedI32(p, &key.rawLen); p = taosDecodeFixedI8(p, &key.compress); @@ -1633,23 +1629,20 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { goto _EXCEPT; } key.len = rawLen; - - if (dest) { - *dest = pOutput; - pOutput = NULL; - } } else { - if (dest != NULL) { - p = taosDecodeBinary(p, (void**)dest, key.len); - } + p = taosDecodeBinary(p, (void**)&pOutput, key.len); } } if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); - code = 0; - taosMemoryFree(pOutput); + code = 0; + if (dest) { + *dest = pOutput; + pOutput = NULL; + } taosMemoryFree(pCompressData); + taosMemoryFree(pOutput); return key.len; _EXCEPT: