From 11f0c3b33648e7f6977f2075efbbab949ded328b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 14 Jun 2023 10:46:46 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 7 ++--- source/dnode/snode/src/snode.c | 2 +- source/libs/stream/inc/streamInc.h | 2 +- source/libs/stream/src/streamDispatch.c | 6 ++-- source/libs/stream/src/streamRecover.c | 37 +++++++++++-------------- 5 files changed, 24 insertions(+), 30 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 6ed97ac547..223fd3c3b7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -339,7 +339,7 @@ struct SStreamTask { SStreamState* pState; // state backend // the followings attributes don't be serialized - int32_t recoverTryingDownstream; + int32_t notReadyTasks; int32_t numOfWaitingUpstream; int64_t checkReqId; SArray* checkReqIds; // shuffle @@ -576,7 +576,7 @@ int32_t streamTaskStartHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); // common -int32_t streamSetParamForRecover(SStreamTask* pTask); +int32_t streamSetParamForScanHistoryData(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); @@ -585,8 +585,7 @@ const char* streamGetTaskStatusStr(int32_t status); int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq); int32_t streamSourceRecoverScanStep1(SStreamTask* pTask); -int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq); -int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); +//int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 41fb50a0df..42bb606c4e 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -165,7 +165,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { // 3.go through recover steps to fill history if (pTask->info.fillHistory) { - streamSetParamForRecover(pTask); + streamSetParamForScanHistoryData(pTask); streamAggRecoverPrepare(pTask); } diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index b3a28aea43..aeb35d9ad5 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -49,7 +49,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); -int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, +int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, SEpSet* pEpSet); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 95f68cce2d..d48bfbb189 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include +#include "ttimer.h" #include "streamInc.h" #define MAX_BLOCK_NAME_NUM 1024 @@ -274,8 +274,8 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR return 0; } -int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, - SEpSet* pEpSet) { +int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, + SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; SRpcMsg msg = {0}; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 82b9941c96..f59ffa68a0 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -33,9 +33,9 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { qDebug("s-task:%s vgId:%d task status:%s and start to scan-history-data task, ver:%" PRId64 " - %" PRId64, pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->dataRange.range.minVer, pTask->dataRange.range.maxVer); + pRange->minVer, pRange->maxVer); - streamSetParamForRecover(pTask); + streamSetParamForScanHistoryData(pTask); streamSourceRecoverPrepareStep1(pTask, pRange, &pTask->dataRange.window); SStreamRecoverStep1Req req; @@ -69,11 +69,11 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { } } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { streamSetStatusNormal(pTask); - streamSetParamForRecover(pTask); + streamSetParamForScanHistoryData(pTask); streamAggRecoverPrepare(pTask); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { streamSetStatusNormal(pTask); - qDebug("s-task:%s sink task convert to normal status immediately", pTask->id.idStr); + qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr); } return 0; @@ -107,7 +107,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); - pTask->recoverTryingDownstream = numOfVgs; + pTask->notReadyTasks = numOfVgs; pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t)); for (int32_t i = 0; i < numOfVgs; i++) { @@ -130,7 +130,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { return 0; } -int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { +int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { SStreamTaskCheckReq req = { .reqId = pRsp->reqId, .streamId = pRsp->streamId, @@ -186,7 +186,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return -1; } - int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1); + int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); ASSERT(left >= 0); if (left == 0) { @@ -203,8 +203,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs streamGetTaskStatusStr(pTask->status.taskStatus)); } } else { - qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, remain not ready:%d", id, - pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, left); + int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); + qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, + pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); } } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (pRsp->reqId != pTask->checkReqId) { @@ -212,8 +213,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } ASSERT(pTask->status.checkDownstream == 0); - pTask->status.checkDownstream = 1; + ASSERT(pTask->status.taskStatus != TASK_STATUS__HALT); if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { @@ -233,17 +234,18 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs pRsp->downstreamNodeId); taosMsleep(100); - streamRecheckOneDownstream(pTask, pRsp); + streamRecheckDownstream(pTask, pRsp); } return 0; } // common -int32_t streamSetParamForRecover(SStreamTask* pTask) { +int32_t streamSetParamForScanHistoryData(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; return qStreamSetParamForRecover(exec); } + int32_t streamRestoreParam(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; return qStreamRestoreParam(exec); @@ -272,13 +274,6 @@ int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) { return streamScanExec(pTask, 100); } -int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) { - pReq->msgHead.vgId = pTask->info.nodeId; - pReq->streamId = pTask->id.streamId; - pReq->taskId = pTask->id.taskId; - return 0; -} - int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { void* exec = pTask->exec.pExecutor; const char* id = pTask->id.idStr; @@ -305,7 +300,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { pTask->fixedEpDispatcher.taskId, streamGetTaskStatusStr(pTask->status.taskStatus)); req.taskId = pTask->fixedEpDispatcher.taskId; - streamDoDispatchRecoverFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); + streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); @@ -315,7 +310,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); req.taskId = pVgInfo->taskId; - streamDoDispatchRecoverFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } }