Merge branch 'enh/triggerCheckPoint2' into enh/chkpTransfer
This commit is contained in:
commit
0698cf5f92
|
@ -203,6 +203,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -88,8 +88,6 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheckReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheckReq);
|
||||||
|
|
||||||
/*mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq);*/
|
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
|
||||||
|
@ -868,7 +866,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)};
|
.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)};
|
||||||
// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -680,7 +680,6 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
|
||||||
|
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1663,6 +1662,7 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo refactor.
|
||||||
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
||||||
STQ* pTq = pVnode->pTq;
|
STQ* pTq = pVnode->pTq;
|
||||||
SMsgHead* msgStr = pMsg->pCont;
|
SMsgHead* msgStr = pMsg->pCont;
|
||||||
|
@ -1830,26 +1830,25 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
|
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
|
||||||
SStreamTaskNodeUpdateMsg req = {0};
|
SStreamTaskNodeUpdateMsg req = {0};
|
||||||
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||||
if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
|
if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
|
||||||
code = TSDB_CODE_MSG_DECODE_ERROR;
|
rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
|
||||||
tDecoderClear(&decoder);
|
tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
|
||||||
tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(code));
|
goto _end;
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoder);
|
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
||||||
req.taskId);
|
req.taskId);
|
||||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
||||||
return TSDB_CODE_SUCCESS;
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
|
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
|
||||||
|
@ -1858,27 +1857,26 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamTask* pHistoryTask = NULL;
|
SStreamTask* pHistoryTask = NULL;
|
||||||
if (pTask->historyTaskId.taskId != 0) {
|
if (pTask->historyTaskId.taskId != 0) {
|
||||||
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
|
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
|
||||||
if (pHistoryTask == NULL) {
|
if (pHistoryTask != NULL) {
|
||||||
|
tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr);
|
||||||
|
streamTaskUpdateEpsetInfo(pHistoryTask, req.pNodeList);
|
||||||
|
|
||||||
|
streamTaskRestart(pHistoryTask, NULL);
|
||||||
|
streamMetaReleaseTask(pMeta, pHistoryTask);
|
||||||
|
} else {
|
||||||
tqError("vgId:%d failed to get fill-history task:0x%x when handling task update, it may have been dropped",
|
tqError("vgId:%d failed to get fill-history task:0x%x when handling task update, it may have been dropped",
|
||||||
pMeta->vgId, pTask->historyTaskId.taskId);
|
pMeta->vgId, pTask->historyTaskId.taskId);
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr);
|
|
||||||
streamTaskUpdateEpsetInfo(pHistoryTask, req.pNodeList);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pHistoryTask != NULL) {
|
|
||||||
streamTaskRestart(pHistoryTask, NULL);
|
|
||||||
streamMetaReleaseTask(pMeta, pHistoryTask);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskRestart(pTask, NULL);
|
streamTaskRestart(pTask, NULL);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
int32_t code = rsp.code;
|
||||||
|
|
||||||
|
_end:
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue