From 13543f7daa2b01d90e5236517dac16a09485752b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 2 Aug 2023 10:33:41 +0800 Subject: [PATCH] feature(stream): handle task update in tq module. --- include/common/tmsg.h | 14 ---- include/libs/stream/tstream.h | 8 ++- source/common/src/tmsg.c | 18 ----- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 94 +++++++++++++++---------- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 + source/libs/stream/src/streamDispatch.c | 8 ++- 8 files changed, 70 insertions(+), 77 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index dc20e6bd76..e772a47e3d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2965,20 +2965,6 @@ typedef struct { int8_t reserved; } SVPauseStreamTaskRsp; -typedef struct { - SMsgHead head; - int32_t taskId; - int32_t nodeId; - SEpSet epset; -} SVStreamTaskUpdateReq; - -typedef struct { - int8_t reserved; -} SVStreamTaskUpdateRsp; - -int32_t tSerializeVTaskUpdateReq(void* buf, int32_t bufLen, const SVStreamTaskUpdateReq* pReq); -int32_t tDeserializeVTaskUpdateReq(void* buf, int32_t bufLen, SVStreamTaskUpdateReq* pReq); - typedef struct { char name[TSDB_STREAM_FNAME_LEN]; int8_t igNotExists; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d9a7d3a456..75eb66d984 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -551,12 +551,14 @@ int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteH int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq); typedef struct { + int64_t streamId; + int32_t taskId; int32_t nodeId; SEpSet epset; -} SStreamTaskUpdateInfo; +} SStreamTaskUpdateMsg; -int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg); -int32_t tDecodeTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateInfo* pMsg); +int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateMsg* pMsg); +int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateMsg* pMsg); typedef struct { int64_t streamId; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 0aa7e1f955..73a0bed48f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7919,24 +7919,6 @@ int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamR return 0; } -int32_t tSerializeVTaskUpdateReq(void *buf, int32_t bufLen, const SVStreamTaskUpdateReq *pReq) { - SEncoder encoder = {0}; - tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; -// if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; -// if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; -// if (tEncodeI8(&encoder, pReq->igUntreated) < 0) return -1; - tEndEncode(&encoder); - - int32_t tlen = encoder.pos; - tEncoderClear(&encoder); - return tlen; -} - -int32_t tDeserializeVTaskUpdateReq(void* buf, int32_t bufLen, SVStreamTaskUpdateReq* pReq) { - return 0; -} - int32_t tEncodeMqSubTopicEp(void **buf, const SMqSubTopicEp *pTopicEp) { int32_t tlen = 0; tlen += taosEncodeString(buf, pTopicEp->topic); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ed64c0ec36..3826853a6b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1728,7 +1728,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } static int32_t doBuildStreamTaskUpdateMsg(void** pBuf, int32_t* pLen, int32_t nodeId, const SEpSet* pEpset) { - SStreamTaskUpdateInfo req = {0}; + SStreamTaskUpdateMsg req = {0}; req.nodeId = nodeId; req.epset = *pEpset; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index eb2e431576..2ea6cb5a44 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -223,6 +223,7 @@ int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckStreamStatus(STQ* pTq); int tqCommit(STQ*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3e29c85e44..252f7da6c1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1739,7 +1739,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { } tDecoderClear(&decoder); - // todo handle this bug: task not in ready state. + // todo handle the case when the task not in ready state, and the checkpoint msg is arrived. SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId); if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, @@ -1811,43 +1811,59 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { return code; } -int32_t tqProcessTaskUpdateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { -// SStreamTaskUpdateInfo* pReq = (SVPauseStreamTaskReq*)msg; -// -// SStreamMeta* pMeta = pTq->pStreamMeta; -// SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); -// if (pTask == NULL) { -// tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, -// pReq->taskId); -// -// // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active -// return TSDB_CODE_SUCCESS; -// } -// -// tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr); -// streamTaskPause(pTask); -// -// SStreamTask* pHistoryTask = NULL; -// if (pTask->historyTaskId.taskId != 0) { -// pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); -// if (pHistoryTask == NULL) { -// tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success", -// pMeta->vgId, pTask->historyTaskId.taskId); -// -// streamMetaReleaseTask(pMeta, pTask); -// -// // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active -// return TSDB_CODE_SUCCESS; -// } -// -// tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); -// streamTaskPause(pHistoryTask); -// } -// -// streamMetaReleaseTask(pMeta, pTask); -// if (pHistoryTask != NULL) { -// streamMetaReleaseTask(pMeta, pHistoryTask); -// } -// +int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { + int32_t vgId = TD_VID(pTq->pVnode); + SStreamMeta* pMeta = pTq->pStreamMeta; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; + + SStreamTaskUpdateMsg req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) { + code = TSDB_CODE_MSG_DECODE_ERROR; + tDecoderClear(&decoder); + tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(code)); + return code; + } + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId); + if (pTask == NULL) { + tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, + req.taskId); + // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active + return TSDB_CODE_SUCCESS; + } + + tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr); +// streamTaskUpdateEpInfo(pTask); + + SStreamTask* pHistoryTask = NULL; + if (pTask->historyTaskId.taskId != 0) { + pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); + if (pHistoryTask == NULL) { + tqError( + "vgId:%d failed to acquire fill-history task:0x%x when handling task update, it may have been dropped " + "already", + pMeta->vgId, pTask->historyTaskId.taskId); + + streamMetaReleaseTask(pMeta, pTask); + + // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active + return TSDB_CODE_SUCCESS; + } + + tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr); +// streamTaskUpdateEpInfo(pHistoryTask); + } + + streamMetaReleaseTask(pMeta, pTask); + if (pHistoryTask != NULL) { + streamMetaReleaseTask(pMeta, pHistoryTask); + } + return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d8a38cebd8..7b210c59eb 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -665,6 +665,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_CHECK_POINT_SOURCE: return tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg); + case TDMT_VND_STREAM_TASK_UPDATE: + return tqProcessTaskUpdateReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY: return tqProcessStreamTaskCheckpointReadyMsg(pVnode->pTq, pMsg); default: diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 51ad795a2b..99ef88d2e1 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -959,16 +959,20 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { return 0; } -int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg) { +int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateMsg* pMsg) { if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pMsg->nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pMsg->epset) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } -int32_t tDecodeTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateInfo* pMsg) { +int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateMsg* pMsg) { if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pMsg->nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pMsg->epset) < 0) return -1; tEndDecode(pDecoder);