diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 96a614f6a4..59075c47b2 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -799,10 +799,15 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // send msg to retrieve checkpoint trigger msg SArray* pList = pTask->upstreamInfo.pList; ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE); + SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); if (pNotSendList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); + stError("s-task:%s quit tmr function due to out of memory", id); + taosThreadMutexUnlock(&pActiveInfo->lock); + + stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); + taosTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle); return; } @@ -967,18 +972,14 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { taosThreadMutexUnlock(&pInfo->lock); } -int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { - SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; - +int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) { int32_t num = 0; - taosThreadMutexLock(&pInfo->lock); for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); if (p->recved) { num++; } } - taosThreadMutexUnlock(&pInfo->lock); return num; } @@ -1000,9 +1001,9 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { } } + int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pInfo); taosThreadMutexUnlock(&pInfo->lock); - int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask); int32_t total = streamTaskGetNumOfDownstream(pTask); stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d", pTask->id.idStr, taskId, vgId, numOfConfirmed, total); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 006e55374e..6fec79eb04 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -501,7 +501,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s not in dispatch procedure, abort from timer, ref:%d", pTask->id.idStr, ref); - pTask->msgInfo.inMonitor = 0; + pTask->msgInfo.inMonitor = 0; // set not in dispatch monitor taosThreadMutexUnlock(&pMsgInfo->lock); return; } @@ -1211,44 +1211,51 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId return 0; } -static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, int32_t* pNotRsp, const char* id) { +static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, int32_t* pNotRsp, + int32_t* pFailed, const char* id) { int32_t numOfRsp = 0; - bool alreadySet = false; - bool updated = false; - bool allRsp = false; - *pNotRsp = 0; + int32_t numOfFailed = 0; - taosThreadMutexLock(&pMsgInfo->lock); + bool allRsp = false; int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo); - for(int32_t i = 0; i < numOfDispatchBranch; ++i) { + *pNotRsp = 0; + *pFailed = 0; + + for (int32_t i = 0; i < numOfDispatchBranch; ++i) { SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i); if (pEntry->rspTs != -1) { numOfRsp += 1; + } else { + if (pEntry->status != TSDB_CODE_SUCCESS || isDispatchRspTimeout(pEntry, now)) { + numOfFailed += 1; + } } } for (int32_t j = 0; j < numOfDispatchBranch; ++j) { SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j); if (pEntry->nodeId == vgId) { - ASSERT(!alreadySet); - pEntry->rspTs = now; - pEntry->status = code; - alreadySet = true; - updated = true; - numOfRsp += 1; + if (pEntry->rspTs != -1) { + stDebug("s-task:%s dispatch rsp has already recved at:%" PRId64 ", ignore this rsp, msgId:%d", id, + pEntry->rspTs, pMsgInfo->msgId); + allRsp = false; + } else { + pEntry->rspTs = now; + pEntry->status = code; + numOfRsp += 1; + allRsp = (numOfRsp == numOfDispatchBranch); - stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d, total recv:%d/%d", id, now, code, j, - numOfRsp, numOfDispatchBranch); + stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d, total recv:%d/%d", id, now, code, j, + numOfRsp, numOfDispatchBranch); + } + break; } } + *pFailed = numOfFailed; *pNotRsp = numOfDispatchBranch - numOfRsp; - allRsp = (numOfRsp == numOfDispatchBranch); - taosThreadMutexUnlock(&pMsgInfo->lock); - - ASSERT(updated); return allRsp; } @@ -1277,15 +1284,23 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int64_t now = taosGetTimestampMs(); bool allRsp = false; int32_t notRsp = 0; + int32_t numOfFailed = 0; + bool triggerDispatchRsp = false; + + // we only set the dispatch msg info for current checkpoint trans + taosThreadMutexLock(&pTask->lock); + triggerDispatchRsp = (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) && + (pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId); + taosThreadMutexUnlock(&pTask->lock); taosThreadMutexLock(&pMsgInfo->lock); - int32_t msgId = pMsgInfo->msgId; - taosThreadMutexUnlock(&pMsgInfo->lock); + int32_t msgId = pMsgInfo->msgId; // follower not handle the dispatch rsp if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId); + taosThreadMutexUnlock(&pMsgInfo->lock); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } @@ -1294,6 +1309,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i stError("s-task:%s vgId:%d not expect rsp, expected: msgId:%d, stage:%" PRId64 " actual msgId:%d, stage:%" PRId64 " discard it", id, vgId, msgId, pTask->pMeta->stage, pRsp->msgId, pRsp->stage); + taosThreadMutexUnlock(&pMsgInfo->lock); return TSDB_CODE_INVALID_MSG; } @@ -1305,18 +1321,18 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId); - allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, &numOfFailed, id); } else { stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code)); - allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, ¬Rsp, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, ¬Rsp, &numOfFailed, id); } } else { // code == 0 if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream - allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, ¬Rsp, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, ¬Rsp, &numOfFailed, id); stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } else { @@ -1328,15 +1344,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } - allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, &numOfFailed, id); { bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); if (delayDispatch) { - taosThreadMutexLock(&pTask->lock); // we only set the dispatch msg info for current checkpoint trans - if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && - pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) { + if (triggerDispatchRsp) { ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId); stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed", pTask->id.idStr, pRsp->downstreamTaskId, pMsgInfo->checkpointId, pMsgInfo->transId); @@ -1347,12 +1361,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i " transId:%d discard, since expired", pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId); } - taosThreadMutexUnlock(&pTask->lock); } } } } + taosThreadMutexUnlock(&pMsgInfo->lock); + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (!allRsp) { stDebug( @@ -1371,29 +1386,25 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } // all msg rsp already, continue - if (allRsp) { - ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); + // we need to re-try send dispatch msg to downstream tasks + if (allRsp && (numOfFailed == 0)) { + // trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state + if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) { + stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", id, + msgId); + ASSERT(pTask->info.fillHistory == 1); - // we need to re-try send dispatch msg to downstream tasks - int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now); - if (numOfFailed == 0) { // this message has been sent successfully, let's try next one. - // trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state - if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) { - stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", - id, msgId); - ASSERT(pTask->info.fillHistory == 1); - - code = streamTransferStatePrepare(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens - } - - clearBufferedDispatchMsg(pTask); - - // now ready for next data output - atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); - } else { - handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + code = streamTransferStatePrepare(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens } + + clearBufferedDispatchMsg(pTask); + + // now ready for next data output + atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); + } else { + // this message has been sent successfully, let's try next one. + handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } }