diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4793a8951a..ced62ba82e 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2112,11 +2112,15 @@ void destroyCompare(void* arg) { } int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { + int32_t code = 0; SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .rawLen = vlen, .compress = 0, .data = (char*)(value)}; int32_t len = 0; char* dst = NULL; if (vlen > 512) { dst = taosMemoryCalloc(1, vlen + 128); + if (dst == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } int32_t dstCap = vlen + 128; int32_t compressedSize = LZ4_compress_default((char*)value, dst, vlen, dstCap); if (compressedSize < vlen) { @@ -2129,7 +2133,11 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { if (*dest == NULL) { size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len; char* p = taosMemoryCalloc(1, size); - char* buf = p; + if (p == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exception; + } + char* buf = p; len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); len += taosEncodeFixedI32((void**)&buf, key.len); len += taosEncodeFixedI32((void**)&buf, key.rawLen); @@ -2151,6 +2159,9 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { taosMemoryFree(dst); return len; +_exception: + taosMemoryFree(dst); + return code; } /* @@ -2165,6 +2176,7 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { char* pCompressData = NULL; char* pOutput = NULL; if (streamStateValueIsStale(p)) { + code = TSDB_CODE_INVALID_DATA_FMT; goto _EXCEPT; } @@ -2177,24 +2189,44 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) { if (vlen == (sizeof(key.unixTimestamp) + sizeof(key.len) + key.len)) { // compatiable with previous data p = taosDecodeBinary(p, (void**)&pOutput, key.len); + if (p == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _EXCEPT; + } + } else { p = taosDecodeFixedI32(p, &key.rawLen); p = taosDecodeFixedI8(p, &key.compress); if (vlen != (sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len)) { stError("vlen: %d, read len: %d", vlen, key.len); + code = TSDB_CODE_INVALID_DATA_FMT; goto _EXCEPT; } if (key.compress == 1) { p = taosDecodeBinary(p, (void**)&pCompressData, key.len); + if (p == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _EXCEPT; + } pOutput = taosMemoryCalloc(1, key.rawLen); + if (pOutput == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _EXCEPT; + } + int32_t rawLen = LZ4_decompress_safe(pCompressData, pOutput, key.len, key.rawLen); if (rawLen != key.rawLen) { stError("read invalid read, rawlen: %d, currlen: %d", key.rawLen, key.len); + code = TSDB_CODE_INVALID_DATA_FMT; goto _EXCEPT; } key.len = rawLen; } else { p = taosDecodeBinary(p, (void**)&pOutput, key.len); + if (p == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _EXCEPT; + } } } @@ -3179,6 +3211,7 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) { if (err != NULL) { idx = -1; stError("failed to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err); + rocksdb_column_family_handle_destroy(cf); taosMemoryFree(err); } else { stDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName); @@ -3224,7 +3257,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe int i = streamStateGetCfIdx(pState, funcname); \ if (i < 0) { \ stWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ + code = TSDB_CODE_THIRDPARTY_ERROR; \ break; \ } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ @@ -3241,7 +3274,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe if (err != NULL) { \ stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ taosMemoryFree(err); \ - code = -1; \ + code = TSDB_CODE_THIRDPARTY_ERROR; \ } else { \ stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \ ttlVLen, wrapper); \ @@ -3301,7 +3334,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe int i = streamStateGetCfIdx(pState, funcname); \ if (i < 0) { \ stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ - code = -1; \ + code = TSDB_CODE_THIRDPARTY_ERROR; \ break; \ } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ @@ -3316,7 +3349,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe if (err != NULL) { \ stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ taosMemoryFree(err); \ - code = -1; \ + code = TSDB_CODE_THIRDPARTY_ERROR; \ } else { \ stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \ } \