From 1ea671e373f1156dd0a7a8c82317175d7009f8b3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 17 Jan 2024 14:55:15 +0800 Subject: [PATCH] enh(stream): remove time controller. --- include/libs/stream/tstream.h | 1 + source/dnode/vnode/src/inc/tq.h | 1 - source/dnode/vnode/src/tq/tq.c | 14 -------------- source/dnode/vnode/src/tq/tqStreamTask.c | 8 ++++++-- source/libs/stream/src/stream.c | 4 ++++ 5 files changed, 11 insertions(+), 17 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9c7d6f0330..05462f1fd8 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -888,6 +888,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); bool streamMetaAllTasksReady(const SStreamMeta* pMeta); +tmr_h streamTimerGetInstance(); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 435b12bc06..0ef29fcb3a 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -107,7 +107,6 @@ struct STQ { TTB* pExecStore; TTB* pCheckStore; SStreamMeta* pStreamMeta; - void* tqTimer; }; int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1fc7402363..9ae4fa5e19 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -26,18 +26,6 @@ static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_ static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; } static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; } -static int32_t tqTimerInit(STQ* pTq) { - pTq->tqTimer = taosTmrInit(100, 100, 1000, "TQ"); - if (pTq->tqTimer == NULL) { - return -1; - } - return 0; -} - -static void tqTimerCleanUp(STQ* pTq) { - taosTmrCleanUp(pTq->tqTimer); -} - void tqDestroyTqHandle(void* data) { STqHandle* pData = (STqHandle*)data; qDestroyTask(pData->execHandle.task); @@ -118,7 +106,6 @@ int32_t tqInitialize(STQ* pTq) { return -1; } - tqTimerInit(pTq); return 0; } @@ -149,7 +136,6 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq->path); tqMetaClose(pTq); streamMetaClose(pTq->pStreamMeta); - tqTimerCleanUp(pTq); qDebug("end to close tq"); taosMemoryFree(pTq); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 74aea0cdfb..cdb5cc26f8 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -95,10 +95,14 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { pParam->pTq = pTq; pParam->numOfTasks = numOfTasks; + + tmr_h pTimer = streamTimerGetInstance(); + ASSERT(pTimer); + if (pMeta->scanInfo.scanTimer == NULL) { - pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTq->tqTimer); + pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTimer); } else { - taosTmrReset(doStartScanWal, idleDuration, pParam, pTq->tqTimer, &pMeta->scanInfo.scanTimer); + taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 75af5a9fc5..c3373c1e7f 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -35,6 +35,10 @@ void streamTimerCleanUp() { streamTimer = NULL; } +tmr_h streamTimerGetInstance() { + return streamTimer; +} + char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { char buf[128] = {0}; sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);