From 54e9522c62842c267be0102bf2c7e4a27e1e4cf7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 09:37:53 +0800 Subject: [PATCH] refactor(stream): opt stream sink perf. --- source/dnode/vnode/src/tq/tqSink.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 25c2f32307..af8f567200 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -333,9 +333,10 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c } while (j < newLen && k < oldLen) { - SRow* pNewRow = taosArrayGetP(pNew->aRowP, j); - SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k); - if (pNewRow->ts < pOldRow->ts) { + SRow* pNewRow = TARRAY_GET_ELEM(pNew->aRowP, j); + SRow* pOldRow = TARRAY_GET_ELEM(pExisted->aRowP, k); + + if (pNewRow->ts <= pOldRow->ts) { taosArrayPush(pFinal, &pNewRow); j += 1; } else if (pNewRow->ts > pOldRow->ts) { @@ -373,12 +374,12 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c } while (j < newLen) { - SRow* pRow = taosArrayGetP(pNew->aRowP, j++); + SRow* pRow = TARRAY_GET_ELEM(pNew->aRowP, j++); taosArrayPush(pFinal, &pRow); } while (k < oldLen) { - SRow* pRow = taosArrayGetP(pExisted->aRowP, k++); + SRow* pRow = TARRAY_GET_ELEM(pExisted->aRowP, k++); taosArrayPush(pFinal, &pRow); }