refactor code
This commit is contained in:
parent
b1f3dfe88f
commit
733db123ad
|
@ -239,31 +239,30 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
|||
}
|
||||
}
|
||||
}
|
||||
// if (!pReader->streamStateDone) {
|
||||
// if (pReader->pStreamStateReader == NULL) {
|
||||
// code =
|
||||
// streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver,
|
||||
// &pReader->pStreamStateReader);
|
||||
// if (code) {
|
||||
// pReader->streamStateDone = 1;
|
||||
// pReader->pStreamStateReader = NULL;
|
||||
// goto _err;
|
||||
// }
|
||||
// }
|
||||
// code = streamStateSnapRead(pReader->pStreamStateReader, ppData);
|
||||
// if (code) {
|
||||
// goto _err;
|
||||
// } else {
|
||||
// if (*ppData) {
|
||||
// goto _exit;
|
||||
// } else {
|
||||
// pReader->streamStateDone = 1;
|
||||
// code = streamStateSnapReaderClose(pReader->pStreamStateReader);
|
||||
// if (code) goto _err;
|
||||
// pReader->pStreamStateReader = NULL;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
if (!pReader->streamStateDone) {
|
||||
if (pReader->pStreamStateReader == NULL) {
|
||||
code =
|
||||
streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamStateReader);
|
||||
if (code) {
|
||||
pReader->streamStateDone = 1;
|
||||
pReader->pStreamStateReader = NULL;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
code = streamStateSnapRead(pReader->pStreamStateReader, ppData);
|
||||
if (code) {
|
||||
goto _err;
|
||||
} else {
|
||||
if (*ppData) {
|
||||
goto _exit;
|
||||
} else {
|
||||
pReader->streamStateDone = 1;
|
||||
code = streamStateSnapReaderClose(pReader->pStreamStateReader);
|
||||
if (code) goto _err;
|
||||
pReader->pStreamStateReader = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RSMA ==============
|
||||
if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) {
|
||||
|
|
|
@ -188,10 +188,10 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
|
|||
|
||||
// chkpId = 0;
|
||||
char* state = taosMemoryCalloc(1, strlen(path) + 32);
|
||||
sprintf(state, "%s/%s", path, "state");
|
||||
sprintf(state, "%s%s%s", path, TD_DIRSEP, "state");
|
||||
if (chkpId != 0) {
|
||||
char* chkp = taosMemoryCalloc(1, strlen(path) + 64);
|
||||
sprintf(chkp, "%s/%s/checkpoint%" PRId64 "", path, "checkpoints", chkpId);
|
||||
sprintf(chkp, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
|
||||
if (taosIsDir(chkp) && isValidCheckpoint(chkp)) {
|
||||
if (taosIsDir(state)) {
|
||||
// remove dir if exists
|
||||
|
@ -462,7 +462,7 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
|
|||
for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) {
|
||||
int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i);
|
||||
char tbuf[256] = {0};
|
||||
sprintf(tbuf, "%s/checkpoint%" PRId64 "", path, id);
|
||||
sprintf(tbuf, "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id);
|
||||
if (taosIsDir(tbuf)) {
|
||||
taosRemoveDir(tbuf);
|
||||
}
|
||||
|
@ -483,7 +483,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
|
|||
|
||||
int32_t len = strlen(pMeta->path) + 30;
|
||||
char* checkpointPath = taosMemoryCalloc(1, len);
|
||||
sprintf(checkpointPath, "%s/%s", pMeta->path, "checkpoints");
|
||||
sprintf(checkpointPath, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints");
|
||||
|
||||
if (!taosDirExist(checkpointPath)) {
|
||||
// no checkpoint, nothing to load
|
||||
|
@ -530,7 +530,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
|||
int32_t code = -1;
|
||||
|
||||
char path[256] = {0};
|
||||
sprintf(path, "%s/%s", pMeta->path, "checkpoints");
|
||||
sprintf(path, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints");
|
||||
code = taosMulModeMkDir(path, 0755);
|
||||
if (code != 0) {
|
||||
qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code));
|
||||
|
|
Loading…
Reference in New Issue