From 9ba8e6bd345296694f2dc7a7510a02c1438dd858 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 09:26:06 +0800 Subject: [PATCH 1/9] refactor: add logs for drop streams. --- source/dnode/mnode/impl/src/mndStream.c | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c87b8e84f4..f5cccbbeff 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1213,27 +1213,31 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SMDropStreamReq dropReq = {0}; if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) { + mError("invalid drop stream msg recv, discarded"); terrno = TSDB_CODE_INVALID_MSG; return -1; } - pStream = mndAcquireStream(pMnode, dropReq.name); + mDebug("recv drop stream:%s msg", dropReq.name); + pStream = mndAcquireStream(pMnode, dropReq.name); if (pStream == NULL) { if (dropReq.igNotExists) { - mInfo("stream:%s not exist, ignore not exist is set", dropReq.name); + mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name); sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); return 0; } else { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - mError("stream:%s not exist failed to drop", dropReq.name); + mError("stream:%s not exist failed to drop it", dropReq.name); tFreeMDropStreamReq(&dropReq); return -1; } } if (pStream->smaId != 0) { + mDebug("stream:%s try to drop sma related stream", dropReq.name); + void *pIter = NULL; SSmaObj *pSma = NULL; pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma); @@ -1241,13 +1245,21 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (pSma && pSma->uid == pStream->smaId) { sdbRelease(pMnode->pSdb, pSma); sdbRelease(pMnode->pSdb, pStream); + sdbCancelFetch(pMnode->pSdb, pIter); tFreeMDropStreamReq(&dropReq); terrno = TSDB_CODE_TSMA_MUST_BE_DROPPED; + + mError("try to drop sma-related stream:%s, code:%s only allowed to be dropped along with sma", dropReq.name, + tstrerror(terrno)); return -1; } - if (pSma) sdbRelease(pMnode->pSdb, pSma); - pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma); + + if (pSma) { + sdbRelease(pMnode->pSdb, pSma); + } + + pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma); } } @@ -1307,6 +1319,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mndKillTransImpl(pMnode, transId, pStream->sourceDb); } + mDebug("stream:%s transId:%d start to drop related task when dropping stream", dropReq.name, transId); removeStreamTasksInBuf(pStream, &execInfo); SName name = {0}; From 54e9522c62842c267be0102bf2c7e4a27e1e4cf7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 09:37:53 +0800 Subject: [PATCH 2/9] refactor(stream): opt stream sink perf. --- source/dnode/vnode/src/tq/tqSink.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 25c2f32307..af8f567200 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -333,9 +333,10 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c } while (j < newLen && k < oldLen) { - SRow* pNewRow = taosArrayGetP(pNew->aRowP, j); - SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k); - if (pNewRow->ts < pOldRow->ts) { + SRow* pNewRow = TARRAY_GET_ELEM(pNew->aRowP, j); + SRow* pOldRow = TARRAY_GET_ELEM(pExisted->aRowP, k); + + if (pNewRow->ts <= pOldRow->ts) { taosArrayPush(pFinal, &pNewRow); j += 1; } else if (pNewRow->ts > pOldRow->ts) { @@ -373,12 +374,12 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c } while (j < newLen) { - SRow* pRow = taosArrayGetP(pNew->aRowP, j++); + SRow* pRow = TARRAY_GET_ELEM(pNew->aRowP, j++); taosArrayPush(pFinal, &pRow); } while (k < oldLen) { - SRow* pRow = taosArrayGetP(pExisted->aRowP, k++); + SRow* pRow = TARRAY_GET_ELEM(pExisted->aRowP, k++); taosArrayPush(pFinal, &pRow); } From cd6261c8d4c4d8d3881555f150e323c3ba6cdaf9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 09:49:11 +0800 Subject: [PATCH 3/9] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckStatus.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index ea9b2ef89f..8b9de2df4c 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -35,6 +35,7 @@ static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId); static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); +static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId); static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); // check status @@ -524,7 +525,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { // the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread. // The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution // of restart in timer thread will result in a dead lock. -static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { +int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -614,8 +615,8 @@ void rspMonitorFn(void* param, void* tmrId) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " - "detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); streamTaskCompleteCheckRsp(pInfo, false, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); @@ -630,9 +631,9 @@ void rspMonitorFn(void* param, void* tmrId) { if (pInfo->stopCheckProcess == 1) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( - "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, " - "fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, " + "notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); streamTaskCompleteCheckRsp(pInfo, false, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); From 973ca3be805a247351478ecb964827150bc4825f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 09:51:40 +0800 Subject: [PATCH 4/9] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckStatus.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 8b9de2df4c..d356a504c6 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -384,6 +384,8 @@ int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskI } void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { + const char* id = pTask->id.idStr; + SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, .upstreamTaskId = pTask->id.taskId, @@ -398,7 +400,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64, - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); + id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet); } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -413,8 +415,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64, - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, - p->reqId); + id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); break; } From 9c39d11ea056df33b910118d909830f3d8425c4b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 09:55:56 +0800 Subject: [PATCH 5/9] other: add some logs for drop streams. --- source/dnode/mnode/impl/src/mndStream.c | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f5cccbbeff..693e37a0ce 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1236,7 +1236,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { } if (pStream->smaId != 0) { - mDebug("stream:%s try to drop sma related stream", dropReq.name); + mDebug("stream:%s, uid:0x%"PRIx64" try to drop sma related stream", dropReq.name, pStream->uid); void *pIter = NULL; SSmaObj *pSma = NULL; @@ -1250,8 +1250,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { tFreeMDropStreamReq(&dropReq); terrno = TSDB_CODE_TSMA_MUST_BE_DROPPED; - mError("try to drop sma-related stream:%s, code:%s only allowed to be dropped along with sma", dropReq.name, - tstrerror(terrno)); + mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma", + dropReq.name, pStream->uid, tstrerror(terrno)); return -1; } @@ -1279,7 +1279,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream"); if (pTrans == NULL) { - mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); + mError("stream:%s uid:0x%"PRIx64" failed to drop since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); return -1; @@ -1289,7 +1289,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { // drop all tasks if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) { - mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); + mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); tFreeMDropStreamReq(&dropReq); @@ -1315,11 +1315,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { // kill the related checkpoint trans int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid); if (transId != 0) { - mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name); + mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid); mndKillTransImpl(pMnode, transId, pStream->sourceDb); } - mDebug("stream:%s transId:%d start to drop related task when dropping stream", dropReq.name, transId); + mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name, + pStream->uid, transId); + removeStreamTasksInBuf(pStream, &execInfo); SName name = {0}; From d3e676b0843df0e90c8a3626e482f2862e9a7f17 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 10:14:32 +0800 Subject: [PATCH 6/9] fix(cos): fix syntax error. --- source/dnode/mnode/impl/src/mndStream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 693e37a0ce..e8c22c8de2 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1279,7 +1279,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream"); if (pTrans == NULL) { - mError("stream:%s uid:0x%"PRIx64" failed to drop since %s", dropReq.name, terrstr()); + mError("stream:%s uid:0x%"PRIx64" failed to drop since %s", dropReq.name, pStream->uid, terrstr()); sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); return -1; From 4c27af9b13837a5ee0cc5b66c9b21e0baa450df0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 13:25:39 +0800 Subject: [PATCH 7/9] refactor: do some internal refactor. --- source/common/src/systable.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 7 +++++++ source/dnode/vnode/src/tq/tqSink.c | 8 ++++---- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index bf2f14339d..9de682dd3a 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -173,6 +173,7 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "checkpoint_interval", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e8c22c8de2..b52f98adbf 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1503,6 +1503,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false); + // sink_quota char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0}; sinkQuota[0] = '0'; char dstStr[20] = {0}; @@ -1510,6 +1511,12 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); + // checkpoint interval + char tmp[20 + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(tmp, "none") + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false); + // checkpoint backup type char backup[20 + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(backup, "none") diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index af8f567200..34171a5872 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -333,8 +333,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c } while (j < newLen && k < oldLen) { - SRow* pNewRow = TARRAY_GET_ELEM(pNew->aRowP, j); - SRow* pOldRow = TARRAY_GET_ELEM(pExisted->aRowP, k); + SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j); + SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k); if (pNewRow->ts <= pOldRow->ts) { taosArrayPush(pFinal, &pNewRow); @@ -374,12 +374,12 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c } while (j < newLen) { - SRow* pRow = TARRAY_GET_ELEM(pNew->aRowP, j++); + SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++); taosArrayPush(pFinal, &pRow); } while (k < oldLen) { - SRow* pRow = TARRAY_GET_ELEM(pExisted->aRowP, k++); + SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++); taosArrayPush(pFinal, &pRow); } From d73fd5492a8dcebac43ce37164eed071c2424c53 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 14:24:20 +0800 Subject: [PATCH 8/9] fix(test): update test case. --- source/dnode/mnode/impl/src/mndStream.c | 4 +++- tests/system-test/0-others/information_schema.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b52f98adbf..fb15e4b857 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1513,7 +1513,9 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB // checkpoint interval char tmp[20 + VARSTR_HEADER_SIZE] = {0}; - STR_TO_VARSTR(tmp, "none") + sprintf(varDataVal(tmp), "%d sec", tsStreamCheckpointInterval); + varDataSetLen(tmp, strlen(varDataVal(tmp))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false); diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 9a112c669e..2924ebd388 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -222,7 +222,7 @@ class TDTestCase: tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdLog.info(len(tdSql.queryResult)) - tdSql.checkEqual(True, len(tdSql.queryResult) in range(253, 254)) + tdSql.checkEqual(True, len(tdSql.queryResult) in range(254, 255)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(54, len(tdSql.queryResult)) From cb5a0563e641608164b057c84a0cf3238a2cba64 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 15:43:12 +0800 Subject: [PATCH 9/9] fix(stream): fix error caused by merge. --- source/dnode/vnode/src/tq/tqSink.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 34171a5872..c0f58fc3ec 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -336,7 +336,7 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j); SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k); - if (pNewRow->ts <= pOldRow->ts) { + if (pNewRow->ts < pOldRow->ts) { taosArrayPush(pFinal, &pNewRow); j += 1; } else if (pNewRow->ts > pOldRow->ts) {