From d355b78884be7e23ba197509aba97dac6268cb53 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 11:21:31 +0800 Subject: [PATCH] fix(stream): fix invalid read. --- source/dnode/mnode/impl/src/mndStream.c | 13 ++++++++----- source/libs/stream/src/streamDispatch.c | 7 +++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 30341f45c0..8b26e0a097 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -88,7 +88,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); -static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName); +static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName, size_t len); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); @@ -1297,7 +1297,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId); const char *pDb = mndGetStreamDB(pMnode); - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + mndTransSetDbName(pTrans, pDb, pDb); taosMemoryFree((void *)pDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -2464,7 +2464,10 @@ static void killAllCheckpointTrans(SMnode* pMnode, SVgroupChangeInfo* pChangeInf void* pIter = NULL; while((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) { char* pDb = (char*) pIter; - killActiveCheckpointTrans(pMnode, pDb); + + size_t len = 0; + void* pKey = taosHashGetKey(pDb, &len); + killActiveCheckpointTrans(pMnode, pKey, len); } } @@ -2699,9 +2702,9 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return TSDB_CODE_ACTION_IN_PROGRESS; } -int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName) { +int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t len) { // data in the hash table will be removed automatically, no need to remove it here. - SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, strlen(pDBName)); + SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len); if (pTransInfo == NULL) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 82affe71d9..b2f47c6d2a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1102,6 +1102,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t vgId = pTask->pMeta->vgId; int32_t msgId = pTask->execInfo.dispatch; +#if 0 + // for test purpose, build the failure case + if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER) { + pRsp->inputStatus = TASK_INPUT_STATUS__REFUSED; + } +#endif + // follower not handle the dispatch rsp if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId);