From 5ff89bc09817b67bb9ed365305b5a68e5e7eeeeb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 1 Nov 2023 13:19:53 +0800 Subject: [PATCH 01/13] fix(stream): add lock log. --- source/dnode/vnode/src/tq/tq.c | 66 +----------------------- source/dnode/vnode/src/tq/tqStreamTask.c | 1 + source/libs/stream/src/streamDispatch.c | 2 + source/libs/stream/src/streamTaskSm.c | 10 ++++ 4 files changed, 14 insertions(+), 65 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a5a3175652..bcf665ddfb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1149,12 +1149,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // 1. get the related stream task pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - // todo delete this task, if the related stream task is dropped - qError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop fill-history task:%s", + tqError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop related fill-history task:%s", pTask->streamTaskId.taskId, pTask->id.idStr); tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); atomic_store_32(&pTask->status.inScanHistorySentinel, 0); @@ -1163,68 +1161,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); -#if 0 - // 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the - // stream task get ready for scan history data - while (streamTaskGetStatus(pStreamTask, NULL) == TASK_STATUS__SCAN_HISTORY) { - tqDebug( - "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", - id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); - taosMsleep(100); - } - - // now we can stop the stream task execution - int64_t nextProcessedVer = 0; - - while (1) { - taosThreadMutexLock(&pStreamTask->lock); - int8_t status = pStreamTask->status.taskStatus; - if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { - // return; do nothing - } - - if (status == TASK_STATUS__HALT) { -// tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, -// pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); -// latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); -// -// taosThreadMutexUnlock(&pStreamTask->lock); -// break; - } - - if (pStreamTask->status.taskStatus == TASK_STATUS__CK) { - qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", - pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__CK)); - taosThreadMutexUnlock(&pStreamTask->lock); - taosMsleep(1000); - continue; - } - - // upgrade to halt status - if (status == TASK_STATUS__PAUSE) { - qDebug("s-task:%s upgrade status to %s from %s", pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT), - streamGetTaskStatusStr(TASK_STATUS__PAUSE)); - } else { - qDebug("s-task:%s halt task, prev status:%s", pStreamTask->id.idStr, streamGetTaskStatusStr(status)); - } - - pStreamTask->status.keepTaskStatus = status; - pStreamTask->status.taskStatus = TASK_STATUS__HALT; - - // wal scan not start yet, reset it to be the start position - nextProcessedVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); - if (nextProcessedVer == -1) { - nextProcessedVer = pStreamTask->dataRange.range.maxVer + 1; - } - - tqDebug("s-task:%s level:%d nextProcessedVer:%" PRId64 ", sched-status:%d is halt by fill-history task:%s", - pStreamTask->id.idStr, pStreamTask->info.taskLevel, nextProcessedVer, pStreamTask->status.schedStatus, - id); - - taosThreadMutexUnlock(&pStreamTask->lock); - break; - } -#endif streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 1f1dd61c3c..1672c8e609 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -446,6 +446,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; taosThreadMutexLock(&pTask->lock); + tqDebug("s-task:%s lock", pTask->id.idStr); char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index cd8bbacb98..14129522d6 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1004,6 +1004,8 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, info.msg.info = *pRpcInfo; taosThreadMutexLock(&pTask->lock); + stDebug("s-task:%s lock", pTask->id.idStr); + if (pTask->pRspMsgList == NULL) { pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 205994b7cc..71521bd8f2 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -201,6 +201,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt if (pTrans->attachEvent.event != 0) { attachEvent(pTask, &pTrans->attachEvent); taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s unlock1", pTask->id.idStr); while (1) { // wait for the task to be here @@ -224,6 +225,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt pSM->pActiveTrans = pTrans; pSM->startTs = taosGetTimestampMs(); taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s unlock2", pTask->id.idStr); int32_t code = pTrans->pAction(pTask); // todo handle error code; @@ -242,6 +244,8 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { while (1) { taosThreadMutexLock(&pTask->lock); + stDebug("s-task:%s lock", pTask->id.idStr); + if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) { taosThreadMutexUnlock(&pTask->lock); taosMsleep(100); @@ -282,6 +286,8 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even // do update the task status taosThreadMutexLock(&pTask->lock); + stDebug("s-task:%s lock", pTask->id.idStr); + STaskStateTrans* pTrans = pSM->pActiveTrans; if (pTrans == NULL) { @@ -292,6 +298,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pSM->prev.evt].name); taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s unlockx", pTask->id.idStr); return TSDB_CODE_INVALID_PARA; } @@ -299,6 +306,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr, StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pTrans->event].name); taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s unlocky", pTask->id.idStr); return TSDB_CODE_INVALID_PARA; } @@ -328,6 +336,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even pSM->pActiveTrans = pNextTrans; pSM->startTs = taosGetTimestampMs(); taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s unlockf", pTask->id.idStr); int32_t code = pNextTrans->pAction(pSM->pTask); if (pNextTrans->autoInvokeEndFn) { @@ -338,6 +347,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even } } else { taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s unlockz", pTask->id.idStr); int64_t el = (taosGetTimestampMs() - pSM->startTs); stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, From e4aeea176bcfc9345c8b865b5061ce12bf5e791e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 1 Nov 2023 13:59:51 +0800 Subject: [PATCH 02/13] fix(stream):fix the bug when event in waiting list not fulfilled. --- source/libs/stream/src/streamTaskSm.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 71521bd8f2..fa14c0618f 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -323,13 +323,16 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name); - SAttachedEventInfo* pEvtInfo = taosArrayPop(pSM->pWaitingEventList); + SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0); // OK, let's handle the attached event, since the task has reached the required status now if (pSM->current.state == pEvtInfo->status) { - stDebug("s-task:%s handle the attached event:%s, state:%s", pTask->id.idStr, + stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr, StreamTaskEventList[pEvtInfo->event].name, pSM->current.name); + // remove it + taosArrayPop(pSM->pWaitingEventList); + STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event); ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL); @@ -344,6 +347,11 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even } else { return code; } + } else { + taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s state:%s event:%s in waiting list, req state:%s not fulfilled, put it back", pTask->id.idStr, + pSM->current.name, StreamTaskEventList[pEvtInfo->event].name, + StreamTaskStatusList[pEvtInfo->status].name); } } else { taosThreadMutexUnlock(&pTask->lock); From 778c29b8086eb2bdd4f671988e757176d061a325 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 1 Nov 2023 14:07:18 +0800 Subject: [PATCH 03/13] fix(stream): do some internal refactor. --- source/libs/stream/src/streamTaskSm.c | 29 ++++++++++++++------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index fa14c0618f..78f864d6ec 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include #include "streamInt.h" #include "streamsm.h" #include "tmisce.h" @@ -21,6 +20,8 @@ #include "ttimer.h" #include "wal.h" +#define GET_EVT_NAME(_ev) (StreamTaskEventList[(_ev)].name) + SStreamTaskState StreamTaskStatusList[9] = { {.state = TASK_STATUS__READY, .name = "ready"}, {.state = TASK_STATUS__DROPPING, .name = "dropped"}, @@ -67,7 +68,7 @@ static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) { streamTaskGetStatus(pTask, &p); stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p, - StreamTaskEventList[pEvtInfo->event].name, StreamTaskStatusList[pEvtInfo->status].name); + GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name); taosArrayPush(pTask->status.pSM->pWaitingEventList, pEvtInfo); return 0; } @@ -210,10 +211,10 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt taosThreadMutexUnlock(&pTask->lock); if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) { - stDebug("s-task:%s attached event:%s handled", id, StreamTaskEventList[pTrans->event].name); + stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event)); return TSDB_CODE_SUCCESS; } else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP) { // this event has been handled already - stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, StreamTaskEventList[event].name); + stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, GET_EVT_NAME(event)); taosMsleep(100); } else { stDebug("s-task:%s is dropped or stopped already, not wait.", id); @@ -250,11 +251,11 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { taosThreadMutexUnlock(&pTask->lock); taosMsleep(100); stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed", - pTask->id.idStr, pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name); + pTask->id.idStr, pSM->current.name, GET_EVT_NAME(pSM->pActiveTrans->event)); } else { pTrans = streamTaskFindTransform(pSM->current.state, event); if (pTrans == NULL) { - stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, StreamTaskEventList[event].name); + stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event)); taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_INVALID_PARA; // todo: set new error code// failed to handle the event. } @@ -262,8 +263,8 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { if (pSM->pActiveTrans != NULL) { // currently in some state transfer procedure, not auto invoke transfer, abort it stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now", - pTask->id.idStr, StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name, - pSM->pActiveTrans->next.name, StreamTaskEventList[event].name); + pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name, + pSM->pActiveTrans->next.name, GET_EVT_NAME(event)); } doHandleEvent(pSM, event, pTrans); @@ -295,7 +296,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP); // the pSM->prev.evt may be 0, so print string is not appropriate. stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr, - StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pSM->prev.evt].name); + GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt)); taosThreadMutexUnlock(&pTask->lock); stDebug("s-task:%s unlockx", pTask->id.idStr); @@ -304,7 +305,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even if (pTrans->event != event) { stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr, - StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pTrans->event].name); + GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event)); taosThreadMutexUnlock(&pTask->lock); stDebug("s-task:%s unlocky", pTask->id.idStr); return TSDB_CODE_INVALID_PARA; @@ -321,14 +322,14 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even if (taosArrayGetSize(pSM->pWaitingEventList) > 0) { int64_t el = (taosGetTimestampMs() - pSM->startTs); stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, - StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name); + GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name); SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0); // OK, let's handle the attached event, since the task has reached the required status now if (pSM->current.state == pEvtInfo->status) { stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr, - StreamTaskEventList[pEvtInfo->event].name, pSM->current.name); + GET_EVT_NAME(pEvtInfo->event), pSM->current.name); // remove it taosArrayPop(pSM->pWaitingEventList); @@ -350,7 +351,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even } else { taosThreadMutexUnlock(&pTask->lock); stDebug("s-task:%s state:%s event:%s in waiting list, req state:%s not fulfilled, put it back", pTask->id.idStr, - pSM->current.name, StreamTaskEventList[pEvtInfo->event].name, + pSM->current.name, GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name); } } else { @@ -359,7 +360,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even int64_t el = (taosGetTimestampMs() - pSM->startTs); stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, - StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name); + GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name); } return TSDB_CODE_SUCCESS; From 12bc4f32cf92b01ec6f05b1c26a839fccf8814e7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 1 Nov 2023 16:12:15 +0800 Subject: [PATCH 04/13] refactor: add some logs. --- include/libs/stream/tstream.h | 1 - source/dnode/vnode/src/tq/tq.c | 15 +++++ source/dnode/vnode/src/tq/tqStreamTask.c | 14 ++-- source/dnode/vnode/src/vnd/vnodeSync.c | 13 ++-- source/libs/stream/src/streamCheckpoint.c | 16 +++-- source/libs/stream/src/streamExec.c | 10 +-- source/libs/stream/src/streamMeta.c | 23 +++++-- source/libs/stream/src/streamStart.c | 79 ++--------------------- 8 files changed, 64 insertions(+), 107 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5e145d8fbb..81a0caeb6b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -762,7 +762,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); int32_t streamRestoreParam(SStreamTask* pTask); void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); void streamTaskResume(SStreamTask* pTask); -void streamTaskDisablePause(SStreamTask* pTask); void streamTaskEnablePause(SStreamTask* pTask); int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bcf665ddfb..f41826fe25 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1802,6 +1802,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // update the nodeEpset when it exists taosWLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-wlock", pMeta->vgId); // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; @@ -1811,6 +1812,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { req.taskId); rsp.code = TSDB_CODE_SUCCESS; taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); + taosArrayDestroy(req.pNodeList); return rsp.code; } @@ -1833,11 +1836,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { req.transId); rsp.code = TSDB_CODE_SUCCESS; taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); taosArrayDestroy(req.pNodeList); return rsp.code; } taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); // the following two functions should not be executed within the scope of meta lock to avoid deadlock streamTaskUpdateEpsetInfo(pTask, req.pNodeList); @@ -1845,6 +1850,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // continue after lock the meta again taosWLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-wlock", pMeta->vgId); SStreamTask** ppHTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -1895,15 +1901,19 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, updateTasks, (numOfTasks - updateTasks)); taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); } else { if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); pMeta->startInfo.startAllTasksFlag = 0; taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); } else { tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId); terrno = 0; + taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); while (streamMetaTaskInTimer(pMeta)) { qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); @@ -1911,10 +1921,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } taosWLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-wlock", pMeta->vgId); + int32_t code = streamMetaReopen(pMeta); if (code != 0) { tqError("vgId:%d failed to reopen stream meta", vgId); taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); taosArrayDestroy(req.pNodeList); return -1; } @@ -1922,6 +1935,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { tqError("vgId:%d failed to load stream tasks", vgId); taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); taosArrayDestroy(req.pNodeList); return -1; } @@ -1935,6 +1949,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); } } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 1672c8e609..6dc5a77fc1 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -39,17 +39,15 @@ int32_t tqScanWal(STQ* pTq) { if (shouldIdle) { taosWLockLatch(&pMeta->lock); - int32_t times = (--pMeta->walScanCounter); ASSERT(pMeta->walScanCounter >= 0); - - if (pMeta->walScanCounter <= 0) { - taosWUnLockLatch(&pMeta->lock); - break; - } - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION); + + if (times <= 0) { + break; + } else { + tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION); + } } taosMsleep(SCAN_WAL_IDLE_DURATION); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 7d19f23e23..5262620116 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -551,10 +551,14 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) walApplyVer(pVnode->pWal, commitIdx); pVnode->restored = true; - taosWLockLatch(&pVnode->pTq->pStreamMeta->lock); - if (pVnode->pTq->pStreamMeta->startInfo.startAllTasksFlag) { + SStreamMeta* pMeta = pVnode->pTq->pStreamMeta; + taosWLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-wlock", pMeta->vgId); + + if (pMeta->startInfo.startAllTasksFlag) { vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); - taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock); + taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); return; } @@ -571,7 +575,8 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); } - taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock); + taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d meta-unlock", pMeta->vgId); } static void vnodeBecomeFollower(const SSyncFSM *pFsm) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 9eaa9fcb92..6d4f09b768 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -185,6 +185,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc { // todo: remove this when the pipeline checkpoint generating is used. SStreamMeta* pMeta = pTask->pMeta; taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); if (pMeta->chkptNotReadyTasks == 0) { pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks; @@ -281,8 +282,10 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) { int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { int32_t vgId = pMeta->vgId; + int32_t code = 0; taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { STaskId* pId = taosArrayGet(pMeta->pTaskList, i); @@ -304,10 +307,11 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { char* str = NULL; streamTaskGetStatus(p, &str); - int32_t code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); if (code != TSDB_CODE_SUCCESS) { stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId); taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); return -1; } else { // save the task streamMetaSaveTask(pMeta, p); @@ -320,17 +324,17 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { str); } - if (streamMetaCommit(pMeta) < 0) { - taosWUnLockLatch(&pMeta->lock); + code = streamMetaCommit(pMeta); + if (code < 0) { stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId, checkpointId, terrstr()); - return -1; } else { - taosWUnLockLatch(&pMeta->lock); stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId); } - return TSDB_CODE_SUCCESS; + taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); + return code; } int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c0589f6ab1..a1951b23cc 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -356,17 +356,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 4. free it and remove fill-history task from disk meta-store streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); - // 5. clear the link between fill-history task and stream task info -// CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask); - - // 6. save to disk + // 5. save to disk taosWLockLatch(&pMeta->lock); - pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL); -// streamMetaSaveTask(pMeta, pStreamTask); -// if (streamMetaCommit(pMeta) < 0) { - // persist to disk -// } taosWUnLockLatch(&pMeta->lock); // 7. pause allowed. diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f788e244cd..ce7f325922 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -492,6 +492,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t // pre-delete operation taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); @@ -509,9 +510,11 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } else { stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); return 0; } taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId); @@ -522,20 +525,25 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t if (ppTask) { if ((*ppTask)->status.timerActive == 0) { taosRUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); break; } taosMsleep(10); stDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr); taosRUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); } else { taosRUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); break; } } // let's do delete of stream task taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; @@ -566,18 +574,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); return 0; } int32_t streamMetaBegin(SStreamMeta* pMeta) { taosWLockLatch(&pMeta->lock); - if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, - TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - taosWUnLockLatch(&pMeta->lock); - return -1; - } + int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); taosWUnLockLatch(&pMeta->lock); - return 0; + return code; } // todo add error log @@ -1013,6 +1019,7 @@ void metaHbToMnode(void* param, void* tmrId) { bool streamMetaTaskInTimer(SStreamMeta* pMeta) { bool inTimer = false; taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); void* pIter = NULL; while (1) { @@ -1028,6 +1035,8 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { } taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); + return inTimer; } @@ -1038,6 +1047,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); void* pIter = NULL; while (1) { @@ -1052,6 +1062,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { } taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); // wait for the stream meta hb function stopping if (pMeta->role == NODE_ROLE_LEADER) { diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 7756d7a2e0..098385c3ef 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -625,6 +625,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { SStreamMeta* pMeta = pInfo->pMeta; taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id)); if (ppTask) { ASSERT((*ppTask)->status.timerActive >= 1); @@ -638,10 +640,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { taosMemoryFree(pInfo); taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); return; } } taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId); if (pTask != NULL) { @@ -934,66 +938,6 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { } void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { -#if 0 - int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__DROPPING) { - stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); - return; - } - - const char* str = streamGetTaskStatusStr(status); - if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) { - stDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str); - return; - } - - if(pTask->info.taskLevel == TASK_LEVEL__SINK) { - int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); - return; - } - - while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { - status = pTask->status.taskStatus; - if (status == TASK_STATUS__DROPPING) { - stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); - return; - } - - if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) { - stDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str); - return; - } -// -// if (pTask->status.downstreamReady == 0) { -// ASSERT(pTask->execInfo.start == 0); -// stDebug("s-task:%s in check downstream procedure, abort and paused", pTask->id.idStr); -// break; -// } - - const char* pStatus = streamGetTaskStatusStr(status); - stDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId); - taosMsleep(100); - } - - // todo: use the task lock, stead of meta lock - taosWLockLatch(&pMeta->lock); - - status = pTask->status.taskStatus; - if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr); - return; - } - - atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); - int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); - taosWUnLockLatch(&pMeta->lock); - -#endif - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE); int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); @@ -1029,19 +973,6 @@ void streamTaskResume(SStreamTask* pTask) { } } -// todo fix race condition -void streamTaskDisablePause(SStreamTask* pTask) { - // pre-condition check -// const char* id = pTask->id.idStr; -// while (pTask->status.taskStatus == TASK_STATUS__PAUSE) { -// stDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id); -// taosMsleep(100); -// } -// -// stDebug("s-task:%s disable task pause", id); -// pTask->status.pauseAllowed = 0; -} - void streamTaskEnablePause(SStreamTask* pTask) { stDebug("s-task:%s enable task pause", pTask->id.idStr); pTask->status.pauseAllowed = 1; @@ -1051,6 +982,7 @@ int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock", pMeta->vgId); STaskId id = streamTaskExtractKey(pTask); taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); @@ -1072,5 +1004,6 @@ int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) { } taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-unlock", pMeta->vgId); return TSDB_CODE_SUCCESS; } From 246ed4e022f2987000c87fd4cdc6585a400c3eba Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 1 Nov 2023 17:19:21 +0800 Subject: [PATCH 05/13] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 4 ++ source/dnode/snode/src/snode.c | 10 ++-- source/dnode/vnode/src/tq/tq.c | 48 ++++++--------- source/dnode/vnode/src/tq/tqPush.c | 4 +- source/dnode/vnode/src/tq/tqRead.c | 4 +- source/dnode/vnode/src/tq/tqStreamTask.c | 28 ++++----- source/dnode/vnode/src/vnd/vnodeSync.c | 9 +-- source/libs/stream/src/streamCheckpoint.c | 14 ++--- source/libs/stream/src/streamExec.c | 8 +-- source/libs/stream/src/streamMeta.c | 73 ++++++++++++----------- source/libs/stream/src/streamStart.c | 19 +++--- source/libs/stream/src/streamState.c | 2 +- 12 files changed, 104 insertions(+), 119 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 81a0caeb6b..2e4204ab34 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -806,6 +806,10 @@ void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask); +void streamMetaRLock(SStreamMeta* pMeta); +void streamMetaRUnLock(SStreamMeta* pMeta); +void streamMetaWLock(SStreamMeta* pMeta); +void streamMetaWUnLock(SStreamMeta* pMeta); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 4e84b4cd26..f2ef00c534 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -165,17 +165,17 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG); // 2.save task - taosWLockLatch(&pSnode->pMeta->lock); + streamMetaWLock(pSnode->pMeta); bool added = false; code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask, &added); if (code < 0) { - taosWUnLockLatch(&pSnode->pMeta->lock); + streamMetaWUnLock(pSnode->pMeta); return -1; } int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); - taosWUnLockLatch(&pSnode->pMeta->lock); + streamMetaWUnLock(pSnode->pMeta); char* p = NULL; streamTaskGetStatus(pTask, &p); @@ -195,14 +195,14 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId); // commit the update - taosWLockLatch(&pSnode->pMeta->lock); + streamMetaWLock(pSnode->pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); qDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks); if (streamMetaCommit(pSnode->pMeta) < 0) { // persist to disk } - taosWUnLockLatch(&pSnode->pMeta->lock); + streamMetaWUnLock(pSnode->pMeta); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f41826fe25..2b091de5d0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1020,10 +1020,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms int64_t streamId = pTask->id.streamId; bool added = false; - taosWLockLatch(&pStreamMeta->lock); + streamMetaWLock(pStreamMeta); code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added); int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); - taosWUnLockLatch(&pStreamMeta->lock); + streamMetaWUnLock(pStreamMeta); if (code < 0) { tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); @@ -1403,14 +1403,14 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); // commit the update - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); if (streamMetaCommit(pMeta) < 0) { // persist to disk } - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return 0; } @@ -1721,7 +1721,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) taosThreadMutexUnlock(&pTask->lock); int32_t total = 0; - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); // set the initial value for generating check point // set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed @@ -1730,7 +1730,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) } total = pMeta->numOfStreamTasks; - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", total checkpoint reqs:%d", pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total); @@ -1801,8 +1801,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tDecoderClear(&decoder); // update the nodeEpset when it exists - taosWLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; @@ -1811,8 +1810,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, req.taskId); rsp.code = TSDB_CODE_SUCCESS; - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); taosArrayDestroy(req.pNodeList); return rsp.code; @@ -1835,22 +1833,19 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, req.transId); rsp.code = TSDB_CODE_SUCCESS; - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); taosArrayDestroy(req.pNodeList); return rsp.code; } - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); // the following two functions should not be executed within the scope of meta lock to avoid deadlock streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskResetStatus(pTask); // continue after lock the meta again - taosWLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); SStreamTask** ppHTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -1900,42 +1895,36 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (updateTasks < numOfTasks) { tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, updateTasks, (numOfTasks - updateTasks)); - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); } else { if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); pMeta->startInfo.startAllTasksFlag = 0; - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); } else { tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId); terrno = 0; - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); while (streamMetaTaskInTimer(pMeta)) { qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); taosMsleep(100); } - taosWLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); int32_t code = streamMetaReopen(pMeta); if (code != 0) { tqError("vgId:%d failed to reopen stream meta", vgId); - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); taosArrayDestroy(req.pNodeList); return -1; } if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { tqError("vgId:%d failed to load stream tasks", vgId); - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); taosArrayDestroy(req.pNodeList); return -1; } @@ -1948,8 +1937,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { vInfo("vgId:%d, follower node not start stream tasks", vgId); } - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); } } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 50ee52f45b..f367bc96f8 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -35,9 +35,9 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) { tqProcessSubmitReqForSubscribe(pTq); } - taosRLockLatch(&pTq->pStreamMeta->lock); + streamMetaRLock(pTq->pStreamMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); - taosRUnLockLatch(&pTq->pStreamMeta->lock); + streamMetaRUnLock(pTq->pStreamMeta); // tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 2a56cd3847..cd7a862344 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1134,7 +1134,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { taosWUnLockLatch(&pTq->lock); // update the table list handle for each stream scanner/wal reader - taosWLockLatch(&pTq->pStreamMeta->lock); + streamMetaWLock(pTq->pStreamMeta); while (1) { pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter); if (pIter == NULL) { @@ -1151,6 +1151,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } } - taosWUnLockLatch(&pTq->pStreamMeta->lock); + streamMetaWUnLock(pTq->pStreamMeta); return 0; } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 6dc5a77fc1..e7993ac673 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -38,10 +38,10 @@ int32_t tqScanWal(STQ* pTq) { doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle); if (shouldIdle) { - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); int32_t times = (--pMeta->walScanCounter); ASSERT(pMeta->walScanCounter >= 0); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); if (times <= 0) { break; @@ -69,11 +69,11 @@ int32_t tqStartStreamTask(STQ* pTq) { } SArray* pTaskList = NULL; - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); pTaskList = taosArrayDup(pMeta->pTaskList, NULL); taosHashClear(pMeta->startInfo.pReadyTaskSet); pMeta->startInfo.startTs = taosGetTimestampMs(); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); // broadcast the check downstream tasks msg for (int32_t i = 0; i < numOfTasks; ++i) { @@ -146,12 +146,12 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return TSDB_CODE_SUCCESS; } - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { tqDebug("vgId:%d no stream tasks existed to run", vgId); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return 0; } @@ -162,7 +162,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { if (pMeta->walScanCounter > 1) { tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return 0; } @@ -172,7 +172,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { // reset the counter value, since we do not launch the scan wal operation. pMeta->walScanCounter = 0; - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return 0; } @@ -180,7 +180,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return -1; } @@ -191,7 +191,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return 0; } @@ -207,9 +207,9 @@ int32_t tqStopStreamTasks(STQ* pTq) { } SArray* pTaskList = NULL; - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); pTaskList = taosArrayDup(pMeta->pTaskList, NULL); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); @@ -410,9 +410,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { // clone the task list, to avoid the task update during scan wal files SArray* pTaskList = NULL; - taosWLockLatch(&pStreamMeta->lock); + streamMetaWLock(pStreamMeta); pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL); - taosWUnLockLatch(&pStreamMeta->lock); + streamMetaWUnLock(pStreamMeta); tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 5262620116..b4afd898fa 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -552,13 +552,11 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) pVnode->restored = true; SStreamMeta* pMeta = pVnode->pTq->pStreamMeta; - taosWLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); if (pMeta->startInfo.startAllTasksFlag) { vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); return; } @@ -575,8 +573,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); } - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); } static void vnodeBecomeFollower(const SSyncFSM *pFsm) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 6d4f09b768..81840aaeb7 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -184,14 +184,13 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc { // todo: remove this when the pipeline checkpoint generating is used. SStreamMeta* pMeta = pTask->pMeta; - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); if (pMeta->chkptNotReadyTasks == 0) { pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks; } - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); } //todo fix race condition: set the status and append checkpoint block @@ -284,8 +283,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { int32_t vgId = pMeta->vgId; int32_t code = 0; - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { STaskId* pId = taosArrayGet(pMeta->pTaskList, i); @@ -310,8 +308,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); if (code != TSDB_CODE_SUCCESS) { stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId); - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); return -1; } else { // save the task streamMetaSaveTask(pMeta, p); @@ -332,8 +329,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId); } - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a1951b23cc..fd2aa47ef2 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -297,11 +297,11 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 2. save to disk - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); if (streamMetaCommit(pMeta) < 0) { // persist to disk } - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } else { stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, @@ -357,9 +357,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 5. save to disk - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); // 7. pause allowed. streamTaskEnablePause(pStreamTask); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ce7f325922..ee53e330db 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -447,20 +447,20 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { } SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { - taosRLockLatch(&pMeta->lock); + streamMetaRLock(pMeta); STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask != NULL) { if (!streamTaskShouldStop(*ppTask)) { int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); - taosRUnLockLatch(&pMeta->lock); + streamMetaRUnLock(pMeta); stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); return *ppTask; } } - taosRUnLockLatch(&pMeta->lock); + streamMetaRUnLock(pMeta); return NULL; } @@ -491,8 +491,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t SStreamTask* pTask = NULL; // pre-delete operation - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); @@ -509,40 +508,34 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_DROPPING); } else { stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); return 0; } - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId); while (1) { - taosRLockLatch(&pMeta->lock); + streamMetaRLock(pMeta); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { if ((*ppTask)->status.timerActive == 0) { - taosRUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaRUnLock(pMeta); break; } taosMsleep(10); stDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr); - taosRUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaRUnLock(pMeta); } else { - taosRUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaRUnLock(pMeta); break; } } // let's do delete of stream task - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { @@ -573,16 +566,15 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); } - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); return 0; } int32_t streamMetaBegin(SStreamMeta* pMeta) { - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return code; } @@ -890,7 +882,7 @@ void metaHbToMnode(void* param, void* tmrId) { stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER)); SStreamHbMsg hbMsg = {0}; - taosRLockLatch(&pMeta->lock); + streamMetaRLock(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); SEpSet epset = {0}; @@ -963,7 +955,7 @@ void metaHbToMnode(void* param, void* tmrId) { } hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus); - taosRUnLockLatch(&pMeta->lock); + streamMetaRUnLock(pMeta); if (hasMnodeEpset) { int32_t code = 0; @@ -1018,8 +1010,7 @@ void metaHbToMnode(void* param, void* tmrId) { bool streamMetaTaskInTimer(SStreamMeta* pMeta) { bool inTimer = false; - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); void* pIter = NULL; while (1) { @@ -1034,9 +1025,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { } } - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); - + streamMetaWUnLock(pMeta); return inTimer; } @@ -1046,8 +1035,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId, (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); void* pIter = NULL; while (1) { @@ -1061,8 +1049,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { streamTaskStop(pTask); } - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); // wait for the stream meta hb function stopping if (pMeta->role == NODE_ROLE_LEADER) { @@ -1101,4 +1088,22 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { taosHashClear(pStartInfo->pReadyTaskSet); pStartInfo->startAllTasksFlag = 0; pStartInfo->readyTs = 0; -} \ No newline at end of file +} + +void streamMetaRLock(SStreamMeta* pMeta) { + stDebug("vgId:%d meta-rlock", pMeta->vgId); + taosRLockLatch(&pMeta->lock); +} +void streamMetaRUnLock(SStreamMeta* pMeta) { + stDebug("vgId:%d meta-runlock", pMeta->vgId); + taosRUnLockLatch(&pMeta->lock); +} +void streamMetaWLock(SStreamMeta* pMeta) { + stDebug("vgId:%d meta-wlock", pMeta->vgId); + taosWLockLatch(&pMeta->lock); +} +void streamMetaWUnLock(SStreamMeta* pMeta) { + stDebug("vgId:%d meta-wunlock", pMeta->vgId); + taosWUnLockLatch(&pMeta->lock); +} + diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 098385c3ef..5ebc60dc13 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -583,10 +583,10 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); streamTaskSetSchedStatusInactive(pTask); - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); streamMetaSaveTask(pMeta, pTask); streamMetaCommit(pMeta); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); // history data scan in the stream time window finished, now let's enable the pause streamTaskEnablePause(pTask); @@ -624,8 +624,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id)); if (ppTask) { @@ -639,13 +638,11 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { (*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref); taosMemoryFree(pInfo); - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); return; } } - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId); if (pTask != NULL) { @@ -981,8 +978,7 @@ void streamTaskEnablePause(SStreamTask* pTask) { int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); STaskId id = streamTaskExtractKey(pTask); taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); @@ -1003,7 +999,6 @@ int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) { pStartInfo->elapsedTime / 1000.0); } - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 4a056563ee..fb0090ec6d 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -121,7 +121,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz #ifdef USE_ROCKSDB SStreamMeta* pMeta = pStreamTask->pMeta; pState->streamBackendRid = pMeta->streamBackendRid; - // taosWLockLatch(&pMeta->lock); + // streamMetaWLock(pMeta); taosThreadMutexLock(&pMeta->backendMutex); void* uniqueId = taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); From 899d13b4ae6edf3ec10f88c910831b6e1a06f959 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 1 Nov 2023 17:25:59 +0800 Subject: [PATCH 06/13] fix(stream): fix deference null ptr. --- source/libs/stream/src/streamTaskSm.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 78f864d6ec..bca8273a78 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -248,10 +248,12 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { stDebug("s-task:%s lock", pTask->id.idStr); if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) { + EStreamTaskEvent evt = pSM->pActiveTrans->event; taosThreadMutexUnlock(&pTask->lock); - taosMsleep(100); + stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed", - pTask->id.idStr, pSM->current.name, GET_EVT_NAME(pSM->pActiveTrans->event)); + pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt)); + taosMsleep(100); } else { pTrans = streamTaskFindTransform(pSM->current.state, event); if (pTrans == NULL) { From ae25f09c454b3ccf201142367a8595b1af42a5b3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 1 Nov 2023 18:03:44 +0800 Subject: [PATCH 07/13] refactor: do some internal refactor. --- source/libs/stream/src/streamMeta.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ee53e330db..b0bffac9ee 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1093,17 +1093,22 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { void streamMetaRLock(SStreamMeta* pMeta) { stDebug("vgId:%d meta-rlock", pMeta->vgId); taosRLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-rlock done", pMeta->vgId); } void streamMetaRUnLock(SStreamMeta* pMeta) { stDebug("vgId:%d meta-runlock", pMeta->vgId); taosRUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-runlock done", pMeta->vgId); + } void streamMetaWLock(SStreamMeta* pMeta) { stDebug("vgId:%d meta-wlock", pMeta->vgId); taosWLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wlock done", pMeta->vgId); } void streamMetaWUnLock(SStreamMeta* pMeta) { stDebug("vgId:%d meta-wunlock", pMeta->vgId); taosWUnLockLatch(&pMeta->lock); + stDebug("vgId:%d meta-wunlock done", pMeta->vgId); } From e7609d8e563ee75ae8c86b8afbaca5ec4e6825f2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 1 Nov 2023 19:09:21 +0800 Subject: [PATCH 08/13] fix(stream): fix dead-lock --- source/libs/stream/src/streamMeta.c | 47 +++++++++++++++++------------ 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b0bffac9ee..e6d5f023d6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -882,19 +882,33 @@ void metaHbToMnode(void* param, void* tmrId) { stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER)); SStreamHbMsg hbMsg = {0}; + SEpSet epset = {0}; + bool hasMnodeEpset = false; + int32_t stage = 0; + streamMetaRLock(pMeta); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - - SEpSet epset = {0}; - bool hasMnodeEpset = false; - hbMsg.vgId = pMeta->vgId; + stage = pMeta->stage; + + SArray* pIdList = taosArrayDup(pMeta->pTaskList, NULL); + + streamMetaRUnLock(pMeta); + hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); for (int32_t i = 0; i < numOfTasks; ++i) { - STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + STaskId* pId = taosArrayGet(pIdList, i); + + streamMetaRLock(pMeta); SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); + streamMetaRUnLock(pMeta); + + if (pTask == NULL) { + continue; + } // not report the status of fill-history task if ((*pTask)->info.fillHistory == 1) { @@ -904,12 +918,12 @@ void metaHbToMnode(void* param, void* tmrId) { STaskStatusEntry entry = { .id = *pId, .status = streamTaskGetStatus(*pTask, NULL), - .nodeId = pMeta->vgId, - .stage = pMeta->stage, + .nodeId = hbMsg.vgId, + .stage = stage, .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), }; - entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; + entry.inputRate = entry.inputQUsed * 100.0 / STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate; entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); @@ -928,9 +942,9 @@ void metaHbToMnode(void* param, void* tmrId) { taosThreadMutexLock(&(*pTask)->lock); int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList); for (int j = 0; j < num; ++j) { - int32_t *pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j); + int32_t* pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j); - bool exist = false; + bool exist = false; int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes); for (int k = 0; k < numOfExisted; ++k) { if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) { @@ -955,7 +969,6 @@ void metaHbToMnode(void* param, void* tmrId) { } hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus); - streamMetaRUnLock(pMeta); if (hasMnodeEpset) { int32_t code = 0; @@ -1091,24 +1104,20 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { } void streamMetaRLock(SStreamMeta* pMeta) { - stDebug("vgId:%d meta-rlock", pMeta->vgId); + stTrace("vgId:%d meta-rlock", pMeta->vgId); taosRLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-rlock done", pMeta->vgId); } void streamMetaRUnLock(SStreamMeta* pMeta) { - stDebug("vgId:%d meta-runlock", pMeta->vgId); + stTrace("vgId:%d meta-runlock", pMeta->vgId); taosRUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-runlock done", pMeta->vgId); } void streamMetaWLock(SStreamMeta* pMeta) { - stDebug("vgId:%d meta-wlock", pMeta->vgId); + stTrace("vgId:%d meta-wlock", pMeta->vgId); taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock done", pMeta->vgId); } void streamMetaWUnLock(SStreamMeta* pMeta) { - stDebug("vgId:%d meta-wunlock", pMeta->vgId); + stTrace("vgId:%d meta-wunlock", pMeta->vgId); taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wunlock done", pMeta->vgId); } From df7633650e2e8dbedb0b690af3dff606b7a349df Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 1 Nov 2023 20:10:20 +0800 Subject: [PATCH 09/13] fix(stream): fix memory leak. --- source/libs/stream/src/streamDispatch.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 14129522d6..d9f73c1ba0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -429,6 +429,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); int32_t code = 0; + { SArray* pList = taosArrayDup(pTask->msgInfo.pRetryList, NULL); taosArrayClear(pTask->msgInfo.pRetryList); @@ -440,7 +441,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { int32_t numOfVgroups = taosArrayGetSize(vgInfo); int32_t numOfFailed = taosArrayGetSize(pList); - stDebug("s-task:%s (child taskId:%d) re-try shuffle-dispatch blocks to %d vgroup(s), msgId:%d", + stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch blocks to %d vgroup(s), msgId:%d", id, pTask->info.selfChildId, numOfFailed, msgId); for (int32_t i = 0; i < numOfFailed; i++) { @@ -471,6 +472,8 @@ static void doRetryDispatchData(void* param, void* tmrId) { code = doSendDispatchMsg(pTask, pReq, vgId, pEpSet); } + + taosArrayDestroy(pList); } if (code != TSDB_CODE_SUCCESS) { From 79e971385ffec02b3b87e347c8f7751b7550fef4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 2 Nov 2023 08:11:51 +0800 Subject: [PATCH 10/13] fix(stream): fix memory leak. --- source/libs/stream/src/streamDispatch.c | 6 +++--- source/libs/stream/src/streamMeta.c | 26 ++++++++++++------------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d9f73c1ba0..edfc66762d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -41,9 +41,9 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas int32_t numOfBlocks, int64_t dstTaskId, int32_t type); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { - pMsg->msgType = msgType; - pMsg->pCont = pCont; - pMsg->contLen = contLen; + pMsg->msgType = msgType; + pMsg->pCont = pCont; + pMsg->contLen = contLen; } int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e6d5f023d6..887e879934 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -844,6 +844,12 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { return false; } +static void clearHbMsg(SStreamHbMsg* pMsg, SArray* pIdList) { + taosArrayDestroy(pMsg->pTaskStatus); + taosArrayDestroy(pMsg->pUpdateNodes); + taosArrayDestroy(pIdList); +} + void metaHbToMnode(void* param, void* tmrId) { int64_t rid = *(int64_t*)param; @@ -977,17 +983,13 @@ void metaHbToMnode(void* param, void* tmrId) { tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code); if (code < 0) { stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); - taosArrayDestroy(hbMsg.pTaskStatus); - taosReleaseRef(streamMetaId, rid); - return; + goto _end; } void* buf = rpcMallocCont(tlen); if (buf == NULL) { stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - taosArrayDestroy(hbMsg.pTaskStatus); - taosReleaseRef(streamMetaId, rid); - return; + goto _end; } SEncoder encoder; @@ -995,15 +997,12 @@ void metaHbToMnode(void* param, void* tmrId) { if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { rpcFreeCont(buf); stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); - taosArrayDestroy(hbMsg.pTaskStatus); - taosReleaseRef(streamMetaId, rid); - return; + goto _end; } tEncoderClear(&encoder); - SRpcMsg msg = {0}; + SRpcMsg msg = {.info.noResp = 1,}; initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); - msg.info.noResp = 1; pMeta->pHbInfo->hbCount += 1; @@ -1014,9 +1013,8 @@ void metaHbToMnode(void* param, void* tmrId) { stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); } - taosArrayDestroy(hbMsg.pTaskStatus); - taosArrayDestroy(hbMsg.pUpdateNodes); - + _end: + clearHbMsg(&hbMsg, pIdList); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); } From e14d42c01e8546305897806b9c6e233c9243e5f4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 2 Nov 2023 08:46:18 +0800 Subject: [PATCH 11/13] fix(stream): add qualified status in assert --- source/libs/stream/src/streamTaskSm.c | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index bca8273a78..6c91bbf2d8 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -139,9 +139,10 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream void streamTaskRestoreStatus(SStreamTask* pTask) { SStreamTaskSM* pSM = pTask->status.pSM; - taosThreadMutexLock(&pTask->lock); - ASSERT(pSM->pActiveTrans == NULL); + taosThreadMutexLock(&pTask->lock); + + ASSERT(pSM->pActiveTrans == NULL); ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT); SStreamTaskState state = pSM->current; @@ -289,13 +290,13 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even // do update the task status taosThreadMutexLock(&pTask->lock); - stDebug("s-task:%s lock", pTask->id.idStr); STaskStateTrans* pTrans = pSM->pActiveTrans; - if (pTrans == NULL) { ETaskStatus s = pSM->current.state; - ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP); + ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP || + s == TASK_STATUS__UNINIT || s == TASK_STATUS__READY); + // the pSM->prev.evt may be 0, so print string is not appropriate. stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr, GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt)); @@ -309,7 +310,6 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr, GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event)); taosThreadMutexUnlock(&pTask->lock); - stDebug("s-task:%s unlocky", pTask->id.idStr); return TSDB_CODE_INVALID_PARA; } @@ -342,7 +342,6 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even pSM->pActiveTrans = pNextTrans; pSM->startTs = taosGetTimestampMs(); taosThreadMutexUnlock(&pTask->lock); - stDebug("s-task:%s unlockf", pTask->id.idStr); int32_t code = pNextTrans->pAction(pSM->pTask); if (pNextTrans->autoInvokeEndFn) { @@ -358,7 +357,6 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even } } else { taosThreadMutexUnlock(&pTask->lock); - stDebug("s-task:%s unlockz", pTask->id.idStr); int64_t el = (taosGetTimestampMs() - pSM->startTs); stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, From 351a31302f418ef12dc594c97021ae75b7994b57 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 2 Nov 2023 09:43:48 +0800 Subject: [PATCH 12/13] fix(stream): add full unsupported event filtering. --- include/util/taoserror.h | 1 + source/dnode/vnode/src/tq/tq.c | 89 ++++++++++++------------ source/dnode/vnode/src/tq/tqStreamTask.c | 9 ++- source/libs/stream/src/streamTaskSm.c | 49 +++++++++---- source/util/src/terror.c | 2 + 5 files changed, 89 insertions(+), 61 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 227e8520e3..124c8bac4c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -803,6 +803,7 @@ int32_t* taosGetErrno(); // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) #define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102) +#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2b091de5d0..28e2235f3a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1061,6 +1061,47 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms return code; } +static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) { + const char* id = pTask->id.idStr; + int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; + + // if it's an source task, extract the last version in wal. + SVersionRange *pRange = &pTask->dataRange.range; + + bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer); + pTask->execInfo.step2Start = taosGetTimestampMs(); + + if (done) { + qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); + streamTaskPutTranstateIntoInputQ(pTask); + streamExecTask(pTask); // exec directly + } else { + STimeWindow* pWindow = &pTask->dataRange.window; + tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 + ", do secondary scan-history from WAL after halt the related stream task:%s", + id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, + pStreamTask->id.idStr); + ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); + + streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); + + int64_t dstVer = pTask->dataRange.range.minVer; + pTask->chkInfo.nextProcessVer = dstVer; + + walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); + tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, + pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); + + /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); + + // now the fill-history task starts to scan data from wal files. + int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); + if (code == TSDB_CODE_SUCCESS) { + tqScanWalAsync(pTq, false); + } + } +} + // this function should be executed by only one thread int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; @@ -1143,7 +1184,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { - SVersionRange* pRange = NULL; SStreamTask* pStreamTask = NULL; // 1. get the related stream task @@ -1162,50 +1202,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); - streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); - int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; - - // if it's an source task, extract the last version in wal. - pRange = &pTask->dataRange.range; - bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer); - pTask->execInfo.step2Start = taosGetTimestampMs(); - - if (done) { - qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); - streamTaskPutTranstateIntoInputQ(pTask); -// streamTaskRestoreStatus(pTask); - -// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { -// pTask->status.keepTaskStatus = TASK_STATUS__READY; -// qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id, -// streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus)); -// } - - streamExecTask(pTask); // exec directly + code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); + if (code == TSDB_CODE_SUCCESS) { + doStartStep2(pTask, pStreamTask, pTq); } else { - STimeWindow* pWindow = &pTask->dataRange.window; - tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 - ", do secondary scan-history from WAL after halt the related stream task:%s", - id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, - pStreamTask->id.idStr); - ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - - streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); - - int64_t dstVer = pTask->dataRange.range.minVer; - pTask->chkInfo.nextProcessVer = dstVer; - - walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); - tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, - pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); - - /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); - - // now the fill-history task starts to scan data from wal files. - code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); - if (code == TSDB_CODE_SUCCESS) { - tqScanWalAsync(pTq, false); - } + tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr); } streamMetaReleaseTask(pMeta, pStreamTask); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index e7993ac673..2d94f23009 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -59,6 +59,7 @@ int32_t tqScanWal(STQ* pTq) { } int32_t tqStartStreamTask(STQ* pTq) { + int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -102,12 +103,16 @@ int32_t tqStartStreamTask(STQ* pTq) { } EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; - streamTaskHandleEvent(pTask->status.pSM, event); + int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); + if (ret != TSDB_CODE_SUCCESS) { + code = ret; + } + streamMetaReleaseTask(pMeta, pTask); } taosArrayDestroy(pTaskList); - return 0; + return code; } int32_t tqLaunchStreamTaskAsync(STQ* pTq) { diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 6c91bbf2d8..508efe9856 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -82,13 +82,6 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) { return 0; } -int32_t streamTaskSetReadyForWal(SStreamTask* pTask) { - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr); - } - return TSDB_CODE_SUCCESS; -} - static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) { stDebug("s-task:%s start to do checkpoint", pTask->id.idStr); return 0; @@ -108,9 +101,31 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { } // todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE -static bool isUnsupportedTransform(ETaskStatus state, const EStreamTaskEvent event) { - if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT) { - if (event == TASK_EVENT_SCANHIST_DONE || event == TASK_EVENT_CHECKPOINT_DONE || event == TASK_EVENT_GEN_CHECKPOINT) { +static bool isInvalidStateTransfer(ETaskStatus state, const EStreamTaskEvent event) { + if (event == TASK_EVENT_INIT_STREAM_SCANHIST || event == TASK_EVENT_INIT || event == TASK_EVENT_INIT_SCANHIST) { + return (state != TASK_STATUS__UNINIT); + } + + if (event == TASK_EVENT_SCANHIST_DONE) { + return (state != TASK_STATUS__SCAN_HISTORY && state != TASK_STATUS__STREAM_SCAN_HISTORY); + } + + if (event == TASK_EVENT_GEN_CHECKPOINT) { + return (state != TASK_STATUS__READY); + } + + if (event == TASK_EVENT_CHECKPOINT_DONE) { + return (state != TASK_STATUS__CK); + } + + // todo refactor later + if (event == TASK_EVENT_RESUME) { + return true; + } + + if (event == TASK_EVENT_HALT) { + if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT || state == TASK_STATUS__STOP || + state == TASK_STATUS__SCAN_HISTORY) { return true; } } @@ -128,7 +143,7 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream } } - if (isUnsupportedTransform(state, event)) { + if (isInvalidStateTransfer(state, event)) { return NULL; } else { ASSERT(0); @@ -219,7 +234,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt taosMsleep(100); } else { stDebug("s-task:%s is dropped or stopped already, not wait.", id); - return TSDB_CODE_INVALID_PARA; + return TSDB_CODE_STREAM_INVALID_STATETRANS; } } @@ -260,7 +275,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { if (pTrans == NULL) { stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event)); taosThreadMutexUnlock(&pTask->lock); - return TSDB_CODE_INVALID_PARA; // todo: set new error code// failed to handle the event. + return TSDB_CODE_STREAM_INVALID_STATETRANS; } if (pSM->pActiveTrans != NULL) { @@ -303,14 +318,14 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even taosThreadMutexUnlock(&pTask->lock); stDebug("s-task:%s unlockx", pTask->id.idStr); - return TSDB_CODE_INVALID_PARA; + return TSDB_CODE_STREAM_INVALID_STATETRANS; } if (pTrans->event != event) { stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr, GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event)); taosThreadMutexUnlock(&pTask->lock); - return TSDB_CODE_INVALID_PARA; + return TSDB_CODE_STREAM_INVALID_STATETRANS; } keepPrevInfo(pSM); @@ -469,6 +484,10 @@ void doInitStateTransferTable(void) { streamTaskKeepCurrentVerInWal, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, + streamTaskKeepCurrentVerInWal, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b4a1a2eae2..b5a4fc4008 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -665,6 +665,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state transfer") + // TDLite TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags") TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory") From 420825c62fc43f22c58f9a27146fc973725e3428 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 2 Nov 2023 10:22:03 +0800 Subject: [PATCH 13/13] fix(stream): return error code. --- source/libs/stream/src/streamTaskSm.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 508efe9856..09656cbe97 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -218,7 +218,6 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt if (pTrans->attachEvent.event != 0) { attachEvent(pTask, &pTrans->attachEvent); taosThreadMutexUnlock(&pTask->lock); - stDebug("s-task:%s unlock1", pTask->id.idStr); while (1) { // wait for the task to be here @@ -242,7 +241,6 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt pSM->pActiveTrans = pTrans; pSM->startTs = taosGetTimestampMs(); taosThreadMutexUnlock(&pTask->lock); - stDebug("s-task:%s unlock2", pTask->id.idStr); int32_t code = pTrans->pAction(pTask); // todo handle error code; @@ -256,12 +254,12 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt } int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { + int32_t code = TSDB_CODE_SUCCESS; SStreamTask* pTask = pSM->pTask; STaskStateTrans* pTrans = NULL; while (1) { taosThreadMutexLock(&pTask->lock); - stDebug("s-task:%s lock", pTask->id.idStr); if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) { EStreamTaskEvent evt = pSM->pActiveTrans->event; @@ -285,12 +283,12 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { pSM->pActiveTrans->next.name, GET_EVT_NAME(event)); } - doHandleEvent(pSM, event, pTrans); + code = doHandleEvent(pSM, event, pTrans); break; } } - return TSDB_CODE_SUCCESS; + return code; } static void keepPrevInfo(SStreamTaskSM* pSM) {