diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index a07830af19..1a25bac0c8 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -207,7 +207,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) // unused,reserved + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL) diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 856d7b2051..6a9f2f1275 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -95,7 +95,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 41b9352f18..7bc41559c3 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -967,7 +967,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToWriteQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 6ca61265bb..9289909b19 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -130,7 +130,7 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamO int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream); int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); -void scanCheckpointReportInfo(SMnode *pMnode); +int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c1747f9379..ab72996058 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -115,10 +115,11 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint); - mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); - mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); @@ -1181,7 +1182,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { int32_t numOfCheckpointTrans = 0; if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) { - return code; + terrno = TSDB_CODE_STREAM_TASK_IVLD_STATUS; + return -1; } SArray* pList = taosArrayInit(4, sizeof(SCheckpointInterval)); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 7c07d003b7..2cb4111e97 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -22,6 +22,8 @@ typedef struct SFailedCheckpointInfo { int32_t transId; } SFailedCheckpointInfo; +static void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode); + static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for (int32_t j = 0; j < numOfNodes; ++j) { @@ -326,7 +328,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mndDropOrphanTasks(pMnode, pOrphanTasks); } - scanCheckpointReportInfo(pMnode); + mndStreamStartUpdateCheckpointInfo(pMnode); taosThreadMutexUnlock(&execInfo.lock); tCleanupStreamHbMsg(&req); @@ -346,3 +348,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; } + +void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doCheckpointmsg + SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); + if (pMsg != NULL) { + int32_t size = sizeof(SMStreamDoCheckpointMsg); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size}; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + } +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index f7b33c5e75..7a45ce1f2a 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -754,8 +754,11 @@ int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj return 0; } -void scanCheckpointReportInfo(SMnode* pMnode) { - void* pIter = NULL; +int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + void *pIter = NULL; + SArray *pDropped = taosArrayInit(4, sizeof(int64_t)); + mDebug("start to scan checkpoint report info"); while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { @@ -764,8 +767,8 @@ void scanCheckpointReportInfo(SMnode* pMnode) { STaskChkptInfo* pInfo = taosArrayGet(pList, 0); SStreamObj* pStream = mndGetStreamObj(pMnode, pInfo->streamId); if (pStream == NULL) { - mError("failed to acquire stream:0x%"PRIx64" remove it from checkpoint-report list", pInfo->streamId); - taosHashRemove(execInfo.pChkptStreams, &pInfo->streamId, sizeof(pInfo->streamId)); + mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId); + taosArrayPush(pDropped, &pInfo->streamId); continue; } @@ -780,14 +783,13 @@ void scanCheckpointReportInfo(SMnode* pMnode) { if (!conflict) { int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList); if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry - taosHashRemove(execInfo.pChkptStreams, &pInfo->streamId, sizeof(pInfo->streamId)); - - int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); - mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", pInfo->streamId, numOfStreams); + taosArrayPush(pDropped, &pInfo->streamId); + mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", pInfo->streamId); } else { - mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet", + mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet", pInfo->streamId); } + break; } else { mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", pInfo->streamId); } @@ -799,4 +801,16 @@ void scanCheckpointReportInfo(SMnode* pMnode) { sdbRelease(pMnode->pSdb, pStream); } + if (taosArrayGetSize(pDropped) > 0) { + for (int32_t i = 0; i < taosArrayGetSize(pDropped); ++i) { + int64_t streamId = *(int64_t *)taosArrayGet(pDropped, i); + taosHashRemove(execInfo.pChkptStreams, &streamId, sizeof(streamId)); + } + + int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); + mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", numOfStreams); + } + + taosArrayDestroy(pDropped); + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 56ac794d8e..1583734a99 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -40,26 +40,8 @@ static void checkpointTriggerMonitorFn(void* param, void* tmrId); static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId); -bool streamTaskIsAllUpstreamSendTrigger(SStreamTask* pTask) { - SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - int32_t numOfUpstreams = taosArrayGetSize(pTask->upstreamInfo.pList); - bool allSend = true; - - taosThreadMutexLock(&pActiveInfo->lock); - int32_t numOfRecv = taosArrayGetSize(pActiveInfo->pReadyMsgList); - - if (numOfRecv < numOfUpstreams) { - stDebug("s-task:%s received checkpoint-trigger block, idx:%d, %d upstream tasks not send yet, total:%d", - pTask->id.idStr, pTask->info.selfChildId, (numOfUpstreams - numOfRecv), numOfUpstreams); - allSend = false; - } - - taosThreadMutexUnlock(&pActiveInfo->lock); - return allSend; -} - SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, - int32_t transId) { + int32_t transId) { SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pChkpoint == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1828409f89..98168abae1 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -615,7 +615,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB * appropriate batch of blocks should be handled in 5 to 10 sec. */ static int32_t doStreamExecTask(SStreamTask* pTask) { - const char* id = pTask->id.idStr; + const char* id = pTask->id.idStr; // merge multiple input data if possible in the input queue. stDebug("s-task:%s start to extract data block from inputQ", id); @@ -699,20 +699,15 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (type != STREAM_INPUT__CHECKPOINT) { doStreamTaskExecImpl(pTask, pInput); - } - - streamFreeQitem(pInput); - - // todo other thread may change the status + streamFreeQitem(pInput); + } else { // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. - if (type == STREAM_INPUT__CHECKPOINT) { - // todo add lock + taosThreadMutexLock(&pTask->lock); SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state == TASK_STATUS__CK) { stDebug("s-task:%s checkpoint block received, set status:%s", id, pState->name); streamTaskBuildCheckpoint(pTask); - } else { - // todo refactor + } else { // todo refactor int32_t code = 0; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamTaskSendCheckpointSourceRsp(pTask); @@ -727,6 +722,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } } + taosThreadMutexUnlock(&pTask->lock); + streamFreeQitem(pInput); return 0; } }