From 00f029e44ff17b4940a79a6a2f23ace001dce75f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 1 Nov 2023 16:12:15 +0800 Subject: [PATCH] refactor: add some logs. --- include/libs/stream/tstream.h | 1 - source/dnode/vnode/src/tq/tq.c | 15 +++++ source/dnode/vnode/src/tq/tqStreamTask.c | 14 ++-- source/dnode/vnode/src/vnd/vnodeSync.c | 13 ++-- source/libs/stream/src/streamCheckpoint.c | 16 +++-- source/libs/stream/src/streamExec.c | 10 +-- source/libs/stream/src/streamMeta.c | 23 +++++-- source/libs/stream/src/streamStart.c | 79 ++--------------------- 8 files changed, 64 insertions(+), 107 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5e145d8fbb..81a0caeb6b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -762,7 +762,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); int32_t streamRestoreParam(SStreamTask* pTask); void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); void streamTaskResume(SStreamTask* pTask); -void streamTaskDisablePause(SStreamTask* pTask); void streamTaskEnablePause(SStreamTask* pTask); int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 91cc8c36c1..e934fd5192 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1805,6 +1805,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // update the nodeEpset when it exists taosWLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-wlock", pMeta->vgId); // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; @@ -1814,6 +1815,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { req.taskId); rsp.code = TSDB_CODE_SUCCESS; taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); + taosArrayDestroy(req.pNodeList); return rsp.code; } @@ -1836,11 +1839,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { req.transId); rsp.code = TSDB_CODE_SUCCESS; taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); taosArrayDestroy(req.pNodeList); return rsp.code; } taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); // the following two functions should not be executed within the scope of meta lock to avoid deadlock streamTaskUpdateEpsetInfo(pTask, req.pNodeList); @@ -1848,6 +1853,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // continue after lock the meta again taosWLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-wlock", pMeta->vgId); SStreamTask** ppHTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -1898,15 +1904,19 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, updateTasks, (numOfTasks - updateTasks)); taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); } else { if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); pMeta->startInfo.startAllTasksFlag = 0; taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); } else { tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId); terrno = 0; + taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); while (streamMetaTaskInTimer(pMeta)) { qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); @@ -1914,10 +1924,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } taosWLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-wlock", pMeta->vgId); + int32_t code = streamMetaReopen(pMeta); if (code != 0) { tqError("vgId:%d failed to reopen stream meta", vgId); taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); taosArrayDestroy(req.pNodeList); return -1; } @@ -1925,6 +1938,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { tqError("vgId:%d failed to load stream tasks", vgId); taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); taosArrayDestroy(req.pNodeList); return -1; } @@ -1938,6 +1952,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); } } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 1672c8e609..6dc5a77fc1 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -39,17 +39,15 @@ int32_t tqScanWal(STQ* pTq) { if (shouldIdle) { taosWLockLatch(&pMeta->lock); - int32_t times = (--pMeta->walScanCounter); ASSERT(pMeta->walScanCounter >= 0); - - if (pMeta->walScanCounter <= 0) { - taosWUnLockLatch(&pMeta->lock); - break; - } - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION); + + if (times <= 0) { + break; + } else { + tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION); + } } taosMsleep(SCAN_WAL_IDLE_DURATION); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index dcc31526f7..5d8fc6056c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -554,10 +554,14 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) walApplyVer(pVnode->pWal, commitIdx); pVnode->restored = true; - taosWLockLatch(&pVnode->pTq->pStreamMeta->lock); - if (pVnode->pTq->pStreamMeta->startInfo.startAllTasksFlag) { + SStreamMeta* pMeta = pVnode->pTq->pStreamMeta; + taosWLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-wlock", pMeta->vgId); + + if (pMeta->startInfo.startAllTasksFlag) { vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); - taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock); + taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); return; } @@ -574,7 +578,8 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); } - taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock); + taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); } static void vnodeBecomeFollower(const SSyncFSM *pFsm) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 9eaa9fcb92..6d4f09b768 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -185,6 +185,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc { // todo: remove this when the pipeline checkpoint generating is used. SStreamMeta* pMeta = pTask->pMeta; taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); if (pMeta->chkptNotReadyTasks == 0) { pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks; @@ -281,8 +282,10 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) { int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { int32_t vgId = pMeta->vgId; + int32_t code = 0; taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { STaskId* pId = taosArrayGet(pMeta->pTaskList, i); @@ -304,10 +307,11 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { char* str = NULL; streamTaskGetStatus(p, &str); - int32_t code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); if (code != TSDB_CODE_SUCCESS) { stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId); taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); return -1; } else { // save the task streamMetaSaveTask(pMeta, p); @@ -320,17 +324,17 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { str); } - if (streamMetaCommit(pMeta) < 0) { - taosWUnLockLatch(&pMeta->lock); + code = streamMetaCommit(pMeta); + if (code < 0) { stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId, checkpointId, terrstr()); - return -1; } else { - taosWUnLockLatch(&pMeta->lock); stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId); } - return TSDB_CODE_SUCCESS; + taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); + return code; } int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c0589f6ab1..a1951b23cc 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -356,17 +356,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 4. free it and remove fill-history task from disk meta-store streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); - // 5. clear the link between fill-history task and stream task info -// CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask); - - // 6. save to disk + // 5. save to disk taosWLockLatch(&pMeta->lock); - pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL); -// streamMetaSaveTask(pMeta, pStreamTask); -// if (streamMetaCommit(pMeta) < 0) { - // persist to disk -// } taosWUnLockLatch(&pMeta->lock); // 7. pause allowed. diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f788e244cd..ce7f325922 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -492,6 +492,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t // pre-delete operation taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); @@ -509,9 +510,11 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } else { stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); return 0; } taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId); @@ -522,20 +525,25 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t if (ppTask) { if ((*ppTask)->status.timerActive == 0) { taosRUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); break; } taosMsleep(10); stDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr); taosRUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); } else { taosRUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); break; } } // let's do delete of stream task taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; @@ -566,18 +574,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); return 0; } int32_t streamMetaBegin(SStreamMeta* pMeta) { taosWLockLatch(&pMeta->lock); - if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, - TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - taosWUnLockLatch(&pMeta->lock); - return -1; - } + int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); taosWUnLockLatch(&pMeta->lock); - return 0; + return code; } // todo add error log @@ -1013,6 +1019,7 @@ void metaHbToMnode(void* param, void* tmrId) { bool streamMetaTaskInTimer(SStreamMeta* pMeta) { bool inTimer = false; taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); void* pIter = NULL; while (1) { @@ -1028,6 +1035,8 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { } taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); + return inTimer; } @@ -1038,6 +1047,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); void* pIter = NULL; while (1) { @@ -1052,6 +1062,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { } taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); // wait for the stream meta hb function stopping if (pMeta->role == NODE_ROLE_LEADER) { diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 7756d7a2e0..098385c3ef 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -625,6 +625,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { SStreamMeta* pMeta = pInfo->pMeta; taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id)); if (ppTask) { ASSERT((*ppTask)->status.timerActive >= 1); @@ -638,10 +640,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { taosMemoryFree(pInfo); taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); return; } } taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId); if (pTask != NULL) { @@ -934,66 +938,6 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { } void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { -#if 0 - int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__DROPPING) { - stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); - return; - } - - const char* str = streamGetTaskStatusStr(status); - if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) { - stDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str); - return; - } - - if(pTask->info.taskLevel == TASK_LEVEL__SINK) { - int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); - return; - } - - while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { - status = pTask->status.taskStatus; - if (status == TASK_STATUS__DROPPING) { - stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); - return; - } - - if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) { - stDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str); - return; - } -// -// if (pTask->status.downstreamReady == 0) { -// ASSERT(pTask->execInfo.start == 0); -// stDebug("s-task:%s in check downstream procedure, abort and paused", pTask->id.idStr); -// break; -// } - - const char* pStatus = streamGetTaskStatusStr(status); - stDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId); - taosMsleep(100); - } - - // todo: use the task lock, stead of meta lock - taosWLockLatch(&pMeta->lock); - - status = pTask->status.taskStatus; - if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr); - return; - } - - atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); - int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); - taosWUnLockLatch(&pMeta->lock); - -#endif - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE); int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); @@ -1029,19 +973,6 @@ void streamTaskResume(SStreamTask* pTask) { } } -// todo fix race condition -void streamTaskDisablePause(SStreamTask* pTask) { - // pre-condition check -// const char* id = pTask->id.idStr; -// while (pTask->status.taskStatus == TASK_STATUS__PAUSE) { -// stDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id); -// taosMsleep(100); -// } -// -// stDebug("s-task:%s disable task pause", id); -// pTask->status.pauseAllowed = 0; -} - void streamTaskEnablePause(SStreamTask* pTask) { stDebug("s-task:%s enable task pause", pTask->id.idStr); pTask->status.pauseAllowed = 1; @@ -1051,6 +982,7 @@ int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); STaskId id = streamTaskExtractKey(pTask); taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); @@ -1072,5 +1004,6 @@ int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) { } taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); return TSDB_CODE_SUCCESS; }