diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 5bf0a9b199..a76fc17377 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -239,7 +239,7 @@ void tqClose(STQ*); int tqPushMsg(STQ*, tmsg_t msgType); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); void tqUnregisterPushHandle(STQ* pTq, void* pHandle); -int tqScanWalAsync(STQ* pTq, bool ckPause); +int tqScanWalAsync(STQ* pTq); int32_t tqStopStreamTasksAsync(STQ* pTq); int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 119edb47bc..fc173eb691 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -920,12 +920,12 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask // now the fill-history task starts to scan data from wal files. code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); - if (code == TSDB_CODE_SUCCESS) { - code = tqScanWalAsync(pTq, false); - if (code) { - tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code)); - } - } +// if (code == TSDB_CODE_SUCCESS) { +// code = tqScanWalAsync(pTq, false); +// if (code) { +// tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code)); +// } +// } } } @@ -1122,12 +1122,12 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } // let's continue scan data in the wal files - if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) { - code = tqScanWalAsync(pTq, false); // it's ok to failed - if (code) { - tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); - } - } +// if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) { +// code = tqScanWalAsync(pTq, false); // it's ok to failed +// if (code) { +// tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); +// } +// } return code; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 2b2667773a..fc83343c99 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -49,20 +49,6 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) { } } - streamMetaRLock(pTq->pStreamMeta); - int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); - streamMetaRUnLock(pTq->pStreamMeta); - -// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks); - - // push data for stream processing: - // 1. the vnode has already been restored. - // 2. the vnode should be the leader. - // 3. the stream is not suspended yet. - if ((!tsDisableStream) && (numOfTasks > 0)) { - code = tqScanWalAsync(pTq, true); - } - return code; } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 9ea84830f1..33a32617ef 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -17,22 +17,20 @@ #include "vnd.h" #define MAX_REPEAT_SCAN_THRESHOLD 3 -#define SCAN_WAL_IDLE_DURATION 500 // idle for 500ms to do next wal scan +#define SCAN_WAL_IDLE_DURATION 500 // idle for 500ms to do next wal scan typedef struct SBuildScanWalMsgParam { int64_t metaId; - int32_t numOfTasks; - int8_t restored; +// int8_t restored; SMsgCb msgCb; } SBuildScanWalMsgParam; -static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta); +static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool taskReadyForDataFromWal(SStreamTask* pTask); static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc); -static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration); -static int32_t doScanWalAsync(STQ* pTq, bool ckPause); +static int32_t tqScanWalInFuture(STQ* pTq); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. int32_t tqScanWal(STQ* pTq) { @@ -40,44 +38,38 @@ int32_t tqScanWal(STQ* pTq) { int32_t vgId = pMeta->vgId; int64_t st = taosGetTimestampMs(); int32_t numOfTasks = 0; + int64_t el = 0; + int32_t code = 0; - tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter); - - // check all tasks - int32_t code = doScanWalForAllTasks(pMeta); - if (code) { - tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code)); + int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanCounter, 0, 1); + if (old == 0) { + tqDebug("vgId:%d try to scan wal to extract data", vgId); + } else { + tqDebug("vgId:%d already in wal scan, abort", vgId); return code; } - streamMetaWLock(pMeta); - int32_t times = (--pMeta->scanInfo.scanCounter); - if (times < 0) { - tqError("vgId:%d invalid scan counter:%d, reset to 0", vgId, times); - times = 0; - } - - numOfTasks = taosArrayGetSize(pMeta->pTaskList); - streamMetaWUnLock(pMeta); - - int64_t el = (taosGetTimestampMs() - st); - tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 " ms", vgId, el); - - if (times > 0) { - tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION); - code = tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION); - if (code) { - tqError("vgId:%d sched scan wal in %dms failed, ignore this failure", vgId, SCAN_WAL_IDLE_DURATION); - } + // check all tasks + code = doScanWalForAllTasks(pMeta, &numOfTasks); + + el = (taosGetTimestampMs() - st); + if (code) { + tqError("vgId:%d failed to scan wal for all tasks, try next time, elapsed time:%" PRId64 "ms code:%s", vgId, el, + tstrerror(code)); + } else { + tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms, next scan start in %dms", vgId, + el, SCAN_WAL_IDLE_DURATION); } + atomic_store_32(&pMeta->scanInfo.scanCounter, 0); return code; } static void doStartScanWal(void* param, void* tmrId) { - int32_t vgId = 0; - int32_t code = 0; - + int32_t vgId = 0; + int32_t code = 0; + SVnode* pVnode = NULL; + int32_t numOfTasks = 0; SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId); @@ -102,71 +94,93 @@ static void doStartScanWal(void* param, void* tmrId) { vgId = pMeta->vgId; - tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks, - pParam->restored); -#if 0 - // wait for the vnode is freed, and invalid read may occur. + if (pMeta->role == NODE_ROLE_FOLLOWER) { + tqDebug("vgId:%d not leader, role:%d not scan wal anymore", vgId, pMeta->role); + + code = taosReleaseRef(streamMetaRefPool, pParam->metaId); + if (code == TSDB_CODE_SUCCESS) { + tqDebug("vgId:%d jump out of scan wal timer since not leader", vgId); + } else { + tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId, + tstrerror(code)); + } + + taosMemFree(pParam); + return; + } + + if (pMeta->startInfo.startAllTasks) { + tqTrace("vgId:%d in restart procedure, not ready to scan wal", vgId); + goto _end; + } + + streamMetaRLock(pMeta); + numOfTasks = taosArrayGetSize(pMeta->pTaskList); + streamMetaRUnLock(pMeta); + + if (numOfTasks == 0) { + goto _end; + } + + tqTrace("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks); + + // #if 0 + // wait for the vnode is freed, and invalid read may occur. taosMsleep(10000); -#endif + // #endif code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); if (code) { tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); } +_end: + streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, &pMeta->scanInfo.scanTimer, &pMeta->scanInfo.scanTimer, + vgId, "scan-wal"); + code = taosReleaseRef(streamMetaRefPool, pParam->metaId); if (code) { tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId, tstrerror(code)); } - - taosMemoryFree(pParam); } -int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { +int32_t tqScanWalAsync(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t code = 0; int32_t vgId = TD_VID(pTq->pVnode); tmr_h pTimer = NULL; SBuildScanWalMsgParam* pParam = NULL; + // 1. the vnode should be the leader. + // 2. the stream isn't disabled + if ((pMeta->role == NODE_ROLE_FOLLOWER) || tsDisableStream) { + tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId); + return TSDB_CODE_SUCCESS; + } + pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam)); if (pParam == NULL) { + tqError("vgId:%d failed to start scan wal, stream not executes, code:%s", vgId, tstrerror(code)); return terrno; } pParam->metaId = pMeta->rid; - pParam->numOfTasks = numOfTasks; - pParam->restored = pTq->pVnode->restored; pParam->msgCb = pTq->pVnode->msgCb; code = streamTimerGetInstance(&pTimer); if (code) { - tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId); + tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal", vgId); taosMemoryFree(pParam); } else { - streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut"); + // todo: start in 1sec for the first time + streamTmrStart(doStartScanWal, 1000, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, + "scan-wal"); } return code; } -int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { - SStreamMeta* pMeta = pTq->pStreamMeta; - bool alreadyRestored = pTq->pVnode->restored; - int32_t code = 0; - - // do not launch the stream tasks, if it is a follower or not restored vnode. - if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) { - return TSDB_CODE_SUCCESS; - } - - streamMetaWLock(pMeta); - code = doScanWalAsync(pTq, ckPause); - streamMetaWUnLock(pMeta); - return code; -} - int32_t tqStopStreamTasksAsync(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = pMeta->vgId; @@ -347,13 +361,10 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt return code; } -int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) { +int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks) { int32_t vgId = pStreamMeta->vgId; SArray* pTaskList = NULL; - int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList); - if (numOfTasks == 0) { - return TSDB_CODE_SUCCESS; - } + int32_t numOfTasks = 0; // clone the task list, to avoid the task update during scan wal files streamMetaWLock(pStreamMeta); @@ -364,10 +375,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) { return terrno; } - tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks); - // update the new task number numOfTasks = taosArrayGetSize(pTaskList); + if (pNumOfTasks != NULL) { + *pNumOfTasks = numOfTasks; + } + + tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks); for (int32_t i = 0; i < numOfTasks; ++i) { STaskId* pTaskId = taosArrayGet(pTaskList, i); @@ -426,51 +440,59 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) { return TSDB_CODE_SUCCESS; } -int32_t doScanWalAsync(STQ* pTq, bool ckPause) { - SStreamMeta* pMeta = pTq->pStreamMeta; - bool alreadyRestored = pTq->pVnode->restored; - int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - - if (numOfTasks == 0) { - tqDebug("vgId:%d no stream tasks existed to run", vgId); - return 0; - } - - if (pMeta->startInfo.startAllTasks) { - tqTrace("vgId:%d in restart procedure, not scan wal", vgId); - return 0; - } - - pMeta->scanInfo.scanCounter += 1; - if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) { - pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD; - } - - if (pMeta->scanInfo.scanCounter > 1) { - tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter); - return 0; - } - - int32_t numOfPauseTasks = pMeta->numOfPausedTasks; - if (ckPause && numOfTasks == numOfPauseTasks) { - tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId); - - // reset the counter value, since we do not launch the scan wal operation. - pMeta->scanInfo.scanCounter = 0; - return 0; - } - - tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId, - numOfTasks, alreadyRestored); - - return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); -} +// int32_t doScanWalAsync(STQ* pTq) { +// SStreamMeta* pMeta = pTq->pStreamMeta; +// bool alreadyRestored = pTq->pVnode->restored; +// int32_t vgId = pMeta->vgId; +// int32_t code = 0; +// int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); +// +// // if (numOfTasks == 0) { +//// tqDebug("vgId:%d no stream tasks existed to run", vgId); +//// return 0; +//// } +// +//// if (pMeta->startInfo.startAllTasks) { +//// tqTrace("vgId:%d in restart procedure, not scan wal", vgId); +//// return 0; +//// } +// +//// pMeta->scanInfo.scanCounter += 1; +//// if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) { +//// pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD; +//// } +// +//// if (pMeta->scanInfo.scanCounter > 1) { +//// tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter); +//// return 0; +//// } +// +//// int32_t numOfPauseTasks = pMeta->numOfPausedTasks; +//// if (ckPause && numOfTasks == numOfPauseTasks) { +//// tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId); +//// +//// reset the counter value, since we do not launch the scan wal operation. +//// pMeta->scanInfo.scanCounter = 0; +//// return 0; +//// } +// +//// tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId, +//// numOfTasks, alreadyRestored); +//// +//// code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); +//// if (code) { +//// tqError("vgId:%d failed create msg to scan data in wal, retry in %dms", vgId, SCAN_WAL_IDLE_DURATION); +//// } +// +// code = tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION); +// if (code) { +// tqError("vgId:%d sched scan wal in %dms failed, ignore this failure", vgId, SCAN_WAL_IDLE_DURATION); +// } +//} void streamMetaFreeTQDuringScanWalError(STQ* pTq) { SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam)); p->metaId = pTq->pStreamMeta->rid; - p->numOfTasks = 0; doStartScanWal(p, 0); } \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 197a45cdb9..6f001981fb 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -47,6 +47,10 @@ END: void tqUpdateNodeStage(STQ* pTq, bool isLeader) { SSyncState state = syncGetState(pTq->pVnode->sync); streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader); + + if (isLeader) { + tqScanWalAsync(pTq); + } } static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 77632d328e..7fc2881881 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -938,10 +938,10 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { streamMetaWUnLock(pMeta); - if (scanWal && (vgId != SNODE_HANDLE)) { - tqDebug("vgId:%d start scan wal for executing tasks", vgId); - code = tqScanWalAsync(pMeta->ahandle, true); - } +// if (scanWal && (vgId != SNODE_HANDLE)) { +// tqDebug("vgId:%d start scan wal for executing tasks", vgId); +// code = tqScanWalAsync(pMeta->ahandle, true); +// } return code; } @@ -1170,7 +1170,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t pTask->hTaskInfo.operatorOpen = false; code = streamStartScanHistoryAsync(pTask, igUntreated); } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) { - code = tqScanWalAsync((STQ*)handle, false); +// code = tqScanWalAsync((STQ*)handle, false); } else { code = streamTrySchedExec(pTask); } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 7c157bb05e..36a6488709 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -341,7 +341,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) { if (code == TSDB_CODE_SUCCESS) { stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role); } else { - stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, vgId, role, rid); + stError("vgId:%d role:%d not leader not send hb to mnode, failed to release meta rid:%" PRId64, vgId, role, rid); } // taosMemoryFree(param); return; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ddd0201460..733bae1097 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1320,7 +1320,7 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) // mark the sign to send msg before close all tasks // 1. for leader vnode, always send msg before closing - // 2. for follower vnode, if it's is changed from leader, also sending msg before closing. + // 2. for follower vnode, if it's changed from leader, also sending msg before closing. if (pMeta->role == NODE_ROLE_LEADER) { pMeta->sendMsgBeforeClosing = true; }