From 9a3708e17b13524f0c7b1d6d9e13ebc7e4b819ec Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 14 Jun 2023 15:54:07 +0800 Subject: [PATCH] pause&resume fill history --- include/libs/stream/tstream.h | 6 ++-- source/dnode/vnode/src/tq/tq.c | 6 ++-- source/libs/stream/src/streamRecover.c | 42 +++++++++++++++----------- 3 files changed, 32 insertions(+), 22 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0891c35716..93b1314cad 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -451,7 +451,8 @@ typedef struct { SMsgHead msgHead; int64_t streamId; int32_t taskId; -} SStreamScanHistoryReq, SStreamRecoverStep2Req; + int8_t igUntreated; +} SStreamScanHistoryReq; typedef struct { int64_t streamId; @@ -574,6 +575,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamTaskStartHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); +int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); // common int32_t streamSetParamForScanHistoryData(SStreamTask* pTask); @@ -583,7 +585,7 @@ const char* streamGetTaskStatusStr(int32_t status); // source level int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow); -int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq); +int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); int32_t streamSourceScanHistoryData(SStreamTask* pTask); //int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a8d712ae01..648b8f4ec2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1468,7 +1468,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); // no lock needs to secure the access of the version - if (pReq->igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + if (pReq->igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) { // discard all the data when the stream task is suspended. walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion); tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64 @@ -1479,7 +1479,9 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); } - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) { + if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + streamStartRecoverTask(pTask, pReq->igUntreated); + } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) { tqStartStreamTasks(pTq); } else { streamSchedExec(pTask); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 38c6ad6f29..46b6798c64 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -17,6 +17,26 @@ #include "ttimer.h" #include "wal.h" +int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) { + SStreamScanHistoryReq req; + streamBuildSourceRecover1Req(pTask, &req, igUntreated); + int32_t len = sizeof(SStreamScanHistoryReq); + + void* serializedReq = rpcMallocCont(len); + if (serializedReq == NULL) { + return -1; + } + + memcpy(serializedReq, &req, len); + + SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY}; + if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) { + /*ASSERT(0);*/ + } + + return 0; +} + const char* streamGetTaskStatusStr(int32_t status) { switch(status) { case TASK_STATUS__NORMAL: return "normal"; @@ -38,23 +58,8 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { streamSetParamForScanHistoryData(pTask); streamSetParamForStreamScanner(pTask, pRange, &pTask->dataRange.window); - SStreamScanHistoryReq req; - streamBuildSourceRecover1Req(pTask, &req); - int32_t len = sizeof(SStreamScanHistoryReq); - - void* serializedReq = rpcMallocCont(len); - if (serializedReq == NULL) { - return -1; - } - - memcpy(serializedReq, &req, len); - - SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY}; - if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) { - /*ASSERT(0);*/ - } - - return 0; + int32_t code = streamStartRecoverTask(pTask, 0); + return code; } int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { @@ -262,10 +267,11 @@ int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRa return qStreamSourceScanParamForHistoryScan(pTask->exec.pExecutor, pVerRange, pWindow); } -int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq) { +int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) { pReq->msgHead.vgId = pTask->info.nodeId; pReq->streamId = pTask->id.streamId; pReq->taskId = pTask->id.taskId; + pReq->igUntreated = igUntreated; return 0; }