diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c8532bf033..ce15088eee 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1594,6 +1594,7 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { * ret < 0 : error or timeout */ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { + int32_t code = -1; SStreamValue key = {0}; char* p = value; if (streamStateValueIsStale(p)) { @@ -1601,10 +1602,16 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { } p = taosDecodeFixedI64(p, &key.unixTimestamp); p = taosDecodeFixedI32(p, &key.len); + if (key.len == 0) { + code = 0; + goto _EXCEPT; + } if (vlen == (sizeof(key.unixTimestamp) + sizeof(key.len) + key.len)) { // compatiable with previous data - if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); + if (dest != NULL) { + p = taosDecodeBinary(p, (void**)dest, key.len); + } } else { p = taosDecodeFixedI32(p, &key.rawLen); @@ -1615,26 +1622,39 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { } if (key.compress == 1) { char* pCompressData = NULL; - if (key.len != 0) { - p = taosDecodeBinary(p, (void**)&pCompressData, key.len); - if (*dest) *dest = taosMemoryCalloc(1, key.rawLen); - int32_t decompressSize = LZ4_decompress_safe(pCompressData, *dest, key.len, key.rawLen); - ASSERT(decompressSize == key.rawLen); - key.len = decompressSize; + p = taosDecodeBinary(p, (void**)&pCompressData, key.len); + + char* buf = taosMemoryCalloc(1, key.rawLen); + int32_t rawLen = LZ4_decompress_safe(pCompressData, buf, key.len, key.rawLen); + if (rawLen != key.rawLen) { + stError("read invalid read, rawlen: %d, currlen: %d", key.rawLen, key.len); + taosMemoryFree(buf); + taosMemoryFree(pCompressData); + goto _EXCEPT; } + key.len = rawLen; taosMemoryFree(pCompressData); + + if (dest) { + *dest = buf; + } else { + taosMemoryFree(buf); + } } else { - if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); + if (dest != NULL) { + p = taosDecodeBinary(p, (void**)dest, key.len); + } } } - if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); + if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); + code = 0; return key.len; _EXCEPT: if (dest != NULL) *dest = NULL; if (ttl != NULL) *ttl = 0; - return -1; + return code; } const char* compareDefaultName(void* arg) {