fix(query): add duplicated row merge for column format data in memory.
This commit is contained in:
parent
35d13a91c2
commit
d3aa0be66e
|
@ -3272,12 +3272,16 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
|
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||||
if (pTSchema == NULL) {
|
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
|
||||||
return terrno;
|
if (pTSchema == NULL) {
|
||||||
}
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
tsdbRowMergerAdd(pMerger, pRow, pTSchema);
|
tsdbRowMergerAdd(pMerger, pRow, pTSchema);
|
||||||
|
} else { // column format
|
||||||
|
tsdbRowMerge(pMerger, pRow);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3426,12 +3430,13 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SRowMerger merge = {0};
|
||||||
|
terrno = 0;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
// start to merge duplicated rows
|
// start to merge duplicated rows
|
||||||
if (current.type == TSDBROW_ROW_FMT) {
|
if (current.type == TSDBROW_ROW_FMT) {
|
||||||
SRowMerger merge = {0};
|
|
||||||
|
|
||||||
// get the correct schema for data in memory
|
// get the correct schema for data in memory
|
||||||
terrno = 0;
|
|
||||||
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(¤t), pReader, uid);
|
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(¤t), pReader, uid);
|
||||||
if (pTSchema == NULL) {
|
if (pTSchema == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -3441,7 +3446,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
|
||||||
pReader->pSchema = pTSchema;
|
pReader->pSchema = pTSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = tsdbRowMergerInit2(&merge, pReader->pSchema, ¤t, pTSchema);
|
code = tsdbRowMergerInit2(&merge, pReader->pSchema, ¤t, pTSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3452,22 +3457,29 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbRowMergerAdd(&merge, pNextRow, pTSchema1);
|
tsdbRowMergerAdd(&merge, pNextRow, pTSchema1);
|
||||||
|
} else { // let's merge rows in file block
|
||||||
code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
|
code = tsdbRowMergerInit(&merge, ¤t, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbRowMergerGetRow(&merge, &pResRow->pTSRow);
|
tsdbRowMerge(&merge, pNextRow);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
pResRow->type = current.type;
|
|
||||||
tsdbRowMergerClear(&merge);
|
|
||||||
*freeTSRow = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbRowMergerGetRow(&merge, &pResRow->pTSRow);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
pResRow->type = current.type;
|
||||||
|
tsdbRowMergerClear(&merge);
|
||||||
|
*freeTSRow = true;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue