fix(stream): use trans to set the consensus-checkpoint id
This commit is contained in:
parent
78993d9c55
commit
8e6bb176c2
|
@ -250,7 +250,7 @@
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_CONSEN, "stream-chkpt-consen", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_REQ_CONSEN_CHKPT, "stream-req-consen-chkpt", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CONSEN_TIMER, "stream-consen-tmr", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CONSEN_TIMER, "stream-consen-tmr", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
||||||
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
|
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
|
||||||
|
@ -342,6 +342,7 @@
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_CREATE, "stream-create", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_CREATE, "stream-create", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_STREAM_CONSEN_CHKPT, "stream-consen-chkpt", NULL, NULL)
|
||||||
TD_CLOSE_MSG_SEG(TDMT_STREAM_MSG)
|
TD_CLOSE_MSG_SEG(TDMT_STREAM_MSG)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8
|
TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8
|
||||||
|
|
|
@ -29,7 +29,8 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
||||||
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamProcessConsensusChkptRsp2(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
|
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
|
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
|
||||||
bool isLeader, bool restored);
|
bool isLeader, bool restored);
|
||||||
|
@ -37,12 +38,12 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
||||||
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
|
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
|
||||||
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
|
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
|
||||||
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);
|
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);
|
||||||
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* msg);
|
||||||
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
|
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
|
||||||
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
|
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
|
||||||
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen);
|
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg);
|
||||||
|
|
||||||
void tqSetRestoreVersionInfo(SStreamTask* pTask);
|
void tqSetRestoreVersionInfo(SStreamTask* pTask);
|
||||||
int32_t tqExpandStreamTask(SStreamTask* pTask);
|
int32_t tqExpandStreamTask(SStreamTask* pTask);
|
||||||
|
|
|
@ -223,16 +223,6 @@ typedef struct SRestoreCheckpointInfo {
|
||||||
int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
|
int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
|
||||||
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq);
|
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq);
|
||||||
|
|
||||||
typedef struct SRestoreCheckpointInfoRsp {
|
|
||||||
int64_t streamId;
|
|
||||||
int64_t checkpointId;
|
|
||||||
int64_t startTs;
|
|
||||||
int32_t taskId;
|
|
||||||
} SRestoreCheckpointInfoRsp;
|
|
||||||
|
|
||||||
int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo);
|
|
||||||
int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo);
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
|
|
|
@ -616,9 +616,8 @@ typedef struct SStreamTaskState {
|
||||||
|
|
||||||
typedef struct SCheckpointConsensusInfo {
|
typedef struct SCheckpointConsensusInfo {
|
||||||
SArray* pTaskList;
|
SArray* pTaskList;
|
||||||
// int64_t checkpointId;
|
|
||||||
// int64_t genTs;
|
|
||||||
int32_t numOfTasks;
|
int32_t numOfTasks;
|
||||||
|
int64_t streamId;
|
||||||
} SCheckpointConsensusInfo;
|
} SCheckpointConsensusInfo;
|
||||||
|
|
||||||
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
|
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
|
||||||
|
|
|
@ -233,6 +233,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
@ -242,7 +243,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CONSEN_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
|
|
|
@ -76,6 +76,7 @@ SArray *smGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
|
@ -96,7 +97,7 @@ SArray *smGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
_OVER:
|
_OVER:
|
||||||
|
|
|
@ -972,10 +972,11 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -83,7 +83,7 @@ typedef struct SOrphanTask {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
} SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp;
|
} SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp, SMStreamReqConsensChkptRsp;
|
||||||
|
|
||||||
typedef struct STaskChkptInfo {
|
typedef struct STaskChkptInfo {
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
|
@ -133,9 +133,8 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream)
|
||||||
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||||
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList);
|
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList);
|
||||||
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq);
|
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq);
|
||||||
int32_t mndSendQuickConsensusChkptIdRsp(SRestoreCheckpointInfo *pReq, int32_t code, int64_t streamId,
|
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
|
||||||
int64_t checkpointId, SRpcHandleInfo *pRpcInfo);
|
int64_t ts);
|
||||||
|
|
||||||
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
|
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
|
||||||
|
|
||||||
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
|
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
|
||||||
|
@ -151,9 +150,7 @@ SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId,
|
||||||
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo,
|
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo,
|
||||||
SRpcHandleInfo *pRpcInfo);
|
SRpcHandleInfo *pRpcInfo);
|
||||||
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo);
|
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo);
|
||||||
int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg, int64_t checkpointId);
|
|
||||||
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId);
|
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId);
|
||||||
int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,6 +61,7 @@ static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
|
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg);
|
static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
|
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
|
||||||
|
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
|
||||||
|
|
||||||
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
|
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
|
||||||
|
|
||||||
|
@ -107,6 +108,7 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
|
||||||
|
|
||||||
// for msgs inside mnode
|
// for msgs inside mnode
|
||||||
// TODO change the name
|
// TODO change the name
|
||||||
|
@ -119,7 +121,7 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_CONSEN, mndProcessConsensusCheckpointId);
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CONSEN_CHKPT, mndProcessConsensusCheckpointId);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
|
||||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
|
||||||
|
@ -2611,23 +2613,14 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
|
||||||
|
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
{
|
doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
|
||||||
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamUpdateChkptRsp)};
|
|
||||||
rsp.pCont = rpcMallocCont(rsp.contLen);
|
|
||||||
SMsgHead *pHead = rsp.pCont;
|
|
||||||
pHead->vgId = htonl(req.nodeId);
|
|
||||||
|
|
||||||
tmsgSendRsp(&rsp);
|
|
||||||
pReq->info.handle = NULL; // disable auto rsp
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, bool* pAllEqual) {
|
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pExistedTasks) {
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
int64_t chkId = INT64_MAX;
|
int64_t chkId = INT64_MAX;
|
||||||
*pAllEqual = true;
|
*pExistedTasks = 0;
|
||||||
|
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
|
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
|
||||||
STaskId* p = taosArrayGet(execInfo.pTaskList, i);
|
STaskId* p = taosArrayGet(execInfo.pTaskList, i);
|
||||||
|
@ -2637,16 +2630,12 @@ static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, bool* pAllEq
|
||||||
|
|
||||||
num += 1;
|
num += 1;
|
||||||
STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
|
STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
|
||||||
|
|
||||||
if (chkId != INT64_MAX && chkId != pe->checkpointInfo.latestId) {
|
|
||||||
*pAllEqual = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (chkId > pe->checkpointInfo.latestId) {
|
if (chkId > pe->checkpointInfo.latestId) {
|
||||||
chkId = pe->checkpointInfo.latestId;
|
chkId = pe->checkpointInfo.latestId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*pExistedTasks = num;
|
||||||
if (num < numOfTasks) { // not all task send info to mnode through hbMsg, no valid checkpoint Id
|
if (num < numOfTasks) { // not all task send info to mnode through hbMsg, no valid checkpoint Id
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -2654,6 +2643,16 @@ static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, bool* pAllEq
|
||||||
return chkId;
|
return chkId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
|
||||||
|
SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
|
||||||
|
rsp.pCont = rpcMallocCont(rsp.contLen);
|
||||||
|
SMsgHead *pHead = rsp.pCont;
|
||||||
|
pHead->vgId = htonl(vgId);
|
||||||
|
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
pInfo->handle = NULL; // disable auto rsp
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) {
|
static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
|
@ -2675,9 +2674,8 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) {
|
||||||
// register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
|
// register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
|
||||||
taosThreadMutexLock(&execInfo.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
|
|
||||||
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
|
||||||
|
|
||||||
// mnode handle the create stream transaction too slow may cause this problem
|
// mnode handle the create stream transaction too slow may cause this problem
|
||||||
|
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
||||||
if (pStream == NULL) {
|
if (pStream == NULL) {
|
||||||
mWarn("failed to find the stream:0x%" PRIx64 ", not handle consensus-checkpointId", req.streamId);
|
mWarn("failed to find the stream:0x%" PRIx64 ", not handle consensus-checkpointId", req.streamId);
|
||||||
|
|
||||||
|
@ -2688,11 +2686,9 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) {
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle consensus-checkpointId", req.streamId);
|
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle consensus-checkpointId", req.streamId);
|
||||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||||
|
|
||||||
mndSendQuickConsensusChkptIdRsp(&req, terrno, req.streamId, 0, &pMsg->info);
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
pMsg->info.handle = NULL; // disable auto rsp
|
|
||||||
|
doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
|
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
|
||||||
|
@ -2706,36 +2702,35 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
|
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
|
||||||
if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly
|
if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly
|
||||||
mndSendQuickConsensusChkptIdRsp(&req, TSDB_CODE_SUCCESS, req.streamId, 0, &pMsg->info);
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
pMsg->info.handle = NULL; // disable auto rsp
|
mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, 0, req.startTs);
|
||||||
|
|
||||||
|
doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool allEqual = true;
|
int32_t num = 0;
|
||||||
int64_t chkId = getConsensusId(req.streamId, numOfTasks, &allEqual);
|
int64_t chkId = getConsensusId(req.streamId, numOfTasks, &num);
|
||||||
|
|
||||||
// some tasks not send hbMsg to mnode yet, wait for 5s.
|
// some tasks not send hbMsg to mnode yet, wait for 5s.
|
||||||
if (chkId == -1) {
|
if (chkId == -1) {
|
||||||
mDebug(
|
mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", req.taskId, num,
|
||||||
"not all task send hbMsg yet, add into list and wait for 10s to check the consensus-checkpointId again, "
|
numOfTasks);
|
||||||
"s-task:0x%x", req.taskId);
|
|
||||||
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks);
|
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks);
|
||||||
mndAddConsensusTasks(pInfo, &req, &pMsg->info);
|
mndAddConsensusTasks(pInfo, &req, &pMsg->info);
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
|
||||||
|
|
||||||
pMsg->info.handle = NULL; // disable auto rsp
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (chkId == req.checkpointId) {
|
if (chkId == req.checkpointId) {
|
||||||
mDebug("vgId:%d stream:0x%" PRIx64 " %s consensus-checkpointId is:%" PRId64 ", meta-stored checkpointId:%" PRId64,
|
mDebug("vgId:%d stream:0x%" PRIx64 " %s consensus-checkpointId is:%" PRId64 ", meta-stored checkpointId:%" PRId64,
|
||||||
req.nodeId, req.streamId, pStream->name, chkId, pStream->checkpointId);
|
req.nodeId, req.streamId, pStream->name, chkId, pStream->checkpointId);
|
||||||
mndSendQuickConsensusChkptIdRsp(&req, TSDB_CODE_SUCCESS, req.streamId, chkId, &pMsg->info);
|
mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, chkId, req.startTs);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
pMsg->info.handle = NULL; // disable auto rsp
|
doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2748,14 +2743,14 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
pMsg->info.handle = NULL; // disable auto rsp
|
doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
||||||
|
SMnode *pMnode = pMsg->info.node;
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
int64_t streamId = -1; // todo: fix only one
|
SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
|
||||||
mDebug("start to process consensus-checkpointId in tmr");
|
mDebug("start to process consensus-checkpointId in tmr");
|
||||||
taosThreadMutexLock(&execInfo.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
|
@ -2764,31 +2759,31 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
||||||
while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
|
while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
|
||||||
SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
|
SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
|
||||||
|
|
||||||
int32_t j = 0;
|
int64_t streamId = -1;
|
||||||
int32_t num = taosArrayGetSize(pInfo->pTaskList);
|
int32_t num = taosArrayGetSize(pInfo->pTaskList);
|
||||||
|
|
||||||
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
||||||
|
|
||||||
for (; j < num; ++j) {
|
SStreamObj *pStream = mndGetStreamObj(pMnode, pInfo->streamId);
|
||||||
SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
|
if (pStream == NULL) { // stream has been dropped already
|
||||||
|
mDebug("stream:0x%"PRIx64" dropped already, continue", pInfo->streamId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if ((now - pe->ts) > 10 * 1000) {
|
for (int32_t j = 0; j < num; ++j) {
|
||||||
bool allEqual = true;
|
SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
|
||||||
int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &allEqual);
|
streamId = pe->req.streamId;
|
||||||
|
|
||||||
|
if ((now - pe->ts) >= 10 * 1000) {
|
||||||
|
int32_t existed = 0;
|
||||||
|
int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed);
|
||||||
if (chkId == -1) {
|
if (chkId == -1) {
|
||||||
mDebug("tasks send hbMsg for stream:0x%" PRIx64 ", wait for next round", pe->req.streamId);
|
mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
|
||||||
|
pInfo->numOfTasks, pe->req.taskId);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (allEqual) {
|
|
||||||
mDebug("all has identical checkpointId for stream:0x%"PRIx64" send checkpointId to s-task:0x%x",
|
|
||||||
pe->req.streamId, pe->req.taskId);
|
|
||||||
|
|
||||||
mndSendQuickConsensusChkptIdRsp(&pe->req, TSDB_CODE_SUCCESS, pe->req.streamId, chkId, &pe->rspInfo);
|
|
||||||
} else {
|
|
||||||
ASSERT(chkId <= pe->req.checkpointId);
|
ASSERT(chkId <= pe->req.checkpointId);
|
||||||
mndSendQuickConsensusChkptIdRsp(&pe->req, TSDB_CODE_SUCCESS, pe->req.streamId, chkId, &pe->rspInfo);
|
mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayPush(pList, &pe->req.taskId);
|
taosArrayPush(pList, &pe->req.taskId);
|
||||||
streamId = pe->req.streamId;
|
streamId = pe->req.streamId;
|
||||||
|
@ -2798,6 +2793,8 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mndReleaseStream(pMnode, pStream);
|
||||||
|
|
||||||
if (taosArrayGetSize(pList) > 0) {
|
if (taosArrayGetSize(pList) > 0) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
||||||
int32_t *taskId = taosArrayGet(pList, i);
|
int32_t *taskId = taosArrayGet(pList, i);
|
||||||
|
@ -2815,12 +2812,19 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
if (taosArrayGetSize(pInfo->pTaskList) == 0) {
|
if (taosArrayGetSize(pInfo->pTaskList) == 0) {
|
||||||
mndClearConsensusRspEntry(pInfo);
|
mndClearConsensusRspEntry(pInfo);
|
||||||
mndClearConsensusCheckpointId(execInfo.pStreamConsensus, streamId);
|
ASSERT(streamId != -1);
|
||||||
|
taosArrayPush(pStreamList, &streamId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
|
||||||
|
int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
|
||||||
|
mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
|
||||||
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
|
taosArrayDestroy(pStreamList);
|
||||||
mDebug("end to process consensus-checkpointId in tmr");
|
mDebug("end to process consensus-checkpointId in tmr");
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -651,6 +651,7 @@ void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) {
|
||||||
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
||||||
taosThreadMutexLock(&pExecNode->lock);
|
taosThreadMutexLock(&pExecNode->lock);
|
||||||
|
|
||||||
|
// 1. remove task entries
|
||||||
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
|
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
|
||||||
while (streamTaskIterNextTask(pIter)) {
|
while (streamTaskIterNextTask(pIter)) {
|
||||||
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
|
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
|
||||||
|
@ -660,8 +661,11 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
|
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
|
||||||
taosThreadMutexUnlock(&pExecNode->lock);
|
|
||||||
|
|
||||||
|
// 2. remove stream entry in consensus hash table
|
||||||
|
mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pExecNode->lock);
|
||||||
destroyStreamTaskIter(pIter);
|
destroyStreamTaskIter(pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -835,45 +839,98 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg, int64_t checkpointId) {
|
static int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) {
|
||||||
|
SRestoreCheckpointInfo req = {
|
||||||
|
.taskId = pTask->id.taskId,
|
||||||
|
.streamId = pTask->id.streamId,
|
||||||
|
.checkpointId = checkpointId,
|
||||||
|
.startTs = ts,
|
||||||
|
.nodeId = pTask->info.nodeId,
|
||||||
|
};
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t blen;
|
int32_t blen;
|
||||||
|
tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code);
|
||||||
SRestoreCheckpointInfoRsp req = {
|
|
||||||
.streamId = pInfo->streamId, .taskId = pInfo->taskId, .checkpointId = checkpointId, .startTs = pInfo->startTs};
|
|
||||||
|
|
||||||
tEncodeSize(tEncodeRestoreCheckpointInfoRsp, &req, blen, code);
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tlen = sizeof(SMsgHead) + blen;
|
int32_t tlen = sizeof(SMsgHead) + blen;
|
||||||
void *abuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
|
||||||
|
void *pBuf = taosMemoryMalloc(tlen);
|
||||||
|
if (pBuf == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, abuf, tlen);
|
tEncoderInit(&encoder, abuf, tlen);
|
||||||
tEncodeRestoreCheckpointInfoRsp(&encoder, &req);
|
tEncodeRestoreCheckpointInfo(&encoder, &req);
|
||||||
|
|
||||||
SMsgHead *pMsgHead = (SMsgHead *)pMsg->pCont;
|
SMsgHead *pMsgHead = (SMsgHead *)pBuf;
|
||||||
pMsgHead->contLen = htonl(tlen);
|
pMsgHead->contLen = htonl(tlen);
|
||||||
pMsgHead->vgId = htonl(pInfo->nodeId);
|
pMsgHead->vgId = htonl(pTask->info.nodeId);
|
||||||
|
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
tmsgSendRsp(pMsg);
|
SEpSet epset = {0};
|
||||||
|
bool hasEpset = false;
|
||||||
|
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||||
|
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSendQuickConsensusChkptIdRsp(SRestoreCheckpointInfo *pReq, int32_t code, int64_t streamId,
|
code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
|
||||||
int64_t checkpointId, SRpcHandleInfo *pRpcInfo) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead)};
|
taosMemoryFree(pBuf);
|
||||||
rsp.pCont = rpcMallocCont(rsp.contLen);
|
}
|
||||||
|
|
||||||
SMsgHead *pHead = rsp.pCont;
|
return code;
|
||||||
pHead->vgId = htonl(pReq->nodeId);
|
}
|
||||||
|
|
||||||
mDebug("stream:0x%" PRIx64 " consensus-checkpointId:%" PRId64 " exists, s-task:0x%x send to vnode",
|
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
|
||||||
streamId, checkpointId, pReq->taskId);
|
int64_t ts) {
|
||||||
return doSendConsensusCheckpointRsp(pReq, &rsp, checkpointId);
|
char msg[128] = {0};
|
||||||
|
snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId);
|
||||||
|
|
||||||
|
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
STaskId id = {.streamId = pStream->uid, .taskId = taskId};
|
||||||
|
SStreamTask *pTask = mndGetStreamTask(&id, pStream);
|
||||||
|
ASSERT(pTask);
|
||||||
|
|
||||||
|
/*int32_t code = */ mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid);
|
||||||
|
int32_t code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts);
|
||||||
|
if (code != 0) {
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
|
mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr());
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
|
||||||
|
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks) {
|
SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks) {
|
||||||
|
@ -885,6 +942,7 @@ SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId,
|
||||||
SCheckpointConsensusInfo p = {
|
SCheckpointConsensusInfo p = {
|
||||||
.pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry)),
|
.pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry)),
|
||||||
.numOfTasks = numOfTasks,
|
.numOfTasks = numOfTasks,
|
||||||
|
.streamId = streamId,
|
||||||
};
|
};
|
||||||
|
|
||||||
taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
|
taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
|
||||||
|
|
|
@ -128,6 +128,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg);
|
return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg);
|
||||||
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
|
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
|
||||||
return tqStreamTaskProcessRetrieveTriggerRsp(pSnode->pMeta, pMsg);
|
return tqStreamTaskProcessRetrieveTriggerRsp(pSnode->pMeta, pMsg);
|
||||||
|
case TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP:
|
||||||
|
return tqStreamProcessConsensusChkptRsp2(pSnode->pMeta, pMsg);
|
||||||
default:
|
default:
|
||||||
sndError("invalid snode msg:%d", pMsg->msgType);
|
sndError("invalid snode msg:%d", pMsg->msgType);
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -149,15 +151,15 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
case TDMT_VND_STREAM_TASK_UPDATE:
|
case TDMT_VND_STREAM_TASK_UPDATE:
|
||||||
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true);
|
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true);
|
||||||
case TDMT_VND_STREAM_TASK_RESET:
|
case TDMT_VND_STREAM_TASK_RESET:
|
||||||
return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg);
|
return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont);
|
||||||
case TDMT_STREAM_TASK_PAUSE:
|
case TDMT_STREAM_TASK_PAUSE:
|
||||||
return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont);
|
return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont);
|
||||||
case TDMT_STREAM_TASK_RESUME:
|
case TDMT_STREAM_TASK_RESUME:
|
||||||
return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
|
return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
|
||||||
case TDMT_STREAM_TASK_UPDATE_CHKPT:
|
case TDMT_STREAM_TASK_UPDATE_CHKPT:
|
||||||
return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont, pMsg->contLen);
|
return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont);
|
||||||
case TDMT_MND_STREAM_CHKPT_CONSEN_RSP:
|
case TDMT_STREAM_CONSEN_CHKPT:
|
||||||
return tqStreamProcessConsensusChkptRsp(pSnode->pMeta, pMsg);
|
return tqStreamTaskProcessConsenChkptIdReq(pSnode->pMeta, pMsg);
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -298,6 +298,7 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
|
int32_t tqProcessTaskConsenChkptIdReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
||||||
// sma
|
// sma
|
||||||
int32_t smaInit();
|
int32_t smaInit();
|
||||||
|
|
|
@ -1016,7 +1016,11 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg, msgLen);
|
return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqProcessTaskConsenChkptIdReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
return tqStreamTaskProcessConsenChkptIdReq(pTq->pStreamMeta, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
|
@ -1239,7 +1243,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg);
|
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
@ -1277,5 +1281,5 @@ int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return tqStreamProcessConsensusChkptRsp(pTq->pStreamMeta, pMsg);
|
return tqStreamProcessConsensusChkptRsp2(pTq->pStreamMeta, pMsg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -670,7 +670,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen) {
|
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) {
|
||||||
SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
|
SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
|
||||||
|
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
@ -858,8 +858,8 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
|
||||||
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg->pCont;
|
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
|
@ -1115,6 +1115,8 @@ int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { ret
|
||||||
|
|
||||||
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
|
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
|
||||||
|
|
||||||
|
int32_t tqStreamProcessConsensusChkptRsp2(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
|
||||||
|
|
||||||
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
|
SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
|
||||||
|
|
||||||
|
@ -1130,22 +1132,21 @@ int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
|
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
|
|
||||||
SRestoreCheckpointInfoRsp req = {0};
|
SRestoreCheckpointInfo req = {0};
|
||||||
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||||
|
|
||||||
rsp.info.handle = NULL;
|
if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) {
|
||||||
if (tDecodeRestoreCheckpointInfoRsp(&decoder, &req) < 0) {
|
tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
|
||||||
// rsp.code = TSDB_CODE_MSG_DECODE_ERROR; // disable it temporarily
|
|
||||||
tqError("vgId:%d failed to decode restore task checkpointId, code:%s", vgId, tstrerror(rsp.code));
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1154,7 +1155,7 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d process restore checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
|
tqError("vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
|
||||||
pMeta->vgId, req.taskId);
|
pMeta->vgId, req.taskId);
|
||||||
streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
|
streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1182,7 +1183,7 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
pTask->chkInfo.checkpointId = req.checkpointId;
|
pTask->chkInfo.checkpointId = req.checkpointId;
|
||||||
tqSetRestoreVersionInfo(pTask);
|
tqSetRestoreVersionInfo(pTask);
|
||||||
} else {
|
} else {
|
||||||
tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current checkpointId, no need to update",
|
tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current checkpointId, not update",
|
||||||
pTask->id.idStr, vgId, req.checkpointId);
|
pTask->id.idStr, vgId, req.checkpointId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -630,6 +630,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
|
case TDMT_STREAM_CONSEN_CHKPT: {
|
||||||
|
if (pVnode->restored) {
|
||||||
|
tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg);
|
||||||
|
}
|
||||||
|
} break;
|
||||||
case TDMT_STREAM_TASK_PAUSE: {
|
case TDMT_STREAM_TASK_PAUSE: {
|
||||||
if (pVnode->restored && vnodeIsLeader(pVnode) &&
|
if (pVnode->restored && vnodeIsLeader(pVnode) &&
|
||||||
tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
|
tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
|
||||||
|
@ -647,11 +652,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
tqProcessTaskResetReq(pVnode->pTq, pMsg);
|
tqProcessTaskResetReq(pVnode->pTq, pMsg);
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_MND_STREAM_CHKPT_CONSEN_RSP: {
|
|
||||||
if (pVnode->restored) {
|
|
||||||
tqProcessTaskConsensusChkptRsp(pVnode->pTq, pMsg);
|
|
||||||
}
|
|
||||||
} break;
|
|
||||||
case TDMT_VND_ALTER_CONFIRM:
|
case TDMT_VND_ALTER_CONFIRM:
|
||||||
needCommit = pVnode->config.hashChange;
|
needCommit = pVnode->config.hashChange;
|
||||||
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
|
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
|
||||||
|
@ -861,6 +861,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
||||||
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
|
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_MND_STREAM_CHKPT_REPORT_RSP:
|
case TDMT_MND_STREAM_CHKPT_REPORT_RSP:
|
||||||
return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg);
|
||||||
|
case TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP:
|
||||||
|
return tqProcessTaskConsensusChkptRsp(pVnode->pTq, pMsg);
|
||||||
default:
|
default:
|
||||||
vError("unknown msg type:%d in stream queue", pMsg->msgType);
|
vError("unknown msg type:%d in stream queue", pMsg->msgType);
|
||||||
return TSDB_CODE_APP_ERROR;
|
return TSDB_CODE_APP_ERROR;
|
||||||
|
|
|
@ -1153,7 +1153,7 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
SRpcMsg msg = {0};
|
SRpcMsg msg = {0};
|
||||||
initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_CONSEN, buf, tlen);
|
initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CONSEN_CHKPT, buf, tlen);
|
||||||
stDebug("s-task:%s vgId:%d send latest checkpointId:%" PRId64 " to mnode to get the consensus checkpointId", id, vgId,
|
stDebug("s-task:%s vgId:%d send latest checkpointId:%" PRId64 " to mnode to get the consensus checkpointId", id, vgId,
|
||||||
pInfo->checkpointId);
|
pInfo->checkpointId);
|
||||||
|
|
||||||
|
|
|
@ -650,23 +650,3 @@ int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo*
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo) {
|
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pInfo->startTs) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pInfo->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pCoder, pInfo->taskId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pInfo->checkpointId) < 0) return -1;
|
|
||||||
tEndEncode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo) {
|
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pInfo->startTs) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pInfo->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pCoder, &pInfo->taskId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pInfo->checkpointId) < 0) return -1;
|
|
||||||
tEndDecode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
Loading…
Reference in New Issue