diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 624257d4c6..40da0f0320 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -171,7 +171,11 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM SStreamTask* pTask = *ppTask; const char* idstr = pTask->id.idStr; - if ((pMeta->updateInfo.transId != req.transId) && (pMeta->updateInfo.transId != -1)) { + if (pMeta->updateInfo.transId == -1) { // info needs to be kept till the new trans to update the nodeEp arrived. + streamMetaInitUpdateTaskList(pMeta, req.transId); + } + + if (pMeta->updateInfo.transId != req.transId) { if (req.transId < pMeta->updateInfo.transId) { tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId, pMeta->updateInfo.transId, req.transId); @@ -937,7 +941,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) } int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - SCheckpointTriggerRsp* pRsp = pMsg->pCont; + SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId); if (pTask == NULL) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 52265963bb..78bd5c4511 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -25,15 +25,12 @@ static int32_t deleteCheckpoint(const char* id); static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, - int32_t transId); + int32_t transId, int32_t srcTaskId); static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList); static void checkpointTriggerMonitorFn(void* param, void* tmrId); -static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, - int32_t transId); - SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, - int32_t transId) { + int32_t transId, int32_t srcTaskId) { SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pChkpoint == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -41,6 +38,10 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint } pChkpoint->type = checkpointType; + if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) { + pChkpoint->srcTaskId = srcTaskId; + ASSERT(srcTaskId != 0); + } SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (pBlock == NULL) { @@ -64,8 +65,9 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint return pChkpoint; } -int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId) { - SStreamDataBlock* pCheckpoint = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId); +int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId, + int32_t srcTaskId) { + SStreamDataBlock* pCheckpoint = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId, srcTaskId); if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) { return TSDB_CODE_OUT_OF_MEMORY; @@ -90,7 +92,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // and this is the last item in the inputQ. - return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId); + return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId, -1); } int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) { @@ -102,15 +104,16 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri return TSDB_CODE_SUCCESS; } - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId); + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId, + pRsp->upstreamTaskId); return TSDB_CODE_SUCCESS; } int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, SRpcHandleInfo* pRpcInfo, int32_t code) { int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp); + void* pBuf = rpcMallocCont(size); - void* pBuf = rpcMallocCont(size); SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); ((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId); @@ -118,6 +121,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId pRsp->streamId = pTask->id.streamId; pRsp->upstreamTaskId = pTask->id.taskId; pRsp->taskId = dstTaskId; + pRsp->rspCode = code; if (code == TSDB_CODE_SUCCESS) { pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId; @@ -127,9 +131,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId pRsp->transId = -1; } - pRsp->rspCode = code; - - SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = size, .info = *pRpcInfo}; + SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); return 0; } @@ -267,7 +269,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointTriggerBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId); + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, -1); streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { @@ -364,9 +366,8 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId taosThreadMutexUnlock(&pInfo->lock); if (notReady == 0) { - stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task", - id); - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId); + stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id); + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1); } return 0; @@ -707,11 +708,10 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { } } else { // clear the checkpoint info if failed taosThreadMutexLock(&pTask->lock); - streamTaskClearCheckInfo(pTask, false); + streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info taosThreadMutexUnlock(&pTask->lock); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); - streamTaskSetFailedCheckpointId(pTask); stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5595085197..83e73e8c88 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -40,6 +40,20 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { pMsg->contLen = contLen; } +static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) { + pInfo->startTs = taosGetTimestampMs(); + pInfo->rspTs = -1; + pInfo->msgId = msgId; +} + +static void clearDispatchInfo(SDispatchMsgInfo* pInfo) { + pInfo->startTs = -1; + pInfo->msgId = -1; + pInfo->rspTs = -1; +} + +static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; } + static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, int64_t dstTaskId, int32_t type) { pReq->streamId = pTask->id.streamId; @@ -225,12 +239,15 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) { destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask)); } + taosThreadMutexLock(&pMsgInfo->lock); + pMsgInfo->checkpointId = -1; pMsgInfo->transId = -1; pMsgInfo->pData = NULL; pMsgInfo->dispatchMsgType = 0; - taosThreadMutexLock(&pMsgInfo->lock); + clearDispatchInfo(pMsgInfo); + taosArrayClear(pTask->msgInfo.pSendInfo); taosThreadMutexUnlock(&pMsgInfo->lock); } @@ -416,6 +433,7 @@ static void setResendInfo(SDispatchEntry* pEntry, int64_t now) { pEntry->sendTs = now; pEntry->rspTs = -1; pEntry->retryCount += 1; + pEntry->status = TSDB_CODE_SUCCESS; } static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock) { @@ -642,20 +660,6 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S return 0; } -static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) { - pInfo->startTs = taosGetTimestampMs(); - pInfo->rspTs = -1; - pInfo->msgId = msgId; -} - -static void clearDispatchInfo(SDispatchMsgInfo* pInfo) { - pInfo->startTs = -1; - pInfo->msgId = -1; - pInfo->rspTs = -1; -} - -static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; } - int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); @@ -698,7 +702,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { type == STREAM_INPUT__TRANS_STATE); pTask->execInfo.dispatch += 1; + + taosThreadMutexLock(&pTask->msgInfo.lock); initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch); + taosThreadMutexUnlock(&pTask->msgInfo.lock); int32_t code = doBuildDispatchMsg(pTask, pBlock); if (code == 0) { @@ -1221,10 +1228,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; - int32_t msgId = pMsgInfo->msgId; int64_t now = taosGetTimestampMs(); int32_t totalRsp = 0; + taosThreadMutexLock(&pMsgInfo->lock); + int32_t msgId = pMsgInfo->msgId; + taosThreadMutexUnlock(&pMsgInfo->lock); + // follower not handle the dispatch rsp if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 08e7c97150..a7f73d1b52 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1053,15 +1053,19 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); } - if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) { - entry.checkpointInfo.failed = - ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0; - entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId; - entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId; + SActiveCheckpointInfo* p = (*pTask)->chkInfo.pActiveInfo; + if (p->activeId != 0) { + entry.checkpointInfo.failed = (p->failedId >= p->activeId) ? 1 : 0; + entry.checkpointInfo.activeId = p->activeId; + entry.checkpointInfo.activeTransId = p->transId; if (entry.checkpointInfo.failed) { - stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, - (*pTask)->chkInfo.pActiveInfo->transId); + stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d, clear the active checkpointInfo", + (*pTask)->id.idStr, p->transId); + + taosThreadMutexLock(&(*pTask)->lock); + streamTaskClearCheckInfo((*pTask), true); + taosThreadMutexUnlock(&(*pTask)->lock); } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 8abac4ce85..ff020e88c9 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1039,10 +1039,10 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { pInfo->activeId = 0; // clear the checkpoint id - pInfo->failedId = 0; pInfo->transId = 0; pInfo->allUpstreamTriggerRecv = 0; pInfo->dispatchTrigger = false; + pInfo->failedId = 0; taosArrayClear(pInfo->pDispatchTriggerList); taosArrayClear(pInfo->pCheckpointReadyRecvList);