From d1ecfe5cf329bdcb3890a6cc98c06272c9348efb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Apr 2024 14:24:20 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckpoint.c | 18 ++++++++++ source/libs/stream/src/streamStart.c | 44 ++++++----------------- 2 files changed, 29 insertions(+), 33 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 36886329ac..8efd661d12 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -94,6 +94,24 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea return 0; } +int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; + tEndEncode(pEncoder); + return 0; +} + +int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + static int32_t streamAlignCheckpoint(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 0b91359f48..1f6c5add42 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -29,19 +29,15 @@ typedef struct SLaunchHTaskInfo { STaskId hTaskId; } SLaunchHTaskInfo; -typedef struct STaskRecheckInfo { - SStreamTask* pTask; - SStreamTaskCheckReq req; - void* checkTimer; -} STaskRecheckInfo; - static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); -static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t hStreamId, - int32_t hTaskId); +static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId); static void tryLaunchHistoryTask(void* param, void* tmrId); static void doProcessDownstreamReadyRsp(SStreamTask* pTask); +static void doExecScanhistoryInFuture(void* param, void* tmrId); +static int32_t doStartScanHistoryTask(SStreamTask* pTask); +static int32_t streamTaskStartScanHistory(SStreamTask* pTask); int32_t streamTaskSetReady(SStreamTask* pTask) { int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); @@ -83,7 +79,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { return 0; } -static void doExecScanhistoryInFuture(void* param, void* tmrId) { +void doExecScanhistoryInFuture(void* param, void* tmrId) { SStreamTask* pTask = param; pTask->schedHistoryInfo.numOfTicks -= 1; @@ -139,7 +135,7 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) return TSDB_CODE_SUCCESS; } -static int32_t doStartScanHistoryTask(SStreamTask* pTask) { +int32_t doStartScanHistoryTask(SStreamTask* pTask) { SVersionRange* pRange = &pTask->dataRange.range; if (pTask->info.fillHistory) { streamSetParamForScanHistory(pTask); @@ -663,16 +659,15 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { taosMemoryFree(pInfo); } -SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t hStreamId, - int32_t hTaskId) { +SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId) { SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo)); if (pInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pInfo->id.streamId = streamId; - pInfo->id.taskId = taskId; + pInfo->id.streamId = pTaskId->streamId; + pInfo->id.taskId = pTaskId->taskId; pInfo->hTaskId.streamId = hStreamId; pInfo->hTaskId.taskId = hTaskId; @@ -691,7 +686,8 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", idStr, pMeta->vgId, hTaskId); - SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, pTask->id.streamId, pTask->id.taskId, hStreamId, hTaskId); + STaskId id = streamTaskGetTaskId(pTask); + SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, &id, hStreamId, hTaskId); if (pInfo == NULL) { stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr); streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); @@ -860,24 +856,6 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) return 0; } -int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; - tEndEncode(pEncoder); - return 0; -} - -int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange;