From 813f4cb36385e3e7bfcc84da70b8b2612c4dceb9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 13 Jul 2023 16:32:25 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 8 +++++-- source/dnode/mnode/impl/src/mndScheduler.c | 6 +++--- source/dnode/snode/src/snode.c | 4 ++-- source/dnode/vnode/src/tq/tq.c | 3 ++- source/libs/stream/src/stream.c | 25 +++++++++++++++++++++- source/libs/stream/src/streamCheckpoint.c | 11 +++++----- source/libs/stream/src/streamDispatch.c | 6 +++--- source/libs/stream/src/streamRecover.c | 4 ++-- source/libs/stream/src/streamTask.c | 10 ++++----- 9 files changed, 53 insertions(+), 24 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c584f6e823..045d89cc22 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -249,6 +249,7 @@ typedef struct SStreamChildEpInfo { int32_t childId; int32_t taskId; SEpSet epSet; + bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it } SStreamChildEpInfo; typedef struct SStreamId { @@ -310,8 +311,9 @@ struct SStreamTask { SHistDataRange dataRange; SStreamId historyTaskId; SStreamId streamTaskId; - SArray* pUpstreamEpInfoList; // SArray, // children info - SArray* pRpcMsgList; // SArray + SArray* pUpstreamInfoList; // SArray, // children info + SArray* pRpcMsgList; // SArray + // output union { STaskDispatcherFixedEp fixedEpDispatcher; @@ -554,6 +556,8 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); +void streamTaskOpenUpstreamInput(SStreamTask* pTask); +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 2563417c5a..800005c27f 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -301,11 +301,11 @@ int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) { return TSDB_CODE_OUT_OF_MEMORY; } - if (pDownstream->pUpstreamEpInfoList == NULL) { - pDownstream->pUpstreamEpInfoList = taosArrayInit(4, POINTER_BYTES); + if (pDownstream->pUpstreamInfoList == NULL) { + pDownstream->pUpstreamInfoList = taosArrayInit(4, POINTER_BYTES); } - taosArrayPush(pDownstream->pUpstreamEpInfoList, &pEpInfo); + taosArrayPush(pDownstream->pUpstreamInfoList, &pEpInfo); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index b8b7e8e172..a959060ee2 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -59,7 +59,7 @@ FAIL: } int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamEpInfoList) != 0); + ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamInfoList) != 0); pTask->refCnt = 1; pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); @@ -82,7 +82,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { return -1; } - int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamEpInfoList); + int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamInfoList); SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory }; initStreamStateAPI(&handle.api); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1ac096cc74..84ab014597 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -814,7 +814,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { return -1; } - int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList); + int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamInfoList); SReadHandle handle = { .checkpointId = pTask->chkInfo.checkpointId, .vnode = NULL, @@ -865,6 +865,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); } + streamTaskOpenUpstreamInput(pTask); streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 3348914159..f1a6d7423c 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -355,4 +355,27 @@ void* streamQueueNextItem(SStreamQueue* pQueue) { } } -void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } \ No newline at end of file +void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } + +void streamTaskOpenUpstreamInput(SStreamTask* pTask) { + int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); + if (num == 0) { + return; + } + + for(int32_t i = 0; i < num; ++i) { + SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i); + pInfo->dataAllowed = true; + } +} + +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { + int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); + for(int32_t i = 0; i < num; ++i) { + SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i); + if (pInfo->taskId == taskId) { + pInfo->dataAllowed = false; + break; + } + } +} \ No newline at end of file diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d770a09395..839822c103 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -114,7 +114,7 @@ int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRs } static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, int32_t childId) { - int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList); + int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num); if (old == 0) { qDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num); @@ -180,13 +180,14 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe streamSchedExec(pTask); qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr); } else { - // todo close the inputQ for data from childId, which means data from childId are not allowed to put into intpuQ - // anymore - ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) > 0); + ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); + + // close the inputQ for data from upstream task. + streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); // there are still some upstream tasks not send checkpoint request, do nothing and wait for then int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId); - int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList); + int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); if (notReady > 0) { qDebug("s-task:%s received checkpoint req, %d upstream tasks not send checkpoint info yet, total:%d", pTask->id.idStr, notReady, num); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index cfe960e99c..f8fa7b024a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -157,11 +157,11 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) .retrieveLen = dataStrLen, }; - int32_t sz = taosArrayGetSize(pTask->pUpstreamEpInfoList); + int32_t sz = taosArrayGetSize(pTask->pUpstreamInfoList); ASSERT(sz > 0); for (int32_t i = 0; i < sz; i++) { req.reqId = tGenIdPI64(); - SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i); + SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); req.dstNodeId = pEpInfo->nodeId; req.dstTaskId = pEpInfo->taskId; int32_t len; @@ -516,7 +516,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { // this function is usually invoked by sink/agg task int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->pRpcMsgList); - ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) == num); + ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) == num); qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr, pTask->info.taskLevel, num); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 45accde7f0..b229078b44 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -355,7 +355,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { // agg int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) { - pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); + pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr, pTask->numOfWaitingUpstream); return 0; @@ -379,7 +379,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, in ASSERT(left >= 0); if (left == 0) { - int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList); + int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamInfoList); qDebug("s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data", pTask->id.idStr, numOfTasks); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ba4d1e1cd7..fa99006210 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -99,10 +99,10 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1; if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1; - int32_t epSz = taosArrayGetSize(pTask->pUpstreamEpInfoList); + int32_t epSz = taosArrayGetSize(pTask->pUpstreamInfoList); if (tEncodeI32(pEncoder, epSz) < 0) return -1; for (int32_t i = 0; i < epSz; i++) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i); + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1; } @@ -165,7 +165,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { int32_t epSz; if (tDecodeI32(pDecoder, &epSz) < 0) return -1; - pTask->pUpstreamEpInfoList = taosArrayInit(epSz, POINTER_BYTES); + pTask->pUpstreamInfoList = taosArrayInit(epSz, POINTER_BYTES); for (int32_t i = 0; i < epSz; i++) { SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo)); if (pInfo == NULL) return -1; @@ -173,7 +173,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { taosMemoryFreeClear(pInfo); return -1; } - taosArrayPush(pTask->pUpstreamEpInfoList, &pInfo); + taosArrayPush(pTask->pUpstreamInfoList, &pInfo); } if (pTask->info.taskLevel != TASK_LEVEL__SINK) { @@ -226,7 +226,7 @@ void tFreeStreamTask(SStreamTask* pTask) { walCloseReader(pTask->exec.pWalReader); } - taosArrayDestroyP(pTask->pUpstreamEpInfoList, taosMemoryFree); + taosArrayDestroyP(pTask->pUpstreamInfoList, taosMemoryFree); if (pTask->outputType == TASK_OUTPUT__TABLE) { tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); taosMemoryFree(pTask->tbSink.pTSchema);