fix(stream): not remove local checkpoint dir, log the checkpoint size.

This commit is contained in:
Haojun Liao 2025-03-04 19:51:27 +08:00
parent eb0553b737
commit fa1ff744fc
1 changed files with 19 additions and 13 deletions

View File

@ -816,13 +816,15 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) { int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
int32_t code = 0; int32_t code = 0;
char* path = NULL; char* path = NULL;
int64_t chkptSize = 0;
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
const char* idStr = pTask->id.idStr; const char* idStr = pTask->id.idStr;
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
if (toDelFiles == NULL) { if (toDelFiles == NULL) {
stError("s-task:%s failed to prepare array list during upload checkpoint, code:%s", pTask->id.idStr,
tstrerror(terrno));
return terrno; return terrno;
} }
@ -848,11 +850,11 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
} }
} }
if (code == TSDB_CODE_SUCCESS) { int32_t num = taosArrayGetSize(toDelFiles);
int32_t size = taosArrayGetSize(toDelFiles); if (code == TSDB_CODE_SUCCESS && num > 0) {
stDebug("s-task:%s remove redundant %d files", idStr, size); stDebug("s-task:%s remove redundant %d files", idStr, num);
for (int i = 0; i < size; i++) { for (int i = 0; i < num; i++) {
char* pName = taosArrayGetP(toDelFiles, i); char* pName = taosArrayGetP(toDelFiles, i);
code = deleteCheckpointFile(idStr, pName); code = deleteCheckpointFile(idStr, pName);
if (code != 0) { if (code != 0) {
@ -868,12 +870,13 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
double el = (taosGetTimestampMs() - now) / 1000.0; double el = (taosGetTimestampMs() - now) / 1000.0;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
stDebug("s-task:%s complete update checkpointId:%" PRId64 ", elapsed time:%.2fs remove local checkpoint data %s", code = taosGetDirSize(path, &chkptSize);
idStr, checkpointId, el, path); stDebug("s-task:%s complete upload checkpointId:%" PRId64
taosRemoveDir(path); ", elapsed time:%.2fs, checkpointSize:%.2fKiB local dir:%s",
idStr, checkpointId, el, SIZE_IN_KiB(chkptSize), path);
} else { } else {
stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs", idStr, stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " elapsed time:%.2fs, checkpointSize:%.2fKiB", idStr,
checkpointId, el); checkpointId, el, SIZE_IN_KiB(chkptSize));
} }
taosMemoryFree(path); taosMemoryFree(path);
@ -883,7 +886,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) { int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) {
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_DISABLE) { if (type == DATA_UPLOAD_DISABLE) {
stDebug("s-task:%s not allowed to upload checkpoint data", pTask->id.idStr); stDebug("s-task:%s not config to backup checkpoint data at snode, checkpointId:%"PRId64, pTask->id.idStr, checkpointId);
return 0; return 0;
} }
@ -925,6 +928,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno)); stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
} }
int64_t et = taosGetTimestampMs();
stDebug("s-task:%s gen local checkpoint completed, elapsed time:%.2fs", id, (et - startTs) / 1000.0);
} }
// TODO: monitoring the checkpoint-source msg // TODO: monitoring the checkpoint-source msg