fix(stream): fix error in heartbeat from vnode.

This commit is contained in:
Haojun Liao 2023-08-22 13:37:38 +08:00
parent 64556c9f1d
commit cc8b2263cb
7 changed files with 110 additions and 51 deletions

View File

@ -284,6 +284,7 @@ typedef struct SHistDataRange {
typedef struct SSTaskBasicInfo {
int32_t nodeId; // vgroup id or snode id
SEpSet epSet;
SEpSet mnodeEpset; // mnode epset for send heartbeat
int32_t selfChildId;
int32_t totalLevel;
int8_t taskLevel;
@ -388,7 +389,7 @@ typedef struct SStreamMeta {
SHashObj* pTaskBackendUnique;
TdThreadMutex backendMutex;
tmr_h hbTmr;
SMgmtInfo mgmtInfo;
// SMgmtInfo mgmtInfo;
int32_t closedTask;
int32_t chkptNotReadyTasks;

View File

@ -205,6 +205,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -14,6 +14,8 @@
*/
#include "mndScheduler.h"
#include "tmisce.h"
#include "mndMnode.h"
#include "mndDb.h"
#include "mndSnode.h"
#include "mndVgroup.h"
@ -26,7 +28,7 @@
extern bool tsDeployOnSnode;
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId,
SVgObj* pVgroup, int32_t fillHistory);
SVgObj* pVgroup, SEpSet* pEpset, int32_t fillHistory);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, int64_t deleteMark) {
@ -205,7 +207,8 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
}
// create sink node for each vgroup.
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, int32_t fillHistory) {
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, SEpSet* pEpset,
int32_t fillHistory) {
SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL;
@ -221,7 +224,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea
continue;
}
mndAddSinkTaskToStream(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup, fillHistory);
mndAddSinkTaskToStream(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup, pEpset, fillHistory);
sdbRelease(pSdb, pVgroup);
}
@ -229,7 +232,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea
}
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
int32_t fillHistory) {
SEpSet* pEpset, int32_t fillHistory) {
int64_t uid = (fillHistory == 0)? pStream->uid:pStream->hTaskUid;
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList);
if (pTask == NULL) {
@ -237,6 +240,8 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p
return -1;
}
epsetAssign(&(pTask)->info.mnodeEpset, pEpset);
pTask->info.nodeId = vgId;
pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
mndSetSinkTaskInfo(pStream, pTask);
@ -244,13 +249,15 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p
}
static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList,
SStreamObj* pStream, SSubplan* plan, uint64_t uid, int8_t fillHistory,
bool hasExtraSink, int64_t firstWindowSkey) {
SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset,
int8_t fillHistory, bool hasExtraSink, int64_t firstWindowSkey) {
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList);
if (pTask == NULL) {
return terrno;
}
epsetAssign(&pTask->info.mnodeEpset, pEpset);
// todo set the correct ts, which should be last key of queried table.
STimeWindow* pWindow = &pTask->dataRange.window;
@ -301,7 +308,7 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
}
static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream,
bool hasExtraSink, int64_t nextWindowSkey) {
SEpSet* pEpset, bool hasExtraSink, int64_t nextWindowSkey) {
// create exec stream task, since only one level, the exec task is also the source task
SArray* pTaskList = addNewTaskList(pStream->tasks);
SSdb* pSdb = pMnode->pSdb;
@ -338,8 +345,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
// new stream task
SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL);
int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, 0,
hasExtraSink, nextWindowSkey);
int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, pEpset,
0, hasExtraSink, nextWindowSkey);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
return -1;
@ -348,7 +355,7 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
if (pStream->conf.fillHistory) {
SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL);
code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid,
1, hasExtraSink, nextWindowSkey);
pEpset, 1, hasExtraSink, nextWindowSkey);
}
sdbRelease(pSdb, pVgroup);
@ -365,13 +372,16 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
}
static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t uid, SStreamTask* pDownstreamTask,
SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup, int64_t nextWindowSkey) {
SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup, SEpSet* pEpset,
int64_t nextWindowSkey) {
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, 0, pTaskList);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
epsetAssign(&(pTask)->info.mnodeEpset, pEpset);
// todo set the correct ts, which should be last key of queried table.
STimeWindow* pWindow = &pTask->dataRange.window;
pWindow->skey = INT64_MIN;
@ -390,13 +400,15 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
}
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream,
int32_t fillHistory, SStreamTask** pAggTask) {
SEpSet* pEpset, int32_t fillHistory, SStreamTask** pAggTask) {
*pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList);
if (*pAggTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
epsetAssign(&(*pAggTask)->info.mnodeEpset, pEpset);
// dispatch
if (mndAddDispatcherForInternalTask(pMnode, pStream, pSinkNodeList, *pAggTask) < 0) {
return -1;
@ -405,8 +417,8 @@ static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeLi
return 0;
}
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SStreamTask** pAggTask,
SStreamTask** pHAggTask) {
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset,
SStreamTask** pAggTask, SStreamTask** pHAggTask) {
SArray* pAggTaskList = addNewTaskList(pStream->tasks);
SSdb* pSdb = pMnode->pSdb;
@ -420,7 +432,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
*pAggTask = NULL;
SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, 0, pAggTask);
int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, pEpset, 0, pAggTask);
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
@ -448,7 +460,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
SArray* pHSinkNodeList = taosArrayGetP(pStream->pHTasksList, SINK_NODE_LEVEL);
*pHAggTask = NULL;
code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pStream->conf.fillHistory,
code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pEpset, pStream->conf.fillHistory,
pHAggTask);
if (code != TSDB_CODE_SUCCESS) {
if (pSnode != NULL) {
@ -478,7 +490,8 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
}
static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPlan, SStreamObj* pStream,
SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask, int64_t nextWindowSkey) {
SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask,
SEpSet* pEpset, int64_t nextWindowSkey) {
SArray* pSourceTaskList = addNewTaskList(pStream->tasks);
SArray* pHSourceTaskList = NULL;
@ -508,7 +521,7 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
}
int32_t code =
doAddSourceTask(pSourceTaskList, 0, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, nextWindowSkey);
doAddSourceTask(pSourceTaskList, 0, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, pEpset, nextWindowSkey);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
terrno = code;
@ -517,7 +530,7 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
if (pStream->conf.fillHistory) {
code = doAddSourceTask(pHSourceTaskList, 1, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup,
nextWindowSkey);
pEpset, nextWindowSkey);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
return code;
@ -535,16 +548,16 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
}
static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList,
int32_t fillHistory) {
SEpSet* pEpset, int32_t fillHistory) {
SArray* pSinkTaskList = addNewTaskList(pTasksList);
if (pStream->fixedSinkVgId == 0) {
if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream, fillHistory) < 0) {
if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream, pEpset, fillHistory) < 0) {
// TODO free
return -1;
}
} else {
if (mndAddSinkTaskToStream(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg,
fillHistory) < 0) {
pEpset, fillHistory) < 0) {
// TODO free
return -1;
}
@ -562,7 +575,7 @@ static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpst
}
}
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) {
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) {
SSdb* pSdb = pMnode->pSdb;
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
@ -585,7 +598,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
hasExtraSink = true;
SArray* pSinkTaskList = NULL;
int32_t code = addSinkTasks(pStream->tasks, pMnode, pStream, &pSinkTaskList, 0);
int32_t code = addSinkTasks(pStream->tasks, pMnode, pStream, &pSinkTaskList, pEpset, 0);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -593,7 +606,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
// check for fill history
if (pStream->conf.fillHistory) {
SArray* pHSinkTaskList = NULL;
code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList, 1);
code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList, pEpset, 1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -608,7 +621,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
SStreamTask* pAggTask = NULL;
SStreamTask* pHAggTask = NULL;
int32_t code = addAggTask(pStream, pMnode, pPlan, &pAggTask, &pHAggTask);
int32_t code = addAggTask(pStream, pMnode, pPlan, pEpset, &pAggTask, &pHAggTask);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -617,9 +630,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask);
// source level
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey);
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, pEpset, nextWindowSkey);
} else if (numOfPlanLevel == 1) {
return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, hasExtraSink, nextWindowSkey);
return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, pEpset, hasExtraSink, nextWindowSkey);
}
return 0;
@ -632,7 +645,10 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindo
return -1;
}
int32_t code = doScheduleStream(pStream, pMnode, pPlan, nextWindowSkey);
SEpSet mnodeEpset = {0};
mndGetMnodeEpSet(pMnode, &mnodeEpset);
int32_t code = doScheduleStream(pStream, pMnode, pPlan, nextWindowSkey, &mnodeEpset);
qDestroyQueryPlan(pPlan);
return code;

View File

@ -110,7 +110,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
@ -861,7 +861,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mndTransDrop(pTrans);
keepStreamTasksInBuf(pStream, &execNodeList);
taosThreadMutexLock(&execNodeList.lock);
keepStreamTasksInBuf(&streamObj, &execNodeList);
taosThreadMutexUnlock(&execNodeList.lock);
code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER:
@ -2139,6 +2142,22 @@ static SArray *doExtractNodeListFromStream(SMnode *pMnode) {
return plist;
}
static void doExtractTasksFromStream(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
keepStreamTasksInBuf(pStream, &execNodeList);
sdbRelease(pSdb, pStream);
}
}
// this function runs by only one thread, so it is not multi-thread safe
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
@ -2229,15 +2248,13 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
// todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
int32_t code = TSDB_CODE_SUCCESS;
SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen);
if (tStartDecode(&decoder) < 0) return -1;
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
@ -2248,6 +2265,11 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
taosThreadMutexLock(&execNodeList.lock);
int32_t numOfExisted = taosHashGetSize(execNodeList.pTaskMap);
if (numOfExisted == 0) {
doExtractTasksFromStream(pMnode);
}
for(int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i);
int64_t k[2] = {p->streamId, p->taskId};
@ -2255,6 +2277,9 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index);
pStatusEntry->status = p->status;
if (p->status != TASK_STATUS__NORMAL) {
mDebug("received s-task:0x%x no in ready stat:%s", p->taskId, streamGetTaskStatusStr(p->status));
}
}
taosThreadMutexUnlock(&execNodeList.lock);

View File

@ -1809,8 +1809,8 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
// set the initial value for generating check point
// set the mgmt epset info according to the checkout source msg from mnode, todo opt perf
pMeta->mgmtInfo.epset = req.mgmtEps;
pMeta->mgmtInfo.mnodeId = req.mnodeId;
// pMeta->mgmtInfo.epset = req.mgmtEps;
// pMeta->mgmtInfo.mnodeId = req.mnodeId;
if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList);

View File

@ -13,12 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <common/tmisce.h>
#include "executor.h"
#include "streamBackendRocksdb.h"
#include "streamInt.h"
#include "tref.h"
#include "ttimer.h"
#include "tstream.h"
#include "ttimer.h"
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0;
@ -608,10 +609,26 @@ void metaHbToMnode(void* param, void* tmrId) {
taosRLockLatch(&pMeta->lock);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
taosRUnLockLatch(&pMeta->lock);
SEpSet epset = {0};
hbMsg.numOfTasks = numOfTasks;
hbMsg.vgId = pMeta->vgId;
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamId* pId = taosArrayGet(pMeta->pTaskList, i);
int64_t keys[2] = {pId->streamId, pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus};
taosArrayPush(hbMsg.pTaskStatus, &entry);
if (i == 0) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
}
}
taosRUnLockLatch(&pMeta->lock);
int32_t code = 0;
int32_t tlen = 0;
@ -622,17 +639,14 @@ void metaHbToMnode(void* param, void* tmrId) {
return;
}
void* buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return;
}
((SMsgHead*)buf)->vgId = htonl(pMeta->mgmtInfo.mnodeId);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, pBuf, tlen);
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
rpcFreeCont(buf);
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
@ -641,11 +655,11 @@ void metaHbToMnode(void* param, void* tmrId) {
tEncoderClear(&encoder);
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen + sizeof(SMsgHead));
qDebug("vgId:%d, build and send hb to mnode", pMeta->mgmtInfo.mnodeId);
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
msg.info.noResp = 1;
tmsgSendReq(&pMeta->mgmtInfo.epset, &msg);
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
// next hb will be issued in 20sec.
taosTmrReset(metaHbToMnode, 5000, pMeta, streamEnv.timer, pMeta->hbTmr);
tmsgSendReq(&epset, &msg);
taosTmrReset(metaHbToMnode, 5000, pMeta, streamEnv.timer, &pMeta->hbTmr);
}

View File

@ -90,6 +90,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1;
@ -190,6 +191,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1;