diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 6aed50e508..c8a967620c 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -120,8 +120,9 @@ void destroyStreamTaskIter(SStreamTaskIter *pIter); bool streamTaskIterNextTask(SStreamTaskIter *pIter); SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter); void mndInitExecInfo(); -void removeExpiredNodeInfo(const SArray *pNodeSnapshot); -void removeTasksInBuf(SArray* pTaskIds, SStreamExecInfo* pExecInfo); +void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo); +int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); +void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index cad8c6d745..9252843d2f 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -680,6 +680,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { return NULL; } + mndInitStreamExecInfo(pMnode, &execInfo); + mInfo("mnode open successfully"); return pMnode; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5fc6e465ad..1f1eaa999e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -56,13 +56,14 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger); static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); -static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList); +static int32_t extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -static int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); +static void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); +static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo); +static void removeExpiredNodeInfo(const SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); @@ -131,9 +132,11 @@ int32_t mndInitStream(SMnode *pMnode) { if (sdbSetTable(pMnode->pSdb, table) != 0) { return -1; } + if (sdbSetTable(pMnode->pSdb, tableSeq) != 0) { return -1; } + return 0; } @@ -2169,7 +2172,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange return 0; } -static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) { +static int32_t extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; @@ -2215,48 +2218,6 @@ static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) { return TSDB_CODE_SUCCESS; } -static bool taskNodeExists(SArray *pList, int32_t nodeId) { - size_t num = taosArrayGetSize(pList); - - for (int32_t i = 0; i < num; ++i) { - SNodeEntry *pEntry = taosArrayGet(pList, i); - if (pEntry->nodeId == nodeId) { - return true; - } - } - - return false; -} - -int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { - SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); - - int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); - for (int32_t i = 0; i < numOfTask; ++i) { - STaskId *pId = taosArrayGet(execInfo.pTaskList, i); - - STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); - if (pEntry->nodeId == SNODE_HANDLE) { - continue; - } - - bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); - if (!existed) { - taosArrayPush(pRemovedTasks, pId); - } - } - - removeTasksInBuf(pRemovedTasks, &execInfo); - - mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), - (int32_t)taosArrayGetSize(execInfo.pTaskList)); - - removeExpiredNodeInfo(pNodeSnapshot); - - taosArrayDestroy(pRemovedTasks); - return 0; -} - // this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int32_t code = 0; @@ -2510,3 +2471,24 @@ static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) { } return code; } + +void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) { + addAllStreamTasksIntoBuf(pMnode, pExecInfo); + extractNodeListFromStream(pMnode, pExecInfo->pNodeList); +} + +void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) { + SSdb *pSdb = pMnode->pSdb; + SStreamObj *pStream = NULL; + void *pIter = NULL; + + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + break; + } + + saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo); + sdbRelease(pSdb, pStream); + } +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 42efb6589e..6f398fbc11 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -22,55 +22,6 @@ typedef struct SFailedCheckpointInfo { int32_t transId; } SFailedCheckpointInfo; -static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) { - SSdb *pSdb = pMnode->pSdb; - SStreamObj *pStream = NULL; - void *pIter = NULL; - - while (1) { - pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); - if (pIter == NULL) { - break; - } - - saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo); - sdbRelease(pSdb, pStream); - } -} - -static void removeDroppedStreamTasksInBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) { - if (pMnode == NULL) { - return; - } - - int32_t num = taosArrayGetSize(pExecInfo->pTaskList); - - SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - SArray *pIdList = taosArrayInit(4, sizeof(STaskId)); - - for (int32_t i = 0; i < num; ++i) { - STaskId* pId = taosArrayGet(pExecInfo->pTaskList, i); - - void* p = taosHashGet(pHash, &pId->streamId, sizeof(int64_t)); - if (p != NULL) { - continue; - } - - void* pObj = mndGetStreamObj(pMnode, pId->streamId); - if (pObj != NULL) { - mndReleaseStream(pMnode, pObj); - taosHashPut(pHash, &pId->streamId, sizeof(int64_t), NULL, 0); - } else { - taosArrayPush(pIdList, pId); - } - } - - removeTasksInBuf(pIdList, &execInfo); - - taosArrayDestroy(pIdList); - taosHashCleanup(pHash); -} - static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for (int32_t j = 0; j < numOfNodes; ++j) { @@ -290,17 +241,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); - // extract stream task list - if (taosHashGetSize(execInfo.pTaskMap) == 0) { - addAllStreamTasksIntoBuf(pMnode, &execInfo); - } else { - // the already dropped tasks may be added by hb from vnode at the time when the pTaskMap happens to be empty. - // let's drop them here. - removeDroppedStreamTasksInBuf(pMnode, &execInfo); - } - - extractStreamNodeList(pMnode); - int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index d138254afd..7e72f67f45 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -645,4 +645,46 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { taosThreadMutexUnlock(&pExecNode->lock); destroyStreamTaskIter(pIter); +} + +static bool taskNodeExists(SArray *pList, int32_t nodeId) { + size_t num = taosArrayGetSize(pList); + + for (int32_t i = 0; i < num; ++i) { + SNodeEntry *pEntry = taosArrayGet(pList, i); + if (pEntry->nodeId == nodeId) { + return true; + } + } + + return false; +} + +int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { + SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); + + int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); + for (int32_t i = 0; i < numOfTask; ++i) { + STaskId *pId = taosArrayGet(execInfo.pTaskList, i); + + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); + if (pEntry->nodeId == SNODE_HANDLE) { + continue; + } + + bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); + if (!existed) { + taosArrayPush(pRemovedTasks, pId); + } + } + + removeTasksInBuf(pRemovedTasks, &execInfo); + + mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), + (int32_t)taosArrayGetSize(execInfo.pTaskList)); + + removeExpiredNodeInfo(pNodeSnapshot); + + taosArrayDestroy(pRemovedTasks); + return 0; } \ No newline at end of file