diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e1298b11ea..407a1e88c8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -920,12 +920,6 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask // now the fill-history task starts to scan data from wal files. code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); -// if (code == TSDB_CODE_SUCCESS) { -// code = tqScanWalAsync(pTq, false); -// if (code) { -// tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code)); -// } -// } } } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index d9aec27759..4d2449bb37 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -148,6 +148,7 @@ static void doStartScanWal(void* param, void* tmrId) { return; } + // failed to lock, try 500ms later code = streamMetaTryRlock(pMeta); if (code == 0) { numOfTasks = taosArrayGetSize(pMeta->pTaskList); @@ -156,25 +157,23 @@ static void doStartScanWal(void* param, void* tmrId) { numOfTasks = 0; } - if (numOfTasks == 0) { - goto _end; - } + if (numOfTasks > 0) { + tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks); - tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks); - - #if 0 +#if 0 // wait for the vnode is freed, and invalid read may occur. taosMsleep(10000); - #endif +#endif - code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); - if (code) { - tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); + code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); + if (code) { + tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); + } } _end: streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal"); - tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT); + tqDebug("vgId:%d try scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT); code = taosReleaseRef(streamMetaRefPool, pParam->metaId); if (code) { @@ -192,7 +191,7 @@ void tqScanWalAsync(STQ* pTq) { // 1. the vnode should be the leader. // 2. the stream isn't disabled - if ((pMeta->role == NODE_ROLE_FOLLOWER) || tsDisableStream) { + if ((pMeta->role != NODE_ROLE_LEADER) || tsDisableStream) { tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId); return; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index fb3582c5ff..d5edfe4b35 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -952,11 +952,6 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { streamMetaWUnLock(pMeta); -// if (scanWal && (vgId != SNODE_HANDLE)) { -// tqDebug("vgId:%d start scan wal for executing tasks", vgId); -// code = tqScanWalAsync(pMeta->ahandle, true); -// } - return code; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index eb8f2c741a..f07d6f4cc1 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -814,15 +814,17 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l } int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) { - int32_t code = 0; - char* path = NULL; - + int32_t code = 0; + char* path = NULL; + int64_t chkptSize = 0; SStreamMeta* pMeta = pTask->pMeta; const char* idStr = pTask->id.idStr; int64_t now = taosGetTimestampMs(); SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); if (toDelFiles == NULL) { + stError("s-task:%s failed to prepare array list during upload checkpoint, code:%s", pTask->id.idStr, + tstrerror(terrno)); return terrno; } @@ -848,11 +850,11 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d } } - if (code == TSDB_CODE_SUCCESS) { - int32_t size = taosArrayGetSize(toDelFiles); - stDebug("s-task:%s remove redundant %d files", idStr, size); + int32_t num = taosArrayGetSize(toDelFiles); + if (code == TSDB_CODE_SUCCESS && num > 0) { + stDebug("s-task:%s remove redundant %d files", idStr, num); - for (int i = 0; i < size; i++) { + for (int i = 0; i < num; i++) { char* pName = taosArrayGetP(toDelFiles, i); code = deleteCheckpointFile(idStr, pName); if (code != 0) { @@ -868,12 +870,13 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d double el = (taosGetTimestampMs() - now) / 1000.0; if (code == TSDB_CODE_SUCCESS) { - stDebug("s-task:%s complete update checkpointId:%" PRId64 ", elapsed time:%.2fs remove local checkpoint data %s", - idStr, checkpointId, el, path); - taosRemoveDir(path); + code = taosGetDirSize(path, &chkptSize); + stDebug("s-task:%s complete upload checkpointId:%" PRId64 + ", elapsed time:%.2fs, checkpointSize:%.2fKiB local dir:%s", + idStr, checkpointId, el, SIZE_IN_KiB(chkptSize), path); } else { - stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs", idStr, - checkpointId, el); + stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " elapsed time:%.2fs, checkpointSize:%.2fKiB", idStr, + checkpointId, el, SIZE_IN_KiB(chkptSize)); } taosMemoryFree(path); @@ -883,7 +886,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_DISABLE) { - stDebug("s-task:%s not allowed to upload checkpoint data", pTask->id.idStr); + stDebug("s-task:%s not config to backup checkpoint data at snode, checkpointId:%"PRId64, pTask->id.idStr, checkpointId); return 0; } @@ -925,6 +928,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno)); } + + int64_t et = taosGetTimestampMs(); + stDebug("s-task:%s gen local checkpoint completed, elapsed time:%.2fs", id, (et - startTs) / 1000.0); } // TODO: monitoring the checkpoint-source msg diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index ca5b6630fd..a6d0142010 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -243,6 +243,8 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { continue; } + // todo: this lock may blocked by lock in streamMetaStartOneTask function, which may lock a very long time when + // trying to load remote checkpoint data streamMutexLock(&pTask->lock); STaskStatusEntry entry = streamTaskGetStatusEntry(pTask); streamMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d5561ebe76..404436e5a5 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1362,6 +1362,10 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) } pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER; + if (!isLeader) { + streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId); + } + streamMetaWUnLock(pMeta); if (isLeader) {