diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 98c82ffc05..23972ecb1b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -32,8 +32,6 @@ #define MND_STREAM_VER_NUMBER 3 #define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_MAX_NUM 60 -#define MND_STREAM_HB_INTERVAL 100 // 100 sec - typedef struct SNodeEntry { int32_t nodeId; @@ -44,6 +42,7 @@ typedef struct SNodeEntry { typedef struct SStreamVnodeRevertIndex { SArray* pDBList; SArray* pNodeEntryList; + int64_t ts; // snapshot ts } SStreamVnodeRevertIndex; static int32_t mndNodeCheckSentinel = 0; @@ -98,7 +97,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); - mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); +// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); @@ -863,7 +862,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SRpcMsg rpcMsg = { .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)}; - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); +// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); return 0; } @@ -1740,6 +1739,7 @@ typedef struct SVgroupChangeInfo { } SVgroupChangeInfo; static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo) { + pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); } @@ -1821,7 +1821,6 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr } mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid); - ASSERT(0); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -1856,7 +1855,25 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr } taosWUnLockLatch(&pStream->lock); - return mndPersistTransLog(pStream, pTrans); + + int32_t code = mndPersistTransLog(pStream, pTrans); + if (code != TSDB_CODE_SUCCESS) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + + return TSDB_CODE_ACTION_IN_PROGRESS; } // todo. 1. multiple change, 2. replica change problem @@ -1943,6 +1960,7 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) { continue; } + mDebug("stream:0x%"PRIx64" involved node changed, create update trans", pStream->uid); int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo); if (code != TSDB_CODE_SUCCESS) { // todo @@ -1952,7 +1970,54 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) { return 0; } +static SArray* doExtractNodeList(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + SStreamObj *pStream = NULL; + void *pIter = NULL; + + SHashObj* pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + break; + } + + taosWLockLatch(&pStream->lock); + int32_t numOfLevels = taosArrayGetSize(pStream->tasks); + + for (int32_t j = 0; j < numOfLevels; ++j) { + SArray *pLevel = taosArrayGetP(pStream->tasks, j); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t k = 0; k < numOfTasks; ++k) { + SStreamTask *pTask = taosArrayGetP(pLevel, k); + SNodeEntry entry = {0}; + epsetAssign(&entry.epset, &pTask->info.epSet); + entry.nodeId = pTask->info.nodeId; + entry.hbTimestamp = -1; + + taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry)); + } + } + + taosWUnLockLatch(&pStream->lock); + } + + SArray* plist = taosArrayInit(taosHashGetSize(pHash), sizeof(SNodeEntry)); + + // convert to list + pIter = NULL; + while((pIter = taosHashIterate(pHash, pIter)) != NULL) { + SNodeEntry* pEntry = (SNodeEntry*) pIter; + taosArrayPush(plist, pEntry); + } + taosHashCleanup(pHash); + + return plist; +} + static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { + return 0; int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1); if (old != 0) { mDebug("still in checking node change"); @@ -1962,98 +2027,111 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { mDebug("start to do node change checking"); SMnode *pMnode = pMsg->info.node; + if (execNodeList.pNodeEntryList == NULL) { + execNodeList.pNodeEntryList = doExtractNodeList(pMnode); + } + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); + int64_t ts = taosGetTimestampSec(); SVgroupChangeInfo changeInfo = mndFindChangedVgroupInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { mndProcessVgroupChange(pMnode, &changeInfo); - } + taosArrayDestroy(changeInfo.pUpdateNodeList); + taosHashCleanup(changeInfo.pDBMap); + + // keep the new vnode snapshot + taosArrayDestroy(execNodeList.pNodeEntryList); + execNodeList.pNodeEntryList = pNodeSnapshot; + execNodeList.ts = ts; + mDebug("end to do node change checking"); + atomic_store_32(&mndNodeCheckSentinel, 0); return 0; } // todo: this process should be executed by the write queue worker of the mnode -int32_t mndProcessStreamHb(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - SStreamHbMsg req = {0}; - int32_t code = TSDB_CODE_SUCCESS; - - SDecoder decoder = {0}; - tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen); - - if (tStartDecode(&decoder) < 0) return -1; - - if (tDecodeStreamHbMsg(&decoder, &req) < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - int64_t now = taosGetTimestampSec(); - mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); - - // timeout list - bool nodeChanged = false; - SArray* pList = taosArrayInit(4, sizeof(int32_t)); - - // record the timeout node - for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { - SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i); - int64_t duration = now - pEntry->hbTimestamp; - if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next - taosArrayPush(pList, &pEntry); - mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration); - continue; - } - - if (pEntry->nodeId != req.vgId) { - continue; - } - - pEntry->hbTimestamp = now; - - // check epset to identify whether the node has been transferred to other dnodes. - // node the epset is changed, which means the node transfer has occurred for this node. -// if (!isEpsetEqual(&pEntry->epset, &req.epset)) { -// nodeChanged = true; -// break; +//int32_t mndProcessStreamHb(SRpcMsg *pReq) { +// SMnode *pMnode = pReq->info.node; +// SSdb *pSdb = pMnode->pSdb; +// SStreamHbMsg req = {0}; +// int32_t code = TSDB_CODE_SUCCESS; +// +// SDecoder decoder = {0}; +// tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen); +// +// if (tStartDecode(&decoder) < 0) return -1; +// +// if (tDecodeStreamHbMsg(&decoder, &req) < 0) { +// terrno = TSDB_CODE_INVALID_MSG; +// return -1; +// } +// +// int64_t now = taosGetTimestampSec(); +// mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); +// +// // timeout list +// bool nodeChanged = false; +// SArray* pList = taosArrayInit(4, sizeof(int32_t)); +// +// // record the timeout node +// for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { +// SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i); +// int64_t duration = now - pEntry->hbTimestamp; +// if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next +// taosArrayPush(pList, &pEntry); +// mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration); +// continue; // } - } - - // todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode, - // to identify whether the dnode is truely offline or not. - - // handle the node changed case - if (!nodeChanged) { - return TSDB_CODE_SUCCESS; - } - - int32_t nodeId = req.vgId; - - {// check all streams that involved this vnode should update the epset info - SStreamObj *pStream = NULL; - void *pIter = NULL; - while (1) { - pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); - if (pIter == NULL) { - break; - } - - // update the related upstream and downstream tasks, todo remove this, no need this function - taosWLockLatch(&pStream->lock); -// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset); -// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset); - taosWUnLockLatch(&pStream->lock); - -// code = createStreamUpdateTrans(pMnode, pStream, nodeId, ); -// if (code != TSDB_CODE_SUCCESS) { -// todo +// +// if (pEntry->nodeId != req.vgId) { +// continue; +// } +// +// pEntry->hbTimestamp = now; +// +// // check epset to identify whether the node has been transferred to other dnodes. +// // node the epset is changed, which means the node transfer has occurred for this node. +//// if (!isEpsetEqual(&pEntry->epset, &req.epset)) { +//// nodeChanged = true; +//// break; +//// } +// } +// +// // todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode, +// // to identify whether the dnode is truely offline or not. +// +// // handle the node changed case +// if (!nodeChanged) { +// return TSDB_CODE_SUCCESS; +// } +// +// int32_t nodeId = req.vgId; +// +// {// check all streams that involved this vnode should update the epset info +// SStreamObj *pStream = NULL; +// void *pIter = NULL; +// while (1) { +// pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); +// if (pIter == NULL) { +// break; // } - } - } - - return TSDB_CODE_SUCCESS; -} +// +// // update the related upstream and downstream tasks, todo remove this, no need this function +// taosWLockLatch(&pStream->lock); +//// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset); +//// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset); +// taosWUnLockLatch(&pStream->lock); +// +//// code = createStreamUpdateTrans(pMnode, pStream, nodeId, ); +//// if (code != TSDB_CODE_SUCCESS) { +//// todo +//// } +// } +// } +// +// return TSDB_CODE_SUCCESS; +//} diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 00fdb6e1ae..ec0f4eb5bc 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -658,6 +658,7 @@ int32_t streamTryExec(SStreamTask* pTask) { qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); + // the inputQ is empty due to the checkpoint process, so we need to scan data from WAL here. if ((!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) { streamSchedExec(pTask); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 4a746f4961..c784d980df 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -547,6 +547,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { } void metaHbToMnode(void* param, void* tmrId) { +#if 0 SStreamMeta* pMeta = param; SStreamHbMsg hbMsg = {0}; @@ -592,4 +593,5 @@ void metaHbToMnode(void* param, void* tmrId) { // next hb will be issued in 20sec. taosTmrReset(metaHbToMnode, 20000, pMeta, streamEnv.timer, pMeta->hbTmr); +#endif } \ No newline at end of file