From 76b43dc072a455abc9772fbae256918514358dc3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 31 May 2024 14:32:55 +0800 Subject: [PATCH] fix(stream): 1. check the checkpoint-trigger rsp, 2. set the error code in the message body, 3. follower nodes not handle the checkpoint-trigger retrieve request. --- include/libs/stream/streammsg.h | 1 + include/libs/stream/tstream.h | 3 +- include/util/taoserror.h | 1 + source/dnode/vnode/src/tq/tq.c | 9 ++ source/dnode/vnode/src/tqCommon/tqCommon.c | 20 +++- source/libs/stream/inc/streamInt.h | 1 + source/libs/stream/src/streamCheckpoint.c | 124 ++++++++++++++------- source/libs/stream/src/streamDispatch.c | 39 ++----- source/libs/stream/src/streamExec.c | 4 - source/libs/stream/src/streamTask.c | 4 + 10 files changed, 129 insertions(+), 77 deletions(-) diff --git a/include/libs/stream/streammsg.h b/include/libs/stream/streammsg.h index 96701fe21d..91bfc6afc8 100644 --- a/include/libs/stream/streammsg.h +++ b/include/libs/stream/streammsg.h @@ -187,6 +187,7 @@ typedef struct SCheckpointTriggerRsp { int32_t upstreamTaskId; int32_t taskId; int32_t transId; + int32_t rspCode; } SCheckpointTriggerRsp; typedef struct { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d007ea29a9..0f546ee869 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -272,7 +272,6 @@ typedef struct SCheckpointInfo { int64_t processedVer; int64_t nextProcessVer; // current offset in WAL, not serialize it int32_t numOfNotReady; - SActiveCheckpointInfo* pActiveInfo; int64_t msgVer; } SCheckpointInfo; @@ -678,7 +677,7 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeI void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal); void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask); void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId); -int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo); +int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pInfo, int32_t code); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 9ae75bade2..8f8434dfc1 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -910,6 +910,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102) #define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103) #define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104) +#define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1e564ba467..1076f1f2c6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1241,6 +1241,15 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) { + int32_t vgId = TD_VID(pTq->pVnode); + + SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg->pCont; + if (!vnodeIsRoleLeader(pTq->pVnode)) { + tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, + (int32_t)pReq->downstreamTaskId); + return TSDB_CODE_STREAM_NOT_LEADER; + } + return tqStreamTaskProcessRetrieveTriggerReq(pTq->pStreamMeta, pMsg); } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 45e1a300d0..c8ca324c5e 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -877,6 +877,16 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64, pReq->upstreamTaskId, (int32_t)pReq->downstreamTaskId, pReq->checkpointId); + if (pTask->status.downstreamReady != 1) { + tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready", + pTask->id.idStr, (int32_t)pReq->downstreamTaskId); + + streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS); + streamMetaReleaseTask(pMeta, pTask); + + return TSDB_CODE_SUCCESS; + } + SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state == TASK_STATUS__CK) { // recv the checkpoint-source/trigger already int32_t transId = 0; @@ -889,8 +899,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) // re-send the lost checkpoint-trigger msg to downstream task tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr, (int32_t)pReq->downstreamTaskId, checkpointId, transId); - streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info); - return TSDB_CODE_SUCCESS; + streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_SUCCESS); } else { // not send checkpoint-trigger yet, wait int32_t recv = 0, total = 0; streamTaskGetTriggerRecvStatus(pTask, &recv, &total); @@ -903,7 +912,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) "sending checkpoint-source/trigger", pTask->id.idStr, recv, total); } - return TSDB_CODE_ACTION_IN_PROGRESS; + streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); } } else { // upstream not recv the checkpoint-source/trigger till now ASSERT(pState->state == TASK_STATUS__READY || pState->state == TASK_STATUS__HALT); @@ -911,8 +920,11 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all " "upstream sending checkpoint-source/trigger", pTask->id.idStr); - return TSDB_CODE_ACTION_IN_PROGRESS; + streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); } + + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; } int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index d6cd5b528d..9a3cbdc963 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -61,6 +61,7 @@ struct SActiveCheckpointInfo { bool dispatchTrigger; SArray* pDispatchTriggerList; // SArray SArray* pReadyMsgList; // SArray + int8_t allUpstreamTriggerRecv; int32_t checkCounter; tmr_h pCheckTmr; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 6b02ae485f..7f41629dac 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -34,11 +34,11 @@ static int32_t streamTaskUploadCheckpoint(const char* id, const char* path); static int32_t deleteCheckpoint(const char* id); static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); -static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType); +static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId); static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList); static void checkpointTriggerMonitorFn(void* param, void* tmrId); -static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType); +static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId); bool streamTaskIsAllUpstreamSendTrigger(SStreamTask* pTask) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; @@ -58,7 +58,8 @@ bool streamTaskIsAllUpstreamSendTrigger(SStreamTask* pTask) { return allSend; } -SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType) { +SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, + int32_t transId) { SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pChkpoint == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -75,8 +76,8 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint } pBlock->info.type = STREAM_CHECKPOINT; - pBlock->info.version = pTask->chkInfo.pActiveInfo->activeId; - pBlock->info.window.ekey = pBlock->info.window.skey = pTask->chkInfo.pActiveInfo->transId; // NOTE: set the transId + pBlock->info.version = checkpointId; + pBlock->info.window.ekey = pBlock->info.window.skey = transId; // NOTE: set the transId pBlock->info.rows = 1; pBlock->info.childId = pTask->info.selfChildId; @@ -89,10 +90,10 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint return pChkpoint; } -int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) { - SStreamDataBlock* pChkpoint = createChkptTriggerBlock(pTask, checkpointType); +int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId) { + SStreamDataBlock* pCheckpoint = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId); - if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pChkpoint) < 0) { + if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -116,39 +117,37 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // and this is the last item in the inputQ. - return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); + return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId); } int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) { ASSERT(pTask->info.taskLevel != TASK_LEVEL__SOURCE); - SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; - if (pInfo->transId != pRsp->transId || pInfo->activeId != pRsp->checkpointId) { - // todo handle error - return -1; + if (pRsp->rspCode != TSDB_CODE_SUCCESS) { + stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", pTask->id.idStr, + pRsp->upstreamTaskId, tstrerror(pRsp->rspCode)); + return TSDB_CODE_SUCCESS; } - taosThreadMutexLock(&pTask->lock); - SStreamTaskState* pState = streamTaskGetStatus(pTask); - if (pState->state != TASK_STATUS__CK) { - // todo handle error - taosThreadMutexUnlock(&pTask->lock); - return -1; - } - - taosThreadMutexUnlock(&pTask->lock); - - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId); return TSDB_CODE_SUCCESS; } -int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo) { +int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo, int32_t code) { SCheckpointTriggerRsp* pRsp = rpcMallocCont(sizeof(SCheckpointTriggerRsp)); pRsp->streamId = pTask->id.streamId; pRsp->upstreamTaskId = pTask->id.taskId; pRsp->taskId = dstTaskId; - pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId; - pRsp->transId = pTask->chkInfo.pActiveInfo->transId; + + if (code == TSDB_CODE_SUCCESS) { + pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId; + pRsp->transId = pTask->chkInfo.pActiveInfo->transId; + } else { + pRsp->checkpointId = -1; + pRsp->transId = -1; + } + + pRsp->rspCode = code; SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = sizeof(SCheckpointTriggerRsp), .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); @@ -178,15 +177,64 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock const char* id = pTask->id.idStr; int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = pTask->pMeta->vgId; + int32_t taskLevel = pTask->info.taskLevel; + + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + + taosThreadMutexLock(&pTask->lock); + if (pTask->chkInfo.checkpointId >= checkpointId) { + stError("s-task:%s vgId:%d current checkpointId:%" PRId64 + " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard", + id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId); + taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_SUCCESS; + } + + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { + if (pActiveInfo->activeId != checkpointId) { + stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64 + " discard", + id, vgId, pActiveInfo->activeId, checkpointId); + taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_SUCCESS; + } else { // checkpointId == pActiveInfo->activeId + if (pActiveInfo->allUpstreamTriggerRecv == 1) { + stDebug( + "s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, " + "checkpointId:%" PRId64 " transId:%d", + id, vgId, checkpointId, transId); + taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_SUCCESS; + } + + if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { + // check if already recv or not, and duplicated checkpoint-trigger msg recv, discard it + for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) { + SStreamChkptReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i); + if (p->upStreamTaskId == pBlock->srcTaskId) { + ASSERT(p->checkpointId == checkpointId); + stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 + ", prev recvTs:%" PRId64 " discard", + pTask->id.idStr, p->upStreamTaskId, p->nodeId, p->checkpointId, p->recvTs); + + taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_SUCCESS; + } + } + } + } + } + + taosThreadMutexUnlock(&pTask->lock); stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64 - ", transId:%d current checkpointingId:%" PRId64, + ", transId:%d current active checkpointId:%" PRId64, id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId); // set task status if (streamTaskGetStatus(pTask)->state != TASK_STATUS__CK) { - pTask->chkInfo.pActiveInfo->activeId = checkpointId; - pTask->chkInfo.pActiveInfo->transId = transId; + pActiveInfo->activeId = checkpointId; + pActiveInfo->transId = transId; code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); if (code != TSDB_CODE_SUCCESS) { @@ -197,20 +245,18 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); - SActiveCheckpointInfo* pActive = pTask->chkInfo.pActiveInfo; streamMetaAcquireOneTask(pTask); - if (pActive->pCheckTmr == NULL) { - pActive->pCheckTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer); + if (pActiveInfo->pCheckTmr == NULL) { + pActiveInfo->pCheckTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer); } else { - taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActive->pCheckTmr); + taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pCheckTmr); } } - // todo fix race condition: set the status and append checkpoint block - int32_t taskLevel = pTask->info.taskLevel; if (taskLevel == TASK_LEVEL__SOURCE) { int8_t type = pTask->outputInfo.type; + pActiveInfo->allUpstreamTriggerRecv = 1; if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); @@ -231,8 +277,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); // there are still some upstream tasks not send checkpoint request, do nothing and wait for then - bool allSend = streamTaskIsAllUpstreamSendTrigger(pTask); - if (!allSend) { + if (pActiveInfo->allUpstreamTriggerRecv != 1) { streamFreeQitem((SStreamQueueItem*)pBlock); return code; } @@ -272,7 +317,8 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) { if (notReady == 0) { stDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for current task", pTask->id.idStr); - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT); + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pInfo->activeId, pInfo->transId); } else { int32_t total = streamTaskGetNumOfDownstream(pTask); stDebug("s-task:%s %d/%d downstream tasks are not ready, wait", pTask->id.idStr, notReady, total); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a6cfbb063b..f134224196 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -559,7 +559,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } if (pTask->chkInfo.pActiveInfo->dispatchTrigger) { - stDebug("s-task:%s already send checkpoint trigger, not dispatch anymore", id); + stDebug("s-task:%s already send checkpoint-trigger, no longer dispatch any other data", id); atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); return 0; } @@ -874,29 +874,18 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, req.upstreamNodeId); SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + taosThreadMutexLock(&pActiveInfo->lock); + taosArrayPush(pActiveInfo->pReadyMsgList, &info); - bool recved = false; - int32_t size = taosArrayGetSize(pActiveInfo->pReadyMsgList); - for (int32_t i = 0; i < size; ++i) { - SStreamChkptReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i); - if (p->nodeId == req.upstreamNodeId) { - if (p->checkpointId == req.checkpointId) { - stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 ", ignore", - pTask->id.idStr, p->upStreamTaskId, p->nodeId, p->checkpointId); - } else { - stError("s-task:%s checkpointId:%" PRId64 " not completed, new checkpointId:%" PRId64 " recv", - pTask->id.idStr, p->checkpointId, checkpointId); - ASSERT(0); // failed to handle it - } - - recved = true; - break; - } - } - - if (!recved) { - taosArrayPush(pActiveInfo->pReadyMsgList, &info); + int32_t numOfRecv = taosArrayGetSize(pActiveInfo->pReadyMsgList); + int32_t total = streamTaskGetNumOfUpstream(pTask); + if (numOfRecv == total) { + stDebug("s-task:%s recv checkpoint-trigger from all upstream, continue", pTask->id.idStr); + pActiveInfo->allUpstreamTriggerRecv = 1; + } else { + ASSERT(numOfRecv <= total); + stDebug("s-task:%s %d/%d checkpoint-trigger recv", pTask->id.idStr, numOfRecv, total); } taosThreadMutexUnlock(&pActiveInfo->lock); @@ -1175,7 +1164,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S // This task has received the checkpoint req from the upstream task, from which all the messages should be // blocked. Note that there is no race condition here. if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId); } else if (pReq->type == STREAM_INPUT__TRANS_STATE) { @@ -1187,11 +1175,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } } - // disable the data from upstream tasks -// if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) { -// status = TASK_INPUT_STATUS__BLOCKED; -// } - { // do send response with the input status int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 77f266bd6d..95634b2ff3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -597,10 +597,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { // dispatch checkpoint msg to all downstream tasks int32_t type = pInput->type; if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - if (pTask->pMeta->vgId == 2) { -// taosSsleep(20); - } - streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput); continue; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 3daadda687..01e2f89d8c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -626,6 +626,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); if (pInfo != NULL) { pInfo->dataAllowed = false; + int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); } } @@ -633,6 +634,8 @@ void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) { SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); if (pInfo != NULL) { pInfo->dataAllowed = true; + int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); + ASSERT(t >= 0); } } @@ -1006,6 +1009,7 @@ void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { pInfo->activeId = 0; // clear the checkpoint id pInfo->failedId = 0; pInfo->transId = 0; + pInfo->allUpstreamTriggerRecv = 0; pInfo->dispatchTrigger = false; taosArrayClear(pInfo->pReadyMsgList);