fix(stream): 1. check the checkpoint-trigger rsp, 2. set the error code in the message body, 3. follower nodes not handle the checkpoint-trigger retrieve request.

This commit is contained in:
Haojun Liao 2024-05-31 14:32:55 +08:00
parent bef1953b24
commit 76b43dc072
10 changed files with 129 additions and 77 deletions

View File

@ -187,6 +187,7 @@ typedef struct SCheckpointTriggerRsp {
int32_t upstreamTaskId;
int32_t taskId;
int32_t transId;
int32_t rspCode;
} SCheckpointTriggerRsp;
typedef struct {

View File

@ -272,7 +272,6 @@ typedef struct SCheckpointInfo {
int64_t processedVer;
int64_t nextProcessVer; // current offset in WAL, not serialize it
int32_t numOfNotReady;
SActiveCheckpointInfo* pActiveInfo;
int64_t msgVer;
} SCheckpointInfo;
@ -678,7 +677,7 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeI
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo);
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pInfo, int32_t code);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue);

View File

@ -910,6 +910,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102)
#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103)
#define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104)
#define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105)
// TDLite
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)

View File

@ -1241,6 +1241,15 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg->pCont;
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
(int32_t)pReq->downstreamTaskId);
return TSDB_CODE_STREAM_NOT_LEADER;
}
return tqStreamTaskProcessRetrieveTriggerReq(pTq->pStreamMeta, pMsg);
}

View File

@ -877,6 +877,16 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
pReq->upstreamTaskId, (int32_t)pReq->downstreamTaskId, pReq->checkpointId);
if (pTask->status.downstreamReady != 1) {
tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
pTask->id.idStr, (int32_t)pReq->downstreamTaskId);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) { // recv the checkpoint-source/trigger already
int32_t transId = 0;
@ -889,8 +899,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
// re-send the lost checkpoint-trigger msg to downstream task
tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
(int32_t)pReq->downstreamTaskId, checkpointId, transId);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info);
return TSDB_CODE_SUCCESS;
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_SUCCESS);
} else { // not send checkpoint-trigger yet, wait
int32_t recv = 0, total = 0;
streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
@ -903,7 +912,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
"sending checkpoint-source/trigger",
pTask->id.idStr, recv, total);
}
return TSDB_CODE_ACTION_IN_PROGRESS;
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
}
} else { // upstream not recv the checkpoint-source/trigger till now
ASSERT(pState->state == TASK_STATUS__READY || pState->state == TASK_STATUS__HALT);
@ -911,8 +920,11 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
"s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
"upstream sending checkpoint-source/trigger",
pTask->id.idStr);
return TSDB_CODE_ACTION_IN_PROGRESS;
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
}
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {

View File

@ -61,6 +61,7 @@ struct SActiveCheckpointInfo {
bool dispatchTrigger;
SArray* pDispatchTriggerList; // SArray<STaskTriggerSendInfo>
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
int8_t allUpstreamTriggerRecv;
int32_t checkCounter;
tmr_h pCheckTmr;

View File

@ -34,11 +34,11 @@ static int32_t streamTaskUploadCheckpoint(const char* id, const char* path);
static int32_t deleteCheckpoint(const char* id);
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType);
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId);
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
static void checkpointTriggerMonitorFn(void* param, void* tmrId);
static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType);
static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId);
bool streamTaskIsAllUpstreamSendTrigger(SStreamTask* pTask) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
@ -58,7 +58,8 @@ bool streamTaskIsAllUpstreamSendTrigger(SStreamTask* pTask) {
return allSend;
}
SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType) {
SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId) {
SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -75,8 +76,8 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint
}
pBlock->info.type = STREAM_CHECKPOINT;
pBlock->info.version = pTask->chkInfo.pActiveInfo->activeId;
pBlock->info.window.ekey = pBlock->info.window.skey = pTask->chkInfo.pActiveInfo->transId; // NOTE: set the transId
pBlock->info.version = checkpointId;
pBlock->info.window.ekey = pBlock->info.window.skey = transId; // NOTE: set the transId
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
@ -89,10 +90,10 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint
return pChkpoint;
}
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
SStreamDataBlock* pChkpoint = createChkptTriggerBlock(pTask, checkpointType);
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId) {
SStreamDataBlock* pCheckpoint = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pChkpoint) < 0) {
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -116,39 +117,37 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// and this is the last item in the inputQ.
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId);
}
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) {
ASSERT(pTask->info.taskLevel != TASK_LEVEL__SOURCE);
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
if (pInfo->transId != pRsp->transId || pInfo->activeId != pRsp->checkpointId) {
// todo handle error
return -1;
if (pRsp->rspCode != TSDB_CODE_SUCCESS) {
stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", pTask->id.idStr,
pRsp->upstreamTaskId, tstrerror(pRsp->rspCode));
return TSDB_CODE_SUCCESS;
}
taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__CK) {
// todo handle error
taosThreadMutexUnlock(&pTask->lock);
return -1;
}
taosThreadMutexUnlock(&pTask->lock);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo) {
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo, int32_t code) {
SCheckpointTriggerRsp* pRsp = rpcMallocCont(sizeof(SCheckpointTriggerRsp));
pRsp->streamId = pTask->id.streamId;
pRsp->upstreamTaskId = pTask->id.taskId;
pRsp->taskId = dstTaskId;
pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
pRsp->transId = pTask->chkInfo.pActiveInfo->transId;
if (code == TSDB_CODE_SUCCESS) {
pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
pRsp->transId = pTask->chkInfo.pActiveInfo->transId;
} else {
pRsp->checkpointId = -1;
pRsp->transId = -1;
}
pRsp->rspCode = code;
SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = sizeof(SCheckpointTriggerRsp), .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
@ -178,15 +177,64 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
const char* id = pTask->id.idStr;
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pTask->pMeta->vgId;
int32_t taskLevel = pTask->info.taskLevel;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
taosThreadMutexLock(&pTask->lock);
if (pTask->chkInfo.checkpointId >= checkpointId) {
stError("s-task:%s vgId:%d current checkpointId:%" PRId64
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_SUCCESS;
}
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
if (pActiveInfo->activeId != checkpointId) {
stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
" discard",
id, vgId, pActiveInfo->activeId, checkpointId);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_SUCCESS;
} else { // checkpointId == pActiveInfo->activeId
if (pActiveInfo->allUpstreamTriggerRecv == 1) {
stDebug(
"s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
"checkpointId:%" PRId64 " transId:%d",
id, vgId, checkpointId, transId);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_SUCCESS;
}
if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
// check if already recv or not, and duplicated checkpoint-trigger msg recv, discard it
for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
SStreamChkptReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
if (p->upStreamTaskId == pBlock->srcTaskId) {
ASSERT(p->checkpointId == checkpointId);
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
", prev recvTs:%" PRId64 " discard",
pTask->id.idStr, p->upStreamTaskId, p->nodeId, p->checkpointId, p->recvTs);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_SUCCESS;
}
}
}
}
}
taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64
", transId:%d current checkpointingId:%" PRId64,
", transId:%d current active checkpointId:%" PRId64,
id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId);
// set task status
if (streamTaskGetStatus(pTask)->state != TASK_STATUS__CK) {
pTask->chkInfo.pActiveInfo->activeId = checkpointId;
pTask->chkInfo.pActiveInfo->transId = transId;
pActiveInfo->activeId = checkpointId;
pActiveInfo->transId = transId;
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
if (code != TSDB_CODE_SUCCESS) {
@ -197,20 +245,18 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
SActiveCheckpointInfo* pActive = pTask->chkInfo.pActiveInfo;
streamMetaAcquireOneTask(pTask);
if (pActive->pCheckTmr == NULL) {
pActive->pCheckTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer);
if (pActiveInfo->pCheckTmr == NULL) {
pActiveInfo->pCheckTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer);
} else {
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActive->pCheckTmr);
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pCheckTmr);
}
}
// todo fix race condition: set the status and append checkpoint block
int32_t taskLevel = pTask->info.taskLevel;
if (taskLevel == TASK_LEVEL__SOURCE) {
int8_t type = pTask->outputInfo.type;
pActiveInfo->allUpstreamTriggerRecv = 1;
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
@ -231,8 +277,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
// there are still some upstream tasks not send checkpoint request, do nothing and wait for then
bool allSend = streamTaskIsAllUpstreamSendTrigger(pTask);
if (!allSend) {
if (pActiveInfo->allUpstreamTriggerRecv != 1) {
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
@ -272,7 +317,8 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
if (notReady == 0) {
stDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for current task",
pTask->id.idStr);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT);
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pInfo->activeId, pInfo->transId);
} else {
int32_t total = streamTaskGetNumOfDownstream(pTask);
stDebug("s-task:%s %d/%d downstream tasks are not ready, wait", pTask->id.idStr, notReady, total);

View File

@ -559,7 +559,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
}
if (pTask->chkInfo.pActiveInfo->dispatchTrigger) {
stDebug("s-task:%s already send checkpoint trigger, not dispatch anymore", id);
stDebug("s-task:%s already send checkpoint-trigger, no longer dispatch any other data", id);
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
return 0;
}
@ -874,29 +874,18 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
req.upstreamNodeId);
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
taosThreadMutexLock(&pActiveInfo->lock);
taosArrayPush(pActiveInfo->pReadyMsgList, &info);
bool recved = false;
int32_t size = taosArrayGetSize(pActiveInfo->pReadyMsgList);
for (int32_t i = 0; i < size; ++i) {
SStreamChkptReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
if (p->nodeId == req.upstreamNodeId) {
if (p->checkpointId == req.checkpointId) {
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 ", ignore",
pTask->id.idStr, p->upStreamTaskId, p->nodeId, p->checkpointId);
} else {
stError("s-task:%s checkpointId:%" PRId64 " not completed, new checkpointId:%" PRId64 " recv",
pTask->id.idStr, p->checkpointId, checkpointId);
ASSERT(0); // failed to handle it
}
recved = true;
break;
}
}
if (!recved) {
taosArrayPush(pActiveInfo->pReadyMsgList, &info);
int32_t numOfRecv = taosArrayGetSize(pActiveInfo->pReadyMsgList);
int32_t total = streamTaskGetNumOfUpstream(pTask);
if (numOfRecv == total) {
stDebug("s-task:%s recv checkpoint-trigger from all upstream, continue", pTask->id.idStr);
pActiveInfo->allUpstreamTriggerRecv = 1;
} else {
ASSERT(numOfRecv <= total);
stDebug("s-task:%s %d/%d checkpoint-trigger recv", pTask->id.idStr, numOfRecv, total);
}
taosThreadMutexUnlock(&pActiveInfo->lock);
@ -1175,7 +1164,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
// This task has received the checkpoint req from the upstream task, from which all the messages should be
// blocked. Note that there is no race condition here.
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId);
} else if (pReq->type == STREAM_INPUT__TRANS_STATE) {
@ -1187,11 +1175,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
}
}
// disable the data from upstream tasks
// if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) {
// status = TASK_INPUT_STATUS__BLOCKED;
// }
{
// do send response with the input status
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);

View File

@ -597,10 +597,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
// dispatch checkpoint msg to all downstream tasks
int32_t type = pInput->type;
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
if (pTask->pMeta->vgId == 2) {
// taosSsleep(20);
}
streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput);
continue;
}

View File

@ -626,6 +626,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) {
pInfo->dataAllowed = false;
int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
}
}
@ -633,6 +634,8 @@ void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) {
pInfo->dataAllowed = true;
int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
ASSERT(t >= 0);
}
}
@ -1006,6 +1009,7 @@ void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
pInfo->activeId = 0; // clear the checkpoint id
pInfo->failedId = 0;
pInfo->transId = 0;
pInfo->allUpstreamTriggerRecv = 0;
pInfo->dispatchTrigger = false;
taosArrayClear(pInfo->pReadyMsgList);