From 09b6642e459384ccd5ac140c92cf1a89bb013d6e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Feb 2025 12:33:32 +0800 Subject: [PATCH 01/12] refactor(stream): scan wal is driven by time, instead of insert events. --- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 24 +- source/dnode/vnode/src/tq/tqPush.c | 14 -- source/dnode/vnode/src/tq/tqStreamTask.c | 242 +++++++++++---------- source/dnode/vnode/src/tq/tqUtil.c | 4 + source/dnode/vnode/src/tqCommon/tqCommon.c | 10 +- source/libs/stream/src/streamHb.c | 2 +- source/libs/stream/src/streamMeta.c | 2 +- 8 files changed, 156 insertions(+), 144 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 5bf0a9b199..a76fc17377 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -239,7 +239,7 @@ void tqClose(STQ*); int tqPushMsg(STQ*, tmsg_t msgType); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); void tqUnregisterPushHandle(STQ* pTq, void* pHandle); -int tqScanWalAsync(STQ* pTq, bool ckPause); +int tqScanWalAsync(STQ* pTq); int32_t tqStopStreamTasksAsync(STQ* pTq); int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 119edb47bc..fc173eb691 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -920,12 +920,12 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask // now the fill-history task starts to scan data from wal files. code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); - if (code == TSDB_CODE_SUCCESS) { - code = tqScanWalAsync(pTq, false); - if (code) { - tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code)); - } - } +// if (code == TSDB_CODE_SUCCESS) { +// code = tqScanWalAsync(pTq, false); +// if (code) { +// tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code)); +// } +// } } } @@ -1122,12 +1122,12 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } // let's continue scan data in the wal files - if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) { - code = tqScanWalAsync(pTq, false); // it's ok to failed - if (code) { - tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); - } - } +// if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) { +// code = tqScanWalAsync(pTq, false); // it's ok to failed +// if (code) { +// tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); +// } +// } return code; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 2b2667773a..fc83343c99 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -49,20 +49,6 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) { } } - streamMetaRLock(pTq->pStreamMeta); - int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); - streamMetaRUnLock(pTq->pStreamMeta); - -// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks); - - // push data for stream processing: - // 1. the vnode has already been restored. - // 2. the vnode should be the leader. - // 3. the stream is not suspended yet. - if ((!tsDisableStream) && (numOfTasks > 0)) { - code = tqScanWalAsync(pTq, true); - } - return code; } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 9ea84830f1..33a32617ef 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -17,22 +17,20 @@ #include "vnd.h" #define MAX_REPEAT_SCAN_THRESHOLD 3 -#define SCAN_WAL_IDLE_DURATION 500 // idle for 500ms to do next wal scan +#define SCAN_WAL_IDLE_DURATION 500 // idle for 500ms to do next wal scan typedef struct SBuildScanWalMsgParam { int64_t metaId; - int32_t numOfTasks; - int8_t restored; +// int8_t restored; SMsgCb msgCb; } SBuildScanWalMsgParam; -static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta); +static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool taskReadyForDataFromWal(SStreamTask* pTask); static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc); -static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration); -static int32_t doScanWalAsync(STQ* pTq, bool ckPause); +static int32_t tqScanWalInFuture(STQ* pTq); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. int32_t tqScanWal(STQ* pTq) { @@ -40,44 +38,38 @@ int32_t tqScanWal(STQ* pTq) { int32_t vgId = pMeta->vgId; int64_t st = taosGetTimestampMs(); int32_t numOfTasks = 0; + int64_t el = 0; + int32_t code = 0; - tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter); - - // check all tasks - int32_t code = doScanWalForAllTasks(pMeta); - if (code) { - tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code)); + int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanCounter, 0, 1); + if (old == 0) { + tqDebug("vgId:%d try to scan wal to extract data", vgId); + } else { + tqDebug("vgId:%d already in wal scan, abort", vgId); return code; } - streamMetaWLock(pMeta); - int32_t times = (--pMeta->scanInfo.scanCounter); - if (times < 0) { - tqError("vgId:%d invalid scan counter:%d, reset to 0", vgId, times); - times = 0; - } - - numOfTasks = taosArrayGetSize(pMeta->pTaskList); - streamMetaWUnLock(pMeta); - - int64_t el = (taosGetTimestampMs() - st); - tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 " ms", vgId, el); - - if (times > 0) { - tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION); - code = tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION); - if (code) { - tqError("vgId:%d sched scan wal in %dms failed, ignore this failure", vgId, SCAN_WAL_IDLE_DURATION); - } + // check all tasks + code = doScanWalForAllTasks(pMeta, &numOfTasks); + + el = (taosGetTimestampMs() - st); + if (code) { + tqError("vgId:%d failed to scan wal for all tasks, try next time, elapsed time:%" PRId64 "ms code:%s", vgId, el, + tstrerror(code)); + } else { + tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms, next scan start in %dms", vgId, + el, SCAN_WAL_IDLE_DURATION); } + atomic_store_32(&pMeta->scanInfo.scanCounter, 0); return code; } static void doStartScanWal(void* param, void* tmrId) { - int32_t vgId = 0; - int32_t code = 0; - + int32_t vgId = 0; + int32_t code = 0; + SVnode* pVnode = NULL; + int32_t numOfTasks = 0; SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId); @@ -102,71 +94,93 @@ static void doStartScanWal(void* param, void* tmrId) { vgId = pMeta->vgId; - tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks, - pParam->restored); -#if 0 - // wait for the vnode is freed, and invalid read may occur. + if (pMeta->role == NODE_ROLE_FOLLOWER) { + tqDebug("vgId:%d not leader, role:%d not scan wal anymore", vgId, pMeta->role); + + code = taosReleaseRef(streamMetaRefPool, pParam->metaId); + if (code == TSDB_CODE_SUCCESS) { + tqDebug("vgId:%d jump out of scan wal timer since not leader", vgId); + } else { + tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId, + tstrerror(code)); + } + + taosMemFree(pParam); + return; + } + + if (pMeta->startInfo.startAllTasks) { + tqTrace("vgId:%d in restart procedure, not ready to scan wal", vgId); + goto _end; + } + + streamMetaRLock(pMeta); + numOfTasks = taosArrayGetSize(pMeta->pTaskList); + streamMetaRUnLock(pMeta); + + if (numOfTasks == 0) { + goto _end; + } + + tqTrace("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks); + + // #if 0 + // wait for the vnode is freed, and invalid read may occur. taosMsleep(10000); -#endif + // #endif code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); if (code) { tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); } +_end: + streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, &pMeta->scanInfo.scanTimer, &pMeta->scanInfo.scanTimer, + vgId, "scan-wal"); + code = taosReleaseRef(streamMetaRefPool, pParam->metaId); if (code) { tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId, tstrerror(code)); } - - taosMemoryFree(pParam); } -int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { +int32_t tqScanWalAsync(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t code = 0; int32_t vgId = TD_VID(pTq->pVnode); tmr_h pTimer = NULL; SBuildScanWalMsgParam* pParam = NULL; + // 1. the vnode should be the leader. + // 2. the stream isn't disabled + if ((pMeta->role == NODE_ROLE_FOLLOWER) || tsDisableStream) { + tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId); + return TSDB_CODE_SUCCESS; + } + pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam)); if (pParam == NULL) { + tqError("vgId:%d failed to start scan wal, stream not executes, code:%s", vgId, tstrerror(code)); return terrno; } pParam->metaId = pMeta->rid; - pParam->numOfTasks = numOfTasks; - pParam->restored = pTq->pVnode->restored; pParam->msgCb = pTq->pVnode->msgCb; code = streamTimerGetInstance(&pTimer); if (code) { - tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId); + tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal", vgId); taosMemoryFree(pParam); } else { - streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut"); + // todo: start in 1sec for the first time + streamTmrStart(doStartScanWal, 1000, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, + "scan-wal"); } return code; } -int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { - SStreamMeta* pMeta = pTq->pStreamMeta; - bool alreadyRestored = pTq->pVnode->restored; - int32_t code = 0; - - // do not launch the stream tasks, if it is a follower or not restored vnode. - if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) { - return TSDB_CODE_SUCCESS; - } - - streamMetaWLock(pMeta); - code = doScanWalAsync(pTq, ckPause); - streamMetaWUnLock(pMeta); - return code; -} - int32_t tqStopStreamTasksAsync(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = pMeta->vgId; @@ -347,13 +361,10 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt return code; } -int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) { +int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks) { int32_t vgId = pStreamMeta->vgId; SArray* pTaskList = NULL; - int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList); - if (numOfTasks == 0) { - return TSDB_CODE_SUCCESS; - } + int32_t numOfTasks = 0; // clone the task list, to avoid the task update during scan wal files streamMetaWLock(pStreamMeta); @@ -364,10 +375,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) { return terrno; } - tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks); - // update the new task number numOfTasks = taosArrayGetSize(pTaskList); + if (pNumOfTasks != NULL) { + *pNumOfTasks = numOfTasks; + } + + tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks); for (int32_t i = 0; i < numOfTasks; ++i) { STaskId* pTaskId = taosArrayGet(pTaskList, i); @@ -426,51 +440,59 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) { return TSDB_CODE_SUCCESS; } -int32_t doScanWalAsync(STQ* pTq, bool ckPause) { - SStreamMeta* pMeta = pTq->pStreamMeta; - bool alreadyRestored = pTq->pVnode->restored; - int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - - if (numOfTasks == 0) { - tqDebug("vgId:%d no stream tasks existed to run", vgId); - return 0; - } - - if (pMeta->startInfo.startAllTasks) { - tqTrace("vgId:%d in restart procedure, not scan wal", vgId); - return 0; - } - - pMeta->scanInfo.scanCounter += 1; - if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) { - pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD; - } - - if (pMeta->scanInfo.scanCounter > 1) { - tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter); - return 0; - } - - int32_t numOfPauseTasks = pMeta->numOfPausedTasks; - if (ckPause && numOfTasks == numOfPauseTasks) { - tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId); - - // reset the counter value, since we do not launch the scan wal operation. - pMeta->scanInfo.scanCounter = 0; - return 0; - } - - tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId, - numOfTasks, alreadyRestored); - - return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); -} +// int32_t doScanWalAsync(STQ* pTq) { +// SStreamMeta* pMeta = pTq->pStreamMeta; +// bool alreadyRestored = pTq->pVnode->restored; +// int32_t vgId = pMeta->vgId; +// int32_t code = 0; +// int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); +// +// // if (numOfTasks == 0) { +//// tqDebug("vgId:%d no stream tasks existed to run", vgId); +//// return 0; +//// } +// +//// if (pMeta->startInfo.startAllTasks) { +//// tqTrace("vgId:%d in restart procedure, not scan wal", vgId); +//// return 0; +//// } +// +//// pMeta->scanInfo.scanCounter += 1; +//// if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) { +//// pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD; +//// } +// +//// if (pMeta->scanInfo.scanCounter > 1) { +//// tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter); +//// return 0; +//// } +// +//// int32_t numOfPauseTasks = pMeta->numOfPausedTasks; +//// if (ckPause && numOfTasks == numOfPauseTasks) { +//// tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId); +//// +//// reset the counter value, since we do not launch the scan wal operation. +//// pMeta->scanInfo.scanCounter = 0; +//// return 0; +//// } +// +//// tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId, +//// numOfTasks, alreadyRestored); +//// +//// code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); +//// if (code) { +//// tqError("vgId:%d failed create msg to scan data in wal, retry in %dms", vgId, SCAN_WAL_IDLE_DURATION); +//// } +// +// code = tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION); +// if (code) { +// tqError("vgId:%d sched scan wal in %dms failed, ignore this failure", vgId, SCAN_WAL_IDLE_DURATION); +// } +//} void streamMetaFreeTQDuringScanWalError(STQ* pTq) { SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam)); p->metaId = pTq->pStreamMeta->rid; - p->numOfTasks = 0; doStartScanWal(p, 0); } \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 197a45cdb9..6f001981fb 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -47,6 +47,10 @@ END: void tqUpdateNodeStage(STQ* pTq, bool isLeader) { SSyncState state = syncGetState(pTq->pVnode->sync); streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader); + + if (isLeader) { + tqScanWalAsync(pTq); + } } static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 77632d328e..7fc2881881 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -938,10 +938,10 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { streamMetaWUnLock(pMeta); - if (scanWal && (vgId != SNODE_HANDLE)) { - tqDebug("vgId:%d start scan wal for executing tasks", vgId); - code = tqScanWalAsync(pMeta->ahandle, true); - } +// if (scanWal && (vgId != SNODE_HANDLE)) { +// tqDebug("vgId:%d start scan wal for executing tasks", vgId); +// code = tqScanWalAsync(pMeta->ahandle, true); +// } return code; } @@ -1170,7 +1170,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t pTask->hTaskInfo.operatorOpen = false; code = streamStartScanHistoryAsync(pTask, igUntreated); } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) { - code = tqScanWalAsync((STQ*)handle, false); +// code = tqScanWalAsync((STQ*)handle, false); } else { code = streamTrySchedExec(pTask); } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 7c157bb05e..36a6488709 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -341,7 +341,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) { if (code == TSDB_CODE_SUCCESS) { stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role); } else { - stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, vgId, role, rid); + stError("vgId:%d role:%d not leader not send hb to mnode, failed to release meta rid:%" PRId64, vgId, role, rid); } // taosMemoryFree(param); return; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ddd0201460..733bae1097 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1320,7 +1320,7 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) // mark the sign to send msg before close all tasks // 1. for leader vnode, always send msg before closing - // 2. for follower vnode, if it's is changed from leader, also sending msg before closing. + // 2. for follower vnode, if it's changed from leader, also sending msg before closing. if (pMeta->role == NODE_ROLE_LEADER) { pMeta->sendMsgBeforeClosing = true; } From b5fdb441937eb46967eb6ebadf218960ba7ea116 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Feb 2025 12:35:37 +0800 Subject: [PATCH 02/12] refactor(stream): remove comments. --- source/dnode/vnode/src/tq/tqStreamTask.c | 50 ------------------------ 1 file changed, 50 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 33a32617ef..72749d31c9 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -440,56 +440,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks) { return TSDB_CODE_SUCCESS; } -// int32_t doScanWalAsync(STQ* pTq) { -// SStreamMeta* pMeta = pTq->pStreamMeta; -// bool alreadyRestored = pTq->pVnode->restored; -// int32_t vgId = pMeta->vgId; -// int32_t code = 0; -// int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); -// -// // if (numOfTasks == 0) { -//// tqDebug("vgId:%d no stream tasks existed to run", vgId); -//// return 0; -//// } -// -//// if (pMeta->startInfo.startAllTasks) { -//// tqTrace("vgId:%d in restart procedure, not scan wal", vgId); -//// return 0; -//// } -// -//// pMeta->scanInfo.scanCounter += 1; -//// if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) { -//// pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD; -//// } -// -//// if (pMeta->scanInfo.scanCounter > 1) { -//// tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter); -//// return 0; -//// } -// -//// int32_t numOfPauseTasks = pMeta->numOfPausedTasks; -//// if (ckPause && numOfTasks == numOfPauseTasks) { -//// tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId); -//// -//// reset the counter value, since we do not launch the scan wal operation. -//// pMeta->scanInfo.scanCounter = 0; -//// return 0; -//// } -// -//// tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId, -//// numOfTasks, alreadyRestored); -//// -//// code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); -//// if (code) { -//// tqError("vgId:%d failed create msg to scan data in wal, retry in %dms", vgId, SCAN_WAL_IDLE_DURATION); -//// } -// -// code = tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION); -// if (code) { -// tqError("vgId:%d sched scan wal in %dms failed, ignore this failure", vgId, SCAN_WAL_IDLE_DURATION); -// } -//} - void streamMetaFreeTQDuringScanWalError(STQ* pTq) { SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam)); p->metaId = pTq->pStreamMeta->rid; From 9bd8b532ad1b454df2148b521948ed9e146163d0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Feb 2025 12:33:32 +0800 Subject: [PATCH 03/12] fix(stream): update function return value. --- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tqStreamTask.c | 4 +--- source/dnode/vnode/src/tq/tqUtil.c | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index a76fc17377..6b3cf47d13 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -239,7 +239,7 @@ void tqClose(STQ*); int tqPushMsg(STQ*, tmsg_t msgType); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); void tqUnregisterPushHandle(STQ* pTq, void* pHandle); -int tqScanWalAsync(STQ* pTq); +void tqScanWalAsync(STQ* pTq); int32_t tqStopStreamTasksAsync(STQ* pTq); int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 72749d31c9..aa8151d7c2 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -145,7 +145,7 @@ _end: } } -int32_t tqScanWalAsync(STQ* pTq) { +void tqScanWalAsync(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t code = 0; int32_t vgId = TD_VID(pTq->pVnode); @@ -177,8 +177,6 @@ int32_t tqScanWalAsync(STQ* pTq) { streamTmrStart(doStartScanWal, 1000, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal"); } - - return code; } int32_t tqStopStreamTasksAsync(STQ* pTq) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 6f001981fb..217859394a 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -49,7 +49,7 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) { streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader); if (isLeader) { - tqScanWalAsync(pTq); + int32_t code = tqScanWalAsync(pTq); } } From 6c92474afd67f863dc86d06aa8253fb7693645db Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Feb 2025 14:16:46 +0800 Subject: [PATCH 04/12] fix(stream): fix syntax error. --- source/dnode/vnode/src/tq/tqStreamTask.c | 4 ++-- source/dnode/vnode/src/tq/tqUtil.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index aa8151d7c2..d4b4e3e90e 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -156,13 +156,13 @@ void tqScanWalAsync(STQ* pTq) { // 2. the stream isn't disabled if ((pMeta->role == NODE_ROLE_FOLLOWER) || tsDisableStream) { tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId); - return TSDB_CODE_SUCCESS; + return; } pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam)); if (pParam == NULL) { tqError("vgId:%d failed to start scan wal, stream not executes, code:%s", vgId, tstrerror(code)); - return terrno; + return; } pParam->metaId = pMeta->rid; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 217859394a..6f001981fb 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -49,7 +49,7 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) { streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader); if (isLeader) { - int32_t code = tqScanWalAsync(pTq); + tqScanWalAsync(pTq); } } From a8d50f28a60d7f8a146487422ad686320ecb4f8b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Feb 2025 14:19:44 +0800 Subject: [PATCH 05/12] fix(stream): disable error injection. --- source/dnode/vnode/src/tq/tqStreamTask.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index d4b4e3e90e..06b496f136 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -124,10 +124,10 @@ static void doStartScanWal(void* param, void* tmrId) { tqTrace("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks); - // #if 0 + #if 0 // wait for the vnode is freed, and invalid read may occur. taosMsleep(10000); - // #endif + #endif code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); if (code) { From 034c907ee148e8eb95523f6e9603b64073c92e74 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 23 Feb 2025 01:38:05 +0800 Subject: [PATCH 06/12] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 4 +- source/dnode/vnode/src/tq/tq.c | 19 ++----- source/dnode/vnode/src/tq/tqStreamTask.c | 66 ++++++++++++++++++------ source/libs/stream/src/streamHb.c | 4 +- source/libs/stream/src/streamMeta.c | 10 +++- 5 files changed, 68 insertions(+), 35 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a4d89dcdcc..4707ee959a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -483,8 +483,10 @@ typedef struct STaskUpdateInfo { } STaskUpdateInfo; typedef struct SScanWalInfo { - int32_t scanCounter; + int32_t scanSentinel; tmr_h scanTimer; + int64_t lastScanTs; + int32_t tickCounter; } SScanWalInfo; typedef struct SFatalErrInfo { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fc173eb691..63727e5c45 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1113,23 +1113,14 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // extracted submit data from wal files for all tasks if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { return tqScanWal(pTq); - } + } else { + code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); + if (code) { + tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code)); + } - code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); - if (code) { - tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code)); return code; } - - // let's continue scan data in the wal files -// if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) { -// code = tqScanWalAsync(pTq, false); // it's ok to failed -// if (code) { -// tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); -// } -// } - - return code; } int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 06b496f136..08c0be49bc 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -16,12 +16,11 @@ #include "tq.h" #include "vnd.h" -#define MAX_REPEAT_SCAN_THRESHOLD 3 -#define SCAN_WAL_IDLE_DURATION 500 // idle for 500ms to do next wal scan +#define SCAN_WAL_IDLE_DURATION 250 // idle for 500ms to do next wal scan +#define SCAN_WAL_WAIT_COUNT 2 typedef struct SBuildScanWalMsgParam { int64_t metaId; -// int8_t restored; SMsgCb msgCb; } SBuildScanWalMsgParam; @@ -30,7 +29,6 @@ static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool taskReadyForDataFromWal(SStreamTask* pTask); static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc); -static int32_t tqScanWalInFuture(STQ* pTq); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. int32_t tqScanWal(STQ* pTq) { @@ -41,7 +39,7 @@ int32_t tqScanWal(STQ* pTq) { int64_t el = 0; int32_t code = 0; - int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanCounter, 0, 1); + int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanSentinel, 0, 1); if (old == 0) { tqDebug("vgId:%d try to scan wal to extract data", vgId); } else { @@ -49,27 +47,44 @@ int32_t tqScanWal(STQ* pTq) { return code; } + // the scan wal interval less than 200, not scan, actually. + if ((pMeta->scanInfo.lastScanTs > st) && (pMeta->scanInfo.lastScanTs - st < 200)) { + tqDebug("vgId:%d scan wal less than 200ms, do nothing", vgId); + atomic_store_32(&pMeta->scanInfo.scanSentinel, 0); + return code; + } + // check all tasks code = doScanWalForAllTasks(pMeta, &numOfTasks); - el = (taosGetTimestampMs() - st); + pMeta->scanInfo.lastScanTs = taosGetTimestampMs(); + el = (pMeta->scanInfo.lastScanTs - st); + if (code) { tqError("vgId:%d failed to scan wal for all tasks, try next time, elapsed time:%" PRId64 "ms code:%s", vgId, el, tstrerror(code)); } else { - tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms, next scan start in %dms", vgId, - el, SCAN_WAL_IDLE_DURATION); + tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms", vgId, el); } - atomic_store_32(&pMeta->scanInfo.scanCounter, 0); + atomic_store_32(&pMeta->scanInfo.scanSentinel, 0); return code; } +static bool waitEnoughDuration(SStreamMeta* pMeta) { + if ((++pMeta->scanInfo.tickCounter) >= SCAN_WAL_WAIT_COUNT) { + pMeta->scanInfo.tickCounter = 0; + return true; + } + + return false; +} + static void doStartScanWal(void* param, void* tmrId) { int32_t vgId = 0; int32_t code = 0; - SVnode* pVnode = NULL; int32_t numOfTasks = 0; + tmr_h pTimer = NULL; SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId); @@ -79,10 +94,12 @@ static void doStartScanWal(void* param, void* tmrId) { return; } + vgId = pMeta->vgId; + if (pMeta->closeFlag) { code = taosReleaseRef(streamMetaRefPool, pParam->metaId); if (code == TSDB_CODE_SUCCESS) { - tqDebug("vgId:%d jump out of scan wal timer since closed", vgId); + tqInfo("vgId:%d jump out of scan wal timer since closed", vgId); } else { tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId, tstrerror(code)); @@ -92,9 +109,7 @@ static void doStartScanWal(void* param, void* tmrId) { return; } - vgId = pMeta->vgId; - - if (pMeta->role == NODE_ROLE_FOLLOWER) { + if (pMeta->role != NODE_ROLE_LEADER) { tqDebug("vgId:%d not leader, role:%d not scan wal anymore", vgId, pMeta->role); code = taosReleaseRef(streamMetaRefPool, pParam->metaId); @@ -114,6 +129,24 @@ static void doStartScanWal(void* param, void* tmrId) { goto _end; } + code = streamTimerGetInstance(&pTimer); + if (code) { + tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal, not scan wal, code:%s", vgId, tstrerror(code)); + taosMemoryFree(pParam); + return; + } + + if (!waitEnoughDuration(pMeta)) { + streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, + "scan-wal"); + code = taosReleaseRef(streamMetaRefPool, pParam->metaId); + if (code) { + tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId, + tstrerror(code)); + } + return; + } + streamMetaRLock(pMeta); numOfTasks = taosArrayGetSize(pMeta->pTaskList); streamMetaRUnLock(pMeta); @@ -135,8 +168,9 @@ static void doStartScanWal(void* param, void* tmrId) { } _end: - streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, &pMeta->scanInfo.scanTimer, &pMeta->scanInfo.scanTimer, - vgId, "scan-wal"); + + streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal"); + tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT); code = taosReleaseRef(streamMetaRefPool, pParam->metaId); if (code) { diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 36a6488709..90ebd47ac6 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -327,7 +327,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) { pMeta->pHbInfo->hbStart = 0; code = taosReleaseRef(streamMetaRefPool, rid); if (code == TSDB_CODE_SUCCESS) { - stDebug("vgId:%d jump out of meta timer", vgId); + stInfo("vgId:%d jump out of meta timer since closed", vgId); } else { stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid); } @@ -413,7 +413,7 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo) { void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) { // wait for the stream meta hb function stopping if (pMeta->role == NODE_ROLE_LEADER) { - taosMsleep(2 * META_HB_CHECK_INTERVAL); + taosMsleep(3 * META_HB_CHECK_INTERVAL); stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 733bae1097..7bb9f5db7b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -427,7 +427,7 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId)); TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno); - pMeta->scanInfo.scanCounter = 0; + pMeta->scanInfo.scanSentinel = 0; pMeta->vgId = vgId; pMeta->ahandle = ahandle; pMeta->buildTaskFn = buildTaskFn; @@ -1213,8 +1213,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount); // wait for the stream meta hb function stopping - streamMetaWaitForHbTmrQuit(pMeta); pMeta->closeFlag = true; + streamMetaWaitForHbTmrQuit(pMeta); stDebug("vgId:%d start to check all tasks for closing", vgId); int64_t st = taosGetTimestampMs(); @@ -1253,6 +1253,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { double el = (taosGetTimestampMs() - st) / 1000.0; stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el); + + if (pMeta->scanInfo.scanTimer != NULL) { + streamTmrStop(pMeta->scanInfo.scanTimer); + pMeta->scanInfo.scanTimer = NULL; + } + streamMetaRUnLock(pMeta); } From e378b5cb8ae212a0e4744ea64f17a2eb7ba2a817 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 23 Feb 2025 03:42:44 +0800 Subject: [PATCH 07/12] fix(stream): adjust scan timer --- source/dnode/vnode/src/tq/tqStreamTask.c | 3 +-- source/libs/stream/src/streamMeta.c | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 08c0be49bc..68e895684e 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -207,8 +207,7 @@ void tqScanWalAsync(STQ* pTq) { tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal", vgId); taosMemoryFree(pParam); } else { - // todo: start in 1sec for the first time - streamTmrStart(doStartScanWal, 1000, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, + streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal"); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7bb9f5db7b..eff4783072 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1336,11 +1336,11 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) if (isLeader) { stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64, - pMeta->vgId, prevStage, stage, isLeader, pMeta->rid); + pMeta->vgId, stage, prevStage, isLeader, pMeta->rid); streamMetaStartHb(pMeta); } else { stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId, - prevStage, stage, isLeader, pMeta->sendMsgBeforeClosing); + stage, prevStage, isLeader, pMeta->sendMsgBeforeClosing); } } From 00edb8e6124e83419508437011193120d9b29eaa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 23 Feb 2025 16:25:17 +0800 Subject: [PATCH 08/12] fix(stream): adjust init tmr position. --- source/dnode/vnode/src/tq/tqStreamTask.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 68e895684e..640b6bdc63 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -95,6 +95,12 @@ static void doStartScanWal(void* param, void* tmrId) { } vgId = pMeta->vgId; + code = streamTimerGetInstance(&pTimer); + if (code) { + tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal, not scan wal, code:%s", vgId, tstrerror(code)); + taosMemoryFree(pParam); + return; + } if (pMeta->closeFlag) { code = taosReleaseRef(streamMetaRefPool, pParam->metaId); @@ -129,13 +135,6 @@ static void doStartScanWal(void* param, void* tmrId) { goto _end; } - code = streamTimerGetInstance(&pTimer); - if (code) { - tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal, not scan wal, code:%s", vgId, tstrerror(code)); - taosMemoryFree(pParam); - return; - } - if (!waitEnoughDuration(pMeta)) { streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal"); From 8bcc8399e63e96402920709dac060c2537ccb4d9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 23 Feb 2025 15:36:26 +0800 Subject: [PATCH 09/12] fix(stream): reset value. --- source/libs/stream/src/streamMeta.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index eff4783072..8b0db33536 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -428,6 +428,9 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno); pMeta->scanInfo.scanSentinel = 0; + pMeta->scanInfo.lastScanTs = 0; + pMeta->scanInfo.tickCounter = 0; + pMeta->vgId = vgId; pMeta->ahandle = ahandle; pMeta->buildTaskFn = buildTaskFn; From 514a63ab37f558cd9645956255ad243fa60593c9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 24 Feb 2025 00:40:16 +0800 Subject: [PATCH 10/12] fix(stream): fix the error in check scan interval --- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 640b6bdc63..b34ea78f64 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -48,7 +48,7 @@ int32_t tqScanWal(STQ* pTq) { } // the scan wal interval less than 200, not scan, actually. - if ((pMeta->scanInfo.lastScanTs > st) && (pMeta->scanInfo.lastScanTs - st < 200)) { + if ((pMeta->scanInfo.lastScanTs > 0) && (st - pMeta->scanInfo.lastScanTs < 200)) { tqDebug("vgId:%d scan wal less than 200ms, do nothing", vgId); atomic_store_32(&pMeta->scanInfo.scanSentinel, 0); return code; From 97071f1d291d9a526e1373ade03a2a9e61e63e9b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 24 Feb 2025 09:42:15 +0800 Subject: [PATCH 11/12] other: add some logs. --- source/dnode/mnode/impl/src/mndStream.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 11fd2d4f65..18919b3ace 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2613,7 +2613,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { int64_t now = taosGetTimestampMs(); bool allReady = true; SArray *pNodeSnapshot = NULL; - int32_t maxAllowedTrans = 50; + int32_t maxAllowedTrans = 20; int32_t numOfTrans = 0; int32_t code = 0; void *pIter = NULL; @@ -2700,6 +2700,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { return TSDB_CODE_FAILED; } + // todo: check for redundant consensus-checkpoint trans, if this kinds of trans repeatly failed. code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid); From d279aaa22cd1111aa49db7fb63c99886bf67af94 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 27 Feb 2025 11:44:50 +0800 Subject: [PATCH 12/12] test(stream): sleep a little bit longer. --- tests/script/tsim/stream/tag.sim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/tsim/stream/tag.sim b/tests/script/tsim/stream/tag.sim index f293f4ac05..9f4c62e747 100644 --- a/tests/script/tsim/stream/tag.sim +++ b/tests/script/tsim/stream/tag.sim @@ -26,7 +26,7 @@ sql insert into t1 values(1648791223000,0,1,1,1.0); sql insert into t1 values(1648791223001,9,2,2,1.1); sql insert into t1 values(1648791223009,0,3,3,1.0); -sleep 300 +sleep 1000 sql select * from streamt; if $data01 != 3 then