diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bfed35bc2f..e70aed79f7 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -83,14 +83,14 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name); -static STrans *doCreateTrans1(SMnode *pMnode, const char *name, const char* pDbName); static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); -static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode); -static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode); +static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode); +static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); +static int32_t doKillActiveCheckpointTrans(SMnode *pMnode); int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { @@ -1189,7 +1189,6 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } } taosThreadMutexUnlock(&execNodeList.lock); - if (!ready) { return 0; } @@ -1203,7 +1202,8 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return -1; } - mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId); + + mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId); const char *pDb = mndGetStreamDB(pMnode); mndTransSetDbName(pTrans, pDb, "checkpoint"); @@ -2082,8 +2082,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange // check all streams that involved this vnode should update the epset info SStreamObj *pStream = NULL; void *pIter = NULL; - - STrans *pTrans = NULL; + STrans *pTrans = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -2091,6 +2090,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange break; } + // here create only one trans if (pTrans == NULL) { pTrans = doCreateTrans(pMnode, pStream, "stream-task-update"); if (pTrans == NULL) { @@ -2137,8 +2137,6 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - // return TSDB_CODE_ACTION_IN_PROGRESS; - return 0; } @@ -2295,7 +2293,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { if (execNodeList.pNodeEntryList != NULL) { execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); } - execNodeList.pNodeEntryList = extractNodeListFromStream(pMnode); } @@ -2313,6 +2310,9 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { + + // kill current active checkpoint transaction, since the transaction is vnode wide. + doKillActiveCheckpointTrans(pMnode); code = mndProcessVgroupChange(pMnode, &changeInfo); // keep the new vnode snapshot @@ -2498,29 +2498,26 @@ int32_t createStreamResetStatusTrans(SMnode* pMnode, SStreamObj* pStream) { return TSDB_CODE_ACTION_IN_PROGRESS; } -int32_t mndResetFromCheckpoint(SMnode* pMnode) { - // find the checkpoint trans id +int32_t doKillActiveCheckpointTrans(SMnode *pMnode) { int32_t transId = 0; + SSdb *pSdb = pMnode->pSdb; + STrans *pTrans = NULL; + void *pIter = NULL; - { - SSdb *pSdb = pMnode->pSdb; - STrans *pTrans = NULL; - void* pIter = NULL; - while (1) { - pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans); - if (pIter == NULL) { - break; - } - - if (strncmp(pTrans->opername, MND_STREAM_CHECKPOINT_NAME, tListLen(pTrans->opername) - 1) == 0) { - transId = pTrans->id; - sdbRelease(pSdb, pTrans); - sdbCancelFetch(pSdb, pIter); - break; - } - - sdbRelease(pSdb, pTrans); + while (1) { + pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans); + if (pIter == NULL) { + break; } + + if (strncmp(pTrans->opername, MND_STREAM_CHECKPOINT_NAME, tListLen(pTrans->opername) - 1) == 0) { + transId = pTrans->id; + sdbRelease(pSdb, pTrans); + sdbCancelFetch(pSdb, pIter); + break; + } + + sdbRelease(pSdb, pTrans); } if (transId == 0) { @@ -2528,8 +2525,16 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) { return TSDB_CODE_SUCCESS; } - STrans* pTrans = mndAcquireTrans(pMnode, transId); + pTrans = mndAcquireTrans(pMnode, transId); + mInfo("kill checkpoint trans:%d", transId); + mndKillTrans(pMnode, pTrans); + mndReleaseTrans(pMnode, pTrans); + return TSDB_CODE_SUCCESS; +} + +int32_t mndResetFromCheckpoint(SMnode* pMnode) { + doKillActiveCheckpointTrans(pMnode); // set all tasks status to be normal, refactor later to be stream level, instead of vnode level. SSdb *pSdb = pMnode->pSdb; @@ -2541,6 +2546,7 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) { break; } + // todo this transaction should exist be only one mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid); int32_t code = createStreamResetStatusTrans(pMnode, pStream); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e6378d309e..750f5d6a43 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1116,16 +1116,16 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ASSERT(leftRsp >= 0); if (leftRsp > 0) { - stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d rsp", - id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp); + stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting for %d rsp", + id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), leftRsp); } else { stDebug( - "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp", - id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); + "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, all rsp", + id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code)); } } else { - stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d", - id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); + stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s", + id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code)); } ASSERT(leftRsp >= 0);