diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 72d3ac5f95..eabf4d8e4a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -83,6 +83,9 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); +static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode); +static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode); + int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_STREAM, @@ -1280,7 +1283,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); return -1; } - // mndTransSetSerial(pTrans); // drop all tasks if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { @@ -1304,13 +1306,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } + removeStreamTasksInBuf(pStream, &execNodeList); + char detail[100] = {0}; sprintf(detail, "igNotExists:%d", dropReq.igNotExists); SName name = {0}; tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB); - //reuse this function for stream - auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, "", detail); sdbRelease(pMnode->pSdb, pStream); @@ -2238,7 +2240,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { return 0; } -static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) { +void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) { int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); @@ -2261,6 +2263,25 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p } } +void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) { + int32_t level = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < level; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < numOfTasks; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); + if (p != NULL) { + taosArrayRemove(pExecNode->pTaskList, *(int32_t*)p); + taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id)); + } + } + } +} + // todo: this process should be executed by the write queue worker of the mnode int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; @@ -2277,7 +2298,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } tDecoderClear(&decoder); - // int64_t now = taosGetTimestampSec(); mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); taosThreadMutexLock(&execNodeList.lock);