fix mem leak

This commit is contained in:
yihaoDeng 2023-10-09 15:53:42 +08:00
parent addb1107d2
commit 2aa88dfe9b
4 changed files with 56 additions and 33 deletions

View File

@ -104,6 +104,7 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
pHdr->type = SNAP_DATA_STREAM_STATE_BACKEND;
pHdr->size = len;
memcpy(pHdr->data, rowData, len);
taosMemoryFree(rowData);
tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode));
return code;

View File

@ -181,5 +181,7 @@ int32_t streamBackendTriggerChkp(void* pMeta, char* dst);
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId);
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
int32_t taskBackendBuildSnap(void* arg, int64_t chkpId);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif

View File

@ -950,7 +950,14 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI
return 0;
}
int32_t taskBackendBuildSnap(void* arg, int64_t chkpId) {
SStreamMeta* pMeta = arg;
void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL);
while (pIter) {
pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
}
return 0;
}
int32_t streamBackendTriggerChkp(void* arg, char* dst) {
return 0;
// SStreamMeta* pMeta = arg;

View File

@ -108,44 +108,57 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
return taosOpenFile(fullname, opt);
}
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) {
// impl later
int len = strlen(path);
char* tdir = taosMemoryCalloc(1, len + 256);
memcpy(tdir, path, len);
int32_t streamStateSnapBuild(void* arg, char* path, int64_t chkpId) {
return taskBackendBuildSnap(arg, chkpId);
// int32_t code = 0;
// int8_t validChkp = 0;
int32_t code = 0;
// int len = strlen(path);
// char* tpath = taosMemoryCalloc(1, len + 256);
// memcpy(tpath, path, len);
int8_t validChkp = 0;
if (chkpId != 0) {
sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP,
chkpId);
if (taosIsDir(tdir)) {
validChkp = 1;
qInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir);
streamBackendAddInUseChkp(pMeta, chkpId);
} else {
qWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir);
}
}
// SStreamMeta *pMeta = arg;
// if (chkpId != 0) {
// sprintf(tpath, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints",
// TD_DIRSEP,
// chkpId);
// if (taosIsDir(tpath)) {
// validChkp = 1;
// qInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tpath);
// streamBackendAddInUseChkp(pMeta, chkpId);
// } else {
// qWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER,
// tpath);
// }
// }
// no checkpoint specified or not exists invalid checkpoint, do checkpoint at default path and translate it
if (validChkp == 0) {
sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state");
char* chkpdir = taosMemoryCalloc(1, len + 256);
sprintf(chkpdir, "%s%s%s", tdir, TD_DIRSEP, "tmp");
taosMemoryFree(tdir);
// if (validChkp == 0) {
// sprintf(tpath, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state");
// char* chkpdir = taosMemoryCalloc(1, len + 256);
// sprintf(chkpdir, "%s%s%s", tpath, TD_DIRSEP, "tmp");
// taosMemoryFree(tpath);
tdir = chkpdir;
qInfo("%s start to trigger checkpoint on %s", STREAM_STATE_TRANSFER, tdir);
// tpath = chkpdir;
// qInfo("%s start to trigger checkpoint on %s", STREAM_STATE_TRANSFER, tpath);
code = streamBackendTriggerChkp(pMeta, tdir);
if (code != 0) {
qError("%s failed to trigger chekckpoint at %s", STREAM_STATE_TRANSFER, tdir);
taosMemoryFree(tdir);
return code;
}
chkpId = 0;
// code = streamBackendTriggerChkp(arg, tpath);
// if (code != 0) {
// qError("%s failed to trigger chekckpoint at %s", STREAM_STATE_TRANSFER, tpath);
// taosMemoryFree(tpath);
// return code;
// }
// chkpId = 0;
// }
//*dstPath = tpath;
}
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) {
// impl later
char* tdir = NULL;
int32_t code = streamStateSnapBuild(pMeta, path, chkpId);
if (code != 0) {
return -1;
}
qInfo("%s start to read dir: %s", STREAM_STATE_TRANSFER, tdir);