diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 32ac38cee6..0ad3023cbe 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -31,7 +31,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t startStreamTasks(SStreamMeta* pMeta); -int32_t resetStreamTaskStatus(SStreamMeta* pMeta); +int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); #endif // TDENGINE_TQ_COMMON_H diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 853dc9d1aa..31d923169a 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -150,7 +150,7 @@ FAIL: } int32_t sndInit(SSnode * pSnode) { - resetStreamTaskStatus(pSnode->pMeta); + tqStreamTaskResetStatus(pSnode->pMeta); startStreamTasks(pSnode->pMeta); return 0; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index bfae691dbf..011ce674ec 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -218,7 +218,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { tqInfo("vgId:%d start all stream tasks after all being updated", vgId); - resetStreamTaskStatus(pTq->pStreamMeta); + tqStreamTaskResetStatus(pTq->pStreamMeta); tqStartStreamTaskAsync(pTq, false); } else { tqInfo("vgId:%d, follower node not start stream tasks", vgId); @@ -703,7 +703,7 @@ int32_t startStreamTasks(SStreamMeta* pMeta) { return code; } -int32_t resetStreamTaskStatus(SStreamMeta* pMeta) { +int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); @@ -763,7 +763,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } if (isLeader && !tsDisableStream) { - resetStreamTaskStatus(pMeta); + tqStreamTaskResetStatus(pMeta); streamMetaWUnLock(pMeta); tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 5871a60c9e..48811b2925 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -570,7 +570,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); } else { vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); - resetStreamTaskStatus(pVnode->pTq->pStreamMeta); + tqStreamTaskResetStatus(pVnode->pTq->pStreamMeta); tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false); } } else { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 59178336d8..c263968828 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1270,18 +1270,22 @@ void streamMetaRLock(SStreamMeta* pMeta) { stTrace("vgId:%d meta-rlock", pMeta->vgId); taosRLockLatch(&pMeta->lock); } + void streamMetaRUnLock(SStreamMeta* pMeta) { stTrace("vgId:%d meta-runlock", pMeta->vgId); taosRUnLockLatch(&pMeta->lock); } + void streamMetaWLock(SStreamMeta* pMeta) { stTrace("vgId:%d meta-wlock", pMeta->vgId); taosWLockLatch(&pMeta->lock); } + void streamMetaWUnLock(SStreamMeta* pMeta) { stTrace("vgId:%d meta-wunlock", pMeta->vgId); taosWUnLockLatch(&pMeta->lock); } + static void execHelper(struct SSchedMsg* pSchedMsg) { __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; int32_t code = execFn(pSchedMsg->thandle); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 8bd7984d29..0ae85024c0 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -1083,7 +1083,6 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI int64_t endTs, bool ready) { STaskStartInfo* pStartInfo = &pMeta->startInfo; STaskId id = {.streamId = streamId, .taskId = taskId}; - bool restart = true; streamMetaWLock(pMeta); SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; diff --git a/source/os/test/osTests.cpp b/source/os/test/osTests.cpp index 16660a9477..4d9ad9b5bd 100644 --- a/source/os/test/osTests.cpp +++ b/source/os/test/osTests.cpp @@ -72,7 +72,7 @@ TEST(osTest, osSystem) { const int sysLen = 64; char osSysName[sysLen]; int ret = taosGetOsReleaseName(osSysName, NULL, NULL, sysLen); - printf("os systeme name:%s\n", osSysName); + printf("os system name:%s\n", osSysName); ASSERT_EQ(ret, 0); }