diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a60b916401..a00ede7270 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -284,6 +284,8 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream pOldStream->status = pNewStream->status; pOldStream->updateTime = pNewStream->updateTime; + pOldStream->checkpointId = pNewStream->checkpointId; + pOldStream->checkpointFreq = pNewStream->checkpointFreq; taosWUnLockLatch(&pOldStream->lock); return 0; @@ -1518,7 +1520,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB char dstStr[20] = {0}; STR_TO_VARSTR(dstStr, sinkQuota) pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false); + colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; strcpy(scanHistoryIdle, "100a"); @@ -1526,7 +1528,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB memset(dstStr, 0, tListLen(dstStr)); STR_TO_VARSTR(dstStr, scanHistoryIdle) pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false); + colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); numOfRows++; sdbRelease(pSdb, pStream); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 791f5ecfed..2ab710176d 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -106,7 +106,6 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) { memcpy(pHdr->data, rowData, len); taosMemoryFree(rowData); tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode)); - taosMemoryFree(rowData); return code; _err: diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 37630c25ea..7e66885620 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -288,7 +288,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { taosThreadMutexLock(&p->lock); - ASSERT(p->chkInfo.checkpointId < p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId && + ASSERT(p->chkInfo.checkpointId <= p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId && p->chkInfo.checkpointVer <= p->chkInfo.processedVer); p->chkInfo.checkpointId = p->chkInfo.checkpointingId; p->chkInfo.checkpointVer = p->chkInfo.processedVer;