other:merge 3.0
This commit is contained in:
parent
e179c10c88
commit
581cb0ffe1
|
@ -229,10 +229,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
|
||||||
for (int32_t j = 0; j < numOfTables; ++j) {
|
for (int32_t j = 0; j < numOfTables; ++j) {
|
||||||
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
|
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
|
||||||
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
|
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
|
||||||
if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
|
info.lastKey = pTsdbReader->window.skey;
|
||||||
info.lastKey = pTsdbReader->window.skey;
|
// if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
|
||||||
}
|
// info.lastKey = pTsdbReader->window.skey - step;
|
||||||
|
// }
|
||||||
ASSERT(info.lastKey >= pTsdbReader->window.skey && info.lastKey <= pTsdbReader->window.ekey);
|
ASSERT(info.lastKey >= pTsdbReader->window.skey && info.lastKey <= pTsdbReader->window.ekey);
|
||||||
} else {
|
} else {
|
||||||
info.lastKey = pTsdbReader->window.ekey;
|
info.lastKey = pTsdbReader->window.ekey;
|
||||||
|
@ -1854,8 +1854,6 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
|
||||||
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
||||||
|
|
||||||
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
|
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
|
||||||
int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order)? 1:-1;
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
|
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
|
||||||
if (!hasVal) {
|
if (!hasVal) {
|
||||||
|
@ -1865,65 +1863,9 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
|
||||||
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
TSDBKEY k = TSDBROW_KEY(&row);
|
TSDBKEY k = TSDBROW_KEY(&row);
|
||||||
if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
|
if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
|
||||||
pBlockScanInfo->lastKey = k.ts + step;
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
*(pLastBlockReader->rowIndex) += step;
|
|
||||||
|
|
||||||
SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
|
|
||||||
for (int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) {
|
|
||||||
if (pBlockData->aUid != NULL) {
|
|
||||||
if (asc) {
|
|
||||||
if (pBlockData->aUid[i] < pLastBlockReader->uid) {
|
|
||||||
continue;
|
|
||||||
} else if (pBlockData->aUid[i] > pLastBlockReader->uid) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (pBlockData->aUid[i] > pLastBlockReader->uid) {
|
|
||||||
continue;
|
|
||||||
} else if (pBlockData->aUid[i] < pLastBlockReader->uid) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t ts = pBlockData->aTSKEY[i];
|
|
||||||
if (ts < pLastBlockReader->window.skey) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t ver = pBlockData->aVersion[i];
|
|
||||||
if (ver < pLastBlockReader->verRange.minVer) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// no data any more, todo opt handle desc case
|
|
||||||
if (ts > pLastBlockReader->window.ekey) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo opt handle desc case
|
|
||||||
if (ver > pLastBlockReader->verRange.maxVer) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
TSDBKEY k = {.ts = ts, .version = ver};
|
|
||||||
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
*(pLastBlockReader->rowIndex) = i;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// set all data is consumed in last block
|
|
||||||
setAllRowsChecked(pLastBlockReader);
|
|
||||||
return false;
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
|
static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
|
||||||
|
@ -1940,11 +1882,12 @@ static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockS
|
||||||
initMemDataIterator(pBlockScanInfo, pReader);
|
initMemDataIterator(pBlockScanInfo, pReader);
|
||||||
pLastBlockReader->uid = pBlockScanInfo->uid;
|
pLastBlockReader->uid = pBlockScanInfo->uid;
|
||||||
|
|
||||||
|
int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order)? 1:-1;
|
||||||
STimeWindow w = pLastBlockReader->window;
|
STimeWindow w = pLastBlockReader->window;
|
||||||
if (ASCENDING_TRAVERSE(pLastBlockReader->order)) {
|
if (ASCENDING_TRAVERSE(pLastBlockReader->order)) {
|
||||||
w.skey = pBlockScanInfo->lastKey;
|
w.skey = pBlockScanInfo->lastKey + step;
|
||||||
} else {
|
} else {
|
||||||
w.ekey = pBlockScanInfo->lastKey;
|
w.ekey = pBlockScanInfo->lastKey + step;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code =
|
int32_t code =
|
||||||
|
@ -2915,6 +2858,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
||||||
|
|
||||||
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
|
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
|
||||||
SRowMerger* pMerger) {
|
SRowMerger* pMerger) {
|
||||||
|
pScanInfo->lastKey = ts;
|
||||||
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo)) {
|
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo)) {
|
||||||
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||||
if (next1 == ts) {
|
if (next1 == ts) {
|
||||||
|
|
Loading…
Reference in New Issue