fix mem leak
This commit is contained in:
parent
b92dcea9fb
commit
ae57974b30
|
@ -2112,11 +2112,15 @@ void destroyCompare(void* arg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
|
int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
|
||||||
|
int32_t code = 0;
|
||||||
SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .rawLen = vlen, .compress = 0, .data = (char*)(value)};
|
SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .rawLen = vlen, .compress = 0, .data = (char*)(value)};
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
char* dst = NULL;
|
char* dst = NULL;
|
||||||
if (vlen > 512) {
|
if (vlen > 512) {
|
||||||
dst = taosMemoryCalloc(1, vlen + 128);
|
dst = taosMemoryCalloc(1, vlen + 128);
|
||||||
|
if (dst == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
int32_t dstCap = vlen + 128;
|
int32_t dstCap = vlen + 128;
|
||||||
int32_t compressedSize = LZ4_compress_default((char*)value, dst, vlen, dstCap);
|
int32_t compressedSize = LZ4_compress_default((char*)value, dst, vlen, dstCap);
|
||||||
if (compressedSize < vlen) {
|
if (compressedSize < vlen) {
|
||||||
|
@ -2129,7 +2133,11 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
|
||||||
if (*dest == NULL) {
|
if (*dest == NULL) {
|
||||||
size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len;
|
size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len;
|
||||||
char* p = taosMemoryCalloc(1, size);
|
char* p = taosMemoryCalloc(1, size);
|
||||||
char* buf = p;
|
if (p == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exception;
|
||||||
|
}
|
||||||
|
char* buf = p;
|
||||||
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
|
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
|
||||||
len += taosEncodeFixedI32((void**)&buf, key.len);
|
len += taosEncodeFixedI32((void**)&buf, key.len);
|
||||||
len += taosEncodeFixedI32((void**)&buf, key.rawLen);
|
len += taosEncodeFixedI32((void**)&buf, key.rawLen);
|
||||||
|
@ -2151,6 +2159,9 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
|
||||||
|
|
||||||
taosMemoryFree(dst);
|
taosMemoryFree(dst);
|
||||||
return len;
|
return len;
|
||||||
|
_exception:
|
||||||
|
taosMemoryFree(dst);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -2165,6 +2176,7 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
|
||||||
char* pCompressData = NULL;
|
char* pCompressData = NULL;
|
||||||
char* pOutput = NULL;
|
char* pOutput = NULL;
|
||||||
if (streamStateValueIsStale(p)) {
|
if (streamStateValueIsStale(p)) {
|
||||||
|
code = TSDB_CODE_INVALID_DATA_FMT;
|
||||||
goto _EXCEPT;
|
goto _EXCEPT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2177,24 +2189,44 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
|
||||||
if (vlen == (sizeof(key.unixTimestamp) + sizeof(key.len) + key.len)) {
|
if (vlen == (sizeof(key.unixTimestamp) + sizeof(key.len) + key.len)) {
|
||||||
// compatiable with previous data
|
// compatiable with previous data
|
||||||
p = taosDecodeBinary(p, (void**)&pOutput, key.len);
|
p = taosDecodeBinary(p, (void**)&pOutput, key.len);
|
||||||
|
if (p == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _EXCEPT;
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
p = taosDecodeFixedI32(p, &key.rawLen);
|
p = taosDecodeFixedI32(p, &key.rawLen);
|
||||||
p = taosDecodeFixedI8(p, &key.compress);
|
p = taosDecodeFixedI8(p, &key.compress);
|
||||||
if (vlen != (sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len)) {
|
if (vlen != (sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len)) {
|
||||||
stError("vlen: %d, read len: %d", vlen, key.len);
|
stError("vlen: %d, read len: %d", vlen, key.len);
|
||||||
|
code = TSDB_CODE_INVALID_DATA_FMT;
|
||||||
goto _EXCEPT;
|
goto _EXCEPT;
|
||||||
}
|
}
|
||||||
if (key.compress == 1) {
|
if (key.compress == 1) {
|
||||||
p = taosDecodeBinary(p, (void**)&pCompressData, key.len);
|
p = taosDecodeBinary(p, (void**)&pCompressData, key.len);
|
||||||
|
if (p == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _EXCEPT;
|
||||||
|
}
|
||||||
pOutput = taosMemoryCalloc(1, key.rawLen);
|
pOutput = taosMemoryCalloc(1, key.rawLen);
|
||||||
|
if (pOutput == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _EXCEPT;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t rawLen = LZ4_decompress_safe(pCompressData, pOutput, key.len, key.rawLen);
|
int32_t rawLen = LZ4_decompress_safe(pCompressData, pOutput, key.len, key.rawLen);
|
||||||
if (rawLen != key.rawLen) {
|
if (rawLen != key.rawLen) {
|
||||||
stError("read invalid read, rawlen: %d, currlen: %d", key.rawLen, key.len);
|
stError("read invalid read, rawlen: %d, currlen: %d", key.rawLen, key.len);
|
||||||
|
code = TSDB_CODE_INVALID_DATA_FMT;
|
||||||
goto _EXCEPT;
|
goto _EXCEPT;
|
||||||
}
|
}
|
||||||
key.len = rawLen;
|
key.len = rawLen;
|
||||||
} else {
|
} else {
|
||||||
p = taosDecodeBinary(p, (void**)&pOutput, key.len);
|
p = taosDecodeBinary(p, (void**)&pOutput, key.len);
|
||||||
|
if (p == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _EXCEPT;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3179,6 +3211,7 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
idx = -1;
|
idx = -1;
|
||||||
stError("failed to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err);
|
stError("failed to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err);
|
||||||
|
rocksdb_column_family_handle_destroy(cf);
|
||||||
taosMemoryFree(err);
|
taosMemoryFree(err);
|
||||||
} else {
|
} else {
|
||||||
stDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName);
|
stDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName);
|
||||||
|
@ -3224,7 +3257,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
||||||
int i = streamStateGetCfIdx(pState, funcname); \
|
int i = streamStateGetCfIdx(pState, funcname); \
|
||||||
if (i < 0) { \
|
if (i < 0) { \
|
||||||
stWarn("streamState failed to get cf name: %s", funcname); \
|
stWarn("streamState failed to get cf name: %s", funcname); \
|
||||||
code = -1; \
|
code = TSDB_CODE_THIRDPARTY_ERROR; \
|
||||||
break; \
|
break; \
|
||||||
} \
|
} \
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
||||||
|
@ -3241,7 +3274,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
||||||
if (err != NULL) { \
|
if (err != NULL) { \
|
||||||
stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
|
stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
|
||||||
taosMemoryFree(err); \
|
taosMemoryFree(err); \
|
||||||
code = -1; \
|
code = TSDB_CODE_THIRDPARTY_ERROR; \
|
||||||
} else { \
|
} else { \
|
||||||
stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \
|
stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \
|
||||||
ttlVLen, wrapper); \
|
ttlVLen, wrapper); \
|
||||||
|
@ -3301,7 +3334,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
||||||
int i = streamStateGetCfIdx(pState, funcname); \
|
int i = streamStateGetCfIdx(pState, funcname); \
|
||||||
if (i < 0) { \
|
if (i < 0) { \
|
||||||
stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
|
stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
|
||||||
code = -1; \
|
code = TSDB_CODE_THIRDPARTY_ERROR; \
|
||||||
break; \
|
break; \
|
||||||
} \
|
} \
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
||||||
|
@ -3316,7 +3349,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
||||||
if (err != NULL) { \
|
if (err != NULL) { \
|
||||||
stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
|
stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
|
||||||
taosMemoryFree(err); \
|
taosMemoryFree(err); \
|
||||||
code = -1; \
|
code = TSDB_CODE_THIRDPARTY_ERROR; \
|
||||||
} else { \
|
} else { \
|
||||||
stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
|
stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
|
||||||
} \
|
} \
|
||||||
|
|
Loading…
Reference in New Issue