diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a4d89dcdcc..4707ee959a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -483,8 +483,10 @@ typedef struct STaskUpdateInfo { } STaskUpdateInfo; typedef struct SScanWalInfo { - int32_t scanCounter; + int32_t scanSentinel; tmr_h scanTimer; + int64_t lastScanTs; + int32_t tickCounter; } SScanWalInfo; typedef struct SFatalErrInfo { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fc173eb691..63727e5c45 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1113,23 +1113,14 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // extracted submit data from wal files for all tasks if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { return tqScanWal(pTq); - } + } else { + code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); + if (code) { + tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code)); + } - code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); - if (code) { - tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code)); return code; } - - // let's continue scan data in the wal files -// if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) { -// code = tqScanWalAsync(pTq, false); // it's ok to failed -// if (code) { -// tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); -// } -// } - - return code; } int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 06b496f136..08c0be49bc 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -16,12 +16,11 @@ #include "tq.h" #include "vnd.h" -#define MAX_REPEAT_SCAN_THRESHOLD 3 -#define SCAN_WAL_IDLE_DURATION 500 // idle for 500ms to do next wal scan +#define SCAN_WAL_IDLE_DURATION 250 // idle for 500ms to do next wal scan +#define SCAN_WAL_WAIT_COUNT 2 typedef struct SBuildScanWalMsgParam { int64_t metaId; -// int8_t restored; SMsgCb msgCb; } SBuildScanWalMsgParam; @@ -30,7 +29,6 @@ static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool taskReadyForDataFromWal(SStreamTask* pTask); static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc); -static int32_t tqScanWalInFuture(STQ* pTq); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. int32_t tqScanWal(STQ* pTq) { @@ -41,7 +39,7 @@ int32_t tqScanWal(STQ* pTq) { int64_t el = 0; int32_t code = 0; - int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanCounter, 0, 1); + int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanSentinel, 0, 1); if (old == 0) { tqDebug("vgId:%d try to scan wal to extract data", vgId); } else { @@ -49,27 +47,44 @@ int32_t tqScanWal(STQ* pTq) { return code; } + // the scan wal interval less than 200, not scan, actually. + if ((pMeta->scanInfo.lastScanTs > st) && (pMeta->scanInfo.lastScanTs - st < 200)) { + tqDebug("vgId:%d scan wal less than 200ms, do nothing", vgId); + atomic_store_32(&pMeta->scanInfo.scanSentinel, 0); + return code; + } + // check all tasks code = doScanWalForAllTasks(pMeta, &numOfTasks); - el = (taosGetTimestampMs() - st); + pMeta->scanInfo.lastScanTs = taosGetTimestampMs(); + el = (pMeta->scanInfo.lastScanTs - st); + if (code) { tqError("vgId:%d failed to scan wal for all tasks, try next time, elapsed time:%" PRId64 "ms code:%s", vgId, el, tstrerror(code)); } else { - tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms, next scan start in %dms", vgId, - el, SCAN_WAL_IDLE_DURATION); + tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms", vgId, el); } - atomic_store_32(&pMeta->scanInfo.scanCounter, 0); + atomic_store_32(&pMeta->scanInfo.scanSentinel, 0); return code; } +static bool waitEnoughDuration(SStreamMeta* pMeta) { + if ((++pMeta->scanInfo.tickCounter) >= SCAN_WAL_WAIT_COUNT) { + pMeta->scanInfo.tickCounter = 0; + return true; + } + + return false; +} + static void doStartScanWal(void* param, void* tmrId) { int32_t vgId = 0; int32_t code = 0; - SVnode* pVnode = NULL; int32_t numOfTasks = 0; + tmr_h pTimer = NULL; SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId); @@ -79,10 +94,12 @@ static void doStartScanWal(void* param, void* tmrId) { return; } + vgId = pMeta->vgId; + if (pMeta->closeFlag) { code = taosReleaseRef(streamMetaRefPool, pParam->metaId); if (code == TSDB_CODE_SUCCESS) { - tqDebug("vgId:%d jump out of scan wal timer since closed", vgId); + tqInfo("vgId:%d jump out of scan wal timer since closed", vgId); } else { tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId, tstrerror(code)); @@ -92,9 +109,7 @@ static void doStartScanWal(void* param, void* tmrId) { return; } - vgId = pMeta->vgId; - - if (pMeta->role == NODE_ROLE_FOLLOWER) { + if (pMeta->role != NODE_ROLE_LEADER) { tqDebug("vgId:%d not leader, role:%d not scan wal anymore", vgId, pMeta->role); code = taosReleaseRef(streamMetaRefPool, pParam->metaId); @@ -114,6 +129,24 @@ static void doStartScanWal(void* param, void* tmrId) { goto _end; } + code = streamTimerGetInstance(&pTimer); + if (code) { + tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal, not scan wal, code:%s", vgId, tstrerror(code)); + taosMemoryFree(pParam); + return; + } + + if (!waitEnoughDuration(pMeta)) { + streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, + "scan-wal"); + code = taosReleaseRef(streamMetaRefPool, pParam->metaId); + if (code) { + tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId, + tstrerror(code)); + } + return; + } + streamMetaRLock(pMeta); numOfTasks = taosArrayGetSize(pMeta->pTaskList); streamMetaRUnLock(pMeta); @@ -135,8 +168,9 @@ static void doStartScanWal(void* param, void* tmrId) { } _end: - streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, &pMeta->scanInfo.scanTimer, &pMeta->scanInfo.scanTimer, - vgId, "scan-wal"); + + streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal"); + tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT); code = taosReleaseRef(streamMetaRefPool, pParam->metaId); if (code) { diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 36a6488709..90ebd47ac6 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -327,7 +327,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) { pMeta->pHbInfo->hbStart = 0; code = taosReleaseRef(streamMetaRefPool, rid); if (code == TSDB_CODE_SUCCESS) { - stDebug("vgId:%d jump out of meta timer", vgId); + stInfo("vgId:%d jump out of meta timer since closed", vgId); } else { stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid); } @@ -413,7 +413,7 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo) { void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) { // wait for the stream meta hb function stopping if (pMeta->role == NODE_ROLE_LEADER) { - taosMsleep(2 * META_HB_CHECK_INTERVAL); + taosMsleep(3 * META_HB_CHECK_INTERVAL); stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 733bae1097..7bb9f5db7b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -427,7 +427,7 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId)); TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno); - pMeta->scanInfo.scanCounter = 0; + pMeta->scanInfo.scanSentinel = 0; pMeta->vgId = vgId; pMeta->ahandle = ahandle; pMeta->buildTaskFn = buildTaskFn; @@ -1213,8 +1213,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount); // wait for the stream meta hb function stopping - streamMetaWaitForHbTmrQuit(pMeta); pMeta->closeFlag = true; + streamMetaWaitForHbTmrQuit(pMeta); stDebug("vgId:%d start to check all tasks for closing", vgId); int64_t st = taosGetTimestampMs(); @@ -1253,6 +1253,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { double el = (taosGetTimestampMs() - st) / 1000.0; stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el); + + if (pMeta->scanInfo.scanTimer != NULL) { + streamTmrStop(pMeta->scanInfo.scanTimer); + pMeta->scanInfo.scanTimer = NULL; + } + streamMetaRUnLock(pMeta); }