refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2022-07-02 23:37:31 +08:00
parent 42d635a73d
commit ba8ab92632
1 changed files with 49 additions and 28 deletions

View File

@ -145,7 +145,7 @@ struct STsdbReader {
}; };
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader); static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader);
static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader); static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger);
static int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader);
@ -1967,7 +1967,7 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
/** /**
* This is an two rectangles overlap cases. * This is an two rectangles overlap cases.
*/ */
static int32_t dataBlockPartialRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) { static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) {
return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) || return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
(pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) || (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
(pVerRange->minVer > pBlock->minVersion && pVerRange->minVer <= pBlock->maxVersion) || (pVerRange->minVer > pBlock->minVersion && pVerRange->minVer <= pBlock->maxVersion) ||
@ -2054,6 +2054,10 @@ static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVer
return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer); return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer);
} }
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) { static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) {
int32_t neighborIndex = 0; int32_t neighborIndex = 0;
SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order); SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);
@ -2071,12 +2075,12 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBloc
} }
return (overlapWithNeighbor || hasDup || return (overlapWithNeighbor || hasDup ||
dataBlockPartialRequired(&pReader->window, &pReader->verRange, pBlock) || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) ||
keyOverlapFileBlock(key, pBlock, &pReader->verRange) || keyOverlapFileBlock(key, pBlock, &pReader->verRange) ||
(pBlock->nRow > pReader->capacity)); (pBlock->nRow > pReader->capacity));
} }
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) { static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
if (!(pBlockScanInfo->imemHasVal || pBlockScanInfo->memHasVal)) { if (!(pBlockScanInfo->imemHasVal || pBlockScanInfo->memHasVal)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -2084,7 +2088,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
SSDataBlock* pBlock = pReader->pResBlock; SSDataBlock* pBlock = pReader->pResBlock;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, *key, pReader->capacity, pReader); int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
int64_t elapsedTime = taosGetTimestampUs() - st; int64_t elapsedTime = taosGetTimestampUs() - st;
@ -2491,13 +2495,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
} }
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) { } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
// data in memory that are earlier than current file block // data in memory that are earlier than current file block
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; // todo rows in buffer should be less than the file block in asc, greater than file block in desc
code = buildDataBlockFromBuf(pReader, pScanInfo, &maxKey); int64_t endKey = (ASCENDING_TRAVERSE(pReader->order))? pBlock->minKey.ts:pBlock->maxKey.ts;
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else { // whole block is required, return it directly } else { // whole block is required, return it directly
// todo
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
SDataBlockInfo* pInfo = &pReader->pResBlock->info; SDataBlockInfo* pInfo = &pReader->pResBlock->info;
pInfo->rows = pBlock->nRow; pInfo->rows = pBlock->nRow;
pInfo->uid = pScanInfo->uid; pInfo->uid = pScanInfo->uid;
@ -2523,8 +2524,8 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter; STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
initMemIterator(pBlockScanInfo, pReader); initMemIterator(pBlockScanInfo, pReader);
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; int64_t endKey = (ASCENDING_TRAVERSE(pReader->order))? INT64_MAX:INT64_MIN;
int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, &maxKey); int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -2590,8 +2591,12 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
bool asc = ASCENDING_TRAVERSE(pReader->order);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
// current block are exhausted, try the next file block // current block are exhausted, try the next file block
if (pReader->status.fBlockDumpInfo.allDumped) { if (pDumpInfo->allDumped) {
// try next data block in current file // try next data block in current file
bool hasNext = blockIteratorNext(&pReader->status.blockIter); bool hasNext = blockIteratorNext(&pReader->status.blockIter);
if (hasNext) { // current file is exhausted, let's try the next file if (hasNext) { // current file is exhausted, let's try the next file
@ -2603,13 +2608,13 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} }
} }
code = doBuildDataBlock(pReader); return doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) { } else if ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && !asc)) {
return code; // file data block is partially loaded
} // todo refactor: extract method
} else { return buildComposedDataBlock(pReader, pScanInfo);
code = buildComposedDataBlock(pReader, pScanInfo); } else { // current block is not loaded yet
return code; return doBuildDataBlock(pReader);
} }
} }
@ -2854,18 +2859,34 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlo
tRowMergerGetRow(&merge, pTSRow); tRowMergerGetRow(&merge, pTSRow);
} }
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow) { int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey) {
TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal) { // todo refactor
bool asc = ASCENDING_TRAVERSE(pReader->order);
if (pBlockScanInfo->memHasVal) {
TSDBKEY k = TSDBROW_KEY(pRow);
if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
pRow = NULL;
}
}
if (pBlockScanInfo->imemHasVal) {
TSDBKEY k = TSDBROW_KEY(piRow);
if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
piRow = NULL;
}
}
if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal && pRow != NULL && piRow != NULL) {
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow); TSDBKEY ik = TSDBROW_KEY(piRow);
if (ik.ts < k.ts) { // ik.ts < k.ts if (ik.ts < k.ts) { // ik.ts < k.ts
doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pTSRow, pReader); doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pTSRow, pReader);
} else if (k.ts < ik.ts) { } else if (k.ts < ik.ts) {
doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader); doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader);
} else { // ik.ts == k.ts } else { // ik.ts == k.ts
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
} }
@ -2873,12 +2894,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pBlockScanInfo->memHasVal) { if (pBlockScanInfo->memHasVal && pRow != NULL) {
doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader); doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pBlockScanInfo->imemHasVal) { if (pBlockScanInfo->imemHasVal && piRow != NULL) {
doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pTSRow, pReader); doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pTSRow, pReader);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -2930,12 +2951,12 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader) { int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader) {
SSDataBlock* pBlock = pReader->pResBlock; SSDataBlock* pBlock = pReader->pResBlock;
do { do {
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow); tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey);
if (pTSRow == NULL) { if (pTSRow == NULL) {
break; break;
} }