diff --git a/include/common/tglobal.h b/include/common/tglobal.h index ceb3d62bb3..44f097eb60 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -183,6 +183,7 @@ extern int64_t tsWalFsyncDataSizeLimit; extern int32_t tsTransPullupInterval; extern int32_t tsMqRebalanceInterval; extern int32_t tsStreamCheckpointTickInterval; +extern int32_t tsStreamNodeCheckInterval; extern int32_t tsTtlUnit; extern int32_t tsTtlPushInterval; extern int32_t tsGrantHBInterval; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 0065caf3b3..19760ff2f0 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -156,6 +156,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT_HB_TIMER, "grant-hb-tmr", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_NODECHECK_TIMER, "node-check-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4c886178df..0891aaf42f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -537,7 +537,6 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea typedef struct { int32_t vgId; - SEpSet epset; int32_t numOfTasks; } SStreamHbMsg; @@ -555,15 +554,20 @@ typedef struct { int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq); int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq); +typedef struct { + int32_t nodeId; + SEpSet prevEp; + SEpSet newEp; +} SNodeUpdateInfo; + typedef struct { int64_t streamId; int32_t taskId; - int32_t nodeId; - SEpSet epset; -} SStreamTaskUpdateMsg; + SArray* pNodeList; // SArray +} SStreamTaskNodeUpdateMsg; -int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateMsg* pMsg); -int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateMsg* pMsg); +int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg); +int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg); typedef struct { int64_t streamId; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index b8455df9a9..c67e94fd8a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -223,6 +223,7 @@ int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L); int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; int32_t tsStreamCheckpointTickInterval = 20; +int32_t tsStreamNodeCheckInterval = 10; int32_t tsTtlUnit = 86400; int32_t tsTtlPushInterval = 3600; int32_t tsGrantHBInterval = 60; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index d47bd78427..c739636a84 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -120,7 +120,6 @@ static void mndPullupTtl(SMnode *pMnode) { } static void mndCalMqRebalance(SMnode *pMnode) { - mTrace("calc mq rebalance"); int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { @@ -133,11 +132,16 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { int32_t contLen = 0; void *pReq = mndBuildCheckpointTickMsg(&contLen, sec); if (pReq != NULL) { - SRpcMsg rpcMsg = { - .msgType = TDMT_MND_STREAM_CHECKPOINT_TIMER, - .pCont = pReq, - .contLen = contLen, - }; + SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_TIMER, .pCont = pReq, .contLen = contLen}; + tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); + } +} + +static void mndStreamCheckNode(SMnode* pMnode) { + int32_t contLen = 0; + void *pReq = mndBuildTimerMsg(&contLen); + if (pReq != NULL) { + SRpcMsg rpcMsg = {.msgType = TDMT_MND_NODECHECK_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } } @@ -268,6 +272,10 @@ static void *mndThreadFp(void *param) { mndStreamCheckpointTick(pMnode, sec); } + if (sec % tsStreamNodeCheckInterval == 0) { + mndStreamCheckNode(pMnode); + } + if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) { mndPullupTelem(pMnode); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d23efe175d..98c82ffc05 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -34,6 +34,7 @@ #define MND_STREAM_MAX_NUM 60 #define MND_STREAM_HB_INTERVAL 100 // 100 sec + typedef struct SNodeEntry { int32_t nodeId; SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. @@ -41,10 +42,11 @@ typedef struct SNodeEntry { } SNodeEntry; typedef struct SStreamVnodeRevertIndex { -// SHashObj* pVnodeMap; + SArray* pDBList; SArray* pNodeEntryList; } SStreamVnodeRevertIndex; +static int32_t mndNodeCheckSentinel = 0; static SStreamVnodeRevertIndex execNodeList; static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); @@ -66,6 +68,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq); static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq); static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, int64_t streamId, int32_t taskId); +static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans); static void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_t msgType, const SEpSet* pEpset); @@ -83,6 +86,8 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); + mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheckReq); + /*mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq);*/ mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); @@ -1729,10 +1734,18 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static int32_t doBuildStreamTaskUpdateMsg(void** pBuf, int32_t* pLen, int32_t nodeId, const SEpSet* pEpset) { - SStreamTaskUpdateMsg req = {0}; - req.nodeId = nodeId; - req.epset = *pEpset; +typedef struct SVgroupChangeInfo { + SHashObj* pDBMap; + SArray* pUpdateNodeList; //SArray +} SVgroupChangeInfo; + +static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo) { + taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); +} + +static int32_t doBuildStreamTaskUpdateMsg(void** pBuf, int32_t* pLen, int32_t nodeId, SVgroupChangeInfo* pInfo) { + SStreamTaskNodeUpdateMsg req = {0}; + initNodeUpdateMsg(&req, pInfo); int32_t code = 0; int32_t blen; @@ -1800,7 +1813,7 @@ void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_ } // build trans to update the epset -static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, int32_t nodeId, SEpSet *pEpset) { +static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo* pInfo) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-task-update"); if (pTrans == NULL) { mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); @@ -1830,11 +1843,10 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, int3 void *pBuf = NULL; int32_t len = 0; - doBuildStreamTaskUpdateMsg(&pBuf, &len, nodeId, pEpset); + doBuildStreamTaskUpdateMsg(&pBuf, &len, pTask->info.nodeId, pInfo); STransAction action = {0}; initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pBuf); taosWUnLockLatch(&pStream->lock); @@ -1847,6 +1859,122 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, int3 return mndPersistTransLog(pStream, pTrans); } +// todo. 1. multiple change, 2. replica change problem +static SVgroupChangeInfo mndFindChangedVgroupInfo(SMnode *pMnode, const SArray *pPrevVgroupList, + const SArray *pVgroupList) { + SVgroupChangeInfo info = { + .pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)), + .pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK), + }; + + int32_t numOfVgroups = taosArrayGetSize(pPrevVgroupList); + for (int32_t i = 0; i < numOfVgroups; ++i) { + SNodeEntry *pPrevEntry = taosArrayGet(pPrevVgroupList, i); + + int32_t num = taosArrayGetSize(pVgroupList); + for (int32_t j = 0; j < num; ++j) { + SNodeEntry *pCurrent = taosArrayGet(pVgroupList, j); + if (pCurrent->nodeId == pPrevEntry->nodeId) { + // todo handle the replica change problem. + if (!isEpsetEqual(&pCurrent->epset, &pPrevEntry->epset)) { + SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId}; + epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset); + epsetAssign(&updateInfo.newEp, &pCurrent->epset); + taosArrayPush(info.pUpdateNodeList, &updateInfo); + + SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId); + taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0); + mndReleaseVgroup(pMnode, pVgroup); + } + break; + } + } + } + + return info; +} + +static SArray* mndTakeVgroupSnapshot(SMnode* pMnode) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SVgObj *pVgroup = NULL; + + SArray* pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry)); + + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) { + break; + } + + SNodeEntry entry = {0}; + entry.epset = mndGetVgroupEpset(pMnode, pVgroup); + entry.nodeId = pVgroup->vgId; + entry.hbTimestamp = -1; + + taosArrayPush(pVgroupListSnapshot, &entry); + sdbRelease(pSdb, pVgroup); + } + + return pVgroupListSnapshot; +} + +int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) { + SSdb *pSdb = pMnode->pSdb; + + // check all streams that involved this vnode should update the epset info + SStreamObj *pStream = NULL; + void *pIter = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + break; + } + + // update the related upstream and downstream tasks, todo remove this, no need this function + // taosWLockLatch(&pStream->lock); + // streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset); + // streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset); + // taosWUnLockLatch(&pStream->lock); + void* p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); + void* p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb)); + if (p == NULL && p1 == NULL) { + mndReleaseStream(pMnode, pStream); + continue; + } + + int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo); + if (code != TSDB_CODE_SUCCESS) { + // todo + } + } + + return 0; +} + +static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { + int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1); + if (old != 0) { + mDebug("still in checking node change"); + return 0; + } + + mDebug("start to do node change checking"); + + SMnode *pMnode = pMsg->info.node; + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); + + SVgroupChangeInfo changeInfo = mndFindChangedVgroupInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); + + if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { + mndProcessVgroupChange(pMnode, &changeInfo); + + } + + mDebug("end to do node change checking"); + return 0; +} + // todo: this process should be executed by the write queue worker of the mnode int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; @@ -1889,10 +2017,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // check epset to identify whether the node has been transferred to other dnodes. // node the epset is changed, which means the node transfer has occurred for this node. - if (!isEpsetEqual(&pEntry->epset, &req.epset)) { - nodeChanged = true; - break; - } +// if (!isEpsetEqual(&pEntry->epset, &req.epset)) { +// nodeChanged = true; +// break; +// } } // todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode, @@ -1904,7 +2032,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } int32_t nodeId = req.vgId; - SEpSet newEpSet = req.epset; {// check all streams that involved this vnode should update the epset info SStreamObj *pStream = NULL; @@ -1917,14 +2044,14 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // update the related upstream and downstream tasks, todo remove this, no need this function taosWLockLatch(&pStream->lock); - streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset); - streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset); +// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset); +// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset); taosWUnLockLatch(&pStream->lock); - code = createStreamUpdateTrans(pMnode, pStream, nodeId, &newEpSet); - if (code != TSDB_CODE_SUCCESS) { - // todo - } +// code = createStreamUpdateTrans(pMnode, pStream, nodeId, ); +// if (code != TSDB_CODE_SUCCESS) { +// todo +// } } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 60645602fe..8b3cfb2d9b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1822,7 +1822,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t code = 0; - SStreamTaskUpdateMsg req = {0}; + SStreamTaskNodeUpdateMsg req = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 58cd613209..c3695e9a43 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -973,22 +973,40 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { return 0; } -int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateMsg* pMsg) { +int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pMsg->nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pMsg->epset) < 0) return -1; + + int32_t size = taosArrayGetSize(pMsg->pNodeList); + if (tEncodeI32(pEncoder, size) < 0) return -1; + + for (int32_t i = 0; i < size; ++i) { + SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i); + if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pInfo->prevEp) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pInfo->newEp) < 0) return -1; + } tEndEncode(pEncoder); return pEncoder->pos; } -int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateMsg* pMsg) { +int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pMsg->nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pMsg->epset) < 0) return -1; + + int32_t size = 0; + if (tDecodeI32(pDecoder, &size) < 0) return -1; + pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo)); + for (int32_t i = 0; i < size; ++i) { + SNodeUpdateInfo info = {0}; + if (tDecodeI32(pDecoder, &info.nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &info.prevEp) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &info.newEp) < 0) return -1; + taosArrayPush(pMsg->pNodeList, &info); + } + tEndDecode(pDecoder); return 0; } \ No newline at end of file diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 6b99d52f98..4a746f4961 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -93,7 +93,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->expandFunc = expandFunc; // send heartbeat every 20sec. -// pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, ahandle, streamEnv.timer); + pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, pMeta, streamEnv.timer); pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -534,7 +534,6 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pReq->epset) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } @@ -543,13 +542,12 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pReq->epset) < 0) return -1; tEndDecode(pDecoder); return 0; } void metaHbToMnode(void* param, void* tmrId) { - STQ* pMeta = param; + SStreamMeta* pMeta = param; SStreamHbMsg hbMsg = {0}; taosRLockLatch(&pMeta->lock); @@ -558,7 +556,6 @@ void metaHbToMnode(void* param, void* tmrId) { hbMsg.numOfTasks = numOfTasks; hbMsg.vgId = pMeta->vgId; - hbMsg.epset = ; int32_t code = 0; int32_t tlen = 0; @@ -592,4 +589,7 @@ void metaHbToMnode(void* param, void* tmrId) { qDebug("vgId:%d, build and send hb to mnode", pMeta->mgmtInfo.mnodeId); tmsgSendReq(&pMeta->mgmtInfo.epset, &msg); + + // next hb will be issued in 20sec. + taosTmrReset(metaHbToMnode, 20000, pMeta, streamEnv.timer, pMeta->hbTmr); } \ No newline at end of file