refactor: do some internal refactor.
This commit is contained in:
parent
3426bb2870
commit
2ec24c1e04
|
@ -49,8 +49,8 @@ static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i
|
||||||
STsdbReader* pReader);
|
STsdbReader* pReader);
|
||||||
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
|
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
|
||||||
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey, STsdbReader* pReader);
|
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey, STsdbReader* pReader);
|
||||||
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
|
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey,
|
||||||
SRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
|
SRowMerger* pMerger, int32_t pkSrcSlot, SVersionRange* pVerRange, const char* id);
|
||||||
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArray* pDelList,
|
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArray* pDelList,
|
||||||
STsdbReader* pReader);
|
STsdbReader* pReader);
|
||||||
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
|
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
|
||||||
|
@ -1526,7 +1526,7 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
|
static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int32_t pkSrcSlot,
|
||||||
SVersionRange* pVerRange) {
|
SVersionRange* pVerRange) {
|
||||||
int32_t order = pSttBlockReader->order;
|
int32_t order = pSttBlockReader->order;
|
||||||
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
|
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
|
||||||
|
@ -1547,13 +1547,11 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
|
||||||
|
|
||||||
if (pSttBlockReader->numOfPks == 0) {
|
if (pSttBlockReader->numOfPks == 0) {
|
||||||
pSttBlockReader->currentKey.ts = key;
|
pSttBlockReader->currentKey.ts = key;
|
||||||
// todo handle error
|
} else {
|
||||||
pScanInfo->sttKeyInfo.nextProcKey = key;
|
tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, &pSttBlockReader->currentKey);
|
||||||
} else { // todo handle the deep copy problem
|
|
||||||
tColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey);
|
|
||||||
pScanInfo->sttKeyInfo.nextProcKey = key;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pScanInfo->sttKeyInfo.nextProcKey = key;
|
||||||
if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) {
|
if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) {
|
||||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) {
|
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) {
|
||||||
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
|
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
|
||||||
|
@ -1578,30 +1576,18 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB
|
||||||
|
|
||||||
// avoid the fetch next row replace the referenced stt block in buffer
|
// avoid the fetch next row replace the referenced stt block in buffer
|
||||||
doPinSttBlock(pSttBlockReader);
|
doPinSttBlock(pSttBlockReader);
|
||||||
bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
|
bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
|
||||||
doUnpinSttBlock(pSttBlockReader);
|
doUnpinSttBlock(pSttBlockReader);
|
||||||
if (hasVal) {
|
if (hasVal) {
|
||||||
SRowKey nextKey;
|
SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
|
if (pkCompEx(pReader->pkComparFn, pSttKey, pNext) != 0) {
|
||||||
TSDBROW* pNextRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
|
||||||
tRowGetKeyEx(pNextRow, &nextKey);
|
|
||||||
|
|
||||||
if (pkCompEx(pReader->pkComparFn, pSttKey, &nextKey) != 0) {
|
|
||||||
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||||
if (code) {
|
*copied = (code == TSDB_CODE_SUCCESS);
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
*copied = true;
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||||
if (code) {
|
*copied = (code == TSDB_CODE_SUCCESS);
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
*copied = true;
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1648,6 +1634,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
__compar_fn_t compFn = pReader->pkComparFn;
|
__compar_fn_t compFn = pReader->pkComparFn;
|
||||||
|
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
|
||||||
|
|
||||||
SRowKey* pSttKey = NULL;
|
SRowKey* pSttKey = NULL;
|
||||||
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
|
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
|
||||||
|
@ -1687,17 +1674,17 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (pReader->info.order == TSDB_ORDER_ASC) {
|
if (pReader->info.order == TSDB_ORDER_ASC) {
|
||||||
minKey = k; // chosen the minimum value
|
minKey = k; // chosen the minimum value
|
||||||
|
|
||||||
if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) < 0)) {
|
if (pkCompEx(compFn, pfKey, &minKey) < 0) {
|
||||||
minKey = *pfKey;
|
minKey = *pfKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSttKey != NULL && (pkCompEx(compFn, pSttKey, &minKey) < 0)) {
|
if (pkCompEx(compFn, pSttKey, &minKey) < 0) {
|
||||||
minKey = *pSttKey;
|
minKey = *pSttKey;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
minKey = k;
|
minKey = k;
|
||||||
|
|
||||||
if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) > 0)) {
|
if (pkCompEx(compFn, pfKey, &minKey) > 0) {
|
||||||
minKey = *pfKey;
|
minKey = *pfKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1724,7 +1711,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pkCompEx(compFn, &minKey, &k) == 0) {
|
if (pkCompEx(compFn, &minKey, &k) == 0) {
|
||||||
|
@ -1757,7 +1744,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pkCompEx(compFn, &minKey, pfKey) == 0) {
|
if (pkCompEx(compFn, &minKey, pfKey) == 0) {
|
||||||
|
@ -1787,6 +1774,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
SRowMerger* pMerger = &pReader->status.merger;
|
SRowMerger* pMerger = &pReader->status.merger;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
|
||||||
|
|
||||||
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
|
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
|
||||||
if (pMerger->pArray == NULL) {
|
if (pMerger->pArray == NULL) {
|
||||||
|
@ -1848,7 +1836,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
|
||||||
} else {
|
} else {
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
|
@ -1856,7 +1844,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
|
||||||
|
|
||||||
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1887,6 +1875,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
SArray* pDelList = pBlockScanInfo->delSkyline;
|
SArray* pDelList = pBlockScanInfo->delSkyline;
|
||||||
__compar_fn_t compFn = pReader->pkComparFn;
|
__compar_fn_t compFn = pReader->pkComparFn;
|
||||||
|
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
|
||||||
|
|
||||||
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
|
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
|
||||||
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
||||||
|
@ -1982,7 +1971,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pkCompEx(compFn, &minKey, &ik) == 0) {
|
if (pkCompEx(compFn, &minKey, &ik) == 0) {
|
||||||
|
@ -2040,7 +2029,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pkCompEx(compFn, &minKey, pfKey) == 0) {
|
if (pkCompEx(compFn, &minKey, pfKey) == 0) {
|
||||||
|
@ -2108,13 +2097,13 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
||||||
startKey = (STsdbRowKey){.version = pReader->info.verRange.minVer,
|
startKey = (STsdbRowKey){.version = pReader->info.verRange.minVer,
|
||||||
.key = {
|
.key = {
|
||||||
.ts = pBlockScanInfo->lastProcKey.ts + 1,
|
.ts = pBlockScanInfo->lastProcKey.ts + 1,
|
||||||
.numOfPKs = 0, // TODO: change here if multi-key is supported
|
.numOfPKs = pReader->suppInfo.numOfPks,
|
||||||
}};
|
}};
|
||||||
} else {
|
} else {
|
||||||
startKey = (STsdbRowKey){.version = pReader->info.verRange.maxVer,
|
startKey = (STsdbRowKey){.version = pReader->info.verRange.maxVer,
|
||||||
.key = {
|
.key = {
|
||||||
.ts = pBlockScanInfo->lastProcKey.ts - 1,
|
.ts = pBlockScanInfo->lastProcKey.ts - 1,
|
||||||
.numOfPKs = 0, // TODO: change here if multi-key is supported
|
.numOfPKs = pReader->suppInfo.numOfPks,
|
||||||
}};
|
}};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2267,13 +2256,13 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
} else { // not clean stt blocks
|
} else { // not clean stt blocks
|
||||||
INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window
|
INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window
|
||||||
pScanInfo->sttBlockReturned = false;
|
pScanInfo->sttBlockReturned = false;
|
||||||
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
|
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pScanInfo->cleanSttBlocks = false;
|
pScanInfo->cleanSttBlocks = false;
|
||||||
INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window
|
INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window
|
||||||
pScanInfo->sttBlockReturned = false;
|
pScanInfo->sttBlockReturned = false;
|
||||||
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
|
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(info.pTimeWindowList);
|
taosArrayDestroy(info.pTimeWindowList);
|
||||||
|
@ -2336,15 +2325,18 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear(pMerger);
|
tsdbRowMergerClear(pMerger);
|
||||||
|
|
||||||
|
tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo,
|
int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo,
|
||||||
STsdbReader* pReader) {
|
STsdbReader* pReader) {
|
||||||
bool copied = false;
|
bool copied = false;
|
||||||
SRow* pTSRow = NULL;
|
SRow* pTSRow = NULL;
|
||||||
SRowKey sttKey = {0};
|
SRowKey sttKey = {0};
|
||||||
|
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
|
||||||
|
|
||||||
tRowKeyAssign(&sttKey, getCurrentKeyInSttBlock(pSttBlockReader));
|
tRowKeyAssign(&sttKey, getCurrentKeyInSttBlock(pSttBlockReader));
|
||||||
|
|
||||||
|
@ -2371,7 +2363,7 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
|
||||||
|
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, &sttKey, pMerger, &pReader->info.verRange, pReader->idStr);
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, &sttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
|
||||||
|
|
||||||
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2382,6 +2374,8 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear(pMerger);
|
tsdbRowMergerClear(pMerger);
|
||||||
|
|
||||||
|
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3702,11 +3696,11 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey,
|
int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey,
|
||||||
SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
|
SRowMerger* pMerger, int32_t pkSrcSlot, SVersionRange* pVerRange, const char* idStr) {
|
||||||
while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pVerRange)) {
|
while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange)) {
|
||||||
SRowKey* next1 = getCurrentKeyInSttBlock(pSttBlockReader);
|
SRowKey* pNextKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
|
|
||||||
int32_t ret = pkCompEx(pSttBlockReader->pkComparFn, pRowKey, next1);
|
int32_t ret = pkCompEx(pSttBlockReader->pkComparFn, pRowKey, pNextKey);
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
|
@ -3724,8 +3718,9 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI
|
||||||
|
|
||||||
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
|
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
|
||||||
TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow) {
|
TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow) {
|
||||||
TSDBROW* pNextRow = NULL;
|
SRowMerger* pMerger = &pReader->status.merger;
|
||||||
TSDBROW current = *pRow;
|
TSDBROW* pNextRow = NULL;
|
||||||
|
TSDBROW current = *pRow;
|
||||||
|
|
||||||
{ // if the timestamp of the next valid row has a different ts, return current row directly
|
{ // if the timestamp of the next valid row has a different ts, return current row directly
|
||||||
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
|
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
|
||||||
|
@ -3770,7 +3765,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pTSchema);
|
code = tsdbRowMergerAdd(pMerger, ¤t, pTSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3783,7 +3778,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbRowMergerAdd(&pReader->status.merger, pNextRow, pTSchema1);
|
code = tsdbRowMergerAdd(pMerger, pNextRow, pTSchema1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3793,13 +3788,13 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbRowMergerGetRow(&pReader->status.merger, &pResRow->pTSRow);
|
code = tsdbRowMergerGetRow(pMerger, &pResRow->pTSRow);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
pResRow->type = TSDBROW_ROW_FMT;
|
pResRow->type = TSDBROW_ROW_FMT;
|
||||||
tsdbRowMergerClear(&pReader->status.merger);
|
tsdbRowMergerClear(pMerger);
|
||||||
*freeTSRow = true;
|
*freeTSRow = true;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3807,7 +3802,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt
|
||||||
|
|
||||||
int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRowKey* piRowKey,
|
int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRowKey* piRowKey,
|
||||||
STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow) {
|
STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow) {
|
||||||
SRowMerger* pMerger = &pReader->status.merger;
|
SRowMerger* pMerger = pMerger;
|
||||||
|
|
||||||
STSchema* pSchema = NULL;
|
STSchema* pSchema = NULL;
|
||||||
if (pRow->type == TSDBROW_ROW_FMT) {
|
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||||
|
@ -3826,7 +3821,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRow
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) { // ascending order imem --> mem
|
if (ASCENDING_TRAVERSE(pReader->info.order)) { // ascending order imem --> mem
|
||||||
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
|
int32_t code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3836,14 +3831,14 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRow
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
|
tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||||
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, pRowKey, pBlockScanInfo->delSkyline, pReader);
|
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, pRowKey, pBlockScanInfo->delSkyline, pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
|
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) {
|
if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3853,7 +3848,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRow
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
|
tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
||||||
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, piRowKey, pBlockScanInfo->delSkyline, pReader);
|
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, piRowKey, pBlockScanInfo->delSkyline, pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue