Merge pull request #26218 from taosdata/fix/3_liaohj

fix(stream): clear the local backend default directory
This commit is contained in:
Haojun Liao 2024-06-20 18:26:28 +08:00 committed by GitHub
commit 151fc212b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 30 additions and 36 deletions

View File

@ -141,6 +141,7 @@ int32_t valueToString(void* k, char* buf);
int32_t valueIsStale(void* k, int64_t ts); int32_t valueIsStale(void* k, int64_t ts);
void destroyCompare(void* arg); void destroyCompare(void* arg);
static void cleanDir(const char* pPath, const char* id);
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
@ -212,12 +213,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
char* chkp = taosMemoryCalloc(1, strlen(path) + 64); char* chkp = taosMemoryCalloc(1, strlen(path) + 64);
sprintf(chkp, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); sprintf(chkp, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
if (taosIsDir(chkp) && isValidCheckpoint(chkp)) { if (taosIsDir(chkp) && isValidCheckpoint(chkp)) {
if (taosIsDir(state)) { cleanDir(state, "");
// remove dir if exists
// taosRenameFile(const char *oldName, const char *newName)
taosRemoveDir(state);
}
taosMkDir(state);
code = backendCopyFiles(chkp, state); code = backendCopyFiles(chkp, state);
stInfo("copy snap file from %s to %s", chkp, state); stInfo("copy snap file from %s to %s", chkp, state);
if (code != 0) { if (code != 0) {
@ -322,6 +318,22 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) {
return complete == 1 ? 0 : -1; return complete == 1 ? 0 : -1;
} }
void cleanDir(const char* pPath, const char* id) {
ASSERT(pPath != NULL);
if (taosIsDir(pPath)) {
taosRemoveDir(pPath);
taosMkDir(pPath);
stInfo("%s clear dir:%s, succ", id, pPath);
}
}
void validateDir(const char* pPath) {
if (!taosIsDir(pPath)) {
taosMulMkDir(pPath);
}
}
int32_t rebuildFromRemoteChkp_rsync(const char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) { int32_t rebuildFromRemoteChkp_rsync(const char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) {
int32_t code = 0; int32_t code = 0;
if (taosIsDir(chkptPath)) { if (taosIsDir(chkptPath)) {
@ -329,11 +341,8 @@ int32_t rebuildFromRemoteChkp_rsync(const char* key, char* chkptPath, int64_t ch
stDebug("remove local checkpoint data dir:%s succ", chkptPath); stDebug("remove local checkpoint data dir:%s succ", chkptPath);
} }
if (taosIsDir(defaultPath)) { cleanDir(defaultPath, key);
taosRemoveDir(defaultPath); stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath);
taosMulMkDir(defaultPath);
stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath);
}
code = streamTaskDownloadCheckpointData(key, chkptPath); code = streamTaskDownloadCheckpointData(key, chkptPath);
if (code != 0) { if (code != 0) {
@ -484,21 +493,14 @@ int32_t backendCopyFiles(const char* src, const char* dst) {
static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId, static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId,
const char* defaultPath) { const char* defaultPath) {
int32_t code = 0; int32_t code = 0;
cleanDir(defaultPath, pTaskIdStr);
if (taosIsDir(defaultPath)) {
taosRemoveDir(defaultPath);
taosMkDir(defaultPath);
stInfo("%s clear local backend dir:%s, succ", pTaskIdStr, defaultPath);
}
if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) { if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) {
stDebug("%s local checkpoint data existed, checkpointId:%" PRId64 " copy to backend dir", pTaskIdStr, checkpointId); stDebug("%s local checkpoint data existed, checkpointId:%" PRId64 " copy to backend dir", pTaskIdStr, checkpointId);
code = backendCopyFiles(checkpointPath, defaultPath); code = backendCopyFiles(checkpointPath, defaultPath);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosRemoveDir(defaultPath); cleanDir(defaultPath, pTaskIdStr);
taosMkDir(defaultPath);
stError("%s failed to start stream backend from local %s, reason:%s, try download checkpoint from remote", stError("%s failed to start stream backend from local %s, reason:%s, try download checkpoint from remote",
pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno))); pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno)));
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
@ -520,26 +522,18 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128); char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128);
sprintf(prefixPath, "%s%s%s", path, TD_DIRSEP, key); sprintf(prefixPath, "%s%s%s", path, TD_DIRSEP, key);
if (!taosIsDir(prefixPath)) { validateDir(prefixPath);
code = taosMkDir(prefixPath);
ASSERT(code == 0);
}
char* defaultPath = taosMemoryCalloc(1, strlen(path) + 256); char* defaultPath = taosMemoryCalloc(1, strlen(path) + 256);
sprintf(defaultPath, "%s%s%s", prefixPath, TD_DIRSEP, "state"); sprintf(defaultPath, "%s%s%s", prefixPath, TD_DIRSEP, "state");
if (!taosIsDir(defaultPath)) { validateDir(defaultPath);
taosMulMkDir(defaultPath);
}
int32_t pathLen = strlen(path) + 256; int32_t pathLen = strlen(path) + 256;
char* checkpointRoot = taosMemoryCalloc(1, pathLen); char* checkpointRoot = taosMemoryCalloc(1, pathLen);
sprintf(checkpointRoot, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints"); sprintf(checkpointRoot, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints");
if (!taosIsDir(checkpointRoot)) { validateDir(checkpointRoot);
taosMulMkDir(checkpointRoot);
}
taosMemoryFree(checkpointRoot); taosMemoryFree(checkpointRoot);
stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId); stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId);
@ -559,7 +553,8 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
code = 0; // reset the error code code = 0; // reset the error code
} }
} else { // no valid checkpoint id } else { // no valid checkpoint id
stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data", key); stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data, clean defaultPath:%s", key, defaultPath);
cleanDir(defaultPath, key);
} }
taosMemoryFree(chkptPath); taosMemoryFree(chkptPath);
@ -2055,7 +2050,7 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err); cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
if (nCf == 0) { if (nCf == 0) {
stInfo("%s newly create db, need to restart", key); stInfo("%s newly create db in state-backend", key);
// pre create db // pre create db
pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err); pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
if (pTaskDb->db == NULL) goto _EXIT; if (pTaskDb->db == NULL) goto _EXIT;
@ -2215,13 +2210,12 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
sprintf(temp, "%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "tmp", chkpId); sprintf(temp, "%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "tmp", chkpId);
if (taosDirExist(temp)) { if (taosDirExist(temp)) {
taosRemoveDir(temp); cleanDir(temp, NULL);
taosMkDir(temp);
} else { } else {
taosMkDir(temp); taosMkDir(temp);
} }
code = bkdMgtGetDelta(p, pDb->idstr, chkpId, list, temp);
code = bkdMgtGetDelta(p, pDb->idstr, chkpId, list, temp);
*path = temp; *path = temp;
return code; return code;