From 947775155b54e5f64903979d4df27b4509be0769 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 21 Nov 2023 20:51:11 +0800 Subject: [PATCH] fix transfer crash --- source/dnode/mnode/impl/src/mndStream.c | 6 ++++-- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 1 - source/libs/stream/src/streamCheckpoint.c | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a60b916401..a00ede7270 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -284,6 +284,8 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream pOldStream->status = pNewStream->status; pOldStream->updateTime = pNewStream->updateTime; + pOldStream->checkpointId = pNewStream->checkpointId; + pOldStream->checkpointFreq = pNewStream->checkpointFreq; taosWUnLockLatch(&pOldStream->lock); return 0; @@ -1518,7 +1520,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB char dstStr[20] = {0}; STR_TO_VARSTR(dstStr, sinkQuota) pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false); + colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; strcpy(scanHistoryIdle, "100a"); @@ -1526,7 +1528,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB memset(dstStr, 0, tListLen(dstStr)); STR_TO_VARSTR(dstStr, scanHistoryIdle) pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false); + colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); numOfRows++; sdbRelease(pSdb, pStream); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 791f5ecfed..2ab710176d 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -106,7 +106,6 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) { memcpy(pHdr->data, rowData, len); taosMemoryFree(rowData); tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode)); - taosMemoryFree(rowData); return code; _err: diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 37630c25ea..7e66885620 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -288,7 +288,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { taosThreadMutexLock(&p->lock); - ASSERT(p->chkInfo.checkpointId < p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId && + ASSERT(p->chkInfo.checkpointId <= p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId && p->chkInfo.checkpointVer <= p->chkInfo.processedVer); p->chkInfo.checkpointId = p->chkInfo.checkpointingId; p->chkInfo.checkpointVer = p->chkInfo.processedVer;