From 7d34c767e906bc24635d554acfeb173a269581d1 Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 8 Apr 2024 18:23:16 +0800 Subject: [PATCH] fix: insert from select and sort merge --- source/common/src/tdataformat.c | 6 +++++- source/libs/executor/src/dataInserter.c | 22 ++++++---------------- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 20e78079ea..1585c12ac1 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -593,12 +593,16 @@ static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart, for (int32_t iCol = 0; iCol < pTSchema->numOfCols; iCol++) { SColVal *pColVal = NULL; - for (int32_t iRow = 0; iRow < nRow; iRow++) { + for (int32_t iRow = nRow - 1; iRow >= 0; --iRow) { SColVal *pColValT = tRowIterNext(aIter[iRow]); + while (pColValT->cid < pTSchema->columns[iCol].colId) { + pColValT = tRowIterNext(aIter[iRow]); + } // todo: take strategy according to the flag if (COL_VAL_IS_VALUE(pColValT)) { pColVal = pColValT; + break; } else if (COL_VAL_IS_NULL(pColValT)) { if (pColVal == NULL) { pColVal = pColValT; diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index dfe30c9f96..06f63f5f04 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -189,8 +189,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp } int64_t lastTs = TSKEY_MIN; - bool updateLastRow = false; - bool disorderTs = false; + bool needSortMerge = false; for (int32_t j = 0; j < rows; ++j) { // iterate by row taosArrayClear(pVals); @@ -258,11 +257,9 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type taosArrayPush(pVals, &cv); } else { - if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) { - if (*(int64_t*)var == lastTs) { - updateLastRow = true; - } else if (*(int64_t*)var < lastTs) { - disorderTs = true; + if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) { + if (*(int64_t*)var <= lastTs) { + needSortMerge = true; } else { lastTs = *(int64_t*)var; } @@ -287,17 +284,10 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); goto _end; } - if (updateLastRow) { - updateLastRow = false; - SRow** lastRow = taosArrayPop(tbData.aRowP); - tRowDestroy(*lastRow); - taosArrayPush(tbData.aRowP, &pRow); - } else { - taosArrayPush(tbData.aRowP, &pRow); - } + taosArrayPush(tbData.aRowP, &pRow); } - if (disorderTs) { + if (needSortMerge) { if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) || (terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) { goto _end;