From f65651f6ef990e256b819796e640b3ca54337ffd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 15 Nov 2024 00:19:35 +0800 Subject: [PATCH] fix(stream): update the msg encoder. --- include/libs/stream/streamMsg.h | 21 ++- include/libs/stream/tstream.h | 5 + source/common/CMakeLists.txt | 7 + source/common/src/{ => msg}/tmsg.c | 0 source/common/test/CMakeLists.txt | 2 +- source/dnode/mnode/impl/src/mndStreamHb.c | 35 ++++- .../dnode/mnode/impl/src/mndStreamTransAct.c | 2 +- source/dnode/vnode/src/tq/tq.c | 42 ++++-- source/dnode/vnode/src/tqCommon/tqCommon.c | 110 ++++++++++----- source/libs/stream/src/streamCheckpoint.c | 104 ++++++++++----- source/libs/stream/src/streamMsg.c | 126 +++++++++++++++++- source/libs/stream/src/streamSched.c | 36 +++-- 12 files changed, 392 insertions(+), 98 deletions(-) rename source/common/src/{ => msg}/tmsg.c (100%) diff --git a/include/libs/stream/streamMsg.h b/include/libs/stream/streamMsg.h index 0ceaa93a72..19b033a02e 100644 --- a/include/libs/stream/streamMsg.h +++ b/include/libs/stream/streamMsg.h @@ -170,8 +170,8 @@ typedef struct SStreamHbMsg { SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. } SStreamHbMsg; -int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); -int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp); +int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq); +int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq); void tCleanupStreamHbMsg(SStreamHbMsg* pMsg); typedef struct { @@ -179,6 +179,9 @@ typedef struct { int32_t msgId; } SMStreamHbRspMsg; +int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp); +int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp); + typedef struct SRetrieveChkptTriggerReq { SMsgHead head; int64_t streamId; @@ -189,6 +192,9 @@ typedef struct SRetrieveChkptTriggerReq { int64_t downstreamTaskId; } SRetrieveChkptTriggerReq; +int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq); +int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq); + typedef struct SCheckpointTriggerRsp { int64_t streamId; int64_t checkpointId; @@ -198,6 +204,9 @@ typedef struct SCheckpointTriggerRsp { int32_t rspCode; } SCheckpointTriggerRsp; +int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp); +int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp); + typedef struct SCheckpointReport { int64_t streamId; int32_t taskId; @@ -222,7 +231,7 @@ typedef struct SRestoreCheckpointInfo { int32_t nodeId; } SRestoreCheckpointInfo; -int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); +int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq); typedef struct { @@ -232,10 +241,8 @@ typedef struct { int32_t reqType; } SStreamTaskRunReq; -typedef struct SCheckpointConsensusEntry { - SRestoreCheckpointInfo req; - int64_t ts; -} SCheckpointConsensusEntry; +int32_t tEncodeStreamTaskRunReq(SEncoder* pEncoder, const SStreamTaskRunReq* pReq); +int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq); #ifdef __cplusplus } diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7571ee22bd..05b3e21eba 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -643,6 +643,11 @@ typedef struct SCheckpointConsensusInfo { int64_t streamId; } SCheckpointConsensusInfo; +typedef struct SCheckpointConsensusEntry { + SRestoreCheckpointInfo req; + int64_t ts; +} SCheckpointConsensusEntry; + void streamSetupScheduleTrigger(SStreamTask* pTask); // dispatch related diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index f10eb6a611..712121ca72 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -1,4 +1,11 @@ aux_source_directory(src COMMON_SRC) +aux_source_directory("src/msg/" MSG_SRC_FILES) + +list( + APPEND + COMMON_SRC + ${MSG_SRC_FILES} +) if(TD_ENTERPRISE) LIST(APPEND COMMON_SRC ${TD_ENTERPRISE_DIR}/src/plugins/common/src/tglobal.c) diff --git a/source/common/src/tmsg.c b/source/common/src/msg/tmsg.c similarity index 100% rename from source/common/src/tmsg.c rename to source/common/src/msg/tmsg.c diff --git a/source/common/test/CMakeLists.txt b/source/common/test/CMakeLists.txt index 2fe3ef652d..bb12612273 100644 --- a/source/common/test/CMakeLists.txt +++ b/source/common/test/CMakeLists.txt @@ -46,7 +46,7 @@ if (${TD_LINUX}) target_sources(tmsgTest PRIVATE "tmsgTest.cpp" - "../src/tmsg.c" + "../src/msg/tmsg.c" ) target_include_directories(tmsgTest PUBLIC "${TD_SOURCE_DIR}/include/common/") target_link_libraries(tmsgTest PUBLIC os util gtest gtest_main) diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 46445af856..4b3db28aa1 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -553,12 +553,37 @@ void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArr } void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) { - SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SMStreamHbRspMsg)}; - rsp.pCont = rpcMallocCont(rsp.contLen); + int32_t ret = 0; + int32_t tlen = 0; + void *buf = NULL; - SMStreamHbRspMsg *pMsg = rsp.pCont; - pMsg->head.vgId = htonl(vgId); - pMsg->msgId = msgId; + const SMStreamHbRspMsg msg = {.msgId = msgId}; + + tEncodeSize(tEncodeStreamHbRsp, &msg, tlen, ret); + if (ret < 0) { + mError("encode stream hb msg rsp failed, code:%s", tstrerror(code)); + } + + buf = rpcMallocCont(tlen + sizeof(SMsgHead)); + if (buf == NULL) { + mError("encode stream hb msg rsp failed, code:%s", tstrerror(terrno)); + return; + } + + ((SMStreamHbRspMsg*)buf)->head.vgId = htonl(vgId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeStreamHbRsp(&encoder, &msg)) < 0) { + rpcFreeCont(buf); + tEncoderClear(&encoder); + mError("encode stream hb msg rsp failed, code:%s", tstrerror(code)); + return; + } + tEncoderClear(&encoder); + + SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = tlen + sizeof(SMsgHead), .pCont = buf}; tmsgSendRsp(&rsp); pRpcInfo->handle = NULL; // disable auto rsp diff --git a/source/dnode/mnode/impl/src/mndStreamTransAct.c b/source/dnode/mnode/impl/src/mndStreamTransAct.c index 7ee60c6f14..5ccb626609 100644 --- a/source/dnode/mnode/impl/src/mndStreamTransAct.c +++ b/source/dnode/mnode/impl/src/mndStreamTransAct.c @@ -607,7 +607,7 @@ int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* p tEncoderInit(&encoder, abuf, tlen); code = tEncodeRestoreCheckpointInfo(&encoder, &req); tEncoderClear(&encoder); - if (code == -1) { + if (code < 0) { taosMemoryFree(pBuf); return code; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6195899566..a234777441 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1009,21 +1009,34 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { - SStreamTaskRunReq* pReq = pMsg->pCont; + int32_t code = 0; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder; + + SStreamTaskRunReq req = {0}; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) { + tqError("vgId:%d failed to decode task run req, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); + tDecoderClear(&decoder); + return TSDB_CODE_SUCCESS; + } + + tDecoderClear(&decoder); // extracted submit data from wal files for all tasks - if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { + if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { return tqScanWal(pTq); } - int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); + code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); if (code) { tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code)); return code; } // let's continue scan data in the wal files - if (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK) { + if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) { code = tqScanWalAsync(pTq, false); // it's ok to failed if (code) { tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); @@ -1297,7 +1310,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont; + SStreamCheckpointReadyMsg* pReq = (SStreamCheckpointReadyMsg*)pMsg->pCont; if (!vnodeIsRoleLeader(pTq->pVnode)) { tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, (int32_t)pReq->downstreamTaskId); @@ -1318,10 +1331,23 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont; if (!vnodeIsRoleLeader(pTq->pVnode)) { - tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, - (int32_t)pReq->downstreamTaskId); + SRetrieveChkptTriggerReq req = {0}; + + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder = {0}; + + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) { + tDecoderClear(&decoder); + tqError("vgId:%d invalid retrieve checkpoint-trigger req received", vgId); + return TSDB_CODE_INVALID_MSG; + } + tDecoderClear(&decoder); + + tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from s-task:0x%" PRId64, vgId, + req.downstreamTaskId); return TSDB_CODE_STREAM_NOT_LEADER; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7b34bb83b6..e5078d2950 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -828,14 +828,25 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { - SStreamTaskRunReq* pReq = pMsg->pCont; + int32_t code = 0; + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder; - int32_t type = pReq->reqType; - int32_t vgId = pMeta->vgId; - int32_t code = 0; + SStreamTaskRunReq req = {0}; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) { + tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code)); + tDecoderClear(&decoder); + return TSDB_CODE_SUCCESS; + } + tDecoderClear(&decoder); + + int32_t type = req.reqType; if (type == STREAM_EXEC_T_START_ONE_TASK) { - code = streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId); + code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId); return 0; } else if (type == STREAM_EXEC_T_START_ALL_TASKS) { code = streamMetaStartAllTasks(pMeta); @@ -847,11 +858,11 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead code = streamMetaStopAllTasks(pMeta); return 0; } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { - code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId); + code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = NULL; - code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); + code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); if (pTask != NULL && (code == 0)) { char* pStatus = NULL; @@ -873,7 +884,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } SStreamTask* pTask = NULL; - code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); + code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); if ((pTask != NULL) && (code == 0)) { // even in halt status, the data in inputQ must be processed char* p = NULL; if (streamTaskReadyToRun(pTask, &p)) { @@ -890,7 +901,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead return 0; } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. // todo add one function to handle this - tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, pReq->taskId); + tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId); return code; } } @@ -976,25 +987,36 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) { } int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont; + SRetrieveChkptTriggerReq req = {0}; + SStreamTask* pTask = NULL; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder = {0}; - SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId, &pTask); + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) { + tDecoderClear(&decoder); + tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId); + return TSDB_CODE_INVALID_MSG; + } + tDecoderClear(&decoder); + + int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask); if (pTask == NULL || (code != 0)) { tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64 " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already", - pMeta->vgId, pReq->checkpointId, (int32_t)pReq->downstreamTaskId, pReq->upstreamTaskId); + pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64, - pReq->upstreamTaskId, (int32_t)pReq->downstreamTaskId, pReq->checkpointId); + req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId); if (pTask->status.downstreamReady != 1) { tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready", - pTask->id.idStr, (int32_t)pReq->downstreamTaskId); + pTask->id.idStr, (int32_t)req.downstreamTaskId); - code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, + code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS); streamMetaReleaseTask(pMeta, pTask); return code; @@ -1006,19 +1028,19 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) int64_t checkpointId = 0; streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId); - if (checkpointId != pReq->checkpointId) { + if (checkpointId != req.checkpointId) { tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64 " req:%" PRId64, - pTask->id.idStr, pReq->downstreamTaskId, checkpointId, pReq->checkpointId); + pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_INVALID_MSG; } - if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) { + if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) { // re-send the lost checkpoint-trigger msg to downstream task tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr, - (int32_t)pReq->downstreamTaskId, checkpointId, transId); - code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, + (int32_t)req.downstreamTaskId, checkpointId, transId); + code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info, TSDB_CODE_SUCCESS); } else { // not send checkpoint-trigger yet, wait int32_t recv = 0, total = 0; @@ -1032,7 +1054,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) "sending checkpoint-source/trigger", pTask->id.idStr, recv, total); } - code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, + code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); } } else { // upstream not recv the checkpoint-source/trigger till now @@ -1044,7 +1066,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all " "upstream sending checkpoint-source/trigger", pTask->id.idStr); - code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, + code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); } @@ -1053,23 +1075,34 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) } int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + SCheckpointTriggerRsp rsp = {0}; + SStreamTask* pTask = NULL; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder = {0}; - SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId, &pTask); + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) { + tDecoderClear(&decoder); + tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId); + return TSDB_CODE_INVALID_MSG; + } + tDecoderClear(&decoder); + + int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask); if (pTask == NULL || (code != 0)) { tqError( "vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already", - pMeta->vgId, pRsp->taskId); + pMeta->vgId, rsp.taskId); return code; } tqDebug( "s-task:%s recv re-send checkpoint-trigger msg from through retrieve/rsp channel, upstream:0x%x, " "checkpointId:%" PRId64 ", transId:%d", - pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId); + pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId); - code = streamTaskProcessCheckpointTriggerRsp(pTask, pRsp); + code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp); streamMetaReleaseTask(pMeta, pTask); return code; } @@ -1199,7 +1232,22 @@ int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) { } int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - return streamProcessHeartbeatRsp(pMeta, pMsg->pCont); + SMStreamHbRspMsg rsp; + int32_t len = 0; + int32_t code = 0; + SDecoder decoder; + + tDecoderInit(&decoder, (uint8_t*)pMsg->pCont, len); + code = tDecodeStreamHbRsp(&decoder, &rsp); + if (code < 0) { + terrno = TSDB_CODE_INVALID_MSG; + tDecoderClear(&decoder); + tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno)); + return terrno; + } + + tDecoderClear(&decoder); + return streamProcessHeartbeatRsp(pMeta, &rsp); } int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } @@ -1233,7 +1281,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { SRestoreCheckpointInfo req = {0}; tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) { + if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) { tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code)); tDecoderClear(&decoder); return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a8a934da98..b7bef6d2e6 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -161,33 +161,52 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, SRpcHandleInfo* pRpcInfo, int32_t code) { - int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp); - void* pBuf = rpcMallocCont(size); - if (pBuf == NULL) { + int32_t ret = 0; + int32_t tlen = 0; + void* buf = NULL; + SEncoder encoder; + + SCheckpointTriggerRsp req = {.streamId = pTask->id.streamId, + .upstreamTaskId = pTask->id.taskId, + .taskId = dstTaskId, + .rspCode = code}; + + if (code == TSDB_CODE_SUCCESS) { + req.checkpointId = pTask->chkInfo.pActiveInfo->activeId; + req.transId = pTask->chkInfo.pActiveInfo->transId; + } else { + req.checkpointId = -1; + req.transId = -1; + } + + tEncodeSize(tEncodeCheckpointTriggerRsp, &req, tlen, ret); + if (ret < 0) { + stError("s-task:%s encode checkpoint-trigger rsp msg failed, code:%s", pTask->id.idStr, tstrerror(code)); + return ret; + } + + buf = rpcMallocCont(tlen + sizeof(SMsgHead)); + if (buf == NULL) { + stError("s-task:%s malloc chkpt-trigger rsp failed for task:0x%x, since out of memory", pTask->id.idStr, dstTaskId); return terrno; } - SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); + ((SMsgHead*)buf)->vgId = htonl(downstreamNodeId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - ((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId); - - pRsp->streamId = pTask->id.streamId; - pRsp->upstreamTaskId = pTask->id.taskId; - pRsp->taskId = dstTaskId; - pRsp->rspCode = code; - - if (code == TSDB_CODE_SUCCESS) { - pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId; - pRsp->transId = pTask->chkInfo.pActiveInfo->transId; - } else { - pRsp->checkpointId = -1; - pRsp->transId = -1; + tEncoderInit(&encoder, abuf, tlen); + if ((ret = tEncodeCheckpointTriggerRsp(&encoder, &req)) < 0) { + rpcFreeCont(buf); + tEncoderClear(&encoder); + stError("encode checkpoint-trigger rsp failed, code:%s", tstrerror(code)); + return ret; } + tEncoderClear(&encoder); - SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo}; + SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = tlen + sizeof(SMsgHead), .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); - return 0; + return ret; } int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { @@ -1115,23 +1134,46 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { return TSDB_CODE_INVALID_PARA; } - SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq)); - if (pReq == NULL) { - code = terrno; + int32_t ret = 0; + int32_t tlen = 0; + void* buf = NULL; + SRpcMsg rpcMsg = {0}; + SEncoder encoder; + + SRetrieveChkptTriggerReq req = + { + .streamId = pTask->id.streamId, + .downstreamTaskId = pTask->id.taskId, + .downstreamNodeId = vgId, + .upstreamTaskId = pUpstreamTask->taskId, + .upstreamNodeId = pUpstreamTask->nodeId, + .checkpointId = checkpointId, + }; + + tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret); + if (ret < 0) { + stError("encode stream hb msg rsp failed, code:%s", tstrerror(code)); + } + + buf = rpcMallocCont(tlen + sizeof(SMsgHead)); + if (buf == NULL) { stError("vgId:%d failed to create msg to retrieve trigger msg for task:%s exec, code:out of memory", vgId, pId); continue; } - pReq->head.vgId = htonl(pUpstreamTask->nodeId); - pReq->streamId = pTask->id.streamId; - pReq->downstreamTaskId = pTask->id.taskId; - pReq->downstreamNodeId = vgId; - pReq->upstreamTaskId = pUpstreamTask->taskId; - pReq->upstreamNodeId = pUpstreamTask->nodeId; - pReq->checkpointId = checkpointId; + ((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(vgId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SRpcMsg rpcMsg = {0}; - initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq)); + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeRetrieveChkptTriggerReq(&encoder, &req)) < 0) { + rpcFreeCont(buf); + tEncoderClear(&encoder); + stError("encode retrieve checkpoint-trigger req failed, code:%s", tstrerror(code)); + continue; + } + tEncoderClear(&encoder); + + initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, buf, tlen + sizeof(SMsgHead)); code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg); if (code == TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 193daa0cc4..8e823b7738 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -605,6 +605,98 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { pMsg->numOfTasks = -1; } +int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->msgId)); + tEndEncode(pEncoder); + +_exit: + return code; +} + +int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->msgId)); + tEndDecode(pDecoder); + +_exit: + return code; +} + +int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->downstreamTaskId)); + tEndEncode(pEncoder); + +_exit: + return code; +} + +int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->downstreamTaskId)); + tEndDecode(pDecoder); + +_exit: + return code; +} + +int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->checkpointId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->transId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->rspCode)); + tEndEncode(pEncoder); + +_exit: + return code; +} + +int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->transId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->rspCode)); + tEndDecode(pDecoder); + +_exit: + return code; +} + int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { int32_t code = 0; int32_t lino; @@ -830,11 +922,7 @@ int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpoin tEndEncode(pEncoder); _exit: - if (code) { - return code; - } else { - return pEncoder->pos; - } + return code; } int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) { @@ -853,3 +941,31 @@ int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* _exit: return code; } + +int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType)); + tEndEncode(pEncoder); + +_exit: + return code; +} + +int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType)); + tEndDecode(pDecoder); + +_exit: + return code; +} \ No newline at end of file diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 8c79abfd02..2d314839e6 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -83,13 +83,36 @@ int32_t streamTrySchedExec(SStreamTask* pTask) { } int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) { - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { + int32_t code = 0; + int32_t tlen = 0; + + SStreamTaskRunReq req = {.streamId = streamId, .taskId = taskId, .reqType = execType}; + + tEncodeSize(tEncodeStreamTaskRunReq, &req, tlen, code); + if (code < 0) { + stError("s-task:0x%" PRIx64 " vgId:%d encode stream task run req failed, code:%s", streamId, vgId, tstrerror(code)); + return code; + } + + void* buf = rpcMallocCont(tlen + sizeof(SMsgHead)); + if (buf == NULL) { stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType, tstrerror(terrno)); return terrno; } + ((SMsgHead*)buf)->vgId = vgId; + + SEncoder encoder; + tEncoderInit(&encoder, buf, tlen); + if ((code = tEncodeStreamTaskRunReq(&encoder, &req)) < 0) { + rpcFreeCont(buf); + tEncoderClear(&encoder); + stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", taskId, vgId, tstrerror(code)); + return code; + } + tEncoderClear(&encoder); + if (streamId != 0) { stDebug("vgId:%d create msg to for task:0x%x, exec type:%d, %s", vgId, taskId, execType, streamTaskGetExecType(execType)); @@ -97,13 +120,8 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType)); } - pRunReq->head.vgId = vgId; - pRunReq->streamId = streamId; - pRunReq->taskId = taskId; - pRunReq->reqType = execType; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - int32_t code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)}; + code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); if (code) { stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId); }