diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 231fc2ce5b..8b87019ee0 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -475,6 +475,7 @@ int32_t rebuildDataFromS3(char* chkpPath, int64_t chkpId) { taosMemoryFree(pMeta); return code; } + taosMemoryFree(pMeta); return chkpAddExtraInfo(chkpPath, chkpId, pMeta->processId); } @@ -2648,6 +2649,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char char* buf = taosMemoryCalloc(1, cap); if (buf == NULL) { + taosReleaseRef(taskDbWrapperId, refId); return TSDB_CODE_OUT_OF_MEMORY; } @@ -2655,6 +2657,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char snprintf(buf, cap, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); if (nBytes <= 0 || nBytes >= cap) { taosMemoryFree(buf); + taosReleaseRef(taskDbWrapperId, refId); return TSDB_CODE_OUT_OF_RANGE; } @@ -4716,19 +4719,22 @@ int32_t dbChkpInit(SDbChkp* p) { int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { static char* chkpMeta = "META"; int32_t code = 0; - int32_t cap = p->len + 128; taosThreadRwlockRdlock(&p->rwLock); - char* srcBuf = taosMemoryCalloc(1, cap); - char* dstBuf = taosMemoryCalloc(1, cap); - char* srcDir = taosMemoryCalloc(1, cap); - char* dstDir = taosMemoryCalloc(1, cap); - if (srcBuf == NULL || dstBuf == NULL || srcDir == NULL || dstDir == NULL) { + int32_t cap = p->len + 128; + + char* buffer = taosMemoryCalloc(4, cap); + if (buffer == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _ERROR; } + char* srcBuf = buffer; + char* dstBuf = &srcBuf[cap]; + char* srcDir = &dstBuf[cap]; + char* dstDir = &srcDir[cap]; + int nBytes = snprintf(srcDir, cap, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", p->curChkpId); if (nBytes <= 0 || nBytes >= cap) { @@ -4872,12 +4878,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { code = 0; _ERROR: + taosMemoryFree(buffer); taosThreadRwlockUnlock(&p->rwLock); - taosMemoryFree(srcBuf); - taosMemoryFree(dstBuf); - taosMemoryFree(srcDir); - taosMemoryFree(dstDir); - return code; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a66c7a7cfa..731b6e9586 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -541,10 +541,8 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { } static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { - TdFilePtr pFile = NULL; - int32_t cap = strlen(path) + 64; - char buf[128] = {0}; - int32_t code = 0; + int32_t code = 0; + int32_t cap = strlen(path) + 64; char* filePath = taosMemoryCalloc(1, cap); if (filePath == NULL) { @@ -603,7 +601,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId); } else { stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, path, - tstrerror(errno)); + tstrerror(code)); } } @@ -1080,13 +1078,17 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() { } int32_t streamTaskUploadCheckpoint(const char* id, const char* path) { + int32_t code = 0; if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { stError("invalid parameters in upload checkpoint, %s", id); - return -1; + return TSDB_CODE_INVALID_CFG; } if (strlen(tsSnodeAddress) != 0) { - return uploadByRsync(id, path); + code = uploadByRsync(id, path); + if (code != 0) { + return TAOS_SYSTEM_ERROR(errno); + } } else if (tsS3StreamEnabled) { return uploadCheckpointToS3(id, path); }