enh(stream):rsp the checkpoint ready msg.
This commit is contained in:
parent
dafac591f7
commit
aee0460daf
|
@ -28,6 +28,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
|||
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
|
||||
bool isLeader, bool restored);
|
||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
||||
|
|
|
@ -87,6 +87,7 @@ SArray *smGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
|
|
|
@ -835,6 +835,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -182,6 +182,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg);
|
||||
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
|
||||
return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
|
||||
return tqStreamProcessCheckpointReadyRsp(pSnode->pMeta, pMsg);
|
||||
default:
|
||||
sndError("invalid snode msg:%d", pMsg->msgType);
|
||||
ASSERT(0);
|
||||
|
|
|
@ -244,6 +244,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
|||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||
int32_t tqScanWal(STQ* pTq);
|
||||
|
|
|
@ -1229,3 +1229,7 @@ int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg);
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
return tqStreamProcessCheckpointReadyRsp(pTq->pStreamMeta, pMsg);
|
||||
}
|
||||
|
|
|
@ -948,3 +948,7 @@ static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
|
||||
|
||||
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
|
||||
|
||||
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
return doProcessDummyRspMsg(pMeta, pMsg);
|
||||
}
|
|
@ -800,6 +800,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
|
||||
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
|
||||
return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in stream queue", pMsg->msgType);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
|
|
|
@ -907,7 +907,6 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
|
|||
|
||||
SStreamChkptReadyInfo info = {.upStreamTaskId = pInfo->taskId, .upstreamNodeEpset = pInfo->epSet};
|
||||
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
|
||||
info.msg.info.noResp = 1; // refactor later.
|
||||
|
||||
stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64
|
||||
":0x%x (vgId:%d) idx:%d, vgId:%d",
|
||||
|
|
Loading…
Reference in New Issue