refactor code
This commit is contained in:
parent
982fed581d
commit
ee09e26f47
|
@ -475,6 +475,7 @@ int32_t rebuildDataFromS3(char* chkpPath, int64_t chkpId) {
|
||||||
taosMemoryFree(pMeta);
|
taosMemoryFree(pMeta);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(pMeta);
|
||||||
|
|
||||||
return chkpAddExtraInfo(chkpPath, chkpId, pMeta->processId);
|
return chkpAddExtraInfo(chkpPath, chkpId, pMeta->processId);
|
||||||
}
|
}
|
||||||
|
@ -2648,6 +2649,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
|
||||||
|
|
||||||
char* buf = taosMemoryCalloc(1, cap);
|
char* buf = taosMemoryCalloc(1, cap);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
|
taosReleaseRef(taskDbWrapperId, refId);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2655,6 +2657,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
|
||||||
snprintf(buf, cap, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
|
snprintf(buf, cap, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
|
||||||
if (nBytes <= 0 || nBytes >= cap) {
|
if (nBytes <= 0 || nBytes >= cap) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
|
taosReleaseRef(taskDbWrapperId, refId);
|
||||||
return TSDB_CODE_OUT_OF_RANGE;
|
return TSDB_CODE_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4716,19 +4719,22 @@ int32_t dbChkpInit(SDbChkp* p) {
|
||||||
int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
|
int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
|
||||||
static char* chkpMeta = "META";
|
static char* chkpMeta = "META";
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t cap = p->len + 128;
|
|
||||||
|
|
||||||
taosThreadRwlockRdlock(&p->rwLock);
|
taosThreadRwlockRdlock(&p->rwLock);
|
||||||
|
|
||||||
char* srcBuf = taosMemoryCalloc(1, cap);
|
int32_t cap = p->len + 128;
|
||||||
char* dstBuf = taosMemoryCalloc(1, cap);
|
|
||||||
char* srcDir = taosMemoryCalloc(1, cap);
|
char* buffer = taosMemoryCalloc(4, cap);
|
||||||
char* dstDir = taosMemoryCalloc(1, cap);
|
if (buffer == NULL) {
|
||||||
if (srcBuf == NULL || dstBuf == NULL || srcDir == NULL || dstDir == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char* srcBuf = buffer;
|
||||||
|
char* dstBuf = &srcBuf[cap];
|
||||||
|
char* srcDir = &dstBuf[cap];
|
||||||
|
char* dstDir = &srcDir[cap];
|
||||||
|
|
||||||
int nBytes = snprintf(srcDir, cap, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP,
|
int nBytes = snprintf(srcDir, cap, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP,
|
||||||
"checkpoint", p->curChkpId);
|
"checkpoint", p->curChkpId);
|
||||||
if (nBytes <= 0 || nBytes >= cap) {
|
if (nBytes <= 0 || nBytes >= cap) {
|
||||||
|
@ -4872,12 +4878,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_ERROR:
|
_ERROR:
|
||||||
|
taosMemoryFree(buffer);
|
||||||
taosThreadRwlockUnlock(&p->rwLock);
|
taosThreadRwlockUnlock(&p->rwLock);
|
||||||
taosMemoryFree(srcBuf);
|
|
||||||
taosMemoryFree(dstBuf);
|
|
||||||
taosMemoryFree(srcDir);
|
|
||||||
taosMemoryFree(dstDir);
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -541,10 +541,8 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
|
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
|
||||||
TdFilePtr pFile = NULL;
|
|
||||||
int32_t cap = strlen(path) + 64;
|
|
||||||
char buf[128] = {0};
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t cap = strlen(path) + 64;
|
||||||
|
|
||||||
char* filePath = taosMemoryCalloc(1, cap);
|
char* filePath = taosMemoryCalloc(1, cap);
|
||||||
if (filePath == NULL) {
|
if (filePath == NULL) {
|
||||||
|
@ -603,7 +601,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
|
||||||
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
|
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
|
||||||
} else {
|
} else {
|
||||||
stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, path,
|
stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, path,
|
||||||
tstrerror(errno));
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1080,13 +1078,17 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskUploadCheckpoint(const char* id, const char* path) {
|
int32_t streamTaskUploadCheckpoint(const char* id, const char* path) {
|
||||||
|
int32_t code = 0;
|
||||||
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
||||||
stError("invalid parameters in upload checkpoint, %s", id);
|
stError("invalid parameters in upload checkpoint, %s", id);
|
||||||
return -1;
|
return TSDB_CODE_INVALID_CFG;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strlen(tsSnodeAddress) != 0) {
|
if (strlen(tsSnodeAddress) != 0) {
|
||||||
return uploadByRsync(id, path);
|
code = uploadByRsync(id, path);
|
||||||
|
if (code != 0) {
|
||||||
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
|
}
|
||||||
} else if (tsS3StreamEnabled) {
|
} else if (tsS3StreamEnabled) {
|
||||||
return uploadCheckpointToS3(id, path);
|
return uploadCheckpointToS3(id, path);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue