From df516703e15b0f0639525c737a7ad35d6fa26d13 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 9 Apr 2024 10:22:13 +0800 Subject: [PATCH] fix(stream): check the pk during merge rows with identical timestamp. --- source/dnode/vnode/src/tq/tqSink.c | 42 +++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 7f5ddf4f1e..d43846bcf4 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -324,6 +324,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) { int32_t oldLen = taosArrayGetSize(pExisted->aRowP); int32_t newLen = taosArrayGetSize(pNew->aRowP); + int32_t numOfPk = 0; int32_t j = 0, k = 0; SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES); @@ -335,17 +336,40 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c while (j < newLen && k < oldLen) { SRow* pNewRow = taosArrayGetP(pNew->aRowP, j); SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k); - if (pNewRow->ts <= pOldRow->ts) { + if (pNewRow->ts < pOldRow->ts) { taosArrayPush(pFinal, &pNewRow); j += 1; - - if (pNewRow->ts == pOldRow->ts) { - k += 1; - tRowDestroy(pOldRow); - } - } else { + } else if (pNewRow->ts > pOldRow->ts) { taosArrayPush(pFinal, &pOldRow); k += 1; + } else { + // check for the existance of primary key + if (pNewRow->numOfPKs == 0) { + taosArrayPush(pFinal, &pNewRow); + k += 1; + j += 1; + tRowDestroy(pOldRow); + } else { + numOfPk = pNewRow->numOfPKs; + + SRowKey kNew, kOld; + tRowGetKey(pNewRow, &kNew); + tRowGetKey(pOldRow, &kOld); + + int32_t ret = tRowKeyCompare(&kNew, &kOld); + if (ret <= 0) { + taosArrayPush(pFinal, &pNewRow); + j += 1; + + if (ret == 0) { + k += 1; + tRowDestroy(pOldRow); + } + } else { + taosArrayPush(pFinal, &pOldRow); + k += 1; + } + } } } @@ -363,8 +387,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c taosArrayDestroy(pExisted->aRowP); pExisted->aRowP = pFinal; - tqTrace("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id, - (int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL), + tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", + id, (int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL), (pNew->pCreateTbReq != NULL)); tdDestroySVCreateTbReq(pNew->pCreateTbReq);