fix: insert from select and sort merge

This commit is contained in:
kailixu 2024-04-08 18:23:16 +08:00
parent a990224e96
commit 7d34c767e9
2 changed files with 11 additions and 17 deletions

View File

@ -593,12 +593,16 @@ static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart,
for (int32_t iCol = 0; iCol < pTSchema->numOfCols; iCol++) { for (int32_t iCol = 0; iCol < pTSchema->numOfCols; iCol++) {
SColVal *pColVal = NULL; SColVal *pColVal = NULL;
for (int32_t iRow = 0; iRow < nRow; iRow++) { for (int32_t iRow = nRow - 1; iRow >= 0; --iRow) {
SColVal *pColValT = tRowIterNext(aIter[iRow]); SColVal *pColValT = tRowIterNext(aIter[iRow]);
while (pColValT->cid < pTSchema->columns[iCol].colId) {
pColValT = tRowIterNext(aIter[iRow]);
}
// todo: take strategy according to the flag // todo: take strategy according to the flag
if (COL_VAL_IS_VALUE(pColValT)) { if (COL_VAL_IS_VALUE(pColValT)) {
pColVal = pColValT; pColVal = pColValT;
break;
} else if (COL_VAL_IS_NULL(pColValT)) { } else if (COL_VAL_IS_NULL(pColValT)) {
if (pColVal == NULL) { if (pColVal == NULL) {
pColVal = pColValT; pColVal = pColValT;

View File

@ -189,8 +189,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
} }
int64_t lastTs = TSKEY_MIN; int64_t lastTs = TSKEY_MIN;
bool updateLastRow = false; bool needSortMerge = false;
bool disorderTs = false;
for (int32_t j = 0; j < rows; ++j) { // iterate by row for (int32_t j = 0; j < rows; ++j) { // iterate by row
taosArrayClear(pVals); taosArrayClear(pVals);
@ -258,11 +257,9 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
taosArrayPush(pVals, &cv); taosArrayPush(pVals, &cv);
} else { } else {
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) { if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) {
if (*(int64_t*)var == lastTs) { if (*(int64_t*)var <= lastTs) {
updateLastRow = true; needSortMerge = true;
} else if (*(int64_t*)var < lastTs) {
disorderTs = true;
} else { } else {
lastTs = *(int64_t*)var; lastTs = *(int64_t*)var;
} }
@ -287,17 +284,10 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end; goto _end;
} }
if (updateLastRow) {
updateLastRow = false;
SRow** lastRow = taosArrayPop(tbData.aRowP);
tRowDestroy(*lastRow);
taosArrayPush(tbData.aRowP, &pRow); taosArrayPush(tbData.aRowP, &pRow);
} else {
taosArrayPush(tbData.aRowP, &pRow);
}
} }
if (disorderTs) { if (needSortMerge) {
if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) || if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) ||
(terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) { (terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) {
goto _end; goto _end;