fix mem leak
This commit is contained in:
parent
bffa6387fe
commit
0b92a6b1ea
|
@ -717,7 +717,9 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
|
||||||
*/
|
*/
|
||||||
int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
|
int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
|
||||||
STaskBackendWrapper* pBackend = arg;
|
STaskBackendWrapper* pBackend = arg;
|
||||||
|
|
||||||
taosThreadRwlockWrlock(&pBackend->chkpDirLock);
|
taosThreadRwlockWrlock(&pBackend->chkpDirLock);
|
||||||
|
|
||||||
taosArrayPush(pBackend->chkpSaved, &chkpId);
|
taosArrayPush(pBackend->chkpSaved, &chkpId);
|
||||||
|
|
||||||
SArray* chkpDel = taosArrayInit(8, sizeof(int64_t));
|
SArray* chkpDel = taosArrayInit(8, sizeof(int64_t));
|
||||||
|
@ -1028,6 +1030,7 @@ int32_t taskBackendDoCheckpoint(void* arg, uint64_t chkpId) {
|
||||||
char* pChkpDir = NULL;
|
char* pChkpDir = NULL;
|
||||||
char* pChkpIdDir = NULL;
|
char* pChkpIdDir = NULL;
|
||||||
if (chkpPreBuildDir(pBackend->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) {
|
if (chkpPreBuildDir(pBackend->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) {
|
||||||
|
code = -1;
|
||||||
goto _EXIT;
|
goto _EXIT;
|
||||||
}
|
}
|
||||||
// Get all cf and acquire cfWrappter
|
// Get all cf and acquire cfWrappter
|
||||||
|
@ -1036,27 +1039,24 @@ int32_t taskBackendDoCheckpoint(void* arg, uint64_t chkpId) {
|
||||||
int32_t nCf = chkpGetAllDbCfHandle2(pBackend, &ppCf);
|
int32_t nCf = chkpGetAllDbCfHandle2(pBackend, &ppCf);
|
||||||
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pBackend, pChkpIdDir, nCf);
|
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pBackend, pChkpIdDir, nCf);
|
||||||
|
|
||||||
if ((code = chkpPreFlushDb(pBackend->db, ppCf, nCf)) == 0 && nCf != 0) {
|
if ((code = chkpPreFlushDb(pBackend->db, ppCf, nCf)) == 0) {
|
||||||
if ((code = chkpDoDbCheckpoint(pBackend->db, pChkpIdDir)) != 0) {
|
if ((code = chkpDoDbCheckpoint(pBackend->db, pChkpIdDir)) != 0) {
|
||||||
qError("stream backend:%p failed to do checkpoint at:%s", pBackend, pChkpIdDir);
|
qError("stream backend:%p failed to do checkpoint at:%s", pBackend, pChkpIdDir);
|
||||||
} else {
|
} else {
|
||||||
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pBackend, pChkpIdDir,
|
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pBackend, pChkpIdDir,
|
||||||
taosGetTimestampMs() - st);
|
taosGetTimestampMs() - st);
|
||||||
}
|
}
|
||||||
} else if (nCf != 0) {
|
} else {
|
||||||
qError("stream backend:%p failed to flush db at:%s", pBackend, pChkpIdDir);
|
qError("stream backend:%p failed to flush db at:%s", pBackend, pChkpIdDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = chkpMayDelObsolete(pBackend, chkpId, pChkpDir);
|
code = chkpMayDelObsolete(pBackend, chkpId, pChkpDir);
|
||||||
|
|
||||||
taosReleaseRef(taskBackendWrapperId, refId);
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_EXIT:
|
_EXIT:
|
||||||
taosMemoryFree(pChkpDir);
|
taosMemoryFree(pChkpDir);
|
||||||
taosMemoryFree(pChkpIdDir);
|
taosMemoryFree(pChkpIdDir);
|
||||||
taosReleaseRef(taskBackendWrapperId, refId);
|
taosReleaseRef(taskBackendWrapperId, refId);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t streamBackendDoCheckpoint(void* arg, uint64_t chkpId) { return taskBackendDoCheckpoint(arg, chkpId); }
|
int32_t streamBackendDoCheckpoint(void* arg, uint64_t chkpId) { return taskBackendDoCheckpoint(arg, chkpId); }
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue