From b2b5a14d9d069038a2b156aab6ea41bb13ff8377 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Nov 2024 18:58:22 +0800 Subject: [PATCH 01/14] fix(stream): reset task add the failed chkptId info. --- include/common/tmsg.h | 9 ++++++- include/libs/stream/tstream.h | 1 + source/dnode/mnode/impl/inc/mndStream.h | 5 ++-- source/dnode/mnode/impl/src/mndStreamHb.c | 21 ++++++++------- .../dnode/mnode/impl/src/mndStreamTransAct.c | 7 ++--- source/dnode/vnode/src/tqCommon/tqCommon.c | 12 +++------ source/libs/stream/inc/streamInt.h | 1 - source/libs/stream/src/streamCheckpoint.c | 26 ++++++++++++------- source/libs/stream/src/streamTask.c | 4 +-- 9 files changed, 51 insertions(+), 35 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bdf333b635..3a64913a13 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3797,7 +3797,14 @@ typedef struct { SMsgHead head; int64_t streamId; int32_t taskId; -} SVPauseStreamTaskReq, SVResetStreamTaskReq; +} SVPauseStreamTaskReq; + +typedef struct { + SMsgHead head; + int64_t streamId; + int32_t taskId; + int64_t chkptId; +} SVResetStreamTaskReq; typedef struct { char name[TSDB_STREAM_FNAME_LEN]; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2cf791c8da..7571ee22bd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -718,6 +718,7 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask); void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId); int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, SRpcHandleInfo* pInfo, int32_t code); +void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue); diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index c9155f536c..b519d8509a 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -56,6 +56,7 @@ typedef struct SStreamTransMgmt { typedef struct SStreamTaskResetMsg { int64_t streamId; int32_t transId; + int64_t checkpointId; } SStreamTaskResetMsg; typedef struct SChkptReportInfo { @@ -142,9 +143,9 @@ int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pSt int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); -int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId); int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream); +int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId); int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts); int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream); int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId, diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 941956ae2b..03b418f13f 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -24,7 +24,7 @@ typedef struct SFailedCheckpointInfo { static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode); static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList); -static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId); +static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId); static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage); static void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); @@ -68,7 +68,7 @@ void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) { } } -int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { +int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId) { STrans *pTrans = NULL; int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint", &pTrans); @@ -84,7 +84,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return code; } - code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream); + code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream, chkptId); if (code) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -115,7 +115,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return code; } -int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId) { +int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId) { int32_t size = sizeof(SStreamTaskResetMsg); int32_t num = taosArrayGetSize(execInfo.pKilledChkptTrans); @@ -135,8 +135,9 @@ int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t taosArrayRemove(execInfo.pKilledChkptTrans, 0); // remove this first, append new reset trans in the tail } - SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId}; + SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId, .checkpointId = checkpointId}; + // let's remember that this trans had been killed already void *px = taosArrayPush(execInfo.pKilledChkptTrans, &p); if (px == NULL) { mError("failed to push reset-msg trans:%d into the killed chkpt trans list, size:%d", transId, num - 1); @@ -150,6 +151,7 @@ int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t pReq->streamId = streamId; pReq->transId = transId; + pReq->checkpointId = checkpointId; SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_TASK_RESET, .pCont = pReq, .contLen = size}; int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); @@ -234,7 +236,7 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) { } else { mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name, pStream->uid, pMsg->transId); - code = mndCreateStreamResetStatusTrans(pMnode, pStream); + code = mndCreateStreamResetStatusTrans(pMnode, pStream, pMsg->checkpointId); } } @@ -495,10 +497,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { continue; } - mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", - pInfo->checkpointId, pInfo->transId); + mInfo("stream:0x%" PRIx64 "checkpointId:%" PRId64 + " transId:%d failed issue task-reset trans to reset all tasks status", + pInfo->streamUid, pInfo->checkpointId, pInfo->transId); - code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId); + code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId, pInfo->checkpointId); if (code) { mError("failed to create reset task trans, code:%s", tstrerror(code)); } diff --git a/source/dnode/mnode/impl/src/mndStreamTransAct.c b/source/dnode/mnode/impl/src/mndStreamTransAct.c index 139ea4f147..7ee60c6f14 100644 --- a/source/dnode/mnode/impl/src/mndStreamTransAct.c +++ b/source/dnode/mnode/impl/src/mndStreamTransAct.c @@ -295,7 +295,7 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas return code; } -static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { +static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t chkptId) { SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq)); if (pReq == NULL) { mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq), @@ -306,6 +306,7 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; + pReq->chkptId = chkptId; SEpSet epset = {0}; bool hasEpset = false; @@ -544,7 +545,7 @@ int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* p return 0; } -int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { +int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId) { SStreamTaskIter *pIter = NULL; taosWLockLatch(&pStream->lock); @@ -564,7 +565,7 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj * return code; } - code = doSetResetAction(pMnode, pTrans, pTask); + code = doSetResetAction(pMnode, pTrans, pTask, chkptId); if (code != TSDB_CODE_SUCCESS) { destroyStreamTaskIter(pIter); taosWUnLockLatch(&pStream->lock); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 326e8d4ada..7b34bb83b6 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -939,7 +939,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { } int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) { - SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg; + SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg; SStreamTask* pTask = NULL; int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); @@ -954,17 +954,13 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) { streamMutexLock(&pTask->lock); streamTaskClearCheckInfo(pTask, true); + streamTaskSetFailedCheckpointId(pTask, pReq->chkptId); + // clear flag set during do checkpoint, and open inputQ for all upstream tasks SStreamTaskState pState = streamTaskGetStatus(pTask); if (pState.state == TASK_STATUS__CK) { - int32_t tranId = 0; - int64_t activeChkId = 0; - streamTaskGetActiveCheckpointInfo(pTask, &tranId, &activeChkId); - - tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d", - pTask->id.idStr, activeChkId, tranId); - streamTaskSetStatusReady(pTask); + tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr); } else if (pState.state == TASK_STATUS__UNINIT) { // tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); // tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 863bc76c79..8f741835e6 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -192,7 +192,6 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); -void streamTaskSetFailedCheckpointId(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7724d1c5ff..999c855f49 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -562,7 +562,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { } streamMutexUnlock(&pInfo->lock); - stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", current checkpointId:%" PRId64, + stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", latest checkpointId:%" PRId64, pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId); } @@ -682,15 +682,22 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV return TSDB_CODE_SUCCESS; } -void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { +void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId) { struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; - if (pInfo->activeId <= 0) { - stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr); + if (failedId <= 0) { + stWarn("s-task:%s failedId is 0, not update the failed checkpoint info, current failedId:%" PRId64 + " activeId:%" PRId64, + pTask->id.idStr, pInfo->failedId, pInfo->activeId); } else { - pInfo->failedId = pInfo->activeId; - stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d)", pTask->id.idStr, pInfo->activeId, - pInfo->transId); + if (failedId <= pInfo->failedId) { + stDebug("s-task:%s failedId:%" PRId64 " not update to:%" PRId64, pTask->id.idStr, pInfo->failedId, failedId); + } else { + stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d) activeId:%" PRId64 + " prev failedId:%" PRId64, + pTask->id.idStr, failedId, pInfo->transId, pInfo->activeId, pInfo->failedId); + pInfo->failedId = failedId; + } } } @@ -698,7 +705,7 @@ void streamTaskSetCheckpointFailed(SStreamTask* pTask) { streamMutexLock(&pTask->lock); ETaskStatus status = streamTaskGetStatus(pTask).state; if (status == TASK_STATUS__CK) { - streamTaskSetFailedCheckpointId(pTask); + streamTaskSetFailedCheckpointId(pTask, pTask->chkInfo.pActiveInfo->activeId); } streamMutexUnlock(&pTask->lock); } @@ -876,8 +883,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask); } } else { // clear the checkpoint info if failed + // set failed checkpoint id before clear the checkpoint info streamMutexLock(&pTask->lock); - streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info + streamTaskSetFailedCheckpointId(pTask, ckId); streamMutexUnlock(&pTask->lock); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a044859b80..a8019937ff 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1246,13 +1246,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { taosMemoryFree(pInfo); } -//NOTE: clear the checkpoint id, and keep the failed id +// NOTE: clear the checkpoint id, and keep the failed id +// failedId for a task will increase as the checkpoint I.D. increases. void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { pInfo->activeId = 0; pInfo->transId = 0; pInfo->allUpstreamTriggerRecv = 0; pInfo->dispatchTrigger = false; -// pInfo->failedId = 0; taosArrayClear(pInfo->pDispatchTriggerList); taosArrayClear(pInfo->pCheckpointReadyRecvList); From 187997d5f0d9981ca131fe49e9cefa138949f299 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 4 Nov 2024 10:11:22 +0800 Subject: [PATCH 02/14] fix(stream): record the failed checkpointId, after receive reset task info. --- include/util/taoserror.h | 1 + .../mnode/impl/src/mndStreamErrorInjection.c | 72 +++++++++++++++++++ source/dnode/mnode/impl/src/mndStreamHb.c | 5 +- source/dnode/mnode/impl/src/mndStreamUtil.c | 70 ------------------ source/libs/stream/inc/streamInt.h | 3 + source/libs/stream/src/streamCheckpoint.c | 8 ++- source/libs/stream/src/streamErrorInjection.c | 17 +++++ source/util/src/terror.c | 3 +- 8 files changed, 104 insertions(+), 75 deletions(-) create mode 100644 source/dnode/mnode/impl/src/mndStreamErrorInjection.c create mode 100644 source/libs/stream/src/streamErrorInjection.c diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2c811495fd..e33af33d0e 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -1011,6 +1011,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106) #define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107) #define TSDB_CODE_STREAM_INPUTQ_FULL TAOS_DEF_ERROR_CODE(0, 0x4108) +#define TSDB_CODE_STREAM_INVLD_CHKPT TAOS_DEF_ERROR_CODE(0, 0x4109) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/dnode/mnode/impl/src/mndStreamErrorInjection.c b/source/dnode/mnode/impl/src/mndStreamErrorInjection.c new file mode 100644 index 0000000000..c68416369d --- /dev/null +++ b/source/dnode/mnode/impl/src/mndStreamErrorInjection.c @@ -0,0 +1,72 @@ +#include "mndTrans.h" + +uint32_t seed = 0; + +static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) { + SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature}; + rpcMsg.pCont = rpcMallocCont(pAction->contLen); + if (rpcMsg.pCont == NULL) { + return rpcMsg; + } + + rpcMsg.info.traceId.rootId = traceId; + rpcMsg.info.notFreeAhandle = 1; + + memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); + return rpcMsg; +} + +void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) { + if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) || + (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) || + (pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) { + if (seed == 0) { + seed = taosGetTimestampSec(); + } + + uint32_t v = taosRandR(&seed); + int32_t choseItem = v % 5; + + if (choseItem == 0) { + // 1. one of update-checkpoint not send, restart and send it again + taosMsleep(5000); + if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) { + mError( + "***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will " + "rollback***"); + exit(-1); + } else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) { // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT + mError( + "***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will " + "not started***"); + } else { // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE + mError( + "***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will " + "started after restart***"); + exit(-1); + } + } else if (choseItem == 1) { + // 2. repeat send update chkpt msg + mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***"); + + mError("***repeat 1***"); + SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature); + int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1); + + mError("***repeat 2***"); + SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature); + code = tmsgSendReq(&pAction->epSet, &rpcMsg2); + + mError("***repeat 3***"); + SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature); + code = tmsgSendReq(&pAction->epSet, &rpcMsg3); + } else if (choseItem == 2) { + // 3. sleep 40s and then send msg + mError("***idle for 30s, and then send msg***"); + taosMsleep(30000); + } else { + // do nothing + // mInfo("no error triggered"); + } + } +} diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 03b418f13f..0f903632ed 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -381,9 +381,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) { - mError("vgId:%d HbMsgId:%d already handled, bh msg discard", pEntry->nodeId, req.msgId); + mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId); - terrno = TSDB_CODE_INVALID_MSG; + // return directly and after the vnode to continue to send the next HbMsg. + terrno = TSDB_CODE_SUCCESS; doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); streamMutexUnlock(&execInfo.lock); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index f9b7644af4..615c383f07 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1521,74 +1521,4 @@ int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) { mError("snode not existed when trying to create stream in db with multiple replica"); return TSDB_CODE_SNODE_NOT_DEPLOYED; } -} - -uint32_t seed = 0; -static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) { - SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature}; - rpcMsg.pCont = rpcMallocCont(pAction->contLen); - if (rpcMsg.pCont == NULL) { - return rpcMsg; - } - - rpcMsg.info.traceId.rootId = traceId; - rpcMsg.info.notFreeAhandle = 1; - - memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); - return rpcMsg; -} - -void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) { - if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) || - (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) || - (pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) { - if (seed == 0) { - seed = taosGetTimestampSec(); - } - - uint32_t v = taosRandR(&seed); - int32_t choseItem = v % 5; - - if (choseItem == 0) { - // 1. one of update-checkpoint not send, restart and send it again - taosMsleep(5000); - if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) { - mError( - "***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will " - "rollback***"); - exit(-1); - } else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) { // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT - mError( - "***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will " - "not started***"); - } else { // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE - mError( - "***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will " - "started after restart***"); - exit(-1); - } - } else if (choseItem == 1) { - // 2. repeat send update chkpt msg - mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***"); - - mError("***repeat 1***"); - SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature); - int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1); - - mError("***repeat 2***"); - SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature); - code = tmsgSendReq(&pAction->epSet, &rpcMsg2); - - mError("***repeat 3***"); - SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature); - code = tmsgSendReq(&pAction->epSet, &rpcMsg3); - } else if (choseItem == 2) { - // 3. sleep 40s and then send msg - mError("***idle for 30s, and then send msg***"); - taosMsleep(30000); - } else { - // do nothing - // mInfo("no error triggered"); - } - } } \ No newline at end of file diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 8f741835e6..427733e9ec 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -244,6 +244,9 @@ int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger); int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, STimeWindow* pLatestWindow, const char* id); +// inject stream errors +void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 999c855f49..cc92df368c 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -222,14 +222,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check stError("s-task:%s vgId:%d current checkpointId:%" PRId64 " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard", id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } if (pActiveInfo->failedId >= checkpointId) { stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64 " discard the checkpoint-trigger block", id, vgId, checkpointId, transId, pActiveInfo->failedId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } if (pTask->chkInfo.checkpointId == checkpointId) { @@ -373,6 +373,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock return code; } +#if 0 + chkptFailedByRetrieveReqToSource(pTask, checkpointId); +#endif + if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure diff --git a/source/libs/stream/src/streamErrorInjection.c b/source/libs/stream/src/streamErrorInjection.c new file mode 100644 index 0000000000..515845ba2b --- /dev/null +++ b/source/libs/stream/src/streamErrorInjection.c @@ -0,0 +1,17 @@ +#include "streamInt.h" + +/** + * pre-request: checkpoint interval should be 60s + * @param pTask + * @param checkpointId + */ +void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId) { + streamMutexLock(&pTask->lock); + + // set current checkpoint failed immediately, set failed checkpoint id before clear the checkpoint info + streamTaskSetFailedCheckpointId(pTask, checkpointId); + streamMutexUnlock(&pTask->lock); + + // the checkpoint interval should be 60s, and the next checkpoint req should be issued by mnode + taosMsleep(65*1000); +} \ No newline at end of file diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0d8a85155a..00f72123dc 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -853,7 +853,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict event") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode") -TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVLD_CHKPT, "Invalid checkpoint trigger msg") // TDLite TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags") From fba44da577b3f65c5b452727f74f2974a901998d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 4 Nov 2024 14:12:05 +0800 Subject: [PATCH 03/14] fix(test): fix error in unit test. --- source/dnode/mnode/impl/test/stream/stream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index d508cf7390..45bc4c2ce2 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -246,7 +246,7 @@ TEST_F(StreamTest, kill_checkpoint_trans) { px = taosArrayPush(pStream->tasks, &pLevel); ASSERT(px != NULL); - code = mndCreateStreamResetStatusTrans(pMnode, pStream); + code = mndCreateStreamResetStatusTrans(pMnode, pStream, 1); ASSERT(code != 0); tFreeStreamObj(pStream); From b88a786fdfac5f4aa448b90e09e4025116d3ff6e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Nov 2024 16:41:15 +0800 Subject: [PATCH 04/14] fix(stream): set correct error code and open inputq for upstream . --- source/dnode/mnode/impl/src/mndStreamHb.c | 2 +- source/libs/stream/src/streamCheckpoint.c | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 0f903632ed..46445af856 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -498,7 +498,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { continue; } - mInfo("stream:0x%" PRIx64 "checkpointId:%" PRId64 + mInfo("stream:0x%" PRIx64 " checkpointId:%" PRId64 " transId:%d failed issue task-reset trans to reset all tasks status", pInfo->streamUid, pInfo->checkpointId, pInfo->transId); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index cc92df368c..a8a934da98 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -255,8 +255,7 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check "the interrupted checkpoint", id, vgId, pBlock->srcTaskId); - streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) { @@ -264,14 +263,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64 " discard", id, vgId, pActiveInfo->activeId, checkpointId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } else { // checkpointId == pActiveInfo->activeId if (pActiveInfo->allUpstreamTriggerRecv == 1) { stDebug( "s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, " "checkpointId:%" PRId64 " transId:%d", id, vgId, checkpointId, transId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { @@ -283,17 +282,17 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check } if (p->upstreamTaskId == pBlock->srcTaskId) { - stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 + stWarn("s-task:%s repeatly recv checkpoint-trigger msg from task:0x%x vgId:%d, checkpointId:%" PRId64 ", prev recvTs:%" PRId64 " discard", pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } } } } } - return 0; + return TSDB_CODE_SUCCESS; } int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { @@ -317,6 +316,9 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId); streamMutexUnlock(&pTask->lock); if (code) { + if (taskLevel != TASK_LEVEL__SOURCE) { // the checkpoint-trigger is discard, open the inputQ for upstream tasks + streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId); + } streamFreeQitem((SStreamQueueItem*)pBlock); return code; } From 969db55e9f54764276de235a82e16ced7663aedf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Nov 2024 16:47:05 +0800 Subject: [PATCH 05/14] fix(stream): update the checkpoint report info in SCheckpointReport --- source/dnode/mnode/impl/src/mndStream.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 81db427afd..6336cd6e49 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2434,7 +2434,12 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64, pReport->taskId, p->checkpointId, pReport->checkpointId); - memcpy(p, pReport, sizeof(STaskChkptInfo)); + // update the checkpoint report info + p->checkpointId = pReport->checkpointId; + p->ts = pReport->checkpointTs; + p->version = pReport->checkpointVer; + p->transId = pReport->transId; + p->dropHTask = pReport->dropHTask; } else { mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId); } From 4dd4f217c1ba8dcc673a63e98f6452dc3d4796d6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Nov 2024 19:11:18 +0800 Subject: [PATCH 06/14] fix(test): update test cases. --- tests/script/tsim/db/basic1.sim | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/script/tsim/db/basic1.sim b/tests/script/tsim/db/basic1.sim index 8eb6dce759..f3239957d3 100644 --- a/tests/script/tsim/db/basic1.sim +++ b/tests/script/tsim/db/basic1.sim @@ -53,6 +53,8 @@ if $rows != 5 then return -1 endi +sleep 500 + print =============== show vgroups2 sql show d2.vgroups if $rows != 2 then @@ -126,13 +128,14 @@ if $data12 != d2 then endi if $data13 != leader then + print expect leader , actual $13 return -1 endi -print $data14 -print $data15 +print $data14 , $data15 if $data16 != 1 then + print expect 1, acutal $data16 return -1 endi From f65651f6ef990e256b819796e640b3ca54337ffd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 15 Nov 2024 00:19:35 +0800 Subject: [PATCH 07/14] fix(stream): update the msg encoder. --- include/libs/stream/streamMsg.h | 21 ++- include/libs/stream/tstream.h | 5 + source/common/CMakeLists.txt | 7 + source/common/src/{ => msg}/tmsg.c | 0 source/common/test/CMakeLists.txt | 2 +- source/dnode/mnode/impl/src/mndStreamHb.c | 35 ++++- .../dnode/mnode/impl/src/mndStreamTransAct.c | 2 +- source/dnode/vnode/src/tq/tq.c | 42 ++++-- source/dnode/vnode/src/tqCommon/tqCommon.c | 110 ++++++++++----- source/libs/stream/src/streamCheckpoint.c | 104 ++++++++++----- source/libs/stream/src/streamMsg.c | 126 +++++++++++++++++- source/libs/stream/src/streamSched.c | 36 +++-- 12 files changed, 392 insertions(+), 98 deletions(-) rename source/common/src/{ => msg}/tmsg.c (100%) diff --git a/include/libs/stream/streamMsg.h b/include/libs/stream/streamMsg.h index 0ceaa93a72..19b033a02e 100644 --- a/include/libs/stream/streamMsg.h +++ b/include/libs/stream/streamMsg.h @@ -170,8 +170,8 @@ typedef struct SStreamHbMsg { SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. } SStreamHbMsg; -int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); -int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp); +int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq); +int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq); void tCleanupStreamHbMsg(SStreamHbMsg* pMsg); typedef struct { @@ -179,6 +179,9 @@ typedef struct { int32_t msgId; } SMStreamHbRspMsg; +int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp); +int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp); + typedef struct SRetrieveChkptTriggerReq { SMsgHead head; int64_t streamId; @@ -189,6 +192,9 @@ typedef struct SRetrieveChkptTriggerReq { int64_t downstreamTaskId; } SRetrieveChkptTriggerReq; +int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq); +int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq); + typedef struct SCheckpointTriggerRsp { int64_t streamId; int64_t checkpointId; @@ -198,6 +204,9 @@ typedef struct SCheckpointTriggerRsp { int32_t rspCode; } SCheckpointTriggerRsp; +int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp); +int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp); + typedef struct SCheckpointReport { int64_t streamId; int32_t taskId; @@ -222,7 +231,7 @@ typedef struct SRestoreCheckpointInfo { int32_t nodeId; } SRestoreCheckpointInfo; -int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); +int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq); typedef struct { @@ -232,10 +241,8 @@ typedef struct { int32_t reqType; } SStreamTaskRunReq; -typedef struct SCheckpointConsensusEntry { - SRestoreCheckpointInfo req; - int64_t ts; -} SCheckpointConsensusEntry; +int32_t tEncodeStreamTaskRunReq(SEncoder* pEncoder, const SStreamTaskRunReq* pReq); +int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq); #ifdef __cplusplus } diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7571ee22bd..05b3e21eba 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -643,6 +643,11 @@ typedef struct SCheckpointConsensusInfo { int64_t streamId; } SCheckpointConsensusInfo; +typedef struct SCheckpointConsensusEntry { + SRestoreCheckpointInfo req; + int64_t ts; +} SCheckpointConsensusEntry; + void streamSetupScheduleTrigger(SStreamTask* pTask); // dispatch related diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index f10eb6a611..712121ca72 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -1,4 +1,11 @@ aux_source_directory(src COMMON_SRC) +aux_source_directory("src/msg/" MSG_SRC_FILES) + +list( + APPEND + COMMON_SRC + ${MSG_SRC_FILES} +) if(TD_ENTERPRISE) LIST(APPEND COMMON_SRC ${TD_ENTERPRISE_DIR}/src/plugins/common/src/tglobal.c) diff --git a/source/common/src/tmsg.c b/source/common/src/msg/tmsg.c similarity index 100% rename from source/common/src/tmsg.c rename to source/common/src/msg/tmsg.c diff --git a/source/common/test/CMakeLists.txt b/source/common/test/CMakeLists.txt index 2fe3ef652d..bb12612273 100644 --- a/source/common/test/CMakeLists.txt +++ b/source/common/test/CMakeLists.txt @@ -46,7 +46,7 @@ if (${TD_LINUX}) target_sources(tmsgTest PRIVATE "tmsgTest.cpp" - "../src/tmsg.c" + "../src/msg/tmsg.c" ) target_include_directories(tmsgTest PUBLIC "${TD_SOURCE_DIR}/include/common/") target_link_libraries(tmsgTest PUBLIC os util gtest gtest_main) diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 46445af856..4b3db28aa1 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -553,12 +553,37 @@ void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArr } void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) { - SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SMStreamHbRspMsg)}; - rsp.pCont = rpcMallocCont(rsp.contLen); + int32_t ret = 0; + int32_t tlen = 0; + void *buf = NULL; - SMStreamHbRspMsg *pMsg = rsp.pCont; - pMsg->head.vgId = htonl(vgId); - pMsg->msgId = msgId; + const SMStreamHbRspMsg msg = {.msgId = msgId}; + + tEncodeSize(tEncodeStreamHbRsp, &msg, tlen, ret); + if (ret < 0) { + mError("encode stream hb msg rsp failed, code:%s", tstrerror(code)); + } + + buf = rpcMallocCont(tlen + sizeof(SMsgHead)); + if (buf == NULL) { + mError("encode stream hb msg rsp failed, code:%s", tstrerror(terrno)); + return; + } + + ((SMStreamHbRspMsg*)buf)->head.vgId = htonl(vgId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeStreamHbRsp(&encoder, &msg)) < 0) { + rpcFreeCont(buf); + tEncoderClear(&encoder); + mError("encode stream hb msg rsp failed, code:%s", tstrerror(code)); + return; + } + tEncoderClear(&encoder); + + SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = tlen + sizeof(SMsgHead), .pCont = buf}; tmsgSendRsp(&rsp); pRpcInfo->handle = NULL; // disable auto rsp diff --git a/source/dnode/mnode/impl/src/mndStreamTransAct.c b/source/dnode/mnode/impl/src/mndStreamTransAct.c index 7ee60c6f14..5ccb626609 100644 --- a/source/dnode/mnode/impl/src/mndStreamTransAct.c +++ b/source/dnode/mnode/impl/src/mndStreamTransAct.c @@ -607,7 +607,7 @@ int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* p tEncoderInit(&encoder, abuf, tlen); code = tEncodeRestoreCheckpointInfo(&encoder, &req); tEncoderClear(&encoder); - if (code == -1) { + if (code < 0) { taosMemoryFree(pBuf); return code; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6195899566..a234777441 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1009,21 +1009,34 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { - SStreamTaskRunReq* pReq = pMsg->pCont; + int32_t code = 0; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder; + + SStreamTaskRunReq req = {0}; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) { + tqError("vgId:%d failed to decode task run req, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); + tDecoderClear(&decoder); + return TSDB_CODE_SUCCESS; + } + + tDecoderClear(&decoder); // extracted submit data from wal files for all tasks - if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { + if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { return tqScanWal(pTq); } - int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); + code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); if (code) { tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code)); return code; } // let's continue scan data in the wal files - if (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK) { + if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) { code = tqScanWalAsync(pTq, false); // it's ok to failed if (code) { tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); @@ -1297,7 +1310,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont; + SStreamCheckpointReadyMsg* pReq = (SStreamCheckpointReadyMsg*)pMsg->pCont; if (!vnodeIsRoleLeader(pTq->pVnode)) { tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, (int32_t)pReq->downstreamTaskId); @@ -1318,10 +1331,23 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont; if (!vnodeIsRoleLeader(pTq->pVnode)) { - tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, - (int32_t)pReq->downstreamTaskId); + SRetrieveChkptTriggerReq req = {0}; + + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder = {0}; + + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) { + tDecoderClear(&decoder); + tqError("vgId:%d invalid retrieve checkpoint-trigger req received", vgId); + return TSDB_CODE_INVALID_MSG; + } + tDecoderClear(&decoder); + + tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from s-task:0x%" PRId64, vgId, + req.downstreamTaskId); return TSDB_CODE_STREAM_NOT_LEADER; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7b34bb83b6..e5078d2950 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -828,14 +828,25 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { - SStreamTaskRunReq* pReq = pMsg->pCont; + int32_t code = 0; + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder; - int32_t type = pReq->reqType; - int32_t vgId = pMeta->vgId; - int32_t code = 0; + SStreamTaskRunReq req = {0}; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) { + tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code)); + tDecoderClear(&decoder); + return TSDB_CODE_SUCCESS; + } + tDecoderClear(&decoder); + + int32_t type = req.reqType; if (type == STREAM_EXEC_T_START_ONE_TASK) { - code = streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId); + code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId); return 0; } else if (type == STREAM_EXEC_T_START_ALL_TASKS) { code = streamMetaStartAllTasks(pMeta); @@ -847,11 +858,11 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead code = streamMetaStopAllTasks(pMeta); return 0; } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { - code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId); + code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = NULL; - code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); + code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); if (pTask != NULL && (code == 0)) { char* pStatus = NULL; @@ -873,7 +884,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } SStreamTask* pTask = NULL; - code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); + code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); if ((pTask != NULL) && (code == 0)) { // even in halt status, the data in inputQ must be processed char* p = NULL; if (streamTaskReadyToRun(pTask, &p)) { @@ -890,7 +901,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead return 0; } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. // todo add one function to handle this - tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, pReq->taskId); + tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId); return code; } } @@ -976,25 +987,36 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) { } int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont; + SRetrieveChkptTriggerReq req = {0}; + SStreamTask* pTask = NULL; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder = {0}; - SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId, &pTask); + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) { + tDecoderClear(&decoder); + tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId); + return TSDB_CODE_INVALID_MSG; + } + tDecoderClear(&decoder); + + int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask); if (pTask == NULL || (code != 0)) { tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64 " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already", - pMeta->vgId, pReq->checkpointId, (int32_t)pReq->downstreamTaskId, pReq->upstreamTaskId); + pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64, - pReq->upstreamTaskId, (int32_t)pReq->downstreamTaskId, pReq->checkpointId); + req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId); if (pTask->status.downstreamReady != 1) { tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready", - pTask->id.idStr, (int32_t)pReq->downstreamTaskId); + pTask->id.idStr, (int32_t)req.downstreamTaskId); - code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, + code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS); streamMetaReleaseTask(pMeta, pTask); return code; @@ -1006,19 +1028,19 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) int64_t checkpointId = 0; streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId); - if (checkpointId != pReq->checkpointId) { + if (checkpointId != req.checkpointId) { tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64 " req:%" PRId64, - pTask->id.idStr, pReq->downstreamTaskId, checkpointId, pReq->checkpointId); + pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_INVALID_MSG; } - if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) { + if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) { // re-send the lost checkpoint-trigger msg to downstream task tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr, - (int32_t)pReq->downstreamTaskId, checkpointId, transId); - code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, + (int32_t)req.downstreamTaskId, checkpointId, transId); + code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info, TSDB_CODE_SUCCESS); } else { // not send checkpoint-trigger yet, wait int32_t recv = 0, total = 0; @@ -1032,7 +1054,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) "sending checkpoint-source/trigger", pTask->id.idStr, recv, total); } - code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, + code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); } } else { // upstream not recv the checkpoint-source/trigger till now @@ -1044,7 +1066,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all " "upstream sending checkpoint-source/trigger", pTask->id.idStr); - code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, + code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); } @@ -1053,23 +1075,34 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) } int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + SCheckpointTriggerRsp rsp = {0}; + SStreamTask* pTask = NULL; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder = {0}; - SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId, &pTask); + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) { + tDecoderClear(&decoder); + tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId); + return TSDB_CODE_INVALID_MSG; + } + tDecoderClear(&decoder); + + int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask); if (pTask == NULL || (code != 0)) { tqError( "vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already", - pMeta->vgId, pRsp->taskId); + pMeta->vgId, rsp.taskId); return code; } tqDebug( "s-task:%s recv re-send checkpoint-trigger msg from through retrieve/rsp channel, upstream:0x%x, " "checkpointId:%" PRId64 ", transId:%d", - pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId); + pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId); - code = streamTaskProcessCheckpointTriggerRsp(pTask, pRsp); + code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp); streamMetaReleaseTask(pMeta, pTask); return code; } @@ -1199,7 +1232,22 @@ int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) { } int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - return streamProcessHeartbeatRsp(pMeta, pMsg->pCont); + SMStreamHbRspMsg rsp; + int32_t len = 0; + int32_t code = 0; + SDecoder decoder; + + tDecoderInit(&decoder, (uint8_t*)pMsg->pCont, len); + code = tDecodeStreamHbRsp(&decoder, &rsp); + if (code < 0) { + terrno = TSDB_CODE_INVALID_MSG; + tDecoderClear(&decoder); + tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno)); + return terrno; + } + + tDecoderClear(&decoder); + return streamProcessHeartbeatRsp(pMeta, &rsp); } int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } @@ -1233,7 +1281,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { SRestoreCheckpointInfo req = {0}; tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) { + if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) { tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code)); tDecoderClear(&decoder); return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a8a934da98..b7bef6d2e6 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -161,33 +161,52 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, SRpcHandleInfo* pRpcInfo, int32_t code) { - int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp); - void* pBuf = rpcMallocCont(size); - if (pBuf == NULL) { + int32_t ret = 0; + int32_t tlen = 0; + void* buf = NULL; + SEncoder encoder; + + SCheckpointTriggerRsp req = {.streamId = pTask->id.streamId, + .upstreamTaskId = pTask->id.taskId, + .taskId = dstTaskId, + .rspCode = code}; + + if (code == TSDB_CODE_SUCCESS) { + req.checkpointId = pTask->chkInfo.pActiveInfo->activeId; + req.transId = pTask->chkInfo.pActiveInfo->transId; + } else { + req.checkpointId = -1; + req.transId = -1; + } + + tEncodeSize(tEncodeCheckpointTriggerRsp, &req, tlen, ret); + if (ret < 0) { + stError("s-task:%s encode checkpoint-trigger rsp msg failed, code:%s", pTask->id.idStr, tstrerror(code)); + return ret; + } + + buf = rpcMallocCont(tlen + sizeof(SMsgHead)); + if (buf == NULL) { + stError("s-task:%s malloc chkpt-trigger rsp failed for task:0x%x, since out of memory", pTask->id.idStr, dstTaskId); return terrno; } - SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); + ((SMsgHead*)buf)->vgId = htonl(downstreamNodeId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - ((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId); - - pRsp->streamId = pTask->id.streamId; - pRsp->upstreamTaskId = pTask->id.taskId; - pRsp->taskId = dstTaskId; - pRsp->rspCode = code; - - if (code == TSDB_CODE_SUCCESS) { - pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId; - pRsp->transId = pTask->chkInfo.pActiveInfo->transId; - } else { - pRsp->checkpointId = -1; - pRsp->transId = -1; + tEncoderInit(&encoder, abuf, tlen); + if ((ret = tEncodeCheckpointTriggerRsp(&encoder, &req)) < 0) { + rpcFreeCont(buf); + tEncoderClear(&encoder); + stError("encode checkpoint-trigger rsp failed, code:%s", tstrerror(code)); + return ret; } + tEncoderClear(&encoder); - SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo}; + SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = tlen + sizeof(SMsgHead), .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); - return 0; + return ret; } int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { @@ -1115,23 +1134,46 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { return TSDB_CODE_INVALID_PARA; } - SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq)); - if (pReq == NULL) { - code = terrno; + int32_t ret = 0; + int32_t tlen = 0; + void* buf = NULL; + SRpcMsg rpcMsg = {0}; + SEncoder encoder; + + SRetrieveChkptTriggerReq req = + { + .streamId = pTask->id.streamId, + .downstreamTaskId = pTask->id.taskId, + .downstreamNodeId = vgId, + .upstreamTaskId = pUpstreamTask->taskId, + .upstreamNodeId = pUpstreamTask->nodeId, + .checkpointId = checkpointId, + }; + + tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret); + if (ret < 0) { + stError("encode stream hb msg rsp failed, code:%s", tstrerror(code)); + } + + buf = rpcMallocCont(tlen + sizeof(SMsgHead)); + if (buf == NULL) { stError("vgId:%d failed to create msg to retrieve trigger msg for task:%s exec, code:out of memory", vgId, pId); continue; } - pReq->head.vgId = htonl(pUpstreamTask->nodeId); - pReq->streamId = pTask->id.streamId; - pReq->downstreamTaskId = pTask->id.taskId; - pReq->downstreamNodeId = vgId; - pReq->upstreamTaskId = pUpstreamTask->taskId; - pReq->upstreamNodeId = pUpstreamTask->nodeId; - pReq->checkpointId = checkpointId; + ((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(vgId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SRpcMsg rpcMsg = {0}; - initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq)); + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeRetrieveChkptTriggerReq(&encoder, &req)) < 0) { + rpcFreeCont(buf); + tEncoderClear(&encoder); + stError("encode retrieve checkpoint-trigger req failed, code:%s", tstrerror(code)); + continue; + } + tEncoderClear(&encoder); + + initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, buf, tlen + sizeof(SMsgHead)); code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg); if (code == TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 193daa0cc4..8e823b7738 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -605,6 +605,98 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { pMsg->numOfTasks = -1; } +int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->msgId)); + tEndEncode(pEncoder); + +_exit: + return code; +} + +int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->msgId)); + tEndDecode(pDecoder); + +_exit: + return code; +} + +int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->downstreamTaskId)); + tEndEncode(pEncoder); + +_exit: + return code; +} + +int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->downstreamTaskId)); + tEndDecode(pDecoder); + +_exit: + return code; +} + +int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->checkpointId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->transId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->rspCode)); + tEndEncode(pEncoder); + +_exit: + return code; +} + +int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->transId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->rspCode)); + tEndDecode(pDecoder); + +_exit: + return code; +} + int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { int32_t code = 0; int32_t lino; @@ -830,11 +922,7 @@ int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpoin tEndEncode(pEncoder); _exit: - if (code) { - return code; - } else { - return pEncoder->pos; - } + return code; } int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) { @@ -853,3 +941,31 @@ int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* _exit: return code; } + +int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType)); + tEndEncode(pEncoder); + +_exit: + return code; +} + +int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType)); + tEndDecode(pDecoder); + +_exit: + return code; +} \ No newline at end of file diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 8c79abfd02..2d314839e6 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -83,13 +83,36 @@ int32_t streamTrySchedExec(SStreamTask* pTask) { } int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) { - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { + int32_t code = 0; + int32_t tlen = 0; + + SStreamTaskRunReq req = {.streamId = streamId, .taskId = taskId, .reqType = execType}; + + tEncodeSize(tEncodeStreamTaskRunReq, &req, tlen, code); + if (code < 0) { + stError("s-task:0x%" PRIx64 " vgId:%d encode stream task run req failed, code:%s", streamId, vgId, tstrerror(code)); + return code; + } + + void* buf = rpcMallocCont(tlen + sizeof(SMsgHead)); + if (buf == NULL) { stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType, tstrerror(terrno)); return terrno; } + ((SMsgHead*)buf)->vgId = vgId; + + SEncoder encoder; + tEncoderInit(&encoder, buf, tlen); + if ((code = tEncodeStreamTaskRunReq(&encoder, &req)) < 0) { + rpcFreeCont(buf); + tEncoderClear(&encoder); + stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", taskId, vgId, tstrerror(code)); + return code; + } + tEncoderClear(&encoder); + if (streamId != 0) { stDebug("vgId:%d create msg to for task:0x%x, exec type:%d, %s", vgId, taskId, execType, streamTaskGetExecType(execType)); @@ -97,13 +120,8 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType)); } - pRunReq->head.vgId = vgId; - pRunReq->streamId = streamId; - pRunReq->taskId = taskId; - pRunReq->reqType = execType; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - int32_t code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)}; + code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); if (code) { stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId); } From fd1996a1d1ad6b453ac222d17ccaa97e3b0fd0ae Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 15 Nov 2024 01:27:31 +0800 Subject: [PATCH 08/14] fix(stream): fix error in build msg. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 7 ++++--- source/libs/stream/src/streamCheckpoint.c | 15 ++++++--------- source/libs/stream/src/streamSched.c | 3 ++- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index e5078d2950..686635cfe2 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1232,12 +1232,13 @@ int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) { } int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - SMStreamHbRspMsg rsp; - int32_t len = 0; + SMStreamHbRspMsg rsp = {0}; int32_t code = 0; SDecoder decoder; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); - tDecoderInit(&decoder, (uint8_t*)pMsg->pCont, len); + tDecoderInit(&decoder, (uint8_t*)msg, len); code = tDecodeStreamHbRsp(&decoder, &rsp); if (code < 0) { terrno = TSDB_CODE_INVALID_MSG; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b7bef6d2e6..73495f5741 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1140,15 +1140,12 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { SRpcMsg rpcMsg = {0}; SEncoder encoder; - SRetrieveChkptTriggerReq req = - { - .streamId = pTask->id.streamId, - .downstreamTaskId = pTask->id.taskId, - .downstreamNodeId = vgId, - .upstreamTaskId = pUpstreamTask->taskId, - .upstreamNodeId = pUpstreamTask->nodeId, - .checkpointId = checkpointId, - }; + SRetrieveChkptTriggerReq req = {.streamId = pTask->id.streamId, + .downstreamTaskId = pTask->id.taskId, + .downstreamNodeId = vgId, + .upstreamTaskId = pUpstreamTask->taskId, + .upstreamNodeId = pUpstreamTask->nodeId, + .checkpointId = checkpointId}; tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret); if (ret < 0) { diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 2d314839e6..e0a8506e02 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -102,9 +102,10 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 } ((SMsgHead*)buf)->vgId = vgId; + char* bufx = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; - tEncoderInit(&encoder, buf, tlen); + tEncoderInit(&encoder, (uint8_t*)bufx, tlen); if ((code = tEncodeStreamTaskRunReq(&encoder, &req)) < 0) { rpcFreeCont(buf); tEncoderClear(&encoder); From 2e3196cfdc99582afb1b925d14bc4dd621aa072e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 15 Nov 2024 08:57:00 +0800 Subject: [PATCH 09/14] fix(stream): fix syntax error. --- source/libs/stream/src/streamSched.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index e0a8506e02..9e131fd526 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -109,7 +109,7 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 if ((code = tEncodeStreamTaskRunReq(&encoder, &req)) < 0) { rpcFreeCont(buf); tEncoderClear(&encoder); - stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", taskId, vgId, tstrerror(code)); + stError("s-task:0x%x vgId:%d encode run task msg failed, code:%s", taskId, vgId, tstrerror(code)); return code; } tEncoderClear(&encoder); From 87e1f8da47148e5ab8f251310ff29ae012df4102 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 13 Nov 2024 19:37:22 +0800 Subject: [PATCH 10/14] fix(stream):fix bloom filter decode issue --- source/libs/executor/src/scanoperator.c | 28 +++++++++++++------------ 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 84dde6a579..d4702e2bdc 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3465,11 +3465,6 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) goto _end; } - void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); - if (!pUpInfo) { - lino = __LINE__; - goto _end; - } SDecoder decoder = {0}; pDeCoder = &decoder; tDecoderInit(pDeCoder, buf, tlen); @@ -3478,14 +3473,21 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) goto _end; } - code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo); - if (code == TSDB_CODE_SUCCESS) { - pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); - pInfo->pUpdateInfo = pUpInfo; - } else { - taosMemoryFree(pUpInfo); - lino = __LINE__; - goto _end; + if (pInfo->pUpdateInfo != NULL) { + void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); + if (!pUpInfo) { + lino = __LINE__; + goto _end; + } + code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo); + if (code == TSDB_CODE_SUCCESS) { + pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); + pInfo->pUpdateInfo = pUpInfo; + } else { + taosMemoryFree(pUpInfo); + lino = __LINE__; + goto _end; + } } if (tDecodeIsEnd(pDeCoder)) { From b74a1bc7265476129fd23c82e2f42d10944c0ce2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 15 Nov 2024 13:48:42 +0800 Subject: [PATCH 11/14] refactor: update logs, and set correct vgId. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 6 +++--- source/libs/stream/src/streamCheckpoint.c | 16 ++++++++-------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 686635cfe2..3f67503454 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1003,7 +1003,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask); if (pTask == NULL || (code != 0)) { - tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64 + tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64 " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; @@ -1098,8 +1098,8 @@ int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) } tqDebug( - "s-task:%s recv re-send checkpoint-trigger msg from through retrieve/rsp channel, upstream:0x%x, " - "checkpointId:%" PRId64 ", transId:%d", + "s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64 + ", transId:%d", pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId); code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 73495f5741..20f49216e7 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -351,6 +351,11 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock pActiveInfo->activeId = checkpointId; pActiveInfo->transId = transId; + if (pTask->chkInfo.startTs == 0) { + pTask->chkInfo.startTs = taosGetTimestampMs(); + pTask->execInfo.checkpoint += 1; + } + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code)); @@ -407,11 +412,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { - if (pTask->chkInfo.startTs == 0) { - pTask->chkInfo.startTs = taosGetTimestampMs(); - pTask->execInfo.checkpoint += 1; - } - // todo: handle this // update the child Id for downstream tasks code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); @@ -1149,16 +1149,16 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret); if (ret < 0) { - stError("encode stream hb msg rsp failed, code:%s", tstrerror(code)); + stError("encode retrieve checkpoint-trigger msg failed, code:%s", tstrerror(code)); } buf = rpcMallocCont(tlen + sizeof(SMsgHead)); if (buf == NULL) { - stError("vgId:%d failed to create msg to retrieve trigger msg for task:%s exec, code:out of memory", vgId, pId); + stError("vgId:%d failed to create retrieve checkpoint-trigger msg for task:%s exec, code:out of memory", vgId, pId); continue; } - ((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(vgId); + ((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(pUpstreamTask->nodeId); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncoderInit(&encoder, abuf, tlen); From 131be84dec948ba4e67c8cf5d13a62b59a70406b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Nov 2024 16:41:15 +0800 Subject: [PATCH 12/14] refactor: do some internal refactor --- source/common/CMakeLists.txt | 7 ------- source/common/src/{msg => }/tmsg.c | 0 source/common/test/CMakeLists.txt | 2 +- 3 files changed, 1 insertion(+), 8 deletions(-) rename source/common/src/{msg => }/tmsg.c (100%) diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index 712121ca72..f10eb6a611 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -1,11 +1,4 @@ aux_source_directory(src COMMON_SRC) -aux_source_directory("src/msg/" MSG_SRC_FILES) - -list( - APPEND - COMMON_SRC - ${MSG_SRC_FILES} -) if(TD_ENTERPRISE) LIST(APPEND COMMON_SRC ${TD_ENTERPRISE_DIR}/src/plugins/common/src/tglobal.c) diff --git a/source/common/src/msg/tmsg.c b/source/common/src/tmsg.c similarity index 100% rename from source/common/src/msg/tmsg.c rename to source/common/src/tmsg.c diff --git a/source/common/test/CMakeLists.txt b/source/common/test/CMakeLists.txt index bb12612273..2fe3ef652d 100644 --- a/source/common/test/CMakeLists.txt +++ b/source/common/test/CMakeLists.txt @@ -46,7 +46,7 @@ if (${TD_LINUX}) target_sources(tmsgTest PRIVATE "tmsgTest.cpp" - "../src/msg/tmsg.c" + "../src/tmsg.c" ) target_include_directories(tmsgTest PUBLIC "${TD_SOURCE_DIR}/include/common/") target_link_libraries(tmsgTest PUBLIC os util gtest gtest_main) From 1c66f22c2b95be3b2a4fb61c37fe12d5b42bb33c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Nov 2024 15:00:30 +0800 Subject: [PATCH 13/14] refactor: move to msg. --- include/libs/stream/streamMsg.h | 251 ------------------ include/libs/stream/tstream.h | 15 +- source/common/CMakeLists.txt | 3 + .../stream/src => common/src/msg}/streamMsg.c | 219 +++------------ source/common/src/{ => msg}/tmsg.c | 0 source/common/test/CMakeLists.txt | 2 +- source/libs/stream/src/streamTask.c | 175 ++++++++++++ 7 files changed, 227 insertions(+), 438 deletions(-) delete mode 100644 include/libs/stream/streamMsg.h rename source/{libs/stream/src => common/src/msg}/streamMsg.c (77%) rename source/common/src/{ => msg}/tmsg.c (100%) diff --git a/include/libs/stream/streamMsg.h b/include/libs/stream/streamMsg.h deleted file mode 100644 index 19b033a02e..0000000000 --- a/include/libs/stream/streamMsg.h +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_STREAMMSG_H -#define TDENGINE_STREAMMSG_H - -#include "tmsg.h" -#include "trpc.h" - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct SStreamUpstreamEpInfo { - int32_t nodeId; - int32_t childId; - int32_t taskId; - SEpSet epSet; - bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it - int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer -} SStreamUpstreamEpInfo; - -int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo); -int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo); - -// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished -typedef struct { - int64_t streamId; - int64_t checkpointId; - int32_t taskId; - int32_t nodeId; - SEpSet mgmtEps; - int32_t mnodeId; - int32_t transId; - int8_t mndTrigger; - int64_t expireTime; -} SStreamCheckpointSourceReq; - -int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq); -int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq); - -typedef struct { - int64_t streamId; - int64_t checkpointId; - int32_t taskId; - int32_t nodeId; - int32_t mnodeId; - int64_t expireTime; - int8_t success; -} SStreamCheckpointSourceRsp; - -int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp); - -typedef struct SStreamTaskNodeUpdateMsg { - int32_t transId; // to identify the msg - int64_t streamId; - int32_t taskId; - SArray* pNodeList; // SArray -} SStreamTaskNodeUpdateMsg; - -int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg); -int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg); - -typedef struct { - int64_t reqId; - int64_t stage; - int64_t streamId; - int32_t upstreamNodeId; - int32_t upstreamTaskId; - int32_t downstreamNodeId; - int32_t downstreamTaskId; - int32_t childId; -} SStreamTaskCheckReq; - -int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq); -int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq); - -typedef struct { - int64_t reqId; - int64_t streamId; - int32_t upstreamNodeId; - int32_t upstreamTaskId; - int32_t downstreamNodeId; - int32_t downstreamTaskId; - int32_t childId; - int64_t oldStage; - int8_t status; -} SStreamTaskCheckRsp; - -int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp); -int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp); - -typedef struct { - SMsgHead msgHead; - int64_t streamId; - int64_t checkpointId; - int32_t downstreamTaskId; - int32_t downstreamNodeId; - int32_t upstreamTaskId; - int32_t upstreamNodeId; - int32_t childId; -} SStreamCheckpointReadyMsg; - -int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp); -int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp); - -struct SStreamDispatchReq { - int32_t type; - int64_t stage; // nodeId from upstream task - int64_t streamId; - int32_t taskId; - int32_t msgId; // msg id to identify if the incoming msg from the same sender - int32_t srcVgId; - int32_t upstreamTaskId; - int32_t upstreamChildId; - int32_t upstreamNodeId; - int32_t upstreamRelTaskId; - int32_t blockNum; - int64_t totalLen; - SArray* dataLen; // SArray - SArray* data; // SArray -}; - -int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const struct SStreamDispatchReq* pReq); -int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, struct SStreamDispatchReq* pReq); -void tCleanupStreamDispatchReq(struct SStreamDispatchReq* pReq); - -struct SStreamRetrieveReq { - int64_t streamId; - int64_t reqId; - int32_t srcTaskId; - int32_t srcNodeId; - int32_t dstTaskId; - int32_t dstNodeId; - int32_t retrieveLen; - SRetrieveTableRsp* pRetrieve; -}; - -int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const struct SStreamRetrieveReq* pReq); -int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, struct SStreamRetrieveReq* pReq); -void tCleanupStreamRetrieveReq(struct SStreamRetrieveReq* pReq); - -typedef struct SStreamTaskCheckpointReq { - int64_t streamId; - int32_t taskId; - int32_t nodeId; -} SStreamTaskCheckpointReq; - -int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq); -int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq); - -typedef struct SStreamHbMsg { - int32_t vgId; - int32_t msgId; - int64_t ts; - int32_t numOfTasks; - SArray* pTaskStatus; // SArray - SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. -} SStreamHbMsg; - -int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq); -int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq); -void tCleanupStreamHbMsg(SStreamHbMsg* pMsg); - -typedef struct { - SMsgHead head; - int32_t msgId; -} SMStreamHbRspMsg; - -int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp); -int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp); - -typedef struct SRetrieveChkptTriggerReq { - SMsgHead head; - int64_t streamId; - int64_t checkpointId; - int32_t upstreamNodeId; - int32_t upstreamTaskId; - int32_t downstreamNodeId; - int64_t downstreamTaskId; -} SRetrieveChkptTriggerReq; - -int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq); -int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq); - -typedef struct SCheckpointTriggerRsp { - int64_t streamId; - int64_t checkpointId; - int32_t upstreamTaskId; - int32_t taskId; - int32_t transId; - int32_t rspCode; -} SCheckpointTriggerRsp; - -int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp); -int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp); - -typedef struct SCheckpointReport { - int64_t streamId; - int32_t taskId; - int32_t nodeId; - int64_t checkpointId; - int64_t checkpointVer; - int64_t checkpointTs; - int32_t transId; - int8_t dropHTask; -} SCheckpointReport; - -int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq); -int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq); - -typedef struct SRestoreCheckpointInfo { - SMsgHead head; - int64_t startTs; - int64_t streamId; - int64_t checkpointId; // latest checkpoint id - int32_t transId; // transaction id of the update the consensus-checkpointId transaction - int32_t taskId; - int32_t nodeId; -} SRestoreCheckpointInfo; - -int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); -int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq); - -typedef struct { - SMsgHead head; - int64_t streamId; - int32_t taskId; - int32_t reqType; -} SStreamTaskRunReq; - -int32_t tEncodeStreamTaskRunReq(SEncoder* pEncoder, const SStreamTaskRunReq* pReq); -int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq); - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_STREAMMSG_H diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 05b3e21eba..6b8e9f12a6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -319,11 +319,6 @@ typedef struct SSTaskBasicInfo { SInterval interval; } SSTaskBasicInfo; -typedef struct SStreamRetrieveReq SStreamRetrieveReq; -typedef struct SStreamDispatchReq SStreamDispatchReq; -typedef struct STokenBucket STokenBucket; -typedef struct SMetaHbInfo SMetaHbInfo; - typedef struct SDispatchMsgInfo { SStreamDispatchReq* pData; // current dispatch data @@ -626,11 +621,11 @@ typedef struct STaskStatusEntry { STaskCkptInfo checkpointInfo; } STaskStatusEntry; -typedef struct SNodeUpdateInfo { - int32_t nodeId; - SEpSet prevEp; - SEpSet newEp; -} SNodeUpdateInfo; +//typedef struct SNodeUpdateInfo { +// int32_t nodeId; +// SEpSet prevEp; +// SEpSet newEp; +//} SNodeUpdateInfo; typedef struct SStreamTaskState { ETaskStatus state; diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index f10eb6a611..39380a0644 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -1,4 +1,7 @@ aux_source_directory(src COMMON_SRC) +aux_source_directory(src/msg COMMON_MSG_SRC) + +LIST(APPEND COMMON_SRC ${COMMON_MSG_SRC}) if(TD_ENTERPRISE) LIST(APPEND COMMON_SRC ${TD_ENTERPRISE_DIR}/src/plugins/common/src/tglobal.c) diff --git a/source/libs/stream/src/streamMsg.c b/source/common/src/msg/streamMsg.c similarity index 77% rename from source/libs/stream/src/streamMsg.c rename to source/common/src/msg/streamMsg.c index 8e823b7738..c92ab52ac1 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/common/src/msg/streamMsg.c @@ -15,8 +15,48 @@ #include "streamMsg.h" #include "os.h" -#include "tstream.h" -#include "streamInt.h" +#include "tcommon.h" + +typedef struct STaskId { + int64_t streamId; + int64_t taskId; +} STaskId; + +typedef struct STaskCkptInfo { + int64_t latestId; // saved checkpoint id + int64_t latestVer; // saved checkpoint ver + int64_t latestTime; // latest checkpoint time + int64_t latestSize; // latest checkpoint size + int8_t remoteBackup; // latest checkpoint backup done + int64_t activeId; // current active checkpoint id + int32_t activeTransId; // checkpoint trans id + int8_t failed; // denote if the checkpoint is failed or not + int8_t consensusChkptId; // required the consensus-checkpointId + int64_t consensusTs; // +} STaskCkptInfo; + +typedef struct STaskStatusEntry { + STaskId id; + int32_t status; + int32_t statusLastDuration; // to record the last duration of current status + int64_t stage; + int32_t nodeId; + SVersionRange verRange; // start/end version in WAL, only valid for source task + int64_t processedVer; // only valid for source task + double inputQUsed; // in MiB + double inputRate; + double procsThroughput; // duration between one element put into input queue and being processed. + double procsTotal; // duration between one element put into input queue and being processed. + double outputThroughput; // the size of dispatched result blocks in bytes + double outputTotal; // the size of dispatched result blocks in bytes + double sinkQuota; // existed quota size for sink task + double sinkDataSize; // sink to dst data size + int64_t startTime; + int64_t startCheckpointId; + int64_t startCheckpointVer; + int64_t hTaskId; + STaskCkptInfo checkpointInfo; +} STaskStatusEntry; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) { TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId)); @@ -289,7 +329,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen)); if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) { - stError("invalid dispatch req msg"); + uError("invalid dispatch req msg"); TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); } @@ -697,179 +737,6 @@ _exit: return code; } -int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartEncode(pEncoder)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type)); - TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType)); - - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus)); - - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId)); - TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet)); - TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset)); - - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory)); - - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)); - int32_t taskId = pTask->hTaskInfo.id.taskId; - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); - - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId)); - taskId = pTask->streamTaskId.taskId; - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); - - TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer)); - TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey)); - - int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz)); - for (int32_t i = 0; i < epSz; i++) { - SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); - TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo)); - } - - if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg)); - } - - if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid)); - TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName)); - TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId)); - TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); - TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); - } - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5)); - TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1)); - - tEndEncode(pEncoder); -_exit: - return code; -} - -int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { - int32_t taskId = 0; - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartDecode(pDecoder)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver)); - if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) { - TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); - } - - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type)); - TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType)); - - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus)); - - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId)); - TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet)); - TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset)); - - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory)); - - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); - pTask->hTaskInfo.id.taskId = taskId; - - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); - pTask->streamTaskId.taskId = taskId; - - TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)); - TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)); - - int32_t epSz = -1; - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0); - - if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - for (int32_t i = 0; i < epSz; i++) { - SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo)); - if (pInfo == NULL) { - TAOS_CHECK_EXIT(terrno); - } - if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) { - taosMemoryFreeClear(pInfo); - goto _exit; - } - if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - } - - if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg)); - } - - if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid)); - TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName)); - pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) { - TAOS_CHECK_EXIT(terrno); - } - TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId)); - TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); - TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); - } - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam)); - if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) { - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5)); - } - TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve)); - - tEndDecode(pDecoder); - -_exit: - return code; -} - int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) { int32_t code = 0; int32_t lino; diff --git a/source/common/src/tmsg.c b/source/common/src/msg/tmsg.c similarity index 100% rename from source/common/src/tmsg.c rename to source/common/src/msg/tmsg.c diff --git a/source/common/test/CMakeLists.txt b/source/common/test/CMakeLists.txt index 2fe3ef652d..bb12612273 100644 --- a/source/common/test/CMakeLists.txt +++ b/source/common/test/CMakeLists.txt @@ -46,7 +46,7 @@ if (${TD_LINUX}) target_sources(tmsgTest PRIVATE "tmsgTest.cpp" - "../src/tmsg.c" + "../src/msg/tmsg.c" ) target_include_directories(tmsgTest PUBLIC "${TD_SOURCE_DIR}/include/common/") target_link_libraries(tmsgTest PUBLIC os util gtest gtest_main) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a8019937ff..f46228fd47 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -22,6 +22,7 @@ #include "tstream.h" #include "ttimer.h" #include "wal.h" +#include "streamMsg.h" static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated); @@ -1303,4 +1304,178 @@ void streamTaskFreeRefId(int64_t* pRefId) { } metaRefMgtRemove(pRefId); +} + + +int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type)); + TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType)); + + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus)); + + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset)); + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory)); + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)); + int32_t taskId = pTask->hTaskInfo.id.taskId; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId)); + taskId = pTask->streamTaskId.taskId; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); + + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer)); + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey)); + + int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz)); + for (int32_t i = 0; i < epSz; i++) { + SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo)); + } + + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg)); + } + + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName)); + TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); + } + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5)); + TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1)); + + tEndEncode(pEncoder); +_exit: + return code; +} + +int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { + int32_t taskId = 0; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver)); + if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) { + TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); + } + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type)); + TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType)); + + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus)); + + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset)); + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory)); + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); + pTask->hTaskInfo.id.taskId = taskId; + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); + pTask->streamTaskId.taskId = taskId; + + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)); + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)); + + int32_t epSz = -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0); + + if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + for (int32_t i = 0; i < epSz; i++) { + SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo)); + if (pInfo == NULL) { + TAOS_CHECK_EXIT(terrno); + } + if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) { + taosMemoryFreeClear(pInfo); + goto _exit; + } + if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg)); + } + + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName)); + pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) { + TAOS_CHECK_EXIT(terrno); + } + TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); + } + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam)); + if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) { + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5)); + } + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve)); + + tEndDecode(pDecoder); + +_exit: + return code; } \ No newline at end of file From 4b47c4ca5d891e6d0b2b013443104b96ca7debc2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Nov 2024 15:37:12 +0800 Subject: [PATCH 14/14] refactor: move to msg. --- include/common/streamMsg.h | 262 +++++++++++++++++++++++++++++++++++++ 1 file changed, 262 insertions(+) create mode 100644 include/common/streamMsg.h diff --git a/include/common/streamMsg.h b/include/common/streamMsg.h new file mode 100644 index 0000000000..3db92ba58d --- /dev/null +++ b/include/common/streamMsg.h @@ -0,0 +1,262 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_STREAMMSG_H +#define TDENGINE_STREAMMSG_H + +#include "tmsg.h" +//#include "trpc.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SStreamRetrieveReq SStreamRetrieveReq; +typedef struct SStreamDispatchReq SStreamDispatchReq; +typedef struct STokenBucket STokenBucket; +typedef struct SMetaHbInfo SMetaHbInfo; + +typedef struct SNodeUpdateInfo { + int32_t nodeId; + SEpSet prevEp; + SEpSet newEp; +} SNodeUpdateInfo; + +typedef struct SStreamUpstreamEpInfo { + int32_t nodeId; + int32_t childId; + int32_t taskId; + SEpSet epSet; + bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it + int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer +} SStreamUpstreamEpInfo; + +int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo); +int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo); + +// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished +typedef struct { + int64_t streamId; + int64_t checkpointId; + int32_t taskId; + int32_t nodeId; + SEpSet mgmtEps; + int32_t mnodeId; + int32_t transId; + int8_t mndTrigger; + int64_t expireTime; +} SStreamCheckpointSourceReq; + +int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq); +int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq); + +typedef struct { + int64_t streamId; + int64_t checkpointId; + int32_t taskId; + int32_t nodeId; + int32_t mnodeId; + int64_t expireTime; + int8_t success; +} SStreamCheckpointSourceRsp; + +int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp); + +typedef struct SStreamTaskNodeUpdateMsg { + int32_t transId; // to identify the msg + int64_t streamId; + int32_t taskId; + SArray* pNodeList; // SArray +} SStreamTaskNodeUpdateMsg; + +int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg); +int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg); + +typedef struct { + int64_t reqId; + int64_t stage; + int64_t streamId; + int32_t upstreamNodeId; + int32_t upstreamTaskId; + int32_t downstreamNodeId; + int32_t downstreamTaskId; + int32_t childId; +} SStreamTaskCheckReq; + +int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq); +int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq); + +typedef struct { + int64_t reqId; + int64_t streamId; + int32_t upstreamNodeId; + int32_t upstreamTaskId; + int32_t downstreamNodeId; + int32_t downstreamTaskId; + int32_t childId; + int64_t oldStage; + int8_t status; +} SStreamTaskCheckRsp; + +int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp); +int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp); + +typedef struct { + SMsgHead msgHead; + int64_t streamId; + int64_t checkpointId; + int32_t downstreamTaskId; + int32_t downstreamNodeId; + int32_t upstreamTaskId; + int32_t upstreamNodeId; + int32_t childId; +} SStreamCheckpointReadyMsg; + +int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp); +int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp); + +struct SStreamDispatchReq { + int32_t type; + int64_t stage; // nodeId from upstream task + int64_t streamId; + int32_t taskId; + int32_t msgId; // msg id to identify if the incoming msg from the same sender + int32_t srcVgId; + int32_t upstreamTaskId; + int32_t upstreamChildId; + int32_t upstreamNodeId; + int32_t upstreamRelTaskId; + int32_t blockNum; + int64_t totalLen; + SArray* dataLen; // SArray + SArray* data; // SArray +}; + +int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const struct SStreamDispatchReq* pReq); +int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, struct SStreamDispatchReq* pReq); +void tCleanupStreamDispatchReq(struct SStreamDispatchReq* pReq); + +struct SStreamRetrieveReq { + int64_t streamId; + int64_t reqId; + int32_t srcTaskId; + int32_t srcNodeId; + int32_t dstTaskId; + int32_t dstNodeId; + int32_t retrieveLen; + SRetrieveTableRsp* pRetrieve; +}; + +int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const struct SStreamRetrieveReq* pReq); +int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, struct SStreamRetrieveReq* pReq); +void tCleanupStreamRetrieveReq(struct SStreamRetrieveReq* pReq); + +typedef struct SStreamTaskCheckpointReq { + int64_t streamId; + int32_t taskId; + int32_t nodeId; +} SStreamTaskCheckpointReq; + +int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq); +int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq); + +typedef struct SStreamHbMsg { + int32_t vgId; + int32_t msgId; + int64_t ts; + int32_t numOfTasks; + SArray* pTaskStatus; // SArray + SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. +} SStreamHbMsg; + +int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq); +int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq); +void tCleanupStreamHbMsg(SStreamHbMsg* pMsg); + +typedef struct { + SMsgHead head; + int32_t msgId; +} SMStreamHbRspMsg; + +int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp); +int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp); + +typedef struct SRetrieveChkptTriggerReq { + SMsgHead head; + int64_t streamId; + int64_t checkpointId; + int32_t upstreamNodeId; + int32_t upstreamTaskId; + int32_t downstreamNodeId; + int64_t downstreamTaskId; +} SRetrieveChkptTriggerReq; + +int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq); +int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq); + +typedef struct SCheckpointTriggerRsp { + int64_t streamId; + int64_t checkpointId; + int32_t upstreamTaskId; + int32_t taskId; + int32_t transId; + int32_t rspCode; +} SCheckpointTriggerRsp; + +int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp); +int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp); + +typedef struct SCheckpointReport { + int64_t streamId; + int32_t taskId; + int32_t nodeId; + int64_t checkpointId; + int64_t checkpointVer; + int64_t checkpointTs; + int32_t transId; + int8_t dropHTask; +} SCheckpointReport; + +int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq); +int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq); + +typedef struct SRestoreCheckpointInfo { + SMsgHead head; + int64_t startTs; + int64_t streamId; + int64_t checkpointId; // latest checkpoint id + int32_t transId; // transaction id of the update the consensus-checkpointId transaction + int32_t taskId; + int32_t nodeId; +} SRestoreCheckpointInfo; + +int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); +int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq); + +typedef struct { + SMsgHead head; + int64_t streamId; + int32_t taskId; + int32_t reqType; +} SStreamTaskRunReq; + +int32_t tEncodeStreamTaskRunReq(SEncoder* pEncoder, const SStreamTaskRunReq* pReq); +int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_STREAMMSG_H