diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 6d56ad31d1..0065caf3b3 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -303,6 +303,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 62e10255e9..902674b9dd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -554,7 +554,7 @@ typedef struct { SEpSet epset; } SStreamTaskUpdateInfo; -int32_t tEncodeTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg); +int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg); int32_t tDecodeTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateInfo* pMsg); typedef struct { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 2426e81e3f..3488df8dc0 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1736,13 +1736,121 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } +static int32_t doBuildStreamTaskUpdateMsg(void** pBuf, int32_t* pLen, int32_t nodeId, const SEpSet* pEpset) { + SStreamTaskUpdateInfo req = {0}; + req.nodeId = nodeId; + req.epset = *pEpset; + + int32_t code = 0; + int32_t blen; + + tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code); + if (code < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + int32_t tlen = sizeof(SMsgHead) + blen; + + void *buf = taosMemoryMalloc(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + tEncodeStreamTaskUpdateMsg(&encoder, &req); + + SMsgHead *pMsgHead = (SMsgHead *)buf; + pMsgHead->contLen = htonl(tlen); + pMsgHead->vgId = htonl(nodeId); + + tEncoderClear(&encoder); + + *pBuf = buf; + *pLen = tlen; + + return TSDB_CODE_SUCCESS; +} + +// build trans to update the epset +static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int32_t nodeId, SEpSet* pEpset) { + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-task-update"); + if (pTrans == NULL) { + mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return -1; + } + + mDebug("start to build stream:0x%"PRIx64" task DAG update", pStream->uid); + ASSERT(0); + +// mndTransSetDbName(pTrans, "stream-task-update", "checkpoint"); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { + mError("failed to build stream:0x%"PRIx64" task DAG update, code:%s", pStream->uid, tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); + mndTransDrop(pTrans); + return -1; + } + + taosWLockLatch(&pStream->lock); + int32_t numOfLevels = taosArrayGetSize(pStream->tasks); + + for (int32_t j = 0; j < numOfLevels; ++j) { + SArray *pLevel = taosArrayGetP(pStream->tasks, j); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t k = 0; k < numOfTasks; ++k) { + SStreamTask *pTask = taosArrayGetP(pLevel, k); + + void* pBuf = NULL; + int32_t len = 0; + doBuildStreamTaskUpdateMsg(&pBuf, &len, nodeId, pEpset); + + STransAction action = {0}; + action.epSet = pTask->info.epSet; + action.pCont = pBuf; + action.contLen = len; + action.msgType = TDMT_VND_STREAM_TASK_UPDATE; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pBuf); + taosWUnLockLatch(&pStream->lock); + return -1; + } + + } + } + + taosWUnLockLatch(&pStream->lock); + + SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); + if (pCommitRaw == NULL) { + mError("failed to prepare trans rebalance since %s", terrstr()); + return -1; + } + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + sdbFreeRaw(pCommitRaw); + mError("failed to prepare trans rebalance since %s", terrstr()); + return -1; + } + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) { + sdbFreeRaw(pCommitRaw); + mError("failed to prepare trans rebalance since %s", terrstr()); + return -1; + } + + return TSDB_CODE_SUCCESS; +} + // todo: handle the database drop/stream drop case int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; SStreamHbMsg req = {0}; + int32_t code = TSDB_CODE_SUCCESS; - SDecoder decoder = {0}; + SDecoder decoder = {0}; tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen); if (tStartDecode(&decoder) < 0) return -1; @@ -1813,44 +1921,23 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } - { // build trans to update the epset + code = createStreamUpdateTrans(pMnode, pStream, nodeId, &newEpSet); + if (code != TSDB_CODE_SUCCESS) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); - if (pTrans == NULL) { - mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return -1; - } -// mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId); - - mndTransSetDbName(pTrans, "checkpoint", "checkpoint"); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, - tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); - mndTransDrop(pTrans); - return -1; - } - - void* pBuf = NULL; - int32_t len = 0; -// doBuildStreamTaskUpdateMsg(&pBuf, &len, nodeId, newEpSet); - - STransAction action = {0}; - action.epSet = /*mndGetVgroupEpset(pMnode, pVgObj)*/; - action.pCont = pBuf; - action.contLen = len; - action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE; - - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pBuf); - taosWUnLockLatch(&pStream->lock); - return -1; - } } taosWUnLockLatch(&pStream->lock); } } +// if (code == 0) { +// if (mndTransPrepare(pMnode, pTrans) != 0) { +// mError("failed to prepre trans rebalance since %s", terrstr()); +// } +// } +// mndTransDrop(pTrans); + return code; + mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a5820ea8b0..51ad795a2b 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -959,7 +959,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { return 0; } -int32_t tEncodeTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg) { +int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI32(pEncoder, pMsg->nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pMsg->epset) < 0) return -1;