diff --git a/include/common/rsync.h b/include/common/rsync.h index f613a35f48..0840b51793 100644 --- a/include/common/rsync.h +++ b/include/common/rsync.h @@ -13,7 +13,7 @@ extern "C" { void stopRsync(); void startRsync(); -int32_t uploadRsync(const char* id, const char* path); +int32_t uploadByRsync(const char* id, const char* path); int32_t downloadRsync(const char* id, const char* path); int32_t deleteRsync(const char* id); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 2d07b56e4c..d1ac2c79c3 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -321,6 +321,7 @@ TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_CREATE, "stream-create", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 0cde499a6b..6d52b10182 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -36,6 +36,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta); int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); diff --git a/include/libs/stream/streammsg.h b/include/libs/stream/streammsg.h index 5436442284..87c756b10c 100644 --- a/include/libs/stream/streammsg.h +++ b/include/libs/stream/streammsg.h @@ -22,17 +22,17 @@ extern "C" { #endif -typedef struct SStreamChildEpInfo { +typedef struct SStreamUpstreamEpInfo { int32_t nodeId; int32_t childId; int32_t taskId; SEpSet epSet; bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer -} SStreamChildEpInfo; +} SStreamUpstreamEpInfo; -int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); -int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); +int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo); +int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo); // mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished typedef struct { @@ -171,6 +171,16 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp); void tCleanupStreamHbMsg(SStreamHbMsg* pMsg); +typedef struct SRetrieveChkptTriggerReq { + SMsgHead head; + int64_t streamId; + int64_t checkpointId; + int32_t upstreamNodeId; + int32_t upstreamTaskId; + int32_t downstreamNodeId; + int64_t downstreamTaskId; +} SRetrieveChkptTriggerReq; + typedef struct { SMsgHead head; int64_t streamId; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ea0f1824b3..185ab7ad51 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -61,10 +61,11 @@ extern "C" { // the load and start stream task should be executed after snode has started successfully, since the load of stream // tasks may incur the download of checkpoint data from remote, which may consume significant network and CPU resources. -typedef struct SStreamTask SStreamTask; -typedef struct SStreamQueue SStreamQueue; -typedef struct SStreamTaskSM SStreamTaskSM; -typedef struct SStreamQueueItem SStreamQueueItem; +typedef struct SStreamTask SStreamTask; +typedef struct SStreamQueue SStreamQueue; +typedef struct SStreamTaskSM SStreamTaskSM; +typedef struct SStreamQueueItem SStreamQueueItem; +typedef struct SActiveCheckpointInfo SActiveCheckpointInfo; #define SSTREAM_TASK_VER 4 #define SSTREAM_TASK_INCOMPATIBLE_VER 1 @@ -270,13 +271,10 @@ typedef struct SCheckpointInfo { int64_t checkpointTime; // latest checkpoint time int64_t processedVer; int64_t nextProcessVer; // current offset in WAL, not serialize it - int64_t failedId; // record the latest failed checkpoint id - int64_t checkpointingId; - int32_t downstreamAlignNum; int32_t numOfNotReady; - bool dispatchCheckpointTrigger; + + SActiveCheckpointInfo* pActiveInfo; int64_t msgVer; - int32_t transId; } SCheckpointInfo; typedef struct SStreamStatus { @@ -436,7 +434,6 @@ struct SStreamTask { SHistoryTaskInfo hTaskInfo; STaskId streamTaskId; STaskExecStatisInfo execInfo; - SArray* pReadyMsgList; // SArray TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ SMsgCb* pMsgCb; // msg handle SStreamState* pState; // state backend @@ -619,7 +616,8 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); -SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); +SStreamUpstreamEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); +SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId); void streamTaskInputFail(SStreamTask* pTask); @@ -672,6 +670,17 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); +// checkpoint related +int32_t streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId); +int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId); +int32_t streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId); +bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId); +void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal); +void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask); +void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId); +int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t checkpointType, int32_t dstTaskId, int32_t vgId, + SEpSet* pEpset); + int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common @@ -682,6 +691,7 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask); int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); +void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key); diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index c4d14a6c2c..1b03b142e0 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -153,7 +153,7 @@ void startRsync() { uDebug("[rsync] start server successful"); } -int32_t uploadRsync(const char* id, const char* path) { +int32_t uploadByRsync(const char* id, const char* path) { int64_t st = taosGetTimestampMs(); char command[PATH_MAX] = {0}; @@ -196,6 +196,7 @@ int32_t uploadRsync(const char* id, const char* path) { return code; } +// abort from retry if quit int32_t downloadRsync(const char* id, const char* path) { int64_t st = taosGetTimestampMs(); int32_t MAX_RETRY = 60; @@ -220,11 +221,10 @@ int32_t downloadRsync(const char* id, const char* path) { uDebug("[rsync] %s start to sync data from remote to:%s, %s", id, path, command); while(times++ < MAX_RETRY) { - code = execCommand(command); if (code != TSDB_CODE_SUCCESS) { - uError("[rsync] %s download checkpoint data:%s failed, retry after 1sec, code:%d," ERRNO_ERR_FORMAT, id, path, code, - ERRNO_ERR_DATA); + uError("[rsync] %s download checkpoint data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT, id, + path, times, code, ERRNO_ERR_DATA); taosSsleep(1); } else { int32_t el = taosGetTimestampMs() - st; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index b3c8ef4017..9b07b6a3d8 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -89,6 +89,8 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d9175bf5fe..bfc9e92293 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -961,6 +961,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 9439f7f179..f3bdc98994 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -257,6 +257,7 @@ int tqScanWalAsync(STQ* pTq, bool ckPause); int32_t tqStopStreamTasksAsync(STQ* pTq); int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskRetrieveTriggerMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 5441d0c4c1..b8d0e30d30 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1285,7 +1285,8 @@ _checkpoint: if (pItem && pItem->pStreamTask) { SStreamTask *pTask = pItem->pStreamTask; // atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); - pTask->chkInfo.checkpointingId = checkpointId; + streamTaskSetActiveCheckpointInfo(pTask, checkpointId); + pTask->chkInfo.checkpointId = checkpointId; // 1pTask->checkpointingId; pTask->chkInfo.checkpointVer = pItem->submitReqVer; pTask->info.triggerParam = pItem->fetchResultVer; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a59a235c50..037c2a7b7a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -725,10 +725,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { } pTask->pBackend = NULL; -// code = tqExpandStreamTask(pTask, pTq->pStreamMeta); -// if (code != TSDB_CODE_SUCCESS) { -// return code; -// } // sink STaskOutputInfo* pOutputInfo = &pTask->outputInfo; @@ -1092,6 +1088,10 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t code = 0; +// if (pTq->pStreamMeta->vgId == 2) { +// ASSERT(0); +// } + // disable auto rsp to mnode pRsp->info.handle = NULL; @@ -1140,10 +1140,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) } if (pTask->status.downstreamReady != 1) { - pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id - pTask->chkInfo.checkpointingId = req.checkpointId; - pTask->chkInfo.transId = req.transId; - + streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId); // record the latest failed checkpoint id tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64 ", transId:%d set it failed", pTask->id.idStr, req.checkpointId, req.transId); @@ -1182,9 +1179,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // check if the checkpoint msg already sent or not. if (status == TASK_STATUS__CK) { + int64_t checkpointId = 0; + streamTaskGetActiveCheckpointInfo(pTask, NULL, &checkpointId); + tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 " transId:%d already handled, ignore msg and continue process checkpoint", - pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId); + pTask->id.idStr, checkpointId, req.transId); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); @@ -1244,6 +1244,10 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg); } +int32_t tqProcessTaskRetrieveTriggerMsg(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamTaskProcessRetrieveTriggerReq(pTq->pStreamMeta, pMsg); +} + // this function is needed, do not try to remove it. int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b5b5ef8755..b9c0589dc5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -699,9 +699,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } if (isLeader && !tsDisableStream) { - streamMetaResetTaskStatus(pMeta); streamMetaWUnLock(pMeta); - streamMetaStartAllTasks(pMeta, tqExpandStreamTask); } else { streamMetaResetStartInfo(&pMeta->startInfo); @@ -839,13 +837,19 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { // clear flag set during do checkpoint, and open inputQ for all upstream tasks SStreamTaskState *pState = streamTaskGetStatus(pTask); if (pState->state == TASK_STATUS__CK) { + int32_t tranId = 0; + int64_t activeChkId = 0; + streamTaskGetActiveCheckpointInfo(pTask, &tranId, &activeChkId); + tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d", - pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId); + pTask->id.idStr, activeChkId, tranId); + streamTaskSetStatusReady(pTask); } else if (pState->state == TASK_STATUS__UNINIT) { tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); ASSERT(pTask->status.downstreamReady == 0); - /*int32_t ret = */ streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); +// /*int32_t ret = */ streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); } else { tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name); } @@ -856,6 +860,57 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { return TSDB_CODE_SUCCESS; } +int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { + SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg; + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId); + if (pTask == NULL) { + tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64 + " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already", + pMeta->vgId, pReq->checkpointId, (int32_t)pReq->downstreamTaskId, pReq->upstreamTaskId); + return TSDB_CODE_SUCCESS; + } + + tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64, + pReq->upstreamTaskId, (int32_t)pReq->downstreamTaskId, pReq->checkpointId); + + SStreamTaskState* pState = streamTaskGetStatus(pTask); + if (pState->state == TASK_STATUS__CK) { // recv the checkpoint-source/trigger already + int32_t transId = 0; + int64_t checkpointId = 0; + + streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId); + ASSERT (checkpointId == pReq->checkpointId); + + if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) { + // re-send the lost checkpoint-trigger msg to downstream task + SEpSet* pEpset = streamTaskGetDownstreamEpInfo(pTask, pReq->downstreamTaskId); + streamTaskSendCheckpointTriggerMsg(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->downstreamTaskId, + pReq->downstreamNodeId, pEpset); + } else { // not send checkpoint-trigger yet, wait + int32_t recv = 0, total = 0; + streamTaskGetTriggerRecvStatus(pTask, &recv, &total); + + if (recv == total) { // add the ts info + tqWarn("s-task:%s all upstream send checkpoint-source/trigger, but not processed yet, wait", pTask->id.idStr); + } else { + tqWarn( + "s-task:%s not all upstream send checkpoint-source/trigger, total recv:%d/%d, wait for all upstream " + "sending checkpoint-source/trigger", + pTask->id.idStr, recv, total); + } + } + } else { // upstream not recv the checkpoint-source/trigger till now + ASSERT(pState->state == TASK_STATUS__READY || pState->state == TASK_STATUS__HALT); + tqWarn( + "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all " + "upstream sending checkpoint-source/trigger", + pTask->id.idStr); + } + + return TSDB_CODE_SUCCESS; +} + int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) { SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg; diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index ea742108aa..fd1bb391b2 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -422,7 +422,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) } // STREAM ============ - vInfo("vgId:%d stream task start", vgId); + vInfo("vgId:%d stream task start to take snapshot", vgId); if (!pReader->streamTaskDone) { if (pReader->pStreamTaskReader == NULL) { code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f6b32c5543..02343206ad 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -842,12 +842,16 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskScanHistory(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY: return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); + case TDMT_STREAM_RETRIEVE_TRIGGER: + return tqProcessTaskRetrieveTriggerMsg(pVnode->pTq, pMsg); case TDMT_MND_STREAM_HEARTBEAT_RSP: return tqProcessStreamHbRsp(pVnode->pTq, pMsg); case TDMT_MND_STREAM_REQ_CHKPT_RSP: return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg); + case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: + return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); case TDMT_VND_GET_STREAM_PROGRESS: return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg); default: diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 10bdccdb29..68c3ab1a6b 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -52,6 +52,18 @@ extern "C" { #define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, stDebugFlag, __VA_ARGS__); }} while(0) // clang-format on +struct SActiveCheckpointInfo { + TdThreadMutex lock; + int32_t transId; + int64_t firstRecvTs; // first time to recv checkpoint trigger info + int64_t activeId; // current active checkpoint id + int64_t failedId; + bool dispatchTrigger; + SArray* pDispatchTriggerList; // SArray + SArray* pReadyMsgList; // SArray + tmr_h pCheckTmr; +}; + typedef struct { int8_t type; SSDataBlock* pBlock; @@ -81,6 +93,24 @@ struct STokenBucket { int64_t quotaFillTimestamp; // fill timestamp }; +typedef struct { + int32_t upStreamTaskId; + SEpSet upstreamNodeEpset; + int32_t nodeId; + SRpcMsg msg; + int64_t recvTs; + int32_t transId; + int64_t checkpointId; +} SStreamChkptReadyInfo; + +typedef struct { + int64_t sendTs; + int64_t recvTs; + bool recved; + int32_t nodeId; + int32_t taskId; +} STaskTriggerSendInfo; + struct SStreamQueue { STaosQueue* pQueue; STaosQall* qall; @@ -113,6 +143,9 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups); int32_t getNumOfDispatchBranch(SStreamTask* pTask); void clearBufferedDispatchMsg(SStreamTask* pTask); +int32_t streamTaskBuildAndSendTriggerMsg(SStreamTask* pTask, const SStreamDataBlock* pData, int32_t dstTaskId, + int32_t vgId, SEpSet* pEpset); + int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, @@ -131,11 +164,13 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); void streamTaskSetFailedCheckpointId(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); +int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); STaskId streamTaskGetTaskId(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask); +void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo); void streamClearChkptReadyMsg(SStreamTask* pTask); EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 28db0a76c6..87fb615d5f 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -386,12 +386,12 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId return code; } -int32_t rebuildFromRemoteCheckpoint(const char* key, char* chkpPath, int64_t checkpointId, char* defaultPath) { +int32_t rebuildFromRemoteCheckpoint(const char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_S3) { - return rebuildFromRemoteChkp_s3(key, chkpPath, checkpointId, defaultPath); + return rebuildFromRemoteChkp_s3(key, chkptPath, checkpointId, defaultPath); } else if (type == DATA_UPLOAD_RSYNC) { - return rebuildFromRemoteChkp_rsync(key, chkpPath, checkpointId, defaultPath); + return rebuildFromRemoteChkp_rsync(key, chkptPath, checkpointId, defaultPath); } else { stError("%s no remote backup checkpoint data for:%" PRId64, key, checkpointId); } @@ -538,7 +538,9 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId taosMulMkDir(defaultPath); } - char* checkpointRoot = taosMemoryCalloc(1, strlen(path) + 256); + int32_t pathLen = strlen(path) + 256; + + char* checkpointRoot = taosMemoryCalloc(1, pathLen); sprintf(checkpointRoot, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints"); if (!taosIsDir(checkpointRoot)) { @@ -548,9 +550,9 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId); - char* chkptPath = taosMemoryCalloc(1, strlen(path) + 256); + char* chkptPath = taosMemoryCalloc(1, pathLen); if (chkptId > 0) { - sprintf(chkptPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkptId); + snprintf(chkptPath, pathLen, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkptId); code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath); if (code != 0) { diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 4a8ca69ba5..f083ff8a61 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -40,7 +40,7 @@ static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage) { - SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); + SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); ASSERT(pInfo != NULL); *oldStage = pInfo->stage; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d09e5bf477..43b39b8574 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -20,9 +20,9 @@ typedef struct { ECHECKPOINT_BACKUP_TYPE type; - char* taskId; - int64_t chkpId; + char* taskId; + int64_t chkpId; SStreamTask* pTask; int64_t dbRefId; void* pMeta; @@ -35,22 +35,33 @@ static int32_t deleteCheckpoint(const char* id); static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType); -static int32_t streamAlignCheckpoint(SStreamTask* pTask); +static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList); +static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType); +static void checkpointTriggerMonitorFn(void* param, void* tmrId); -int32_t streamAlignCheckpoint(SStreamTask* pTask) { - int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); - int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num); - if (old == 0) { - stDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num); +bool streamTaskIsAllUpstreamSendTrigger(SStreamTask* pTask) { + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + int32_t numOfUpstreams = taosArrayGetSize(pTask->upstreamInfo.pList); + bool allSend = true; + + taosThreadMutexLock(&pActiveInfo->lock); + int32_t numOfRecv = taosArrayGetSize(pActiveInfo->pReadyMsgList); + + if (numOfRecv < numOfUpstreams) { + stDebug("s-task:%s received checkpoint-trigger block, idx:%d, %d upstream tasks not send yet, total:%d", + pTask->id.idStr, pTask->info.selfChildId, (numOfUpstreams - numOfRecv), numOfUpstreams); + allSend = false; } - return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1); + taosThreadMutexUnlock(&pActiveInfo->lock); + return allSend; } -int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) { +SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType) { SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pChkpoint == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; } pChkpoint->type = checkpointType; @@ -58,12 +69,13 @@ int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) { SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (pBlock == NULL) { taosFreeQitem(pChkpoint); - return TSDB_CODE_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; } pBlock->info.type = STREAM_CHECKPOINT; - pBlock->info.version = pTask->chkInfo.checkpointingId; - pBlock->info.window.ekey = pBlock->info.window.skey = pTask->chkInfo.transId; // NOTE: set the transId + pBlock->info.version = pTask->chkInfo.pActiveInfo->activeId; + pBlock->info.window.ekey = pBlock->info.window.skey = pTask->chkInfo.pActiveInfo->transId; // NOTE: set the transId pBlock->info.rows = 1; pBlock->info.childId = pTask->info.selfChildId; @@ -71,6 +83,14 @@ int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) { taosArrayPush(pChkpoint->blocks, pBlock); taosMemoryFree(pBlock); + terrno = 0; + + return pChkpoint; +} + +int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) { + SStreamDataBlock* pChkpoint = createChkptTriggerBlock(pTask, checkpointType); + if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pChkpoint) < 0) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -87,23 +107,43 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); ASSERT(code == TSDB_CODE_SUCCESS); - pTask->chkInfo.transId = pReq->transId; - pTask->chkInfo.checkpointingId = pReq->checkpointId; + pTask->chkInfo.pActiveInfo->transId = pReq->transId; + pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId; pTask->chkInfo.numOfNotReady = streamTaskGetNumOfDownstream(pTask); pTask->chkInfo.startTs = taosGetTimestampMs(); pTask->execInfo.checkpoint += 1; // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task + // and this is the last item in the inputQ. return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); } +int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t checkpointType, int32_t dstTaskId, int32_t vgId, + SEpSet* pEpset) { + SStreamDataBlock* pChkpoint = createChkptTriggerBlock(pTask, checkpointType); + + pChkpoint->srcTaskId = pTask->id.taskId; + pChkpoint->srcVgId = pTask->pMeta->vgId; + + int32_t code = streamTaskBuildAndSendTriggerMsg(pTask, pChkpoint, dstTaskId, vgId, pEpset); + if (code == TSDB_CODE_SUCCESS) { + stDebug("s-task:%s build and send checkpoint-trigger dispatch msg succ, stage:%" PRId64, pTask->id.idStr, + pTask->pMeta->stage); + } else { + // todo handle send data failure + stError("s-task:%s failed to build and send trigger msg", pTask->id.idStr); + } + + return code; +} + int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { pBlock->srcTaskId = pTask->id.taskId; pBlock->srcVgId = pTask->pMeta->vgId; int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock); if (code == 0) { - ASSERT(pTask->chkInfo.dispatchCheckpointTrigger == false); + ASSERT(pTask->chkInfo.pActiveInfo->dispatchTrigger == false); streamDispatchStreamBlock(pTask); } else { stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code)); @@ -127,8 +167,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock // set task status if (streamTaskGetStatus(pTask)->state != TASK_STATUS__CK) { - pTask->chkInfo.checkpointingId = checkpointId; - pTask->chkInfo.transId = transId; + pTask->chkInfo.pActiveInfo->activeId = checkpointId; + pTask->chkInfo.pActiveInfo->transId = transId; code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); if (code != TSDB_CODE_SUCCESS) { @@ -136,12 +176,20 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamFreeQitem((SStreamQueueItem*)pBlock); return code; } + + SActiveCheckpointInfo* pActive = pTask->chkInfo.pActiveInfo; + if (pActive->pCheckTmr == NULL) { + pActive->pCheckTmr = taosTmrStart(checkpointTriggerMonitorFn, 10000, pTask, streamTimer); + } else { + taosTmrReset(checkpointTriggerMonitorFn, 10000, pTask, streamTimer, &pActive->pCheckTmr); + } } // todo fix race condition: set the status and append checkpoint block int32_t taskLevel = pTask->info.taskLevel; if (taskLevel == TASK_LEVEL__SOURCE) { int8_t type = pTask->outputInfo.type; + if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointTriggerBlock(pBlock, pTask); @@ -161,23 +209,19 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); // there are still some upstream tasks not send checkpoint request, do nothing and wait for then - int32_t notReady = streamAlignCheckpoint(pTask); - int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); - if (notReady > 0) { - stDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d", - id, pTask->info.selfChildId, notReady, num); + bool allSend = streamTaskIsAllUpstreamSendTrigger(pTask); + if (!allSend) { streamFreeQitem((SStreamQueueItem*)pBlock); return code; } + int32_t num = streamTaskGetNumOfUpstream(pTask); if (taskLevel == TASK_LEVEL__SINK) { - stDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream", - id, num); + stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num); streamFreeQitem((SStreamQueueItem*)pBlock); streamTaskBuildCheckpoint(pTask); } else { // source & agg tasks need to forward the checkpoint msg downwards - stDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, continue forwards msg", id, - num); + stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num); // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task // can start local checkpoint procedure @@ -216,14 +260,10 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) { } void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { - pTask->chkInfo.checkpointingId = 0; // clear the checkpoint id - pTask->chkInfo.failedId = 0; pTask->chkInfo.startTs = 0; // clear the recorded start time pTask->chkInfo.numOfNotReady = 0; - pTask->chkInfo.transId = 0; - pTask->chkInfo.dispatchCheckpointTrigger = false; - pTask->chkInfo.downstreamAlignNum = 0; + streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo); streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks if (clearChkpReadyMsg) { streamClearChkptReadyMsg(pTask); @@ -317,9 +357,9 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin } void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { - pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; + pTask->chkInfo.pActiveInfo->failedId = pTask->chkInfo.pActiveInfo->activeId; stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, - pTask->chkInfo.checkpointingId, pTask->chkInfo.transId); + pTask->chkInfo.pActiveInfo->activeId, pTask->chkInfo.pActiveInfo->transId); } static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { @@ -363,7 +403,7 @@ int32_t uploadCheckpointData(void* param) { SAsyncUploadArg* pParam = param; char* path = NULL; int32_t code = 0; - SArray* toDelFiles = taosArrayInit(4, sizeof(void*)); + SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); char* taskStr = pParam->taskId ? pParam->taskId : "NULL"; void* pBackend = taskAcquireDb(pParam->dbRefId); @@ -387,10 +427,10 @@ int32_t uploadCheckpointData(void* param) { if (code == TSDB_CODE_SUCCESS) { code = streamTaskUploadCheckpoint(pParam->taskId, path); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s failed to upload checkpoint data:%s, checkpointId:%" PRId64, taskStr, path, pParam->chkpId); + if (code == TSDB_CODE_SUCCESS) { + stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", taskStr, pParam->chkpId); } else { - stDebug("s-task:%s backup checkpointId:%"PRId64" to remote succ", taskStr, pParam->chkpId); + stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", taskStr, pParam->chkpId, path); } } @@ -403,19 +443,25 @@ int32_t uploadCheckpointData(void* param) { for (int i = 0; i < size; i++) { char* pName = taosArrayGetP(toDelFiles, i); code = deleteCheckpointFile(pParam->taskId, pName); - stDebug("s-task:%s try to del file: %s", taskStr, pName); if (code != 0) { + stDebug("s-task:%s failed to del file: %s", taskStr, pName); break; } } + + stDebug("s-task:%s remove redundant files done", taskStr); } taosArrayDestroyP(toDelFiles, taosMemoryFree); - stDebug("s-task:%s remove local checkpoint dir:%s", taskStr, path); - taosRemoveDir(path); - taosMemoryFree(path); + if (code == TSDB_CODE_SUCCESS) { + stDebug("s-task:%s remove local checkpointId:%" PRId64 " data %s", taskStr, pParam->chkpId, path); + taosRemoveDir(path); + } else { + stDebug("s-task:%s update checkpointId:%" PRId64 " keep local checkpoint data", taskStr, pParam->chkpId); + } + taosMemoryFree(path); taosMemoryFree(pParam->taskId); taosMemoryFree(pParam); @@ -446,7 +492,7 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointI int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t code = TSDB_CODE_SUCCESS; int64_t startTs = pTask->chkInfo.startTs; - int64_t ckId = pTask->chkInfo.checkpointingId; + int64_t ckId = pTask->chkInfo.pActiveInfo->activeId; const char* id = pTask->id.idStr; bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); SStreamMeta* pMeta = pTask->pMeta; @@ -510,6 +556,211 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { return code; } +void checkpointTriggerMonitorFn(void* param, void* tmrId) { + SStreamTask* pTask = param; + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + + int32_t vgId = pTask->pMeta->vgId; + int64_t now = taosGetTimestampMs(); + stDebug("s-task:%s vgId:%d checkpoint-trigger monit start, ts:%" PRId64, pTask->id.idStr, vgId, now); + + taosThreadMutexLock(&pTask->lock); + SStreamTaskState* pState = streamTaskGetStatus(pTask); + if (pState->state == TASK_STATUS__CK) { + stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger", pTask->id.idStr, vgId); + taosThreadMutexUnlock(&pTask->lock); + return; + } + taosThreadMutexUnlock(&pTask->lock); + + taosThreadMutexLock(&pActiveInfo->lock); + + // send msg to retrieve checkpoint trigger msg + SArray* pList = pTask->upstreamInfo.pList; + ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE); + SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); + + for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i); + + bool recved = false; + for(int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) { + SStreamChkptReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j); + if (pInfo->nodeId == pReady->nodeId) { + recved = true; + break; + } + } + + if (!recved) { // make sure the inputQ is opened for not recv upstream checkpoint-trigger message + streamTaskOpenUpstreamInput(pTask, pInfo->taskId); + taosArrayPush(pNotSendList, pInfo); + } + } + + // do send retrieve checkpoint trigger msg to upstream + doSendRetrieveTriggerMsg(pTask, pNotSendList); + taosThreadMutexUnlock(&pActiveInfo->lock); + + // check every 100ms + if (taosArrayGetSize(pNotSendList) > 0) { + taosTmrReset(checkpointTriggerMonitorFn, 10000, pTask, streamTimer, &pActiveInfo->pCheckTmr); + } + + taosArrayDestroy(pNotSendList); +} + +int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { + int32_t code = 0; + int32_t vgId = pTask->pMeta->vgId; + const char* pId = pTask->id.idStr; + + for (int32_t i = 0; i < taosArrayGetSize(pNotSendList); i++) { + SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i); + + SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("vgId:%d failed to create msg to retrieve trigger msg for task:%s exec, code:out of memory", vgId, pId); + continue; + } + + pReq->head.vgId = htonl(pUpstreamTask->nodeId); + pReq->streamId = pTask->id.streamId; + pReq->downstreamTaskId = pTask->id.taskId; + pReq->downstreamNodeId = vgId; + pReq->upstreamTaskId = pUpstreamTask->taskId; + pReq->upstreamNodeId = pUpstreamTask->nodeId; + pReq->checkpointId = pTask->chkInfo.pActiveInfo->activeId; + + SRpcMsg rpcMsg = {0}; + initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE, pReq, sizeof(SRetrieveChkptTriggerReq)); + + code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg); + } + + return TSDB_CODE_SUCCESS; +} + +bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) { + SStreamTaskState* pStatus = streamTaskGetStatus(pTask); + if (pStatus->state != TASK_STATUS__CK) { + return false; + } + + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + + taosThreadMutexLock(&pInfo->lock); + if (!pInfo->dispatchTrigger) { + taosThreadMutexUnlock(&pInfo->lock); + return false; + } + + for(int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { + STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i); + if (pSendInfo->nodeId != downstreamNodeId) { + continue; + } + + // has send trigger msg to downstream node, + if (pSendInfo->recved) { + stWarn("s-task:%s checkpoint-trigger msg send at:%"PRId64" and recv confirmed, checkpointId:%"PRId64 ", transId:%d", + pTask->id.idStr, pSendInfo->sendTs, pInfo->activeId, pInfo->transId); + } else { + stWarn("s-task:%s checkpoint-trigger send at:%"PRId64", checkpointId:%"PRId64", transId:%d", pTask->id.idStr, + pSendInfo->sendTs, pInfo->activeId, pInfo->transId); + } + + taosThreadMutexUnlock(&pInfo->lock); + return true; + } + + ASSERT(0); + return false; +} + +void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal) { + *pRecved = taosArrayGetSize(pTask->chkInfo.pActiveInfo->pReadyMsgList); + + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + *pTotal = 1; + } else { + *pTotal = streamTaskGetNumOfUpstream(pTask); + } +} + +// record the dispatch checkpoint trigger info in the list +void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + + int64_t now = taosGetTimestampMs(); + taosThreadMutexLock(&pInfo->lock); + + // outputQ should be empty here + ASSERT(streamQueueGetNumOfItems(pTask->outputq.queue) == 1); + + pInfo->dispatchTrigger = true; + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; + + STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId}; + taosArrayPush(pInfo->pDispatchTriggerList, &p); + } else { + for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) { + SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i); + + STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId}; + taosArrayPush(pInfo->pDispatchTriggerList, &p); + } + } + + taosThreadMutexUnlock(&pInfo->lock); +} + +int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + + int32_t num = 0; + taosThreadMutexLock(&pInfo->lock); + for(int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { + STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); + if (p->recved) { + num ++; + } + } + taosThreadMutexUnlock(&pInfo->lock); + return num; +} + +void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + + int32_t taskId = 0; + + taosThreadMutexLock(&pInfo->lock); + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { + STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); + if (p->nodeId == vgId) { + ASSERT(p->recved == false); + + p->recved = true; + p->recvTs = taosGetTimestampMs(); + taskId = p->taskId; + break; + } + } + + taosThreadMutexUnlock(&pInfo->lock); + + int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask); + int32_t total = streamTaskGetNumOfDownstream(pTask); + stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d", + pTask->id.idStr, taskId, vgId, numOfConfirmed, total); + + ASSERT(taskId != 0); +} + static int32_t uploadCheckpointToS3(const char* id, const char* path) { TdDirPtr pDir = taosOpenDir(path); if (pDir == NULL) return -1; @@ -576,7 +827,7 @@ int32_t streamTaskUploadCheckpoint(const char* id, const char* path) { } if (strlen(tsSnodeAddress) != 0) { - return uploadRsync(id, path); + return uploadByRsync(id, path); } else if (tsS3StreamEnabled) { return uploadCheckpointToS3(id, path); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 3115c2cb43..c8a626a739 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -23,12 +23,6 @@ typedef struct SBlockName { char parTbName[TSDB_TABLE_NAME_LEN]; } SBlockName; -typedef struct { - int32_t upStreamTaskId; - SEpSet upstreamNodeEpset; - SRpcMsg msg; -} SStreamChkptReadyInfo; - static void doRetryDispatchData(void* param, void* tmrId); static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet); static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq); @@ -85,12 +79,14 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r void* buf = NULL; int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList); ASSERT(sz > 0); + for (int32_t i = 0; i < sz; i++) { req->reqId = tGenIdPI64(); - SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + SStreamUpstreamEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); req->dstNodeId = pEpInfo->nodeId; req->dstTaskId = pEpInfo->taskId; int32_t len; + tEncodeSize(tEncodeStreamRetrieveReq, req, len, code); if (code != 0) { ASSERT(0); @@ -115,7 +111,6 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r code = tmsgSendReq(&pEpInfo->epSet, &rpcMsg); if (code != 0) { - ASSERT(0); rpcFreeCont(buf); return code; } @@ -124,15 +119,16 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId); } + return code; } static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req){ - SRetrieveTableRsp* pRetrieve = NULL; int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); - - pRetrieve = taosMemoryCalloc(1, dataStrLen); - if (pRetrieve == NULL) return TSDB_CODE_OUT_OF_MEMORY; + SRetrieveTableRsp* pRetrieve = taosMemoryCalloc(1, dataStrLen); + if (pRetrieve == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); pRetrieve->useconds = 0; @@ -231,6 +227,28 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) { pMsgInfo->dispatchMsgType = 0; } +int32_t streamTaskBuildAndSendTriggerMsg(SStreamTask* pTask, const SStreamDataBlock* pData, int32_t dstTaskId, + int32_t vgId, SEpSet* pEpset) { + SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq)); + + int32_t numOfBlocks = taosArrayGetSize(pData->blocks); + int32_t code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, dstTaskId, pData->type); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + for (int32_t i = 0; i < numOfBlocks; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); + code = streamAddBlockIntoDispatchMsg(pDataBlock, pReq); + if (code != TSDB_CODE_SUCCESS) { + destroyDispatchMsg(pReq, 1); + return code; + } + } + + return doSendDispatchMsg(pTask, pReq, vgId, pEpset); +} + static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = 0; int32_t numOfBlocks = taosArrayGetSize(pData->blocks); @@ -357,8 +375,8 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch for (int32_t i = 0; i < numOfVgroups; i++) { if (pDispatchMsg[i].blockNum > 0) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, - pTask->info.selfChildId, pDispatchMsg[i].blockNum, pVgInfo->vgId); + stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", id, pTask->info.selfChildId, + pDispatchMsg[i].blockNum, pVgInfo->vgId); code = doSendDispatchMsg(pTask, &pDispatchMsg[i], pVgInfo->vgId, &pVgInfo->epSet); if (code < 0) { @@ -372,8 +390,7 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch } } - stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups, - msgId); + stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", id, numOfVgroups, msgId); } return code; @@ -562,7 +579,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return 0; } - if (pTask->chkInfo.dispatchCheckpointTrigger) { + if (pTask->chkInfo.pActiveInfo->dispatchTrigger) { stDebug("s-task:%s already send checkpoint trigger, not dispatch anymore", id); atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); return 0; @@ -590,6 +607,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } else { // todo handle build dispatch msg failed } + if (pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + streamTaskInitTriggerDispatchInfo(pTask); + } + int32_t retryCount = 0; while (1) { code = sendDispatchMsg(pTask, pTask->msgInfo.pData); @@ -624,18 +645,24 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { // this function is usually invoked by sink/agg task int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { - int32_t num = taosArrayGetSize(pTask->pReadyMsgList); + SArray* pList = pTask->chkInfo.pActiveInfo->pReadyMsgList; + + taosThreadMutexLock(&pTask->chkInfo.pActiveInfo->lock); + + int32_t num = taosArrayGetSize(pList); ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num); for (int32_t i = 0; i < num; ++i) { - SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i); + SStreamChkptReadyInfo* pInfo = taosArrayGet(pList, i); tmsgSendReq(&pInfo->upstreamNodeEpset, &pInfo->msg); stDebug("s-task:%s level:%d checkpoint ready msg sent to upstream:0x%x", pTask->id.idStr, pTask->info.taskLevel, pInfo->upStreamTaskId); } - taosArrayClear(pTask->pReadyMsgList); + taosArrayClear(pList); + + taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock); stDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel, num); @@ -644,21 +671,22 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { // this function is only invoked by source task, and send rsp to mnode int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { - taosThreadMutexLock(&pTask->lock); + SArray* pList = pTask->chkInfo.pActiveInfo->pReadyMsgList; + taosThreadMutexLock(&pTask->chkInfo.pActiveInfo->lock); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - if (taosArrayGetSize(pTask->pReadyMsgList) == 1) { - SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, 0); + if (taosArrayGetSize(pList) == 1) { + SStreamChkptReadyInfo* pInfo = taosArrayGet(pList, 0); tmsgSendRsp(&pInfo->msg); - taosArrayClear(pTask->pReadyMsgList); + taosArrayClear(pList); stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); } else { stDebug("s-task:%s level:%d already send rsp checkpoint success to mnode", pTask->id.idStr, pTask->info.taskLevel); } - taosThreadMutexUnlock(&pTask->lock); + taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock); return TSDB_CODE_SUCCESS; } @@ -777,17 +805,34 @@ int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRp } int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) { - SStreamChkptReadyInfo info = {0}; + SStreamChkptReadyInfo info = { + .recvTs = taosGetTimestampMs(), .transId = pReq->transId, .checkpointId = pReq->checkpointId}; + streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS); - if (pTask->pReadyMsgList == NULL) { - pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo)); + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + taosThreadMutexLock(&pActiveInfo->lock); + + int32_t size = taosArrayGetSize(pActiveInfo->pReadyMsgList); + if (size > 0) { + ASSERT(size == 1); + + SStreamChkptReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0); + if (pReady->transId == pReq->transId) { + stWarn("s-task:%s repeatly recv checkpoint source msg from mnode, checkpointId:%" PRId64 ", ignore", + pTask->id.idStr, pReq->checkpointId); + } else { + stError("s-task:%s checkpointId:%" PRId64 " transId:%d not completed, new transId:%d checkpointId:%" PRId64 + " recv from mnode", + pTask->id.idStr, pReady->checkpointId, pReady->transId, pReq->transId, pReq->checkpointId); + ASSERT(0); // failed to handle it + } + } else { + taosArrayPush(pActiveInfo->pReadyMsgList, &info); + stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size); } - taosArrayPush(pTask->pReadyMsgList, &info); - - int32_t size = taosArrayGetSize(pTask->pReadyMsgList); - stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size); + taosThreadMutexUnlock(&pActiveInfo->lock); return TSDB_CODE_SUCCESS; } @@ -799,7 +844,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, return TSDB_CODE_SUCCESS; } - SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); + SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); SStreamCheckpointReadyMsg req = {0}; req.downstreamNodeId = pTask->pMeta->vgId; @@ -833,7 +878,14 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, ASSERT(req.upstreamTaskId != 0); - SStreamChkptReadyInfo info = {.upStreamTaskId = pInfo->taskId, .upstreamNodeEpset = pInfo->epSet}; + SStreamChkptReadyInfo info = { + .upStreamTaskId = pInfo->taskId, + .upstreamNodeEpset = pInfo->epSet, + .nodeId = req.upstreamNodeId, + .recvTs = taosGetTimestampMs(), + .checkpointId = req.checkpointId, + }; + initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead)); stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 @@ -841,39 +893,65 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index, req.upstreamNodeId); - if (pTask->pReadyMsgList == NULL) { - pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo)); + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + taosThreadMutexLock(&pActiveInfo->lock); + + bool recved = false; + int32_t size = taosArrayGetSize(pActiveInfo->pReadyMsgList); + for (int32_t i = 0; i < size; ++i) { + SStreamChkptReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i); + if (p->nodeId == req.upstreamNodeId) { + if (p->checkpointId == req.checkpointId) { + stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 ", ignore", + pTask->id.idStr, p->upStreamTaskId, p->nodeId, p->checkpointId); + } else { + stError("s-task:%s checkpointId:%" PRId64 " not completed, new checkpointId:%" PRId64 " recv", + pTask->id.idStr, p->checkpointId, checkpointId); + ASSERT(0); // failed to handle it + } + + recved = true; + break; + } } - taosArrayPush(pTask->pReadyMsgList, &info); + if (!recved) { + taosArrayPush(pActiveInfo->pReadyMsgList, &info); + } + + taosThreadMutexUnlock(&pActiveInfo->lock); return 0; } void streamClearChkptReadyMsg(SStreamTask* pTask) { - if (pTask->pReadyMsgList == NULL) { + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + if (pActiveInfo == NULL) { return; } - for (int i = 0; i < taosArrayGetSize(pTask->pReadyMsgList); i++) { - SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i); + for (int i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); i++) { + SStreamChkptReadyInfo* pInfo = taosArrayGet(pActiveInfo->pReadyMsgList, i); rpcFreeCont(pInfo->msg.pCont); } - taosArrayClear(pTask->pReadyMsgList); + + taosArrayClear(pActiveInfo->pReadyMsgList); } // this message has been sent successfully, let's try next one. -static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { +static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId, int32_t downstreamNodeId) { stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); if (delayDispatch) { taosThreadMutexLock(&pTask->lock); // we only set the dispatch msg info for current checkpoint trans - if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && pTask->chkInfo.checkpointingId == pTask->msgInfo.checkpointId) { - ASSERT(pTask->chkInfo.transId == pTask->msgInfo.transId); - pTask->chkInfo.dispatchCheckpointTrigger = true; - stDebug("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d confirmed", - pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId); + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && + pTask->chkInfo.pActiveInfo->activeId == pTask->msgInfo.checkpointId) { + ASSERT(pTask->chkInfo.pActiveInfo->transId == pTask->msgInfo.transId); + stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed", + pTask->id.idStr, downstreamId, pTask->msgInfo.checkpointId, pTask->msgInfo.transId); + + streamTaskSetTriggerDispatchConfirmed(pTask, downstreamNodeId); } else { stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d discard, since expired", pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId); @@ -966,10 +1044,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i pTask->info.taskLevel == TASK_LEVEL__SOURCE) { stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64 ", set the current checkpoint failed, and send rsp to mnode", - id, pTask->chkInfo.checkpointingId); + id, pTask->chkInfo.pActiveInfo->activeId); { // send checkpoint failure msg to mnode directly - pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; // record the latest failed checkpoint id - pTask->chkInfo.checkpointingId = pTask->chkInfo.checkpointingId; + pTask->chkInfo.pActiveInfo->failedId = pTask->chkInfo.pActiveInfo->activeId; // record the latest failed checkpoint id + pTask->chkInfo.pActiveInfo->activeId = pTask->chkInfo.pActiveInfo->activeId; streamTaskSendCheckpointSourceRsp(pTask); } } else { @@ -1035,7 +1113,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // now ready for next data output atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); } else { - handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); + handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } } } @@ -1096,7 +1174,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId); - SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); + SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); ASSERT(pInfo != NULL); if (pMeta->role == NODE_ROLE_FOLLOWER) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b60164fca9..87f239f31c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -426,7 +426,7 @@ int32_t streamTransferStatePrepare(SStreamTask* pTask) { streamMetaReleaseTask(pMeta, pStreamTask); return code; } else { - stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr); + stDebug("s-task:%s sink task halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr); } streamMetaReleaseTask(pMeta, pStreamTask); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 00fe1207dd..7356dc01ce 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1044,13 +1044,13 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); } - if ((*pTask)->chkInfo.checkpointingId != 0) { - entry.checkpointInfo.failed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0; - entry.checkpointInfo.activeId = (*pTask)->chkInfo.checkpointingId; - entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.transId; + if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) { + entry.checkpointInfo.failed = ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0; + entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId; + entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId; if (entry.checkpointInfo.failed) { - stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId); + stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.pActiveInfo->transId); } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c056e2a4b6..e0415c8467 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -24,6 +24,8 @@ static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated); static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate); +static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo); +static SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo(); static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); @@ -70,12 +72,12 @@ static void freeItem(void* p) { } static void freeUpstreamItem(void* p) { - SStreamChildEpInfo** pInfo = p; + SStreamUpstreamEpInfo** pInfo = p; taosMemoryFree(*pInfo); } -static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { - SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); +static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { + SStreamUpstreamEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamUpstreamEpInfo)); if (pEpInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -254,7 +256,6 @@ void tFreeStreamTask(SStreamTask* pTask) { } streamClearChkptReadyMsg(pTask); - pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList); if (pTask->msgInfo.pData != NULL) { clearBufferedDispatchMsg(pTask); @@ -302,6 +303,9 @@ void tFreeStreamTask(SStreamTask* pTask) { taosMemoryFree((void*)pTask->id.idStr); } + streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo); + pTask->chkInfo.pActiveInfo = NULL; + taosMemoryFree(pTask); stDebug("s-task:0x%x free task completed", taskId); } @@ -414,6 +418,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i return TSDB_CODE_OUT_OF_MEMORY; } + if (pTask->chkInfo.pActiveInfo == NULL) { + pTask->chkInfo.pActiveInfo = streamTaskCreateActiveChkptInfo(); + } + return TSDB_CODE_SUCCESS; } @@ -433,8 +441,12 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) { } } +int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask) { + return taosArrayGetSize(pTask->upstreamInfo.pList); +} + int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) { - SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask); + SStreamUpstreamEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask); if (pEpInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -453,7 +465,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); for (int32_t i = 0; i < numOfUpstream; ++i) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); if (pInfo->nodeId == nodeId) { bool equal = isEpsetEqual(&pInfo->epSet, pEpSet); if (!equal) { @@ -589,7 +601,7 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) { int32_t size = taosArrayGetSize(pTask->upstreamInfo.pList); for (int32_t i = 0; i < size; ++i) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); pInfo->stage = -1; } @@ -603,7 +615,7 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { } for (int32_t i = 0; i < num; ++i) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); pInfo->dataAllowed = true; } @@ -612,12 +624,19 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { } void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { - SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); + SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); if (pInfo != NULL) { pInfo->dataAllowed = false; } } +void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) { + SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); + if (pInfo != NULL) { + pInfo->dataAllowed = true; + } +} + bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) { return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList); } @@ -723,9 +742,9 @@ int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStr pReq->dropRelHTask = dropRelHTask; pReq->hStreamId = pHTaskId->streamId; pReq->hTaskId = pHTaskId->taskId; - pReq->transId = pCheckpointInfo->transId; + pReq->transId = pCheckpointInfo->pActiveInfo->transId; - pReq->checkpointId = pCheckpointInfo->checkpointingId; + pReq->checkpointId = pCheckpointInfo->pActiveInfo->activeId; pReq->checkpointVer = pCheckpointInfo->processedVer; pReq->checkpointTs = pCheckpointInfo->startTs; @@ -860,10 +879,10 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { return 0; } -SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { +SStreamUpstreamEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); for (int32_t i = 0; i < num; ++i) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); if (pInfo->taskId == taskId) { return pInfo; } @@ -873,6 +892,24 @@ SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t return NULL; } +SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) { + if (pTask->info.taskLevel == TASK_OUTPUT__FIXED_DISPATCH) { + if (pTask->outputInfo.fixedDispatcher.taskId == taskId) { + return &pTask->outputInfo.fixedDispatcher.epSet; + } + } else if (pTask->info.taskLevel == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; + for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + SVgroupInfo* pVgInfo = taosArrayGet(pList, i); + if (pVgInfo->taskId == taskId) { + return &pVgInfo->epSet; + } + } + } + + return NULL; +} + char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { char buf[128] = {0}; sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId); @@ -914,4 +951,64 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) { void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; +} + +int32_t streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) { + if (pTransId != NULL) { + *pTransId = pTask->chkInfo.pActiveInfo->transId; + } + + if (pCheckpointId != NULL) { + *pCheckpointId = pTask->chkInfo.pActiveInfo->activeId; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId) { + pTask->chkInfo.pActiveInfo->activeId = activeCheckpointId; + return TSDB_CODE_SUCCESS; +} + +int32_t streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) { + pTask->chkInfo.pActiveInfo->transId = transId; + pTask->chkInfo.pActiveInfo->activeId = checkpointId; + pTask->chkInfo.pActiveInfo->failedId = checkpointId; + return TSDB_CODE_SUCCESS; +} + +SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo() { + SActiveCheckpointInfo* pInfo = taosMemoryCalloc(1, sizeof(SActiveCheckpointInfo)); + taosThreadMutexInit(&pInfo->lock, NULL); + + pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo)); + pInfo->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo)); + return pInfo; +} + +void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { + if (pInfo == NULL) { + return; + } + + taosThreadMutexDestroy(&pInfo->lock); + pInfo->pDispatchTriggerList = taosArrayDestroy(pInfo->pDispatchTriggerList); + pInfo->pReadyMsgList = taosArrayDestroy(pInfo->pReadyMsgList); + + if (pInfo->pCheckTmr != NULL) { + taosTmrStop(pInfo->pCheckTmr); + pInfo->pCheckTmr = NULL; + } + + taosMemoryFree(pInfo); +} + +void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { + pInfo->activeId = 0; // clear the checkpoint id + pInfo->failedId = 0; + pInfo->transId = 0; + pInfo->dispatchTrigger = false; + + taosArrayClear(pInfo->pReadyMsgList); + taosArrayClear(pInfo->pDispatchTriggerList); } \ No newline at end of file diff --git a/source/libs/stream/src/streammsg.c b/source/libs/stream/src/streammsg.c index 9b69833234..f8228a8f5f 100644 --- a/source/libs/stream/src/streammsg.c +++ b/source/libs/stream/src/streammsg.c @@ -17,7 +17,7 @@ #include "streammsg.h" #include "tstream.h" -int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) { +int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) { if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1; @@ -26,7 +26,7 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) return 0; } -int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { +int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) { if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1; @@ -481,7 +481,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); if (tEncodeI32(pEncoder, epSz) < 0) return -1; for (int32_t i = 0; i < epSz; i++) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1; } @@ -557,7 +557,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES); for (int32_t i = 0; i < epSz; i++) { - SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo)); + SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo)); if (pInfo == NULL) return -1; if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) { taosMemoryFreeClear(pInfo);