fix(stream): set the checkpoint ready info for only one task in stream.

This commit is contained in:
Haojun Liao 2024-06-01 23:20:43 +08:00
parent 2a8270f9c8
commit 31317c4895
3 changed files with 20 additions and 12 deletions

View File

@ -62,7 +62,7 @@ struct SActiveCheckpointInfo {
SArray* pDispatchTriggerList; // SArray<STaskTriggerSendInfo>
SArray* pReadyMsgList; // SArray<STaskCheckpointReadyInfo*>
int8_t allUpstreamTriggerRecv;
SArray* pCheckpointReadyRecvList; // SArray<STaskCheckpointReadyRecvInfo>
SArray* pCheckpointReadyRecvList; // SArray<STaskDownstreamReadyInfo>
int32_t checkCounter;
tmr_h pCheckTmr;
};
@ -100,10 +100,11 @@ typedef struct {
int32_t upstreamTaskId;
SEpSet upstreamNodeEpset;
int32_t nodeId;
SRpcMsg msg;
int64_t recvTs;
int32_t transId;
SRpcMsg msg;
int64_t checkpointId;
int64_t recvTs;
int32_t sendToUpstream;
} STaskCheckpointReadyInfo;
typedef struct {
@ -121,7 +122,7 @@ typedef struct {
int32_t downstreamTaskId;
int64_t checkpointId;
int32_t transId;
} STaskCheckpointReadyRecvInfo;
} STaskDownstreamReadyInfo;
struct SStreamQueue {
STaosQueue* pQueue;

View File

@ -334,12 +334,19 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNo
bool received = false;
int32_t total = streamTaskGetNumOfDownstream(pTask);
// only one task in this stream
if (total == 0 && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pInfo->activeId, pInfo->transId);
taosThreadMutexUnlock(&pInfo->lock);
return 0;
}
taosThreadMutexLock(&pInfo->lock);
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
for (int32_t i = 0; i < size; ++i) {
STaskCheckpointReadyRecvInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
if (p->downstreamTaskId == downstreamTaskId) {
received = true;
break;
@ -350,12 +357,12 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNo
stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, %d/%d downstream not ready", id,
downstreamTaskId, (int32_t)(total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), total);
} else {
STaskCheckpointReadyRecvInfo info = {.recvTs = taosGetTimestampMs(),
.downstreamTaskId = downstreamTaskId,
.checkpointId = pInfo->activeId,
.transId = pInfo->transId,
.streamId = pTask->id.streamId,
.downstreamNodeId = downstreamNodeId};
STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(),
.downstreamTaskId = downstreamTaskId,
.checkpointId = pInfo->activeId,
.transId = pInfo->transId,
.streamId = pTask->id.streamId,
.downstreamNodeId = downstreamNodeId};
taosArrayPush(pInfo->pCheckpointReadyRecvList, &info);
}

View File

@ -985,7 +985,7 @@ SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo() {
pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo));
pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo));
pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskCheckpointReadyRecvInfo));
pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskDownstreamReadyInfo));
return pInfo;
}