fix: use colId to merge rows
This commit is contained in:
parent
56d2697308
commit
e57f12d581
|
@ -2126,6 +2126,10 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
|
||||||
}
|
}
|
||||||
|
|
||||||
tRowMergerAdd(pMerger, pRow, pTSchema);
|
tRowMergerAdd(pMerger, pRow, pTSchema);
|
||||||
|
|
||||||
|
if (sversion != pReader->pSchema->version) {
|
||||||
|
taosMemoryFree(pTSchema);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2262,6 +2266,10 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe
|
||||||
doMergeRowsInBuf(pIter, uid, k.ts, pDelList, &merge, pReader);
|
doMergeRowsInBuf(pIter, uid, k.ts, pDelList, &merge, pReader);
|
||||||
tRowMergerGetRow(&merge, pTSRow);
|
tRowMergerGetRow(&merge, pTSRow);
|
||||||
tRowMergerClear(&merge);
|
tRowMergerClear(&merge);
|
||||||
|
|
||||||
|
if (sversion != pReader->pSchema->version) {
|
||||||
|
taosMemoryFree(pTSchema);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
|
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
|
||||||
|
|
|
@ -575,7 +575,7 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo
|
||||||
TSDBKEY key = TSDBROW_KEY(pRow);
|
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||||
SColVal *pColVal = &(SColVal){0};
|
SColVal *pColVal = &(SColVal){0};
|
||||||
STColumn *pTColumn;
|
STColumn *pTColumn;
|
||||||
int32_t iCol = 0;
|
int32_t iCol, jCol = 0;
|
||||||
|
|
||||||
pMerger->pTSchema = pResTSchema;
|
pMerger->pTSchema = pResTSchema;
|
||||||
pMerger->version = key.version;
|
pMerger->version = key.version;
|
||||||
|
@ -587,7 +587,7 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo
|
||||||
}
|
}
|
||||||
|
|
||||||
// ts
|
// ts
|
||||||
pTColumn = &pTSchema->columns[iCol++];
|
pTColumn = &pTSchema->columns[jCol++];
|
||||||
|
|
||||||
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
|
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
|
|
||||||
|
@ -598,14 +598,18 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo
|
||||||
}
|
}
|
||||||
|
|
||||||
// other
|
// other
|
||||||
for (; iCol < pTSchema->numOfCols && iCol < pResTSchema->numOfCols; ++iCol) {
|
for (iCol = 1; jCol < pTSchema->numOfCols && iCol < pResTSchema->numOfCols; ++iCol) {
|
||||||
pTColumn = &pResTSchema->columns[iCol];
|
pTColumn = &pResTSchema->columns[iCol];
|
||||||
if (pTSchema->columns[iCol].colId != pTColumn->colId) {
|
if (pTSchema->columns[jCol].colId < pTColumn->colId) {
|
||||||
|
++jCol;
|
||||||
|
--iCol;
|
||||||
|
continue;
|
||||||
|
} else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
|
||||||
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
|
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
|
||||||
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
@ -626,17 +630,21 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
TSDBKEY key = TSDBROW_KEY(pRow);
|
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||||
SColVal *pColVal = &(SColVal){0};
|
SColVal *pColVal = &(SColVal){0};
|
||||||
STColumn *pTColumn;
|
STColumn *pTColumn;
|
||||||
int32_t iCol;
|
int32_t iCol, jCol = 1;
|
||||||
|
|
||||||
ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts);
|
ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts);
|
||||||
|
|
||||||
for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && iCol < pTSchema->numOfCols; ++iCol) {
|
for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
|
||||||
pTColumn = &pMerger->pTSchema->columns[iCol];
|
pTColumn = &pMerger->pTSchema->columns[iCol];
|
||||||
if (pTSchema->columns[iCol].colId != pTColumn->colId) {
|
if (pTSchema->columns[jCol].colId < pTColumn->colId) {
|
||||||
|
++jCol;
|
||||||
|
--iCol;
|
||||||
|
continue;
|
||||||
|
} else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbRowGetColVal(pRow, pMerger->pTSchema, iCol, pColVal);
|
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
|
||||||
|
|
||||||
if (key.version > pMerger->version) {
|
if (key.version > pMerger->version) {
|
||||||
if (!pColVal->isNone) {
|
if (!pColVal->isNone) {
|
||||||
|
|
Loading…
Reference in New Issue