From 1912de9b498ab510d84ee4e6ba7ac18243efda73 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 5 Jan 2024 14:49:33 +0800 Subject: [PATCH] enh(stream): scan wal by using timer to trigger it. --- include/libs/stream/tstream.h | 7 +- source/dnode/vnode/src/inc/tq.h | 1 + source/dnode/vnode/src/tq/tq.c | 26 +++++-- source/dnode/vnode/src/tq/tqStreamTask.c | 98 ++++++++++++++++++------ source/libs/stream/src/streamMeta.c | 2 +- 5 files changed, 99 insertions(+), 35 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 47c5149795..2998c6d7b1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -482,6 +482,11 @@ typedef struct STaskUpdateInfo { int32_t transId; } STaskUpdateInfo; +typedef struct SScanWalInfo { + int32_t scanCounter; + tmr_h scanTimer; +} SScanWalInfo; + // meta typedef struct SStreamMeta { char* path; @@ -499,7 +504,7 @@ typedef struct SStreamMeta { bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower. STaskStartInfo startInfo; TdThreadRwlock lock; - int32_t walScanCounter; + SScanWalInfo scanInfo; void* streamBackend; int64_t streamBackendRid; SHashObj* pTaskDbUnique; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 0ef29fcb3a..e4c673f533 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -42,6 +42,7 @@ extern "C" { // clang-format on typedef struct STqOffsetStore STqOffsetStore; +extern void* tqTimer; #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8cd16bbe60..33e9d97f8d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -19,13 +19,25 @@ // 0: not init // 1: already inited -// 2: wait to be inited or cleaup +// 2: wait to be inited or cleanup static int32_t tqInitialize(STQ* pTq); static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; } static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; } static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; } +int32_t tqTimerInit() { + tqTimer = taosTmrInit(100, 100, 1000, "TQ"); + if (tqTimer == NULL) { + return -1; + } + return 0; +} + +void tqTimerCleanUp() { + taosTmrCleanUp(tqTimer); +} + void tqDestroyTqHandle(void* data) { STqHandle* pData = (STqHandle*)data; qDestroyTask(pData->execHandle.task); @@ -106,6 +118,7 @@ int32_t tqInitialize(STQ* pTq) { return -1; } + tqTimerInit(); return 0; } @@ -136,6 +149,8 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq->path); tqMetaClose(pTq); streamMetaClose(pTq->pStreamMeta); + tqTimerCleanUp(); + qDebug("end to close tq"); taosMemoryFree(pTq); } @@ -1055,7 +1070,8 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; - if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { // all tasks are extracted submit data from the wal + // extracted submit data from wal files for all tasks + if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { tqScanWal(pTq); return 0; } @@ -1351,14 +1367,8 @@ int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) { } taosThreadMutexLock(&pTask->lock); - ETaskStatus status = streamTaskGetStatus(pTask)->state; -// if (status == TASK_STATUS__STREAM_SCAN_HISTORY) { -// streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); -// } - SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id); - taosThreadMutexUnlock(&pTask->lock); // clear the scheduler status diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 12f540b9cf..95a4474460 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -19,11 +19,14 @@ #define MAX_REPEAT_SCAN_THRESHOLD 3 #define SCAN_WAL_IDLE_DURATION 100 +void* tqTimer = NULL; + static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool taskReadyForDataFromWal(SStreamTask* pTask); static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems); +static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. int32_t tqScanWal(STQ* pTq) { @@ -31,33 +34,78 @@ int32_t tqScanWal(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int64_t st = taosGetTimestampMs(); - while (1) { - tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, pMeta->walScanCounter); + tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter); - // check all tasks - bool shouldIdle = true; - doScanWalForAllTasks(pMeta, &shouldIdle); + // check all tasks + int32_t numOfTasks = 0; + bool shouldIdle = true; + doScanWalForAllTasks(pMeta, &shouldIdle); - streamMetaWLock(pMeta); - int32_t times = (--pMeta->walScanCounter); - ASSERT(pMeta->walScanCounter >= 0); - streamMetaWUnLock(pMeta); + streamMetaWLock(pMeta); + int32_t times = (--pMeta->scanInfo.scanCounter); + ASSERT(pMeta->scanInfo.scanCounter >= 0); - if (times > 0) { - tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION); - } else { // times <= 0 - break; - } - - // todo: remove the sleep - taosMsleep(SCAN_WAL_IDLE_DURATION); - } + numOfTasks = taosArrayGetSize(pMeta->pTaskList); + streamMetaWUnLock(pMeta); int64_t el = (taosGetTimestampMs() - st); tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 " ms", vgId, el); + + if (times > 0) { + tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION); + tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION); + } return 0; } +typedef struct SBuildScanWalMsgParam { + STQ* pTq; + int32_t numOfTasks; +} SBuildScanWalMsgParam; + +static void doStartScanWal(void* param, void* tmrId) { + SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*) param; + + int32_t vgId = pParam->pTq->pStreamMeta->vgId; + + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + taosMemoryFree(pParam); + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); + return; + } + + tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks, + pParam->pTq->pVnode->restored); + + pRunReq->head.vgId = vgId; + pRunReq->streamId = 0; + pRunReq->taskId = 0; + pRunReq->reqType = STREAM_EXEC_T_EXTRACT_WAL_DATA; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(&pParam->pTq->pVnode->msgCb, STREAM_QUEUE, &msg); + + taosMemoryFree(pParam); +} + +int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { + SStreamMeta* pMeta = pTq->pStreamMeta; + + SBuildScanWalMsgParam* pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam)); + + pParam->pTq = pTq; + pParam->numOfTasks = numOfTasks; + if (pMeta->scanInfo.scanTimer == NULL) { + pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, tqTimer); + } else { + taosTmrReset(doStartScanWal, idleDuration, pParam, tqTimer, &pMeta->scanInfo.scanTimer); + } + + return TSDB_CODE_SUCCESS; +} + int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -78,23 +126,23 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return 0; } - pMeta->walScanCounter += 1; - if (pMeta->walScanCounter > MAX_REPEAT_SCAN_THRESHOLD) { - pMeta->walScanCounter = MAX_REPEAT_SCAN_THRESHOLD; + pMeta->scanInfo.scanCounter += 1; + if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) { + pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD; } - if (pMeta->walScanCounter > 1) { - tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter); + if (pMeta->scanInfo.scanCounter > 1) { + tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter); streamMetaWUnLock(pMeta); return 0; } - int32_t numOfPauseTasks = pTq->pStreamMeta->numOfPausedTasks; + int32_t numOfPauseTasks = pMeta->numOfPausedTasks; if (ckPause && numOfTasks == numOfPauseTasks) { tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId); // reset the counter value, since we do not launch the scan wal operation. - pMeta->walScanCounter = 0; + pMeta->scanInfo.scanCounter = 0; streamMetaWUnLock(pMeta); return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 16a9fb3274..bf97bee5f2 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -359,7 +359,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - pMeta->walScanCounter = 0; + pMeta->scanInfo.scanCounter = 0; pMeta->vgId = vgId; pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc;