fix: merge data of mem and file

This commit is contained in:
Cary Xu 2022-05-09 11:46:00 +08:00
parent 6916f4696a
commit 2c4b44b590
1 changed files with 18 additions and 9 deletions

View File

@ -1985,7 +1985,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
return; return;
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
SSkipListNode* node = NULL; SSkipListNode* node = NULL;
TSKEY lastRowKey = TSKEY_INITIAL_VAL; TSKEY lastKeyAppend = TSKEY_INITIAL_VAL;
do { do {
STSRow* row2 = NULL; STSRow* row2 = NULL;
@ -2019,8 +2019,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
} }
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastRowKey); pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
// numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
} }
@ -2028,7 +2028,6 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
cur->win.ekey = key; cur->win.ekey = key;
cur->lastKey = key + step; cur->lastKey = key + step;
cur->mixBlock = true; cur->mixBlock = true;
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
#if 0 #if 0
@ -2064,7 +2063,11 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
} }
#endif #endif
if (TD_SUPPORT_UPDATE(pCfg->update)) { if (TD_SUPPORT_UPDATE(pCfg->update)) {
doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos); if(lastKeyAppend != key) {
lastKeyAppend = key;
++curRow;
}
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
if (rv1 != TD_ROW_SVER(row1)) { if (rv1 != TD_ROW_SVER(row1)) {
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
@ -2074,10 +2077,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
rv2 = TD_ROW_SVER(row2); rv2 = TD_ROW_SVER(row2);
} }
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastRowKey); pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
// ++numOfRows;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
} }
@ -2118,11 +2119,19 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
int32_t qstart = 0, qend = 0; int32_t qstart = 0, qend = 0;
getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend); getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend); if ((lastKeyAppend != TSKEY_INITIAL_VAL) &&
(lastKeyAppend != (ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qstart] : tsArray[qend]))) {
++curRow;
}
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
pos += (qend - qstart + 1) * step; pos += (qend - qstart + 1) * step;
if (numOfRows > 0) {
curRow = numOfRows - 1;
}
cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart]; cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart];
cur->lastKey = cur->win.ekey + step; cur->lastKey = cur->win.ekey + step;
lastKeyAppend = cur->win.ekey;
} }
} while (numOfRows < pTsdbReadHandle->outputCapacity); } while (numOfRows < pTsdbReadHandle->outputCapacity);