diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 20777e12e0..b1b6dccf35 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -87,7 +87,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); -static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName); +static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName, size_t len); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); @@ -1259,7 +1259,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId); const char *pDb = mndGetStreamDB(pMnode); - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + mndTransSetDbName(pTrans, pDb, pDb); taosMemoryFree((void *)pDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -2406,7 +2406,10 @@ static void killAllCheckpointTrans(SMnode* pMnode, SVgroupChangeInfo* pChangeInf void* pIter = NULL; while((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) { char* pDb = (char*) pIter; - killActiveCheckpointTrans(pMnode, pDb); + + size_t len = 0; + void* pKey = taosHashGetKey(pDb, &len); + killActiveCheckpointTrans(pMnode, pKey, len); } } @@ -2641,9 +2644,9 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return TSDB_CODE_ACTION_IN_PROGRESS; } -int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName) { +int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t len) { // data in the hash table will be removed automatically, no need to remove it here. - SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, strlen(pDBName)); + SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len); if (pTransInfo == NULL) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9e929bb6c0..545d6310f7 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1068,6 +1068,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t vgId = pTask->pMeta->vgId; int32_t msgId = pTask->execInfo.dispatch; +#if 0 + // for test purpose, build the failure case + if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER) { + pRsp->inputStatus = TASK_INPUT_STATUS__REFUSED; + } +#endif + // follower not handle the dispatch rsp if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId);