From 0a72e576bdd82471634f0dccfffcf391eabfef39 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 30 Aug 2023 11:55:00 +0800 Subject: [PATCH 1/2] op stream pause pause data dispatch opt stream pause --- include/libs/stream/tstream.h | 5 +++-- source/dnode/mnode/impl/src/mndStream.c | 4 ++-- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 17 +++++++++++------ source/dnode/vnode/src/tq/tqPush.c | 2 +- source/dnode/vnode/src/tq/tqRestore.c | 9 ++++++++- source/libs/stream/src/streamDispatch.c | 6 +++++- source/libs/stream/src/streamMeta.c | 11 +++++++++++ source/libs/stream/src/streamRecover.c | 20 +++++++++++++++----- 9 files changed, 57 insertions(+), 19 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 779fb416c6..1b3960bdba 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -409,6 +409,7 @@ typedef struct SStreamMeta { SArray* chkpInUse; int32_t chkpCap; SRWLatch chkpDirLock; + int32_t pauseTaskNum; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -673,8 +674,8 @@ int32_t streamTaskGetInputQItems(const SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); -void streamTaskPause(SStreamTask* pTask); -void streamTaskResume(SStreamTask* pTask); +void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); +void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta); void streamTaskHalt(SStreamTask* pTask); void streamTaskResumeFromHalt(SStreamTask* pTask); void streamTaskDisablePause(SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 306a6cbdd9..85dabf1113 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1615,7 +1615,7 @@ int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray *tasks) { int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) { + if (mndPauseStreamTask(pTrans, pTask) < 0) { return -1; } @@ -1755,7 +1755,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { + if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { return -1; } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index f0c73b4b37..5f5c27bfdd 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -223,7 +223,7 @@ void tqClose(STQ*); int tqPushMsg(STQ*, tmsg_t msgType); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); -int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. +int tqStartStreamTasks(STQ* pTq, bool ckPause); // restore all stream tasks after vnode launching completed. int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 000d6e9d5d..e57b316967 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1210,7 +1210,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamSetStatusNormal(pTask); } - tqStartStreamTasks(pTq); + tqStartStreamTasks(pTq, false); } streamMetaReleaseTask(pMeta, pTask); @@ -1376,7 +1376,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqStartStreamTasks(pTq); + tqStartStreamTasks(pTq, false); return 0; } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. // todo add one function to handle this @@ -1459,7 +1459,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr); - streamTaskPause(pTask); + streamTaskPause(pTask, pMeta); SStreamTask* pHistoryTask = NULL; if (pTask->historyTaskId.taskId != 0) { @@ -1475,7 +1475,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); - streamTaskPause(pHistoryTask); + streamTaskPause(pHistoryTask, pMeta); streamMetaReleaseTask(pMeta, pHistoryTask); } @@ -1490,9 +1490,14 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, } // todo: handle the case: resume from halt to pause/ from halt to normal/ from pause to normal - streamTaskResume(pTask); + streamTaskResume(pTask, pTq->pStreamMeta); int32_t level = pTask->info.taskLevel; + if (level == TASK_LEVEL__SINK) { + streamMetaReleaseTask(pTq->pStreamMeta, pTask); + return 0; + } + int8_t status = pTask->status.taskStatus; if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) { // no lock needs to secure the access of the version @@ -1511,7 +1516,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { streamStartScanHistoryAsync(pTask, igUntreated); } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { - tqStartStreamTasks(pTq); + tqStartStreamTasks(pTq, false); } else { streamSchedExec(pTask); } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 1ab4ef523a..8a9b95e045 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -46,7 +46,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) { // 2. the vnode should be the leader. // 3. the stream is not suspended yet. if ((!tsDisableStream) && (numOfTasks > 0) && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)) { - tqStartStreamTasks(pTq); + tqStartStreamTasks(pTq, true); } return 0; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 9cf5983228..8c3dbf7118 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -121,7 +121,7 @@ int32_t tqCheckStreamStatus(STQ* pTq) { return 0; } -int32_t tqStartStreamTasks(STQ* pTq) { +int32_t tqStartStreamTasks(STQ* pTq, bool ckPause) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -147,6 +147,13 @@ int32_t tqStartStreamTasks(STQ* pTq) { return 0; } + int32_t numOfPauseTasks = pTq->pStreamMeta->pauseTaskNum; + if (ckPause && numOfTasks == numOfPauseTasks) { + tqDebug("ignore all submit, all streams had been paused"); + taosWUnLockLatch(&pMeta->lock); + return 0; + } + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 43ad91aab5..c51ed10c44 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -403,7 +403,11 @@ static void doRetryDispatchData(void* param, void* tmrId) { if (!streamTaskShouldStop(&pTask->status)) { qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); - streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); + if (streamTaskShouldPause(&pTask->status)) { + streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS * 10); + } else { + streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); + } } else { atomic_sub_fetch_8(&pTask->status.timerActive, 1); qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d4dd878250..d9abbcb085 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -186,6 +186,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF taosInitRWLatch(&pMeta->lock); taosThreadMutexInit(&pMeta->backendMutex, NULL); + pMeta->pauseTaskNum = 0; + qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage); return pMeta; @@ -469,6 +471,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask) { pTask = *ppTask; + if (streamTaskShouldPause(&pTask->status)) { + int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); + qInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); + } atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); } else { qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); @@ -668,8 +674,13 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { return -1; } + if (streamTaskShouldPause(&pTask->status)) { + atomic_add_fetch_32(&pMeta->pauseTaskNum, 1); + } + ASSERT(pTask->status.downstreamReady == 0); } + qInfo("vgId:%d pause task num:%d", pMeta->vgId, pMeta->pauseTaskNum); tdbFree(pKey); tdbFree(pVal); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 2ca1612290..fcebf22e5e 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -811,9 +811,7 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { } // normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause -void streamTaskPause(SStreamTask* pTask) { - SStreamMeta* pMeta = pTask->pMeta; - +void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { int64_t st = taosGetTimestampMs(); int8_t status = pTask->status.taskStatus; @@ -828,6 +826,12 @@ void streamTaskPause(SStreamTask* pTask) { return; } + if(pTask->info.taskLevel == TASK_LEVEL__SINK) { + int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1); + qInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); + return; + } + while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { status = pTask->status.taskStatus; if (status == TASK_STATUS__DROPPING) { @@ -857,6 +861,8 @@ void streamTaskPause(SStreamTask* pTask) { atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); + int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1); + qInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); taosWUnLockLatch(&pMeta->lock); // in case of fill-history task, stop the tsdb file scan operation. @@ -870,12 +876,16 @@ void streamTaskPause(SStreamTask* pTask) { streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el); } -void streamTaskResume(SStreamTask* pTask) { +void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) { int8_t status = pTask->status.taskStatus; if (status == TASK_STATUS__PAUSE) { pTask->status.taskStatus = pTask->status.keepTaskStatus; pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; - qDebug("s-task:%s resume from pause", pTask->id.idStr); + int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); + qInfo("vgId:%d s-task:%s resume from pause. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); + } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); + qInfo("vgId:%d s-task:%s sink task.resume from pause. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); } else { qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); } From fc3468fe392802525bf13fc04a87e5f2552e6243 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 30 Aug 2023 15:58:51 +0800 Subject: [PATCH 2/2] add log --- source/libs/stream/src/streamRecover.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index fcebf22e5e..be7c1584fd 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -882,10 +882,10 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) { pTask->status.taskStatus = pTask->status.keepTaskStatus; pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); - qInfo("vgId:%d s-task:%s resume from pause. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); + qInfo("vgId:%d s-task:%s resume from pause, status%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); - qInfo("vgId:%d s-task:%s sink task.resume from pause. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); + qInfo("vgId:%d s-task:%s sink task.resume from pause, status%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); } else { qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); }