From cc8b2263cb9e1a501719dc534da57bf8c0e6c963 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 22 Aug 2023 13:37:38 +0800 Subject: [PATCH] fix(stream): fix error in heartbeat from vnode. --- include/libs/stream/tstream.h | 3 +- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mnode/impl/src/mndScheduler.c | 74 +++++++++++++-------- source/dnode/mnode/impl/src/mndStream.c | 39 +++++++++-- source/dnode/vnode/src/tq/tq.c | 4 +- source/libs/stream/src/streamMeta.c | 38 +++++++---- source/libs/stream/src/streamTask.c | 2 + 7 files changed, 110 insertions(+), 51 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8d5d5d224c..4f485eb2ee 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -284,6 +284,7 @@ typedef struct SHistDataRange { typedef struct SSTaskBasicInfo { int32_t nodeId; // vgroup id or snode id SEpSet epSet; + SEpSet mnodeEpset; // mnode epset for send heartbeat int32_t selfChildId; int32_t totalLevel; int8_t taskLevel; @@ -388,7 +389,7 @@ typedef struct SStreamMeta { SHashObj* pTaskBackendUnique; TdThreadMutex backendMutex; tmr_h hbTmr; - SMgmtInfo mgmtInfo; +// SMgmtInfo mgmtInfo; int32_t closedTask; int32_t chkptNotReadyTasks; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 5029f0aec4..128a0bc89b 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -205,6 +205,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 36f1eabdba..1d7d391acf 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -14,6 +14,8 @@ */ #include "mndScheduler.h" +#include "tmisce.h" +#include "mndMnode.h" #include "mndDb.h" #include "mndSnode.h" #include "mndVgroup.h" @@ -26,7 +28,7 @@ extern bool tsDeployOnSnode; static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, - SVgObj* pVgroup, int32_t fillHistory); + SVgObj* pVgroup, SEpSet* pEpset, int32_t fillHistory); int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, int64_t deleteMark) { @@ -205,7 +207,8 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { } // create sink node for each vgroup. -int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, int32_t fillHistory) { +int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, SEpSet* pEpset, + int32_t fillHistory) { SSdb* pSdb = pMnode->pSdb; void* pIter = NULL; @@ -221,7 +224,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea continue; } - mndAddSinkTaskToStream(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup, fillHistory); + mndAddSinkTaskToStream(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup, pEpset, fillHistory); sdbRelease(pSdb, pVgroup); } @@ -229,7 +232,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea } int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, - int32_t fillHistory) { + SEpSet* pEpset, int32_t fillHistory) { int64_t uid = (fillHistory == 0)? pStream->uid:pStream->hTaskUid; SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList); if (pTask == NULL) { @@ -237,6 +240,8 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p return -1; } + epsetAssign(&(pTask)->info.mnodeEpset, pEpset); + pTask->info.nodeId = vgId; pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup); mndSetSinkTaskInfo(pStream, pTask); @@ -244,13 +249,15 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p } static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList, - SStreamObj* pStream, SSubplan* plan, uint64_t uid, int8_t fillHistory, - bool hasExtraSink, int64_t firstWindowSkey) { + SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset, + int8_t fillHistory, bool hasExtraSink, int64_t firstWindowSkey) { SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList); if (pTask == NULL) { return terrno; } + epsetAssign(&pTask->info.mnodeEpset, pEpset); + // todo set the correct ts, which should be last key of queried table. STimeWindow* pWindow = &pTask->dataRange.window; @@ -301,7 +308,7 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { } static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream, - bool hasExtraSink, int64_t nextWindowSkey) { + SEpSet* pEpset, bool hasExtraSink, int64_t nextWindowSkey) { // create exec stream task, since only one level, the exec task is also the source task SArray* pTaskList = addNewTaskList(pStream->tasks); SSdb* pSdb = pMnode->pSdb; @@ -338,8 +345,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* // new stream task SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL); - int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, 0, - hasExtraSink, nextWindowSkey); + int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, pEpset, + 0, hasExtraSink, nextWindowSkey); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); return -1; @@ -348,7 +355,7 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* if (pStream->conf.fillHistory) { SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL); code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, - 1, hasExtraSink, nextWindowSkey); + pEpset, 1, hasExtraSink, nextWindowSkey); } sdbRelease(pSdb, pVgroup); @@ -365,13 +372,16 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* } static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t uid, SStreamTask* pDownstreamTask, - SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup, int64_t nextWindowSkey) { + SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup, SEpSet* pEpset, + int64_t nextWindowSkey) { SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, 0, pTaskList); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + epsetAssign(&(pTask)->info.mnodeEpset, pEpset); + // todo set the correct ts, which should be last key of queried table. STimeWindow* pWindow = &pTask->dataRange.window; pWindow->skey = INT64_MIN; @@ -390,13 +400,15 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui } static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, - int32_t fillHistory, SStreamTask** pAggTask) { + SEpSet* pEpset, int32_t fillHistory, SStreamTask** pAggTask) { *pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList); if (*pAggTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + epsetAssign(&(*pAggTask)->info.mnodeEpset, pEpset); + // dispatch if (mndAddDispatcherForInternalTask(pMnode, pStream, pSinkNodeList, *pAggTask) < 0) { return -1; @@ -405,8 +417,8 @@ static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeLi return 0; } -static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SStreamTask** pAggTask, - SStreamTask** pHAggTask) { +static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, + SStreamTask** pAggTask, SStreamTask** pHAggTask) { SArray* pAggTaskList = addNewTaskList(pStream->tasks); SSdb* pSdb = pMnode->pSdb; @@ -420,7 +432,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan *pAggTask = NULL; SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); - int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, 0, pAggTask); + int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, pEpset, 0, pAggTask); if (code != TSDB_CODE_SUCCESS) { return -1; } @@ -448,7 +460,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan SArray* pHSinkNodeList = taosArrayGetP(pStream->pHTasksList, SINK_NODE_LEVEL); *pHAggTask = NULL; - code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pStream->conf.fillHistory, + code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pEpset, pStream->conf.fillHistory, pHAggTask); if (code != TSDB_CODE_SUCCESS) { if (pSnode != NULL) { @@ -478,7 +490,8 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan } static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPlan, SStreamObj* pStream, - SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask, int64_t nextWindowSkey) { + SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask, + SEpSet* pEpset, int64_t nextWindowSkey) { SArray* pSourceTaskList = addNewTaskList(pStream->tasks); SArray* pHSourceTaskList = NULL; @@ -508,7 +521,7 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl } int32_t code = - doAddSourceTask(pSourceTaskList, 0, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, nextWindowSkey); + doAddSourceTask(pSourceTaskList, 0, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, pEpset, nextWindowSkey); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); terrno = code; @@ -517,7 +530,7 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl if (pStream->conf.fillHistory) { code = doAddSourceTask(pHSourceTaskList, 1, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup, - nextWindowSkey); + pEpset, nextWindowSkey); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); return code; @@ -535,16 +548,16 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl } static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList, - int32_t fillHistory) { + SEpSet* pEpset, int32_t fillHistory) { SArray* pSinkTaskList = addNewTaskList(pTasksList); if (pStream->fixedSinkVgId == 0) { - if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream, fillHistory) < 0) { + if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream, pEpset, fillHistory) < 0) { // TODO free return -1; } } else { if (mndAddSinkTaskToStream(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, - fillHistory) < 0) { + pEpset, fillHistory) < 0) { // TODO free return -1; } @@ -562,7 +575,7 @@ static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpst } } -static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) { +static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) { SSdb* pSdb = pMnode->pSdb; int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); @@ -585,7 +598,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* hasExtraSink = true; SArray* pSinkTaskList = NULL; - int32_t code = addSinkTasks(pStream->tasks, pMnode, pStream, &pSinkTaskList, 0); + int32_t code = addSinkTasks(pStream->tasks, pMnode, pStream, &pSinkTaskList, pEpset, 0); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -593,7 +606,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* // check for fill history if (pStream->conf.fillHistory) { SArray* pHSinkTaskList = NULL; - code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList, 1); + code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList, pEpset, 1); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -608,7 +621,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* SStreamTask* pAggTask = NULL; SStreamTask* pHAggTask = NULL; - int32_t code = addAggTask(pStream, pMnode, pPlan, &pAggTask, &pHAggTask); + int32_t code = addAggTask(pStream, pMnode, pPlan, pEpset, &pAggTask, &pHAggTask); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -617,9 +630,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask); // source level - return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey); + return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, pEpset, nextWindowSkey); } else if (numOfPlanLevel == 1) { - return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, hasExtraSink, nextWindowSkey); + return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, pEpset, hasExtraSink, nextWindowSkey); } return 0; @@ -632,7 +645,10 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindo return -1; } - int32_t code = doScheduleStream(pStream, pMnode, pPlan, nextWindowSkey); + SEpSet mnodeEpset = {0}; + mndGetMnodeEpSet(pMnode, &mnodeEpset); + + int32_t code = doScheduleStream(pStream, pMnode, pPlan, nextWindowSkey, &mnodeEpset); qDestroyQueryPlan(pPlan); return code; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 1ec89bdd2b..251e8bc37d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -110,7 +110,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); - mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq); @@ -861,7 +861,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); - keepStreamTasksInBuf(pStream, &execNodeList); + taosThreadMutexLock(&execNodeList.lock); + keepStreamTasksInBuf(&streamObj, &execNodeList); + taosThreadMutexUnlock(&execNodeList.lock); + code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: @@ -2139,6 +2142,22 @@ static SArray *doExtractNodeListFromStream(SMnode *pMnode) { return plist; } +static void doExtractTasksFromStream(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + SStreamObj *pStream = NULL; + void *pIter = NULL; + + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + break; + } + + keepStreamTasksInBuf(pStream, &execNodeList); + sdbRelease(pSdb, pStream); + } +} + // this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1); @@ -2229,15 +2248,13 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p // todo: this process should be executed by the write queue worker of the mnode int32_t mndProcessStreamHb(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SMnode *pMnode = pReq->info.node; + SStreamHbMsg req = {0}; int32_t code = TSDB_CODE_SUCCESS; SDecoder decoder = {0}; - tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen); - - if (tStartDecode(&decoder) < 0) return -1; + tDecoderInit(&decoder, pReq->pCont, pReq->contLen); if (tDecodeStreamHbMsg(&decoder, &req) < 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -2248,6 +2265,11 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); taosThreadMutexLock(&execNodeList.lock); + int32_t numOfExisted = taosHashGetSize(execNodeList.pTaskMap); + if (numOfExisted == 0) { + doExtractTasksFromStream(pMnode); + } + for(int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i); int64_t k[2] = {p->streamId, p->taskId}; @@ -2255,6 +2277,9 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); pStatusEntry->status = p->status; + if (p->status != TASK_STATUS__NORMAL) { + mDebug("received s-task:0x%x no in ready stat:%s", p->taskId, streamGetTaskStatusStr(p->status)); + } } taosThreadMutexUnlock(&execNodeList.lock); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4e26dcd0d0..5ef750b2b0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1809,8 +1809,8 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { // set the initial value for generating check point // set the mgmt epset info according to the checkout source msg from mnode, todo opt perf - pMeta->mgmtInfo.epset = req.mgmtEps; - pMeta->mgmtInfo.mnodeId = req.mnodeId; +// pMeta->mgmtInfo.epset = req.mgmtEps; +// pMeta->mgmtInfo.mnodeId = req.mnodeId; if (pMeta->chkptNotReadyTasks == 0) { pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ba1ec7e5ab..aaddcbfcd4 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -13,12 +13,13 @@ * along with this program. If not, see . */ +#include #include "executor.h" #include "streamBackendRocksdb.h" #include "streamInt.h" #include "tref.h" -#include "ttimer.h" #include "tstream.h" +#include "ttimer.h" static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; @@ -608,10 +609,26 @@ void metaHbToMnode(void* param, void* tmrId) { taosRLockLatch(&pMeta->lock); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - taosRUnLockLatch(&pMeta->lock); + + SEpSet epset = {0}; hbMsg.numOfTasks = numOfTasks; hbMsg.vgId = pMeta->vgId; + hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamId* pId = taosArrayGet(pMeta->pTaskList, i); + + int64_t keys[2] = {pId->streamId, pId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus}; + + taosArrayPush(hbMsg.pTaskStatus, &entry); + if (i == 0) { + epsetAssign(&epset, &(*pTask)->info.mnodeEpset); + } + } + + taosRUnLockLatch(&pMeta->lock); int32_t code = 0; int32_t tlen = 0; @@ -622,17 +639,14 @@ void metaHbToMnode(void* param, void* tmrId) { return; } - void* buf = rpcMallocCont(sizeof(SMsgHead) + tlen); + void* buf = rpcMallocCont(tlen); if (buf == NULL) { qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return; } - ((SMsgHead*)buf)->vgId = htonl(pMeta->mgmtInfo.mnodeId); - void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SEncoder encoder; - tEncoderInit(&encoder, pBuf, tlen); + tEncoderInit(&encoder, buf, tlen); if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { rpcFreeCont(buf); qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); @@ -641,11 +655,11 @@ void metaHbToMnode(void* param, void* tmrId) { tEncoderClear(&encoder); SRpcMsg msg = {0}; - initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen + sizeof(SMsgHead)); - qDebug("vgId:%d, build and send hb to mnode", pMeta->mgmtInfo.mnodeId); + initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); + msg.info.noResp = 1; - tmsgSendReq(&pMeta->mgmtInfo.epset, &msg); + qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); - // next hb will be issued in 20sec. - taosTmrReset(metaHbToMnode, 5000, pMeta, streamEnv.timer, pMeta->hbTmr); + tmsgSendReq(&epset, &msg); + taosTmrReset(metaHbToMnode, 5000, pMeta, streamEnv.timer, &pMeta->hbTmr); } \ No newline at end of file diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 10fb844117..c33c5f40cd 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -90,6 +90,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset) < 0) return -1; if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1; if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1; @@ -190,6 +191,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1;