From 31317c489508aae81268fdb0e5a95e3c4eb227f8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 1 Jun 2024 23:20:43 +0800 Subject: [PATCH] fix(stream): set the checkpoint ready info for only one task in stream. --- source/libs/stream/inc/streamInt.h | 9 +++++---- source/libs/stream/src/streamCheckpoint.c | 21 ++++++++++++++------- source/libs/stream/src/streamTask.c | 2 +- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index c943f663e6..c4c3298ea7 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -62,7 +62,7 @@ struct SActiveCheckpointInfo { SArray* pDispatchTriggerList; // SArray SArray* pReadyMsgList; // SArray int8_t allUpstreamTriggerRecv; - SArray* pCheckpointReadyRecvList; // SArray + SArray* pCheckpointReadyRecvList; // SArray int32_t checkCounter; tmr_h pCheckTmr; }; @@ -100,10 +100,11 @@ typedef struct { int32_t upstreamTaskId; SEpSet upstreamNodeEpset; int32_t nodeId; - SRpcMsg msg; - int64_t recvTs; int32_t transId; + SRpcMsg msg; int64_t checkpointId; + int64_t recvTs; + int32_t sendToUpstream; } STaskCheckpointReadyInfo; typedef struct { @@ -121,7 +122,7 @@ typedef struct { int32_t downstreamTaskId; int64_t checkpointId; int32_t transId; -} STaskCheckpointReadyRecvInfo; +} STaskDownstreamReadyInfo; struct SStreamQueue { STaosQueue* pQueue; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 6b6b740f01..f2868fea96 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -334,12 +334,19 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNo bool received = false; int32_t total = streamTaskGetNumOfDownstream(pTask); + // only one task in this stream + if (total == 0 && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pInfo->activeId, pInfo->transId); + taosThreadMutexUnlock(&pInfo->lock); + return 0; + } + taosThreadMutexLock(&pInfo->lock); // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList); for (int32_t i = 0; i < size; ++i) { - STaskCheckpointReadyRecvInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); + STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); if (p->downstreamTaskId == downstreamTaskId) { received = true; break; @@ -350,12 +357,12 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNo stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, %d/%d downstream not ready", id, downstreamTaskId, (int32_t)(total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), total); } else { - STaskCheckpointReadyRecvInfo info = {.recvTs = taosGetTimestampMs(), - .downstreamTaskId = downstreamTaskId, - .checkpointId = pInfo->activeId, - .transId = pInfo->transId, - .streamId = pTask->id.streamId, - .downstreamNodeId = downstreamNodeId}; + STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(), + .downstreamTaskId = downstreamTaskId, + .checkpointId = pInfo->activeId, + .transId = pInfo->transId, + .streamId = pTask->id.streamId, + .downstreamNodeId = downstreamNodeId}; taosArrayPush(pInfo->pCheckpointReadyRecvList, &info); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 8084a978ef..f6524a69ab 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -985,7 +985,7 @@ SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo() { pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo)); pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo)); - pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskCheckpointReadyRecvInfo)); + pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskDownstreamReadyInfo)); return pInfo; }