Merge pull request #30010 from taosdata/fix/checkfh
fix(stream): not remove local checkpoint dir, log the checkpoint size.
This commit is contained in:
commit
2c3099b840
|
@ -920,12 +920,6 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
|||
|
||||
// now the fill-history task starts to scan data from wal files.
|
||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||
// if (code == TSDB_CODE_SUCCESS) {
|
||||
// code = tqScanWalAsync(pTq, false);
|
||||
// if (code) {
|
||||
// tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code));
|
||||
// }
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -148,6 +148,7 @@ static void doStartScanWal(void* param, void* tmrId) {
|
|||
return;
|
||||
}
|
||||
|
||||
// failed to lock, try 500ms later
|
||||
code = streamMetaTryRlock(pMeta);
|
||||
if (code == 0) {
|
||||
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
|
@ -156,25 +157,23 @@ static void doStartScanWal(void* param, void* tmrId) {
|
|||
numOfTasks = 0;
|
||||
}
|
||||
|
||||
if (numOfTasks == 0) {
|
||||
goto _end;
|
||||
}
|
||||
if (numOfTasks > 0) {
|
||||
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
|
||||
|
||||
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
|
||||
|
||||
#if 0
|
||||
#if 0
|
||||
// wait for the vnode is freed, and invalid read may occur.
|
||||
taosMsleep(10000);
|
||||
#endif
|
||||
#endif
|
||||
|
||||
code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
|
||||
code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal");
|
||||
tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
|
||||
tqDebug("vgId:%d try scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
|
||||
|
||||
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
|
||||
if (code) {
|
||||
|
@ -192,7 +191,7 @@ void tqScanWalAsync(STQ* pTq) {
|
|||
|
||||
// 1. the vnode should be the leader.
|
||||
// 2. the stream isn't disabled
|
||||
if ((pMeta->role == NODE_ROLE_FOLLOWER) || tsDisableStream) {
|
||||
if ((pMeta->role != NODE_ROLE_LEADER) || tsDisableStream) {
|
||||
tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -952,11 +952,6 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
|||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// if (scanWal && (vgId != SNODE_HANDLE)) {
|
||||
// tqDebug("vgId:%d start scan wal for executing tasks", vgId);
|
||||
// code = tqScanWalAsync(pMeta->ahandle, true);
|
||||
// }
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -814,15 +814,17 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
|
|||
}
|
||||
|
||||
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
|
||||
int32_t code = 0;
|
||||
char* path = NULL;
|
||||
|
||||
int32_t code = 0;
|
||||
char* path = NULL;
|
||||
int64_t chkptSize = 0;
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
const char* idStr = pTask->id.idStr;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
|
||||
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
|
||||
if (toDelFiles == NULL) {
|
||||
stError("s-task:%s failed to prepare array list during upload checkpoint, code:%s", pTask->id.idStr,
|
||||
tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
@ -848,11 +850,11 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
|
|||
}
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
int32_t size = taosArrayGetSize(toDelFiles);
|
||||
stDebug("s-task:%s remove redundant %d files", idStr, size);
|
||||
int32_t num = taosArrayGetSize(toDelFiles);
|
||||
if (code == TSDB_CODE_SUCCESS && num > 0) {
|
||||
stDebug("s-task:%s remove redundant %d files", idStr, num);
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
for (int i = 0; i < num; i++) {
|
||||
char* pName = taosArrayGetP(toDelFiles, i);
|
||||
code = deleteCheckpointFile(idStr, pName);
|
||||
if (code != 0) {
|
||||
|
@ -868,12 +870,13 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
|
|||
double el = (taosGetTimestampMs() - now) / 1000.0;
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
stDebug("s-task:%s complete update checkpointId:%" PRId64 ", elapsed time:%.2fs remove local checkpoint data %s",
|
||||
idStr, checkpointId, el, path);
|
||||
taosRemoveDir(path);
|
||||
code = taosGetDirSize(path, &chkptSize);
|
||||
stDebug("s-task:%s complete upload checkpointId:%" PRId64
|
||||
", elapsed time:%.2fs, checkpointSize:%.2fKiB local dir:%s",
|
||||
idStr, checkpointId, el, SIZE_IN_KiB(chkptSize), path);
|
||||
} else {
|
||||
stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs", idStr,
|
||||
checkpointId, el);
|
||||
stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " elapsed time:%.2fs, checkpointSize:%.2fKiB", idStr,
|
||||
checkpointId, el, SIZE_IN_KiB(chkptSize));
|
||||
}
|
||||
|
||||
taosMemoryFree(path);
|
||||
|
@ -883,7 +886,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
|
|||
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) {
|
||||
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
|
||||
if (type == DATA_UPLOAD_DISABLE) {
|
||||
stDebug("s-task:%s not allowed to upload checkpoint data", pTask->id.idStr);
|
||||
stDebug("s-task:%s not config to backup checkpoint data at snode, checkpointId:%"PRId64, pTask->id.idStr, checkpointId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -925,6 +928,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
|
||||
}
|
||||
|
||||
int64_t et = taosGetTimestampMs();
|
||||
stDebug("s-task:%s gen local checkpoint completed, elapsed time:%.2fs", id, (et - startTs) / 1000.0);
|
||||
}
|
||||
|
||||
// TODO: monitoring the checkpoint-source msg
|
||||
|
|
|
@ -243,6 +243,8 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
|
|||
continue;
|
||||
}
|
||||
|
||||
// todo: this lock may blocked by lock in streamMetaStartOneTask function, which may lock a very long time when
|
||||
// trying to load remote checkpoint data
|
||||
streamMutexLock(&pTask->lock);
|
||||
STaskStatusEntry entry = streamTaskGetStatusEntry(pTask);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
|
|
@ -1362,6 +1362,10 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
|
|||
}
|
||||
|
||||
pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
|
||||
if (!isLeader) {
|
||||
streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
if (isLeader) {
|
||||
|
|
Loading…
Reference in New Issue