fix mem leak
This commit is contained in:
parent
cb2a9ee005
commit
e3f18495e0
|
@ -17,8 +17,8 @@
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "rocksdb/c.h"
|
#include "rocksdb/c.h"
|
||||||
#include "streamBackendRocksdb.h"
|
#include "streamBackendRocksdb.h"
|
||||||
#include "tcommon.h"
|
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
|
#include "tcommon.h"
|
||||||
|
|
||||||
enum SBackendFileType {
|
enum SBackendFileType {
|
||||||
ROCKSDB_OPTIONS_TYPE = 1,
|
ROCKSDB_OPTIONS_TYPE = 1,
|
||||||
|
@ -126,7 +126,8 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
|
||||||
stInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir);
|
stInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir);
|
||||||
streamBackendAddInUseChkp(pMeta, chkpId);
|
streamBackendAddInUseChkp(pMeta, chkpId);
|
||||||
} else {
|
} else {
|
||||||
stWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir);
|
stWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER,
|
||||||
|
tdir);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,6 +154,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
|
||||||
|
|
||||||
TdDirPtr pDir = taosOpenDir(tdir);
|
TdDirPtr pDir = taosOpenDir(tdir);
|
||||||
if (NULL == pDir) {
|
if (NULL == pDir) {
|
||||||
|
taosMemoryFree(tdir);
|
||||||
stError("%s failed to open %s", STREAM_STATE_TRANSFER, tdir);
|
stError("%s failed to open %s", STREAM_STATE_TRANSFER, tdir);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -271,7 +273,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
|
||||||
|
|
||||||
if (handle->checkpointId == 0) {
|
if (handle->checkpointId == 0) {
|
||||||
// del tmp dir
|
// del tmp dir
|
||||||
if (taosIsDir(pFile->path)) {
|
if (pFile && taosIsDir(pFile->path)) {
|
||||||
taosRemoveDir(pFile->path);
|
taosRemoveDir(pFile->path);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -335,24 +337,24 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
|
||||||
} else {
|
} else {
|
||||||
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
|
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
|
||||||
stDebug("%s open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
|
stDebug("%s open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
|
||||||
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
|
stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
|
||||||
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
|
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
|
||||||
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
|
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
|
||||||
if (nread == -1) {
|
if (nread == -1) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(terrno);
|
||||||
stError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name,
|
stError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name,
|
||||||
item->type, tstrerror(code));
|
item->type, tstrerror(code));
|
||||||
return -1;
|
return -1;
|
||||||
} else if (nread > 0 && nread <= kBlockSize) {
|
} else if (nread > 0 && nread <= kBlockSize) {
|
||||||
// left bytes less than kBlockSize
|
// left bytes less than kBlockSize
|
||||||
stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
|
stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
|
||||||
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
pHandle->offset += nread;
|
pHandle->offset += nread;
|
||||||
if (pHandle->offset >= item->size || nread < kBlockSize) {
|
if (pHandle->offset >= item->size || nread < kBlockSize) {
|
||||||
taosCloseFile(&pHandle->fd);
|
taosCloseFile(&pHandle->fd);
|
||||||
|
@ -361,7 +363,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stDebug("%s no data read, close file no.%d, move to next file, open and read", STREAM_STATE_TRANSFER,
|
stDebug("%s no data read, close file no.%d, move to next file, open and read", STREAM_STATE_TRANSFER,
|
||||||
pHandle->currFileIdx);
|
pHandle->currFileIdx);
|
||||||
taosCloseFile(&pHandle->fd);
|
taosCloseFile(&pHandle->fd);
|
||||||
pHandle->offset = 0;
|
pHandle->offset = 0;
|
||||||
pHandle->currFileIdx += 1;
|
pHandle->currFileIdx += 1;
|
||||||
|
@ -379,7 +381,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
|
||||||
pHandle->offset += nread;
|
pHandle->offset += nread;
|
||||||
|
|
||||||
stDebug("%s open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d",
|
stDebug("%s open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d",
|
||||||
STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
|
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
|
||||||
|
@ -434,8 +436,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
|
||||||
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
if (pHandle->fd == NULL) {
|
if (pHandle->fd == NULL) {
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(terrno);
|
||||||
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name,
|
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP,
|
||||||
tstrerror(code));
|
pHdr->name, tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -461,8 +463,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
|
||||||
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
if (pHandle->fd == NULL) {
|
if (pHandle->fd == NULL) {
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(terrno);
|
||||||
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name,
|
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP,
|
||||||
tstrerror(code));
|
pHdr->name, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
|
taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
|
||||||
|
|
Loading…
Reference in New Issue