diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 8d5d5d224c..4f485eb2ee 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -284,6 +284,7 @@ typedef struct SHistDataRange {
typedef struct SSTaskBasicInfo {
int32_t nodeId; // vgroup id or snode id
SEpSet epSet;
+ SEpSet mnodeEpset; // mnode epset for send heartbeat
int32_t selfChildId;
int32_t totalLevel;
int8_t taskLevel;
@@ -388,7 +389,7 @@ typedef struct SStreamMeta {
SHashObj* pTaskBackendUnique;
TdThreadMutex backendMutex;
tmr_h hbTmr;
- SMgmtInfo mgmtInfo;
+// SMgmtInfo mgmtInfo;
int32_t closedTask;
int32_t chkptNotReadyTasks;
diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
index 5029f0aec4..128a0bc89b 100644
--- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
@@ -205,6 +205,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c
index 36f1eabdba..1d7d391acf 100644
--- a/source/dnode/mnode/impl/src/mndScheduler.c
+++ b/source/dnode/mnode/impl/src/mndScheduler.c
@@ -14,6 +14,8 @@
*/
#include "mndScheduler.h"
+#include "tmisce.h"
+#include "mndMnode.h"
#include "mndDb.h"
#include "mndSnode.h"
#include "mndVgroup.h"
@@ -26,7 +28,7 @@
extern bool tsDeployOnSnode;
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId,
- SVgObj* pVgroup, int32_t fillHistory);
+ SVgObj* pVgroup, SEpSet* pEpset, int32_t fillHistory);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, int64_t deleteMark) {
@@ -205,7 +207,8 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
}
// create sink node for each vgroup.
-int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, int32_t fillHistory) {
+int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, SEpSet* pEpset,
+ int32_t fillHistory) {
SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL;
@@ -221,7 +224,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea
continue;
}
- mndAddSinkTaskToStream(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup, fillHistory);
+ mndAddSinkTaskToStream(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup, pEpset, fillHistory);
sdbRelease(pSdb, pVgroup);
}
@@ -229,7 +232,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea
}
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
- int32_t fillHistory) {
+ SEpSet* pEpset, int32_t fillHistory) {
int64_t uid = (fillHistory == 0)? pStream->uid:pStream->hTaskUid;
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList);
if (pTask == NULL) {
@@ -237,6 +240,8 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p
return -1;
}
+ epsetAssign(&(pTask)->info.mnodeEpset, pEpset);
+
pTask->info.nodeId = vgId;
pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
mndSetSinkTaskInfo(pStream, pTask);
@@ -244,13 +249,15 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p
}
static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList,
- SStreamObj* pStream, SSubplan* plan, uint64_t uid, int8_t fillHistory,
- bool hasExtraSink, int64_t firstWindowSkey) {
+ SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset,
+ int8_t fillHistory, bool hasExtraSink, int64_t firstWindowSkey) {
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList);
if (pTask == NULL) {
return terrno;
}
+ epsetAssign(&pTask->info.mnodeEpset, pEpset);
+
// todo set the correct ts, which should be last key of queried table.
STimeWindow* pWindow = &pTask->dataRange.window;
@@ -301,7 +308,7 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
}
static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream,
- bool hasExtraSink, int64_t nextWindowSkey) {
+ SEpSet* pEpset, bool hasExtraSink, int64_t nextWindowSkey) {
// create exec stream task, since only one level, the exec task is also the source task
SArray* pTaskList = addNewTaskList(pStream->tasks);
SSdb* pSdb = pMnode->pSdb;
@@ -338,8 +345,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
// new stream task
SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL);
- int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, 0,
- hasExtraSink, nextWindowSkey);
+ int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, pEpset,
+ 0, hasExtraSink, nextWindowSkey);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
return -1;
@@ -348,7 +355,7 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
if (pStream->conf.fillHistory) {
SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL);
code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid,
- 1, hasExtraSink, nextWindowSkey);
+ pEpset, 1, hasExtraSink, nextWindowSkey);
}
sdbRelease(pSdb, pVgroup);
@@ -365,13 +372,16 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
}
static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t uid, SStreamTask* pDownstreamTask,
- SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup, int64_t nextWindowSkey) {
+ SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup, SEpSet* pEpset,
+ int64_t nextWindowSkey) {
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, 0, pTaskList);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
+ epsetAssign(&(pTask)->info.mnodeEpset, pEpset);
+
// todo set the correct ts, which should be last key of queried table.
STimeWindow* pWindow = &pTask->dataRange.window;
pWindow->skey = INT64_MIN;
@@ -390,13 +400,15 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
}
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream,
- int32_t fillHistory, SStreamTask** pAggTask) {
+ SEpSet* pEpset, int32_t fillHistory, SStreamTask** pAggTask) {
*pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList);
if (*pAggTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
+ epsetAssign(&(*pAggTask)->info.mnodeEpset, pEpset);
+
// dispatch
if (mndAddDispatcherForInternalTask(pMnode, pStream, pSinkNodeList, *pAggTask) < 0) {
return -1;
@@ -405,8 +417,8 @@ static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeLi
return 0;
}
-static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SStreamTask** pAggTask,
- SStreamTask** pHAggTask) {
+static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset,
+ SStreamTask** pAggTask, SStreamTask** pHAggTask) {
SArray* pAggTaskList = addNewTaskList(pStream->tasks);
SSdb* pSdb = pMnode->pSdb;
@@ -420,7 +432,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
*pAggTask = NULL;
SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
- int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, 0, pAggTask);
+ int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, pEpset, 0, pAggTask);
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
@@ -448,7 +460,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
SArray* pHSinkNodeList = taosArrayGetP(pStream->pHTasksList, SINK_NODE_LEVEL);
*pHAggTask = NULL;
- code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pStream->conf.fillHistory,
+ code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pEpset, pStream->conf.fillHistory,
pHAggTask);
if (code != TSDB_CODE_SUCCESS) {
if (pSnode != NULL) {
@@ -478,7 +490,8 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
}
static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPlan, SStreamObj* pStream,
- SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask, int64_t nextWindowSkey) {
+ SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask,
+ SEpSet* pEpset, int64_t nextWindowSkey) {
SArray* pSourceTaskList = addNewTaskList(pStream->tasks);
SArray* pHSourceTaskList = NULL;
@@ -508,7 +521,7 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
}
int32_t code =
- doAddSourceTask(pSourceTaskList, 0, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, nextWindowSkey);
+ doAddSourceTask(pSourceTaskList, 0, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, pEpset, nextWindowSkey);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
terrno = code;
@@ -517,7 +530,7 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
if (pStream->conf.fillHistory) {
code = doAddSourceTask(pHSourceTaskList, 1, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup,
- nextWindowSkey);
+ pEpset, nextWindowSkey);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
return code;
@@ -535,16 +548,16 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
}
static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList,
- int32_t fillHistory) {
+ SEpSet* pEpset, int32_t fillHistory) {
SArray* pSinkTaskList = addNewTaskList(pTasksList);
if (pStream->fixedSinkVgId == 0) {
- if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream, fillHistory) < 0) {
+ if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream, pEpset, fillHistory) < 0) {
// TODO free
return -1;
}
} else {
if (mndAddSinkTaskToStream(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg,
- fillHistory) < 0) {
+ pEpset, fillHistory) < 0) {
// TODO free
return -1;
}
@@ -562,7 +575,7 @@ static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpst
}
}
-static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) {
+static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) {
SSdb* pSdb = pMnode->pSdb;
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
@@ -585,7 +598,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
hasExtraSink = true;
SArray* pSinkTaskList = NULL;
- int32_t code = addSinkTasks(pStream->tasks, pMnode, pStream, &pSinkTaskList, 0);
+ int32_t code = addSinkTasks(pStream->tasks, pMnode, pStream, &pSinkTaskList, pEpset, 0);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -593,7 +606,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
// check for fill history
if (pStream->conf.fillHistory) {
SArray* pHSinkTaskList = NULL;
- code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList, 1);
+ code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList, pEpset, 1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -608,7 +621,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
SStreamTask* pAggTask = NULL;
SStreamTask* pHAggTask = NULL;
- int32_t code = addAggTask(pStream, pMnode, pPlan, &pAggTask, &pHAggTask);
+ int32_t code = addAggTask(pStream, pMnode, pPlan, pEpset, &pAggTask, &pHAggTask);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -617,9 +630,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask);
// source level
- return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey);
+ return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, pEpset, nextWindowSkey);
} else if (numOfPlanLevel == 1) {
- return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, hasExtraSink, nextWindowSkey);
+ return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, pEpset, hasExtraSink, nextWindowSkey);
}
return 0;
@@ -632,7 +645,10 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindo
return -1;
}
- int32_t code = doScheduleStream(pStream, pMnode, pPlan, nextWindowSkey);
+ SEpSet mnodeEpset = {0};
+ mndGetMnodeEpSet(pMnode, &mnodeEpset);
+
+ int32_t code = doScheduleStream(pStream, pMnode, pPlan, nextWindowSkey, &mnodeEpset);
qDestroyQueryPlan(pPlan);
return code;
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index 1ec89bdd2b..251e8bc37d 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -110,7 +110,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
- mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
+ mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
@@ -861,7 +861,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mndTransDrop(pTrans);
- keepStreamTasksInBuf(pStream, &execNodeList);
+ taosThreadMutexLock(&execNodeList.lock);
+ keepStreamTasksInBuf(&streamObj, &execNodeList);
+ taosThreadMutexUnlock(&execNodeList.lock);
+
code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER:
@@ -2139,6 +2142,22 @@ static SArray *doExtractNodeListFromStream(SMnode *pMnode) {
return plist;
}
+static void doExtractTasksFromStream(SMnode *pMnode) {
+ SSdb *pSdb = pMnode->pSdb;
+ SStreamObj *pStream = NULL;
+ void *pIter = NULL;
+
+ while (1) {
+ pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
+ if (pIter == NULL) {
+ break;
+ }
+
+ keepStreamTasksInBuf(pStream, &execNodeList);
+ sdbRelease(pSdb, pStream);
+ }
+}
+
// this function runs by only one thread, so it is not multi-thread safe
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
@@ -2229,15 +2248,13 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
// todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
- SMnode *pMnode = pReq->info.node;
- SSdb *pSdb = pMnode->pSdb;
+ SMnode *pMnode = pReq->info.node;
+
SStreamHbMsg req = {0};
int32_t code = TSDB_CODE_SUCCESS;
SDecoder decoder = {0};
- tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen);
-
- if (tStartDecode(&decoder) < 0) return -1;
+ tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
@@ -2248,6 +2265,11 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
taosThreadMutexLock(&execNodeList.lock);
+ int32_t numOfExisted = taosHashGetSize(execNodeList.pTaskMap);
+ if (numOfExisted == 0) {
+ doExtractTasksFromStream(pMnode);
+ }
+
for(int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i);
int64_t k[2] = {p->streamId, p->taskId};
@@ -2255,6 +2277,9 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index);
pStatusEntry->status = p->status;
+ if (p->status != TASK_STATUS__NORMAL) {
+ mDebug("received s-task:0x%x no in ready stat:%s", p->taskId, streamGetTaskStatusStr(p->status));
+ }
}
taosThreadMutexUnlock(&execNodeList.lock);
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index 4e26dcd0d0..5ef750b2b0 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -1809,8 +1809,8 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
// set the initial value for generating check point
// set the mgmt epset info according to the checkout source msg from mnode, todo opt perf
- pMeta->mgmtInfo.epset = req.mgmtEps;
- pMeta->mgmtInfo.mnodeId = req.mnodeId;
+// pMeta->mgmtInfo.epset = req.mgmtEps;
+// pMeta->mgmtInfo.mnodeId = req.mnodeId;
if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList);
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index ba1ec7e5ab..aaddcbfcd4 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -13,12 +13,13 @@
* along with this program. If not, see .
*/
+#include
#include "executor.h"
#include "streamBackendRocksdb.h"
#include "streamInt.h"
#include "tref.h"
-#include "ttimer.h"
#include "tstream.h"
+#include "ttimer.h"
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0;
@@ -608,10 +609,26 @@ void metaHbToMnode(void* param, void* tmrId) {
taosRLockLatch(&pMeta->lock);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
- taosRUnLockLatch(&pMeta->lock);
+
+ SEpSet epset = {0};
hbMsg.numOfTasks = numOfTasks;
hbMsg.vgId = pMeta->vgId;
+ hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
+ for (int32_t i = 0; i < numOfTasks; ++i) {
+ SStreamId* pId = taosArrayGet(pMeta->pTaskList, i);
+
+ int64_t keys[2] = {pId->streamId, pId->taskId};
+ SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
+ STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus};
+
+ taosArrayPush(hbMsg.pTaskStatus, &entry);
+ if (i == 0) {
+ epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
+ }
+ }
+
+ taosRUnLockLatch(&pMeta->lock);
int32_t code = 0;
int32_t tlen = 0;
@@ -622,17 +639,14 @@ void metaHbToMnode(void* param, void* tmrId) {
return;
}
- void* buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
+ void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return;
}
- ((SMsgHead*)buf)->vgId = htonl(pMeta->mgmtInfo.mnodeId);
- void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
-
SEncoder encoder;
- tEncoderInit(&encoder, pBuf, tlen);
+ tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
rpcFreeCont(buf);
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
@@ -641,11 +655,11 @@ void metaHbToMnode(void* param, void* tmrId) {
tEncoderClear(&encoder);
SRpcMsg msg = {0};
- initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen + sizeof(SMsgHead));
- qDebug("vgId:%d, build and send hb to mnode", pMeta->mgmtInfo.mnodeId);
+ initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
+ msg.info.noResp = 1;
- tmsgSendReq(&pMeta->mgmtInfo.epset, &msg);
+ qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
- // next hb will be issued in 20sec.
- taosTmrReset(metaHbToMnode, 5000, pMeta, streamEnv.timer, pMeta->hbTmr);
+ tmsgSendReq(&epset, &msg);
+ taosTmrReset(metaHbToMnode, 5000, pMeta, streamEnv.timer, &pMeta->hbTmr);
}
\ No newline at end of file
diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c
index 10fb844117..c33c5f40cd 100644
--- a/source/libs/stream/src/streamTask.c
+++ b/source/libs/stream/src/streamTask.c
@@ -90,6 +90,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1;
+ if (tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1;
@@ -190,6 +191,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1;
+ if (tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1;