From 7cf90dde5c29bd99d61a7d447723d94f798d7536 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 1 Nov 2023 17:19:21 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 4 ++ source/dnode/snode/src/snode.c | 10 ++-- source/dnode/vnode/src/tq/tq.c | 48 ++++++--------- source/dnode/vnode/src/tq/tqPush.c | 4 +- source/dnode/vnode/src/tq/tqRead.c | 4 +- source/dnode/vnode/src/tq/tqStreamTask.c | 28 ++++----- source/dnode/vnode/src/vnd/vnodeSync.c | 9 +-- source/libs/stream/src/streamCheckpoint.c | 14 ++--- source/libs/stream/src/streamExec.c | 8 +-- source/libs/stream/src/streamMeta.c | 73 ++++++++++++----------- source/libs/stream/src/streamStart.c | 19 +++--- source/libs/stream/src/streamState.c | 2 +- 12 files changed, 104 insertions(+), 119 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 81a0caeb6b..2e4204ab34 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -806,6 +806,10 @@ void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask); +void streamMetaRLock(SStreamMeta* pMeta); +void streamMetaRUnLock(SStreamMeta* pMeta); +void streamMetaWLock(SStreamMeta* pMeta); +void streamMetaWUnLock(SStreamMeta* pMeta); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 4e84b4cd26..f2ef00c534 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -165,17 +165,17 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG); // 2.save task - taosWLockLatch(&pSnode->pMeta->lock); + streamMetaWLock(pSnode->pMeta); bool added = false; code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask, &added); if (code < 0) { - taosWUnLockLatch(&pSnode->pMeta->lock); + streamMetaWUnLock(pSnode->pMeta); return -1; } int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); - taosWUnLockLatch(&pSnode->pMeta->lock); + streamMetaWUnLock(pSnode->pMeta); char* p = NULL; streamTaskGetStatus(pTask, &p); @@ -195,14 +195,14 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId); // commit the update - taosWLockLatch(&pSnode->pMeta->lock); + streamMetaWLock(pSnode->pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); qDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks); if (streamMetaCommit(pSnode->pMeta) < 0) { // persist to disk } - taosWUnLockLatch(&pSnode->pMeta->lock); + streamMetaWUnLock(pSnode->pMeta); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e934fd5192..7a14f8349f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1023,10 +1023,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms int64_t streamId = pTask->id.streamId; bool added = false; - taosWLockLatch(&pStreamMeta->lock); + streamMetaWLock(pStreamMeta); code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added); int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); - taosWUnLockLatch(&pStreamMeta->lock); + streamMetaWUnLock(pStreamMeta); if (code < 0) { tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); @@ -1406,14 +1406,14 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); // commit the update - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); if (streamMetaCommit(pMeta) < 0) { // persist to disk } - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return 0; } @@ -1724,7 +1724,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) taosThreadMutexUnlock(&pTask->lock); int32_t total = 0; - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); // set the initial value for generating check point // set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed @@ -1733,7 +1733,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) } total = pMeta->numOfStreamTasks; - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", total checkpoint reqs:%d", pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total); @@ -1804,8 +1804,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tDecoderClear(&decoder); // update the nodeEpset when it exists - taosWLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; @@ -1814,8 +1813,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, req.taskId); rsp.code = TSDB_CODE_SUCCESS; - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); taosArrayDestroy(req.pNodeList); return rsp.code; @@ -1838,22 +1836,19 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, req.transId); rsp.code = TSDB_CODE_SUCCESS; - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); taosArrayDestroy(req.pNodeList); return rsp.code; } - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); // the following two functions should not be executed within the scope of meta lock to avoid deadlock streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskResetStatus(pTask); // continue after lock the meta again - taosWLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); SStreamTask** ppHTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -1903,42 +1898,36 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (updateTasks < numOfTasks) { tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, updateTasks, (numOfTasks - updateTasks)); - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); } else { if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); pMeta->startInfo.startAllTasksFlag = 0; - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); } else { tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId); terrno = 0; - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); while (streamMetaTaskInTimer(pMeta)) { qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); taosMsleep(100); } - taosWLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); int32_t code = streamMetaReopen(pMeta); if (code != 0) { tqError("vgId:%d failed to reopen stream meta", vgId); - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); taosArrayDestroy(req.pNodeList); return -1; } if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { tqError("vgId:%d failed to load stream tasks", vgId); - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); taosArrayDestroy(req.pNodeList); return -1; } @@ -1951,8 +1940,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { vInfo("vgId:%d, follower node not start stream tasks", vgId); } - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); } } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 50ee52f45b..f367bc96f8 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -35,9 +35,9 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) { tqProcessSubmitReqForSubscribe(pTq); } - taosRLockLatch(&pTq->pStreamMeta->lock); + streamMetaRLock(pTq->pStreamMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); - taosRUnLockLatch(&pTq->pStreamMeta->lock); + streamMetaRUnLock(pTq->pStreamMeta); // tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index dadbd30808..bd2a591a98 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1111,7 +1111,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { taosWUnLockLatch(&pTq->lock); // update the table list handle for each stream scanner/wal reader - taosWLockLatch(&pTq->pStreamMeta->lock); + streamMetaWLock(pTq->pStreamMeta); while (1) { pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter); if (pIter == NULL) { @@ -1128,6 +1128,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } } - taosWUnLockLatch(&pTq->pStreamMeta->lock); + streamMetaWUnLock(pTq->pStreamMeta); return 0; } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 6dc5a77fc1..e7993ac673 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -38,10 +38,10 @@ int32_t tqScanWal(STQ* pTq) { doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle); if (shouldIdle) { - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); int32_t times = (--pMeta->walScanCounter); ASSERT(pMeta->walScanCounter >= 0); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); if (times <= 0) { break; @@ -69,11 +69,11 @@ int32_t tqStartStreamTask(STQ* pTq) { } SArray* pTaskList = NULL; - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); pTaskList = taosArrayDup(pMeta->pTaskList, NULL); taosHashClear(pMeta->startInfo.pReadyTaskSet); pMeta->startInfo.startTs = taosGetTimestampMs(); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); // broadcast the check downstream tasks msg for (int32_t i = 0; i < numOfTasks; ++i) { @@ -146,12 +146,12 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return TSDB_CODE_SUCCESS; } - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { tqDebug("vgId:%d no stream tasks existed to run", vgId); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return 0; } @@ -162,7 +162,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { if (pMeta->walScanCounter > 1) { tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return 0; } @@ -172,7 +172,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { // reset the counter value, since we do not launch the scan wal operation. pMeta->walScanCounter = 0; - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return 0; } @@ -180,7 +180,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return -1; } @@ -191,7 +191,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return 0; } @@ -207,9 +207,9 @@ int32_t tqStopStreamTasks(STQ* pTq) { } SArray* pTaskList = NULL; - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); pTaskList = taosArrayDup(pMeta->pTaskList, NULL); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); @@ -410,9 +410,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { // clone the task list, to avoid the task update during scan wal files SArray* pTaskList = NULL; - taosWLockLatch(&pStreamMeta->lock); + streamMetaWLock(pStreamMeta); pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL); - taosWUnLockLatch(&pStreamMeta->lock); + streamMetaWUnLock(pStreamMeta); tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 5d8fc6056c..4a0c987e57 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -555,13 +555,11 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) pVnode->restored = true; SStreamMeta* pMeta = pVnode->pTq->pStreamMeta; - taosWLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); if (pMeta->startInfo.startAllTasksFlag) { vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); return; } @@ -578,8 +576,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); } - taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); } static void vnodeBecomeFollower(const SSyncFSM *pFsm) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 6d4f09b768..81840aaeb7 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -184,14 +184,13 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc { // todo: remove this when the pipeline checkpoint generating is used. SStreamMeta* pMeta = pTask->pMeta; - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); if (pMeta->chkptNotReadyTasks == 0) { pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks; } - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); } //todo fix race condition: set the status and append checkpoint block @@ -284,8 +283,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { int32_t vgId = pMeta->vgId; int32_t code = 0; - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { STaskId* pId = taosArrayGet(pMeta->pTaskList, i); @@ -310,8 +308,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); if (code != TSDB_CODE_SUCCESS) { stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId); - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); return -1; } else { // save the task streamMetaSaveTask(pMeta, p); @@ -332,8 +329,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId); } - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a1951b23cc..fd2aa47ef2 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -297,11 +297,11 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 2. save to disk - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); if (streamMetaCommit(pMeta) < 0) { // persist to disk } - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } else { stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, @@ -357,9 +357,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 5. save to disk - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); // 7. pause allowed. streamTaskEnablePause(pStreamTask); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ce7f325922..ee53e330db 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -447,20 +447,20 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { } SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { - taosRLockLatch(&pMeta->lock); + streamMetaRLock(pMeta); STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask != NULL) { if (!streamTaskShouldStop(*ppTask)) { int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); - taosRUnLockLatch(&pMeta->lock); + streamMetaRUnLock(pMeta); stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); return *ppTask; } } - taosRUnLockLatch(&pMeta->lock); + streamMetaRUnLock(pMeta); return NULL; } @@ -491,8 +491,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t SStreamTask* pTask = NULL; // pre-delete operation - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); @@ -509,40 +508,34 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_DROPPING); } else { stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); return 0; } - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId); while (1) { - taosRLockLatch(&pMeta->lock); + streamMetaRLock(pMeta); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { if ((*ppTask)->status.timerActive == 0) { - taosRUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaRUnLock(pMeta); break; } taosMsleep(10); stDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr); - taosRUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaRUnLock(pMeta); } else { - taosRUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaRUnLock(pMeta); break; } } // let's do delete of stream task - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { @@ -573,16 +566,15 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); } - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); return 0; } int32_t streamMetaBegin(SStreamMeta* pMeta) { - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); return code; } @@ -890,7 +882,7 @@ void metaHbToMnode(void* param, void* tmrId) { stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER)); SStreamHbMsg hbMsg = {0}; - taosRLockLatch(&pMeta->lock); + streamMetaRLock(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); SEpSet epset = {0}; @@ -963,7 +955,7 @@ void metaHbToMnode(void* param, void* tmrId) { } hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus); - taosRUnLockLatch(&pMeta->lock); + streamMetaRUnLock(pMeta); if (hasMnodeEpset) { int32_t code = 0; @@ -1018,8 +1010,7 @@ void metaHbToMnode(void* param, void* tmrId) { bool streamMetaTaskInTimer(SStreamMeta* pMeta) { bool inTimer = false; - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); void* pIter = NULL; while (1) { @@ -1034,9 +1025,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { } } - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); - + streamMetaWUnLock(pMeta); return inTimer; } @@ -1046,8 +1035,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId, (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); void* pIter = NULL; while (1) { @@ -1061,8 +1049,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { streamTaskStop(pTask); } - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); // wait for the stream meta hb function stopping if (pMeta->role == NODE_ROLE_LEADER) { @@ -1101,4 +1088,22 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { taosHashClear(pStartInfo->pReadyTaskSet); pStartInfo->startAllTasksFlag = 0; pStartInfo->readyTs = 0; -} \ No newline at end of file +} + +void streamMetaRLock(SStreamMeta* pMeta) { + stDebug("vgId:%d meta-rlock", pMeta->vgId); + taosRLockLatch(&pMeta->lock); +} +void streamMetaRUnLock(SStreamMeta* pMeta) { + stDebug("vgId:%d meta-runlock", pMeta->vgId); + taosRUnLockLatch(&pMeta->lock); +} +void streamMetaWLock(SStreamMeta* pMeta) { + stDebug("vgId:%d meta-wlock", pMeta->vgId); + taosWLockLatch(&pMeta->lock); +} +void streamMetaWUnLock(SStreamMeta* pMeta) { + stDebug("vgId:%d meta-wunlock", pMeta->vgId); + taosWUnLockLatch(&pMeta->lock); +} + diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 098385c3ef..5ebc60dc13 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -583,10 +583,10 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); streamTaskSetSchedStatusInactive(pTask); - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); streamMetaSaveTask(pMeta, pTask); streamMetaCommit(pMeta); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); // history data scan in the stream time window finished, now let's enable the pause streamTaskEnablePause(pTask); @@ -624,8 +624,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id)); if (ppTask) { @@ -639,13 +638,11 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { (*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref); taosMemoryFree(pInfo); - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); return; } } - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId); if (pTask != NULL) { @@ -981,8 +978,7 @@ void streamTaskEnablePause(SStreamTask* pTask) { int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; - taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock", pMeta->vgId); + streamMetaWLock(pMeta); STaskId id = streamTaskExtractKey(pTask); taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); @@ -1003,7 +999,6 @@ int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) { pStartInfo->elapsedTime / 1000.0); } - taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-unlock", pMeta->vgId); + streamMetaWUnLock(pMeta); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 4a056563ee..fb0090ec6d 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -121,7 +121,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz #ifdef USE_ROCKSDB SStreamMeta* pMeta = pStreamTask->pMeta; pState->streamBackendRid = pMeta->streamBackendRid; - // taosWLockLatch(&pMeta->lock); + // streamMetaWLock(pMeta); taosThreadMutexLock(&pMeta->backendMutex); void* uniqueId = taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);