diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c840aef6cf..f79b11431e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3495,8 +3495,6 @@ typedef struct SVUpdateCheckpointInfoReq { int64_t checkpointTs; int32_t transId; int8_t dropRelHTask; - int64_t hStreamId; - int64_t hTaskId; } SVUpdateCheckpointInfoReq; typedef struct { @@ -3649,10 +3647,6 @@ typedef struct { int32_t taskId; } SVPauseStreamTaskReq, SVResetStreamTaskReq; -typedef struct { - int8_t reserved; -} SVPauseStreamTaskRsp; - typedef struct { char name[TSDB_STREAM_FNAME_LEN]; int8_t igNotExists; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index aff17e5de5..c79e66f2e2 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -225,9 +225,9 @@ TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, "stream-checkpoint-remain", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT_NOTIFY, "grant-notify", NULL, NULL) @@ -390,6 +390,7 @@ TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_UNUSED, "vnd-stream-unused", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_GET_STREAM_PROGRESS, "vnd-stream-progress", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_VND_STREAM_MSG) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 0076d79312..0fb690e756 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -28,6 +28,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); diff --git a/include/libs/stream/streammsg.h b/include/libs/stream/streammsg.h index 91bfc6afc8..723c9fd099 100644 --- a/include/libs/stream/streammsg.h +++ b/include/libs/stream/streammsg.h @@ -190,6 +190,20 @@ typedef struct SCheckpointTriggerRsp { int32_t rspCode; } SCheckpointTriggerRsp; +typedef struct SCheckpointReport { + int64_t streamId; + int32_t taskId; + int32_t nodeId; + int64_t checkpointId; + int64_t checkpointVer; + int64_t checkpointTs; + int32_t transId; + int8_t dropHTask; +} SCheckpointReport; + +int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq); +int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq); + typedef struct { SMsgHead head; int64_t streamId; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d07a302920..bcf081dbfb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -734,6 +734,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32 void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs); void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, int64_t startTs); +void streamMetaClearUpdateTaskList(SStreamMeta* pMeta); +void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId); + void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); @@ -762,8 +765,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t setCode); -int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, STaskId* pHTaskId, - SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask); +int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask); int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq); SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo(); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index e4afba6722..e137bcbdec 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -232,6 +232,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -240,6 +241,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 9b07b6a3d8..6a9f2f1275 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -75,26 +75,27 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index bfc9e92293..7bc41559c3 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -967,6 +967,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 6aed50e508..dade7b4bdf 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -26,13 +26,14 @@ extern "C" { #define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_VER_NUMBER 5 -#define MND_STREAM_CREATE_NAME "stream-create" -#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" -#define MND_STREAM_PAUSE_NAME "stream-pause" -#define MND_STREAM_RESUME_NAME "stream-resume" -#define MND_STREAM_DROP_NAME "stream-drop" -#define MND_STREAM_TASK_RESET_NAME "stream-task-reset" -#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" +#define MND_STREAM_CREATE_NAME "stream-create" +#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" +#define MND_STREAM_PAUSE_NAME "stream-pause" +#define MND_STREAM_RESUME_NAME "stream-resume" +#define MND_STREAM_DROP_NAME "stream-drop" +#define MND_STREAM_TASK_RESET_NAME "stream-task-reset" +#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" +#define MND_STREAM_CHKPT_UPDATE_NAME "stream-chkpt-update" typedef struct SStreamTransInfo { int64_t startTime; @@ -51,6 +52,7 @@ typedef struct SStreamTransMgmt { } SStreamTransMgmt; typedef struct SStreamExecInfo { + bool initTaskList; SArray *pNodeList; int64_t ts; // snapshot ts SStreamTransMgmt transMgmt; @@ -58,6 +60,7 @@ typedef struct SStreamExecInfo { SArray *pTaskList; TdThreadMutex lock; SHashObj *pTransferStateStreams; + SHashObj *pChkptStreams; } SStreamExecInfo; extern SStreamExecInfo execInfo; @@ -78,7 +81,18 @@ typedef struct SOrphanTask { typedef struct { SMsgHead head; -} SMStreamHbRspMsg, SMStreamReqCheckpointRspMsg; +} SMStreamHbRspMsg, SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp; + +typedef struct STaskChkptInfo { + int32_t nodeId; + int32_t taskId; + int64_t streamId; + int64_t checkpointId; + int64_t version; + int64_t ts; + int32_t transId; + int8_t dropHTask; +}STaskChkptInfo; int32_t mndInitStream(SMnode *pMnode); void mndCleanupStream(SMnode *pMnode); @@ -96,7 +110,7 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream); SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, - int32_t retryCode); + int32_t retryCode, int32_t acceptCode); STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg); int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); @@ -114,14 +128,19 @@ int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *p int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream); +int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); +int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); +void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); void destroyStreamTaskIter(SStreamTaskIter *pIter); bool streamTaskIterNextTask(SStreamTaskIter *pIter); SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter); void mndInitExecInfo(); -void removeExpiredNodeInfo(const SArray *pNodeSnapshot); -void removeTasksInBuf(SArray* pTaskIds, SStreamExecInfo* pExecInfo); +void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo); +int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); +void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5fc6e465ad..c9598c4b38 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -56,13 +56,14 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger); static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); -static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList); +static int32_t extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); +static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -static int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); +static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo); +static void removeExpiredNodeInfo(const SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); @@ -103,6 +104,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp); // for msgs inside mnode // TODO change the name @@ -114,8 +116,10 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); - mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); @@ -131,9 +135,11 @@ int32_t mndInitStream(SMnode *pMnode) { if (sdbSetTable(pMnode->pSdb, table) != 0) { return -1; } + if (sdbSetTable(pMnode->pSdb, tableSeq) != 0) { return -1; } + return 0; } @@ -143,6 +149,7 @@ void mndCleanupStream(SMnode *pMnode) { taosHashCleanup(execInfo.pTaskMap); taosHashCleanup(execInfo.transMgmt.pDBTrans); taosHashCleanup(execInfo.pTransferStateStreams); + taosHashCleanup(execInfo.pChkptStreams); taosThreadMutexDestroy(&execInfo.lock); mDebug("mnd stream exec info cleanup"); } @@ -508,7 +515,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); - int32_t code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0); + int32_t code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, 0); if (code != 0) { taosMemoryFree(buf); return -1; @@ -952,7 +959,7 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask return -1; } - code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY); + code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, 0); if (code != 0) { taosMemoryFree(buf); } @@ -1088,7 +1095,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { } static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { - bool ready = true; + bool ready = true; if (taskNodeIsUpdated(pMnode)) { return -1; } @@ -1099,6 +1106,8 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { ASSERT(taosArrayGetSize(execInfo.pTaskList) == 0); } + SArray* pInvalidList = taosArrayInit(4, sizeof(STaskId)); + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { STaskId *p = taosArrayGet(execInfo.pTaskList, i); STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); @@ -1106,11 +1115,20 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { continue; } + if (pEntry->status == TASK_STATUS__STOP) { + for(int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) { + STaskId* pId = taosArrayGet(pInvalidList, j); + if (pEntry->id.streamId == pId->streamId) { + taosArrayPush(pInvalidList, &pEntry->id); + break; + } + } + } + if (pEntry->status != TASK_STATUS__READY) { - mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued", + mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status)); ready = false; - break; } if (pEntry->hTaskId != 0) { @@ -1123,6 +1141,9 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { } } + removeTasksInBuf(pInvalidList, &execInfo); + taosArrayDestroy(pInvalidList); + taosThreadMutexUnlock(&execInfo.lock); return ready ? 0 : -1; } @@ -1151,7 +1172,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { int32_t numOfCheckpointTrans = 0; if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) { - return code; + terrno = TSDB_CODE_STREAM_TASK_IVLD_STATUS; + return -1; } SArray* pList = taosArrayInit(4, sizeof(SCheckpointInterval)); @@ -1798,6 +1820,10 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock int32_t numOfRows = 0; SStreamObj *pStream = NULL; + taosThreadMutexLock(&execInfo.lock); + mndInitStreamExecInfo(pMnode, &execInfo); + taosThreadMutexUnlock(&execInfo.lock); + while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); if (pShow->pIter == NULL) { @@ -2169,7 +2195,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange return 0; } -static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) { +static int32_t extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; @@ -2215,48 +2241,6 @@ static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) { return TSDB_CODE_SUCCESS; } -static bool taskNodeExists(SArray *pList, int32_t nodeId) { - size_t num = taosArrayGetSize(pList); - - for (int32_t i = 0; i < num; ++i) { - SNodeEntry *pEntry = taosArrayGet(pList, i); - if (pEntry->nodeId == nodeId) { - return true; - } - } - - return false; -} - -int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { - SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); - - int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); - for (int32_t i = 0; i < numOfTask; ++i) { - STaskId *pId = taosArrayGet(execInfo.pTaskList, i); - - STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); - if (pEntry->nodeId == SNODE_HANDLE) { - continue; - } - - bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); - if (!existed) { - taosArrayPush(pRemovedTasks, pId); - } - } - - removeTasksInBuf(pRemovedTasks, &execInfo); - - mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), - (int32_t)taosArrayGetSize(execInfo.pTaskList)); - - removeExpiredNodeInfo(pNodeSnapshot); - - taosArrayDestroy(pRemovedTasks); - return 0; -} - // this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int32_t code = 0; @@ -2476,13 +2460,137 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); { - SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRspMsg)}; + SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)}; rsp.pCont = rpcMallocCont(rsp.contLen); SMsgHead *pHead = rsp.pCont; pHead->vgId = htonl(req.nodeId); tmsgSendRsp(&rsp); + pReq->info.handle = NULL; // disable auto rsp + } + return 0; +} + +static void doAddTaskInfo(SArray* pList, SCheckpointReport* pReport) { + bool existed = false; + for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + STaskChkptInfo* p = taosArrayGet(pList ,i); + if (p->taskId == pReport->taskId) { + existed = true; + break; + } + } + + if (!existed) { + STaskChkptInfo info = { + .streamId = pReport->streamId, + .taskId = pReport->taskId, + .transId = pReport->transId, + .dropHTask = pReport->dropHTask, + .version = pReport->checkpointVer, + .ts = pReport->checkpointTs, + .checkpointId = pReport->checkpointId, + .nodeId = pReport->nodeId, + }; + taosArrayPush(pList, &info); + } +} + +int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SCheckpointReport req = {0}; + + SDecoder decoder = {0}; + tDecoderInit(&decoder, pReq->pCont, pReq->contLen); + + if (tDecodeStreamTaskChkptReport(&decoder, &req)) { + tDecoderClear(&decoder); + terrno = TSDB_CODE_INVALID_MSG; + mError("invalid task checkpoint-report msg received"); + return -1; + } + tDecoderClear(&decoder); + + mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64 + " checkpointVer:%" PRId64 " transId:%d", + req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId); + + // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. + taosThreadMutexLock(&execInfo.lock); + + SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); + if (pStream == NULL) { + mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", + req.streamId); + + // not in meta-store yet, try to acquire the task in exec buffer + // the checkpoint req arrives too soon before the completion of the create stream trans. + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (p == NULL) { + mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId); + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + taosThreadMutexUnlock(&execInfo.lock); + return -1; + } else { + mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", + req.streamId, req.taskId); + } + } + + int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); + + SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + if (pReqTaskList == NULL) { + SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo)); + doAddTaskInfo(pList, &req); + taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); + + pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + } else { + doAddTaskInfo(*pReqTaskList, &req); + } + + int32_t total = taosArrayGetSize(*pReqTaskList); + if (total == numOfTasks) { // all tasks has send the reqs + mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64 + " will be issued soon", + req.streamId, pStream->name, total, req.checkpointId); + + // if (pStream != NULL) { + // bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); + // if (conflict) { + // mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId); + // } else { + // int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList); + // if (code == TSDB_CODE_SUCCESS) { // remove this entry + // taosHashRemove(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + // + // int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); + // mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", req.streamId, + // numOfStreams); + // } else { + // mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet", + // req.streamId); + // } + // } + // } + } + + if (pStream != NULL) { + mndReleaseStream(pMnode, pStream); + } + + taosThreadMutexUnlock(&execInfo.lock); + + { + SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamUpdateChkptRsp)}; + rsp.pCont = rpcMallocCont(rsp.contLen); + SMsgHead *pHead = rsp.pCont; + pHead->vgId = htonl(req.nodeId); + + tmsgSendRsp(&rsp); pReq->info.handle = NULL; // disable auto rsp } @@ -2510,3 +2618,63 @@ static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) { } return code; } + +void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) { + if (pExecInfo->initTaskList || pMnode == NULL) { + return; + } + + addAllStreamTasksIntoBuf(pMnode, pExecInfo); + extractNodeListFromStream(pMnode, pExecInfo->pNodeList); + pExecInfo->initTaskList = true; +} + +void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) { + SSdb *pSdb = pMnode->pSdb; + SStreamObj *pStream = NULL; + void *pIter = NULL; + + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + break; + } + + saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo); + sdbRelease(pSdb, pStream); + } +} + +int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray* pChkptInfoList) { + STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME, "update checkpoint-info"); + if (pTrans == NULL) { + return terrno; + } + + /*int32_t code = */mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid); + int32_t code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream); + if (code != 0) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return code; + } + + code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); + if (code != TSDB_CODE_SUCCESS) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + + return TSDB_CODE_ACTION_IN_PROGRESS; +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 42efb6589e..a79fe0cf0a 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -22,54 +22,7 @@ typedef struct SFailedCheckpointInfo { int32_t transId; } SFailedCheckpointInfo; -static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) { - SSdb *pSdb = pMnode->pSdb; - SStreamObj *pStream = NULL; - void *pIter = NULL; - - while (1) { - pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); - if (pIter == NULL) { - break; - } - - saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo); - sdbRelease(pSdb, pStream); - } -} - -static void removeDroppedStreamTasksInBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) { - if (pMnode == NULL) { - return; - } - - int32_t num = taosArrayGetSize(pExecInfo->pTaskList); - - SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - SArray *pIdList = taosArrayInit(4, sizeof(STaskId)); - - for (int32_t i = 0; i < num; ++i) { - STaskId* pId = taosArrayGet(pExecInfo->pTaskList, i); - - void* p = taosHashGet(pHash, &pId->streamId, sizeof(int64_t)); - if (p != NULL) { - continue; - } - - void* pObj = mndGetStreamObj(pMnode, pId->streamId); - if (pObj != NULL) { - mndReleaseStream(pMnode, pObj); - taosHashPut(pHash, &pId->streamId, sizeof(int64_t), NULL, 0); - } else { - taosArrayPush(pIdList, pId); - } - } - - removeTasksInBuf(pIdList, &execInfo); - - taosArrayDestroy(pIdList); - taosHashCleanup(pHash); -} +static void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode); static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); @@ -290,16 +243,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); - // extract stream task list - if (taosHashGetSize(execInfo.pTaskMap) == 0) { - addAllStreamTasksIntoBuf(pMnode, &execInfo); - } else { - // the already dropped tasks may be added by hb from vnode at the time when the pTaskMap happens to be empty. - // let's drop them here. - removeDroppedStreamTasksInBuf(pMnode, &execInfo); - } - - extractStreamNodeList(pMnode); + mndInitStreamExecInfo(pMnode, &execInfo); int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { @@ -326,18 +270,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { snodeChanged = true; } } else { - // task is idle for more than 50 sec. -// if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { -// if (!pTaskEntry->inputQChanging) { -// pTaskEntry->inputQUnchangeCounter++; -// } else { -// pTaskEntry->inputQChanging = false; -// } -// } else { -// pTaskEntry->inputQChanging = true; -// pTaskEntry->inputQUnchangeCounter = 0; -// } - streamTaskStatusCopy(pTaskEntry, p); STaskCkptInfo *pChkInfo = &p->checkpointInfo; @@ -348,6 +280,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SFailedCheckpointInfo info = { .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; addIntoCheckpointList(pFailedChkpt, &info); + + // remove failed trans from pChkptStreams + taosHashRemove(execInfo.pChkptStreams, &p->id.streamId, sizeof(p->id.streamId)); } } @@ -393,6 +328,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mndDropOrphanTasks(pMnode, pOrphanTasks); } + if (pMnode != NULL) { // make sure that the unit test case can work + mndStreamStartUpdateCheckpointInfo(pMnode); + } + taosThreadMutexUnlock(&execInfo.lock); tCleanupStreamHbMsg(&req); @@ -411,3 +350,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; } + +void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doCheckpointmsg + SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); + if (pMsg != NULL) { + int32_t size = sizeof(SMStreamDoCheckpointMsg); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size}; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + } +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index ff31aa0f7d..0daa383d3e 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -127,7 +127,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p return false; } -int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) { +int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamId) { taosThreadMutexLock(&execInfo.lock); int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); if (num <= 0) { @@ -136,12 +136,13 @@ int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) { } mndStreamClearFinishedTrans(pMnode, NULL); - SStreamTransInfo* pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid)); + SStreamTransInfo* pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId)); if (pEntry != NULL) { SStreamTransInfo tInfo = *pEntry; taosThreadMutexUnlock(&execInfo.lock); - if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) { + if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 || + strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) { return tInfo.transId; } } else { @@ -159,7 +160,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnCo return NULL; } - mInfo("s-task:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, pTrans->id); + mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, pTrans->id); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -246,8 +247,9 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) } int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, - int32_t retryCode) { - STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode}; + int32_t retryCode, int32_t acceptCode) { + STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode, + .acceptableCode = acceptCode}; return mndTransAppendRedoAction(pTrans, &action); } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index d138254afd..a0731833e6 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -230,7 +230,7 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT return -1; } - code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); return -1; @@ -308,7 +308,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa epsetToStr(&epset, buf, tListLen(buf)); mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); - code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); return -1; @@ -356,7 +356,7 @@ static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTas } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); return -1; @@ -400,7 +400,7 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); return -1; @@ -484,7 +484,7 @@ static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask return code; } - code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID); + code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID, 0); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pBuf); } @@ -534,7 +534,7 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa return code; } - code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, 0); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pReq); } @@ -574,9 +574,11 @@ void mndInitExecInfo() { execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK); execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK); + execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry)); taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); + taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList); } void removeExpiredNodeInfo(const SArray *pNodeSnapshot) { @@ -645,4 +647,172 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { taosThreadMutexUnlock(&pExecNode->lock); destroyStreamTaskIter(pIter); +} + +static bool taskNodeExists(SArray *pList, int32_t nodeId) { + size_t num = taosArrayGetSize(pList); + + for (int32_t i = 0; i < num; ++i) { + SNodeEntry *pEntry = taosArrayGet(pList, i); + if (pEntry->nodeId == nodeId) { + return true; + } + } + + return false; +} + +int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { + SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); + + int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); + for (int32_t i = 0; i < numOfTask; ++i) { + STaskId *pId = taosArrayGet(execInfo.pTaskList, i); + + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); + if (pEntry->nodeId == SNODE_HANDLE) { + continue; + } + + bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); + if (!existed) { + taosArrayPush(pRemovedTasks, pId); + } + } + + removeTasksInBuf(pRemovedTasks, &execInfo); + + mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), + (int32_t)taosArrayGetSize(execInfo.pTaskList)); + + removeExpiredNodeInfo(pNodeSnapshot); + + taosArrayDestroy(pRemovedTasks); + return 0; +} + +static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { + SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq), + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return terrno; + } + + pReq->head.vgId = htonl(pTask->info.nodeId); + pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + + SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId)); + ASSERT(pReqTaskList); + + int32_t size = taosArrayGetSize(*pReqTaskList); + for(int32_t i = 0; i < size; ++i) { + STaskChkptInfo* pInfo = taosArrayGet(*pReqTaskList, i); + if (pInfo->taskId == pTask->id.taskId) { + pReq->checkpointId = pInfo->checkpointId; + pReq->checkpointVer = pInfo->version; + pReq->checkpointTs = pInfo->ts; + pReq->dropRelHTask = pInfo->dropHTask; + pReq->transId = pInfo->transId; + } + } + + SEpSet epset = {0}; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS || !hasEpset) { + taosMemoryFree(pReq); + return code; + } + + code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pReq); + } + + return code; +} + +int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + taosWLockLatch(&pStream->lock); + + SStreamTaskIter *pIter = createStreamTaskIter(pStream); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); + + int32_t code = doSetUpdateChkptAction(pMnode, pTrans, pTask); + if (code != TSDB_CODE_SUCCESS) { + destroyStreamTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return -1; + } + } + + destroyStreamTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return 0; +} + +int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + void *pIter = NULL; + SArray *pDropped = taosArrayInit(4, sizeof(int64_t)); + + mDebug("start to scan checkpoint report info"); + + while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { + SArray *pList = *(SArray **)pIter; + + STaskChkptInfo* pInfo = taosArrayGet(pList, 0); + SStreamObj* pStream = mndGetStreamObj(pMnode, pInfo->streamId); + if (pStream == NULL) { + mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId); + taosArrayPush(pDropped, &pInfo->streamId); + continue; + } + + int32_t total = mndGetNumOfStreamTasks(pStream); + int32_t existed = (int32_t) taosArrayGetSize(pList); + + if (total == existed) { + mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", + pStream->uid, pStream->name, total); + + bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); + if (!conflict) { + int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList); + if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry + taosArrayPush(pDropped, &pInfo->streamId); + mDebug("stream:0x%" PRIx64 " removed", pInfo->streamId); + } else { + mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet", + pInfo->streamId); + } + break; + } else { + mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", pInfo->streamId); + } + } else { + mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name, + existed, total, total - existed); + } + + sdbRelease(pMnode->pSdb, pStream); + } + + int32_t size = taosArrayGetSize(pDropped); + if (size > 0) { + for (int32_t i = 0; i < size; ++i) { + int64_t streamId = *(int64_t *)taosArrayGet(pDropped, i); + taosHashRemove(execInfo.pChkptStreams, &streamId, sizeof(streamId)); + } + + int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); + mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams); + } + + taosArrayDestroy(pDropped); + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/test/func/func.cpp b/source/dnode/mnode/impl/test/func/func.cpp index ee60556639..0fc903f5db 100644 --- a/source/dnode/mnode/impl/test/func/func.cpp +++ b/source/dnode/mnode/impl/test/func/func.cpp @@ -169,7 +169,8 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, "f1"); + char name[TSDB_FUNC_NAME_LEN] = "f1"; + taosArrayPush(retrieveReq.pFuncNames, name); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); void* pReq = rpcMallocCont(contLen); @@ -220,7 +221,8 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { retrieveReq.numOfFuncs = TSDB_FUNC_MAX_RETRIEVE + 1; retrieveReq.pFuncNames = taosArrayInit(TSDB_FUNC_MAX_RETRIEVE + 1, TSDB_FUNC_NAME_LEN); for (int32_t i = 0; i < TSDB_FUNC_MAX_RETRIEVE + 1; ++i) { - taosArrayPush(retrieveReq.pFuncNames, "1"); + char name[TSDB_FUNC_NAME_LEN] = "1"; + taosArrayPush(retrieveReq.pFuncNames, name); } int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); @@ -237,7 +239,8 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, "f2"); + char name[TSDB_FUNC_NAME_LEN] = "f2"; + taosArrayPush(retrieveReq.pFuncNames, name); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); void* pReq = rpcMallocCont(contLen); @@ -279,7 +282,8 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, "f2"); + char name[TSDB_FUNC_NAME_LEN] = "f2"; + taosArrayPush(retrieveReq.pFuncNames, name); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); void* pReq = rpcMallocCont(contLen); @@ -316,8 +320,10 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 2; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, "f2"); - taosArrayPush(retrieveReq.pFuncNames, "f1"); + char name1[TSDB_FUNC_NAME_LEN] = "f2"; + taosArrayPush(retrieveReq.pFuncNames, name1); + char name2[TSDB_FUNC_NAME_LEN] = "f1"; + taosArrayPush(retrieveReq.pFuncNames, name2); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); void* pReq = rpcMallocCont(contLen); @@ -367,8 +373,10 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 2; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, "f2"); - taosArrayPush(retrieveReq.pFuncNames, "f3"); + char name1[TSDB_FUNC_NAME_LEN] = "f2"; + taosArrayPush(retrieveReq.pFuncNames, name1); + char name2[TSDB_FUNC_NAME_LEN] = "f3"; + taosArrayPush(retrieveReq.pFuncNames, name2); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); void* pReq = rpcMallocCont(contLen); @@ -483,7 +491,8 @@ TEST_F(MndTestFunc, 05_Actual_code) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, "udf1"); + char name[TSDB_FUNC_NAME_LEN] = "udf1"; + taosArrayPush(retrieveReq.pFuncNames, name); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); void* pReq = rpcMallocCont(contLen); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index c61988574c..b0d61ebc06 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -136,6 +136,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: return tqStreamProcessCheckpointReadyRsp(pSnode->pMeta, pMsg); + case TDMT_MND_STREAM_CHKPT_REPORT_RSP: + return tqStreamProcessChkptReportRsp(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER: return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8222af4d60..7f5ab8b6e6 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -261,6 +261,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 712cfbaa55..a2ca3662d7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1278,3 +1278,7 @@ int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessCheckpointReadyRsp(pTq->pStreamMeta, pMsg); } + +int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg); +} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c55745e5c5..cabccfc0c8 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -185,7 +185,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM SStreamTask* pTask = *ppTask; const char* idstr = pTask->id.idStr; - if (pMeta->updateInfo.transId != req.transId) { + if ((pMeta->updateInfo.transId != req.transId) && (pMeta->updateInfo.transId != -1)) { if (req.transId < pMeta->updateInfo.transId) { tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId, pMeta->updateInfo.transId, req.transId); @@ -197,10 +197,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } else { tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr, vgId, req.transId, pMeta->updateInfo.transId); - // info needs to be kept till the new trans to update the nodeEp arrived. - taosHashClear(pMeta->updateInfo.pTasks); - pMeta->updateInfo.transId = req.transId; + streamMetaInitUpdateTaskList(pMeta, req.transId); } } else { tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId); @@ -280,6 +278,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM // persist to disk } + streamMetaClearUpdateTaskList(pMeta); + if (!restored) { tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId); pMeta->startInfo.tasksWillRestart = 0; @@ -1078,6 +1078,8 @@ int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return d int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } +int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {return doProcessDummyRspMsg(pMeta, pMsg);} + int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 002f04b8a7..b1f353af81 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -854,6 +854,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); case TDMT_VND_GET_STREAM_PROGRESS: return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg); + case TDMT_MND_STREAM_CHKPT_REPORT_RSP: + return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 84d3f734fe..f02fefd977 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -843,6 +843,7 @@ static int32_t selectStmtCopy(const SSelectStmt* pSrc, SSelectStmt* pDst) { CLONE_NODE_FIELD_EX(pLimit, SLimitNode*); COPY_CHAR_ARRAY_FIELD(stmtName); COPY_SCALAR_FIELD(precision); + COPY_SCALAR_FIELD(isSubquery); COPY_SCALAR_FIELD(isEmptyResult); COPY_SCALAR_FIELD(timeLineResMode); COPY_SCALAR_FIELD(timeLineFromOrderBy); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index beedffc4f2..c081841b3b 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -963,12 +963,14 @@ void nodesDestroyNode(SNode* pNode) { break; case QUERY_NODE_WHEN_THEN: { SWhenThenNode* pWhenThen = (SWhenThenNode*)pNode; + destroyExprNode((SExprNode*)pNode); nodesDestroyNode(pWhenThen->pWhen); nodesDestroyNode(pWhenThen->pThen); break; } case QUERY_NODE_CASE_WHEN: { SCaseWhenNode* pCaseWhen = (SCaseWhenNode*)pNode; + destroyExprNode((SExprNode*)pNode); nodesDestroyNode(pCaseWhen->pCase); nodesDestroyNode(pCaseWhen->pElse); nodesDestroyList(pCaseWhen->pWhenThenList); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index ab3b5d6fa0..6696c9f8c2 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -40,26 +40,8 @@ static void checkpointTriggerMonitorFn(void* param, void* tmrId); static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId); -bool streamTaskIsAllUpstreamSendTrigger(SStreamTask* pTask) { - SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - int32_t numOfUpstreams = taosArrayGetSize(pTask->upstreamInfo.pList); - bool allSend = true; - - taosThreadMutexLock(&pActiveInfo->lock); - int32_t numOfRecv = taosArrayGetSize(pActiveInfo->pReadyMsgList); - - if (numOfRecv < numOfUpstreams) { - stDebug("s-task:%s received checkpoint-trigger block, idx:%d, %d upstream tasks not send yet, total:%d", - pTask->id.idStr, pTask->info.selfChildId, (numOfUpstreams - numOfRecv), numOfUpstreams); - allSend = false; - } - - taosThreadMutexUnlock(&pActiveInfo->lock); - return allSend; -} - SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, - int32_t transId) { + int32_t transId) { SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pChkpoint == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -436,25 +418,27 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin int32_t code = 0; const char* id = pTask->id.idStr; SCheckpointInfo* pInfo = &pTask->chkInfo; + STaskId hTaskId = {0}; taosThreadMutexLock(&pTask->lock); if (pReq->checkpointId <= pInfo->checkpointId) { stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64 " ignored", - id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer); + " no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " transId:%d ignored", + id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer, + pReq->transId); taosThreadMutexUnlock(&pTask->lock); { // destroy the related fill-history tasks // drop task should not in the meta-lock, and drop the related fill-history task now streamMetaWUnLock(pMeta); if (pReq->dropRelHTask) { - streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); + streamMetaUnregisterTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks); } - streamMetaWLock(pMeta); } @@ -492,8 +476,9 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin } if (pReq->dropRelHTask) { + hTaskId = pTask->hTaskInfo.id; stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint", - pReq->taskId, vgId, pReq->hTaskId); + pReq->taskId, vgId, hTaskId.taskId); CLEAR_RELATED_FILLHISTORY_TASK(pTask); } @@ -514,9 +499,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { - streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); + streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, (int32_t) pReq->hTaskId, numOfTasks); + stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, + (int32_t)hTaskId.taskId, numOfTasks); } streamMetaWLock(pMeta); @@ -703,9 +689,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { // update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null. if (code == TSDB_CODE_SUCCESS && (pTask->pMsgCb != NULL)) { - STaskId* pHTaskId = &pTask->hTaskInfo.id; - code = streamBuildAndSendCheckpointUpdateMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, pHTaskId, &pTask->chkInfo, - dropRelHTask); + code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask); if (code == TSDB_CODE_SUCCESS) { code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id); if (code != TSDB_CODE_SUCCESS) { @@ -770,6 +754,18 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { streamMetaReleaseTask(pTask->pMeta, pTask); return; } + + // checkpoint-trigger recv flag is set, quit + if (pActiveInfo->allUpstreamTriggerRecv) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", + pTask->id.idStr, vgId, ref); + + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + taosThreadMutexUnlock(&pTask->lock); taosThreadMutexLock(&pActiveInfo->lock); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1828409f89..98168abae1 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -615,7 +615,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB * appropriate batch of blocks should be handled in 5 to 10 sec. */ static int32_t doStreamExecTask(SStreamTask* pTask) { - const char* id = pTask->id.idStr; + const char* id = pTask->id.idStr; // merge multiple input data if possible in the input queue. stDebug("s-task:%s start to extract data block from inputQ", id); @@ -699,20 +699,15 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (type != STREAM_INPUT__CHECKPOINT) { doStreamTaskExecImpl(pTask, pInput); - } - - streamFreeQitem(pInput); - - // todo other thread may change the status + streamFreeQitem(pInput); + } else { // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. - if (type == STREAM_INPUT__CHECKPOINT) { - // todo add lock + taosThreadMutexLock(&pTask->lock); SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state == TASK_STATUS__CK) { stDebug("s-task:%s checkpoint block received, set status:%s", id, pState->name); streamTaskBuildCheckpoint(pTask); - } else { - // todo refactor + } else { // todo refactor int32_t code = 0; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamTaskSendCheckpointSourceRsp(pTask); @@ -727,6 +722,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } } + taosThreadMutexUnlock(&pTask->lock); + streamFreeQitem(pInput); return 0; } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f6449829a3..da1fec5565 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -372,6 +372,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->expandFunc = expandFunc; pMeta->stage = stage; pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT; + pMeta->updateInfo.transId = -1; pMeta->startInfo.completeFn = fn; pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -1740,4 +1741,14 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", id, vgId, transId, el); } +} + +void streamMetaClearUpdateTaskList(SStreamMeta* pMeta) { + taosHashClear(pMeta->updateInfo.pTasks); + pMeta->updateInfo.transId = -1; +} + +void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) { + taosHashClear(pMeta->updateInfo.pTasks); + pMeta->updateInfo.transId = transId; } \ No newline at end of file diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index dc9d2166e6..f8a4deb6b1 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -731,35 +731,50 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI return code; } -int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, STaskId* pHTaskId, - SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) { - SVUpdateCheckpointInfoReq* pReq = rpcMallocCont(sizeof(SVUpdateCheckpointInfoReq)); - if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; +int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) { + int32_t code; + int32_t tlen = 0; + int32_t vgId = pTask->pMeta->vgId; + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pActive = pCheckpointInfo->pActiveInfo; + + SCheckpointReport req = {.streamId = pTask->id.streamId, + .taskId = pTask->id.taskId, + .nodeId = vgId, + .dropHTask = dropRelHTask, + .transId = pActive->transId, + .checkpointId = pActive->activeId, + .checkpointVer = pCheckpointInfo->processedVer, + .checkpointTs = pCheckpointInfo->startTs}; + + tEncodeSize(tEncodeStreamTaskChkptReport, &req, tlen, code); + if (code < 0) { + stError("s-task:%s vgId:%d encode stream task checkpoint-report failed, code:%s", id, vgId, tstrerror(code)); return -1; } - pReq->head.vgId = vgId; - pReq->taskId = pTaskId->taskId; - pReq->streamId = pTaskId->streamId; - pReq->dropRelHTask = dropRelHTask; - pReq->hStreamId = pHTaskId->streamId; - pReq->hTaskId = pHTaskId->taskId; - pReq->transId = pCheckpointInfo->pActiveInfo->transId; - - pReq->checkpointId = pCheckpointInfo->pActiveInfo->activeId; - pReq->checkpointVer = pCheckpointInfo->processedVer; - pReq->checkpointTs = pCheckpointInfo->startTs; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_UPDATE_CHKPT, .pCont = pReq, .contLen = sizeof(SVUpdateCheckpointInfoReq)}; - int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg); - - if (code != TSDB_CODE_SUCCESS) { - stError("vgId:%d task:0x%x failed to send update checkpoint info msg, code:%s", vgId, pTaskId->taskId, tstrerror(code)); - } else { - stDebug("vgId:%d task:0x%x build and send update checkpoint info msg msg", vgId, pTaskId->taskId); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return -1; } - return code; + + SEncoder encoder; + tEncoderInit(&encoder, buf, tlen); + if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) { + rpcFreeCont(buf); + stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code)); + return -1; + } + tEncoderClear(&encoder); + + SRpcMsg msg = {0}; + initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen); + stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId); + + tmsgSendReq(&pTask->info.mnodeEpset, &msg); + return 0; } STaskId streamTaskGetTaskId(const SStreamTask* pTask) { diff --git a/source/libs/stream/src/streammsg.c b/source/libs/stream/src/streammsg.c index 705406f044..a6ab6a60c2 100644 --- a/source/libs/stream/src/streammsg.c +++ b/source/libs/stream/src/streammsg.c @@ -594,6 +594,34 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->checkpointVer) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->checkpointTs) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->dropHTask) < 0) return -1; + tEndEncode(pEncoder); + return 0; +} + +int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->checkpointVer) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->checkpointTs) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1; + if (tDecodeI8(pDecoder, &pReq->dropHTask) < 0) return -1; tEndDecode(pDecoder); return 0; } \ No newline at end of file diff --git a/source/os/src/osTimer.c b/source/os/src/osTimer.c index b8eb54e10e..36d364382a 100644 --- a/source/os/src/osTimer.c +++ b/source/os/src/osTimer.c @@ -86,8 +86,9 @@ static void taosDeleteTimer(void *tharg) { static TdThread timerThread; static timer_t timerId; static volatile bool stopTimer = false; -static void *taosProcessAlarmSignal(void *tharg) { - // Block the signal + +static void *taosProcessAlarmSignal(void *tharg) { + // Block the signal sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGALRM); diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 0c09174970..b224eac8c2 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -66,6 +66,7 @@ typedef struct { int32_t lines; int32_t flag; int32_t openInProgress; + int64_t lastKeepFileSec; pid_t pid; char logName[LOG_FILE_NAME_LEN]; SLogBuff *logHandle; @@ -267,19 +268,33 @@ static void taosUnLockLogFile(TdFilePtr pFile) { } } -static void taosKeepOldLog(char *oldName) { - if (tsLogKeepDays == 0) return; +static void taosReserveOldLog(char *oldName, char *keepName) { + if (tsLogKeepDays <= 0) { + keepName[0] = 0; + return; + } + int32_t code = 0; int64_t fileSec = taosGetTimestampSec(); - char fileName[LOG_FILE_NAME_LEN + 20]; - snprintf(fileName, LOG_FILE_NAME_LEN + 20, "%s.%" PRId64, tsLogObj.logName, fileSec); + if (tsLogObj.lastKeepFileSec < fileSec) { + tsLogObj.lastKeepFileSec = fileSec; + } else { + fileSec = ++tsLogObj.lastKeepFileSec; + } + snprintf(keepName, LOG_FILE_NAME_LEN + 20, "%s.%" PRId64, tsLogObj.logName, fileSec); + if ((code = taosRenameFile(oldName, keepName))) { + keepName[0] = 0; + uError("failed to rename file:%s to %s since %s", oldName, keepName, tstrerror(code)); + } +} - (void)taosRenameFile(oldName, fileName); - - char compressFileName[LOG_FILE_NAME_LEN + 20]; - snprintf(compressFileName, LOG_FILE_NAME_LEN + 20, "%s.%" PRId64 ".gz", tsLogObj.logName, fileSec); - if (taosCompressFile(fileName, compressFileName) == 0) { - (void)taosRemoveFile(fileName); +static void taosKeepOldLog(char *oldName) { + if (oldName[0] != 0) { + char compressFileName[LOG_FILE_NAME_LEN + 20]; + snprintf(compressFileName, LOG_FILE_NAME_LEN + 20, "%s.gz", oldName); + if (taosCompressFile(oldName, compressFileName) == 0) { + (void)taosRemoveFile(oldName); + } } if (tsLogKeepDays > 0) { @@ -316,13 +331,13 @@ static OldFileKeeper *taosOpenNewFile() { tsLogObj.logHandle->pFile = pFile; tsLogObj.lines = 0; tsLogObj.openInProgress = 0; - OldFileKeeper* oldFileKeeper = taosMemoryMalloc(sizeof(OldFileKeeper)); + OldFileKeeper *oldFileKeeper = taosMemoryMalloc(sizeof(OldFileKeeper)); if (oldFileKeeper == NULL) { uError("create old log keep info faild! mem is not enough."); return NULL; } oldFileKeeper->pOldFile = pOldFile; - memcpy(oldFileKeeper->keepName, keepName, LOG_FILE_NAME_LEN + 20); + taosReserveOldLog(keepName, oldFileKeeper->keepName); uInfo(" new log file:%d is opened", tsLogObj.flag); uInfo("=================================="); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 1eaf17ccb1..307260cf6a 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1485,6 +1485,7 @@ ,,y,script,./test.sh -f tsim/view/view.sim ,,y,script,./test.sh -f tsim/query/cache_last.sim ,,y,script,./test.sh -f tsim/query/const.sim +,,y,script,./test.sh -f tsim/query/nestedJoinView.sim #develop test diff --git a/tests/script/tsim/query/nestedJoinView.sim b/tests/script/tsim/query/nestedJoinView.sim new file mode 100644 index 0000000000..efdec9fcbc --- /dev/null +++ b/tests/script/tsim/query/nestedJoinView.sim @@ -0,0 +1,19 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +sql create database test; +sql use test; + +sql CREATE TABLE `resource_info` ( job_id_ts TIMESTAMP , role VARCHAR(20) primary key, start_time TIMESTAMP, ip VARCHAR(15), cpu FLOAT, memory FLOAT, io_write FLOAT, io_read FLOAT, net_write FLOAT, net_read FLOAT) TAGS ( end_time TIMESTAMP); + +sql CREATE STABLE `test_results` ( `job_id_ts` TIMESTAMP , `end_time` VARCHAR(40) PRIMARY KEY, `job_id` BIGINT, `time_cost` FLOAT, `write_speed` FLOAT, `qps` FLOAT, `min_delay` FLOAT, `p90_delay` FLOAT, `p95_delay` FLOAT, `p99_delay` FLOAT, `max_delay` FLOAT, `avg_delay` FLOAT, `hostname` VARCHAR(15), `tdengine_commit_id` VARCHAR(50), `tdinternal_commit_id` VARCHAR(50), `load_type` VARCHAR(50), `cpu` FLOAT, `memory` FLOAT, `io_write` FLOAT, `io_read` FLOAT) TAGS ( `branch` VARCHAR(50), `scenario` VARCHAR(50), `test_case` VARCHAR(1000), `env_id` INT, `type` VARCHAR(50)); + +sql CREATE TABLE `job_info` ( `start_time` TIMESTAMP , `finish_time` TIMESTAMP , `job_id` INT, `job_status` VARCHAR(20), `test_type` VARCHAR(50), `environment` INT, `version` VARCHAR(20), `tdengine_commit_id` VARCHAR(50), `tdinternal_commit_id` VARCHAR(50), `type` VARCHAR(50), `scenario` VARCHAR(50), `note` VARCHAR(500), `version_number` VARCHAR(20)); + +sql create view abc as select * from ( select a.job_id, a.start_time as job_start_time, a.finish_time as job_end_time, a.job_status, a.test_type, a.environment, case when a.version_number <> null then a.version else CONCAT(a.version,'_',a.version_number) end as version_info, a.tdengine_commit_id, a.tdinternal_commit_id, a.type, a.scenario, a.note, a.version_number, b.end_time as tc_end_time, b.time_cost, b.write_speed, b.qps, b.min_delay, b.p90_delay, b.p95_delay, b.p99_delay, b.`max_delay`, b.avg_delay, b.hostname, b.load_type, b.scenario, b.test_case, b.type from job_info a, test_results b where a.start_time=b.job_id_ts and a.job_status='finished') s1 inner join resource_info s2 on s1.job_start_time=s2.job_id_ts and s1.job_id=2 and s1.tc_end_time=s2.end_time; + +sql select * from abc; + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/2-query/compa4096_tsma.json b/tests/system-test/2-query/compa4096_tsma.json index 66d98ceebe..993eb8ae58 100644 --- a/tests/system-test/2-query/compa4096_tsma.json +++ b/tests/system-test/2-query/compa4096_tsma.json @@ -1,66 +1,80 @@ { - "filetype": "insert", - "cfgdir": "/etc/taos", - "host": "localhost", - "port": 6030, - "rest_port": 6041, - "user": "root", - "password": "taosdata", - "thread_count": 100, - "create_table_thread_count": 24, - "result_file": "taosBenchmark_result.log", - "confirm_parameter_prompt": "no", - "insert_interval": 0, - "num_of_records_per_req": 1000000, - "max_sql_len": 1024000, - "databases": [ - { - "dbinfo": { - "name": "db4096", - "drop": "yes", - "replica": 1, - "duration": 10, - "precision": "ms", - "keep": 3650, - "comp": 2, - "vgroups": 2, - "buffer": 1000 - }, - "super_tables": [ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "localhost", + "port": 6030, + "rest_port": 6041, + "user": "root", + "password": "taosdata", + "thread_count": 100, + "create_table_thread_count": 24, + "result_file": "taosBenchmark_result.log", + "confirm_parameter_prompt": "no", + "insert_interval": 0, + "num_of_records_per_req": 1000000, + "max_sql_len": 1024000, + "databases": [ { - "name": "stb0", - "child_table_exists": "no", - "childtable_count":2, - "childtable_prefix": "ctb0", - "escape_character": "no", - "auto_create_table": "no", - "batch_create_tbl_num": 500, - "data_source": "rand", - "insert_mode": "taosc", - "rollup": null, - "interlace_rows": 0, - "line_protocol": null, - "tcp_transfer": "no", - "insert_rows": 10, - "childtable_limit": 0, - "childtable_offset": 0, - "rows_per_tbl": 0, - "max_sql_len": 1048576, - "disorder_ratio": 0, - "disorder_range": 1000, - "timestamp_step": 1000, - "start_timestamp": "2022-10-22 17:20:36", - "sample_format": "csv", - "sample_file": "./sample.csv", - "tags_file": "", - "columns": [{ "type": "INT","count": 4093}], - "tags": [{"type": "TINYINT", "count": 1},{"type": "NCHAR","count": 1}] + "dbinfo": { + "name": "db4096", + "drop": "yes", + "replica": 1, + "duration": 10, + "precision": "ms", + "keep": 3650, + "comp": 2, + "vgroups": 2, + "buffer": 1000 + }, + "super_tables": [ + { + "name": "stb0", + "child_table_exists": "no", + "childtable_count": 2, + "childtable_prefix": "ctb0", + "escape_character": "no", + "auto_create_table": "no", + "batch_create_tbl_num": 500, + "data_source": "rand", + "insert_mode": "taosc", + "rollup": null, + "interlace_rows": 0, + "line_protocol": null, + "tcp_transfer": "no", + "insert_rows": 10, + "childtable_limit": 0, + "childtable_offset": 0, + "rows_per_tbl": 0, + "max_sql_len": 1048576, + "disorder_ratio": 0, + "disorder_range": 1000, + "timestamp_step": 1000, + "start_timestamp": "2022-10-22 17:20:36", + "sample_format": "csv", + "sample_file": "./sample.csv", + "tags_file": "", + "columns": [ + { + "type": "INT", + "count": 4093 + } + ], + "tags": [ + { + "type": "TINYINT", + "count": 1 + }, + { + "type": "NCHAR", + "count": 1 + } + ] + } + ] } - ] - } - ], - "prepare_rand": 10000, - "chinese": "no", - "streams": false, - "test_log": "/root/testlog/" -} + ], + "prepare_rand": 10000, + "chinese": "no", + "streams": false, + "test_log": "/root/testlog/" +} \ No newline at end of file diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index f80aab0e82..d7dc1d24f3 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -1270,15 +1270,14 @@ class TDTestCase: def test_drop_tsma(self): function_name = sys._getframe().f_code.co_name tdLog.debug(f'-----{function_name}------') - self.create_tsma('tsma1', 'test', 'meters', [ - 'avg(c1)', 'avg(c2)'], '5m') + self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') self.create_recursive_tsma('tsma1', 'tsma2', 'test', '15m', 'meters') # drop recursive tsma first tdSql.error('drop tsma test.tsma1', -2147482491) tdSql.execute('drop tsma test.tsma2', queryTimes=1) tdSql.execute('drop tsma test.tsma1', queryTimes=1) - self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u') tdSql.execute('drop database test', queryTimes=1) self.init_data() @@ -1319,7 +1318,7 @@ class TDTestCase: 'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096) tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1) - self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u') tdSql.execute('drop database nsdb') # drop norm table @@ -1346,7 +1345,7 @@ class TDTestCase: # test drop stream tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first - self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u') tdSql.execute('drop database test', queryTimes=1) self.init_data() @@ -1449,7 +1448,7 @@ class TDTestCase: tdSql.error( 'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097) - self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u') tdSql.execute('drop database nsdb') def test_create_tsma_on_norm_table(self): diff --git a/tests/system-test/8-stream/stream_multi_agg.py b/tests/system-test/8-stream/stream_multi_agg.py index acb80f528b..32c2648b46 100644 --- a/tests/system-test/8-stream/stream_multi_agg.py +++ b/tests/system-test/8-stream/stream_multi_agg.py @@ -42,7 +42,7 @@ class TDTestCase: tdSql.execute("use test", queryTimes=100) tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") tdLog.debug("========create stream and insert data ok========") - time.sleep(15) + time.sleep(20) tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart") rowCnt = tdSql.getRows() @@ -68,7 +68,7 @@ class TDTestCase: # create stream tdSql.execute("use db", queryTimes=100) tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) - time.sleep(5) + time.sleep(10) sql = "select count(*) from sta" # loop wait max 60s to check count is ok