refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2022-06-30 23:38:18 +08:00
parent 1b4cfa64c3
commit 0db8d77a70
2 changed files with 172 additions and 98 deletions

View File

@ -80,11 +80,12 @@ typedef struct SBlockLoadSuppInfo {
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
} SBlockLoadSuppInfo; } SBlockLoadSuppInfo;
typedef struct SFileSetIter { typedef struct SFilesetIter {
int32_t numOfFiles; // number of total files int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list int32_t index; // current accessed index in the list
SArray* pFileList; // data file list SArray* pFileList; // data file list
} SFileSetIter; int32_t order;
} SFilesetIter;
typedef struct SFileDataBlockInfo { typedef struct SFileDataBlockInfo {
int32_t tbBlockIdx; // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it int32_t tbBlockIdx; // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
@ -95,12 +96,14 @@ typedef struct SDataBlockIter {
int32_t numOfBlocks; int32_t numOfBlocks;
int32_t index; int32_t index;
SArray* blockList; // SArray<SFileDataBlockInfo> SArray* blockList; // SArray<SFileDataBlockInfo>
int32_t order;
} SDataBlockIter; } SDataBlockIter;
typedef struct SFileBlockDumpInfo { typedef struct SFileBlockDumpInfo {
int32_t totalRows; int32_t totalRows;
int32_t rowIndex; int32_t rowIndex;
int64_t lastKey; int64_t lastKey;
bool allDumped;
} SFileBlockDumpInfo; } SFileBlockDumpInfo;
typedef struct SVersionRange { typedef struct SVersionRange {
@ -115,9 +118,9 @@ typedef struct SReaderStatus {
STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks. STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks.
SFileBlockDumpInfo fBlockDumpInfo; SFileBlockDumpInfo fBlockDumpInfo;
SDFileSet* pCurrentFileSet; // current opened file set SDFileSet* pCurrentFileset; // current opened file set
SBlockData fileBlockData; SBlockData fileBlockData;
SFileSetIter fileIter; SFilesetIter fileIter;
SDataBlockIter blockIter; SDataBlockIter blockIter;
bool composedDataBlock;// the returned data block is a composed block or not bool composedDataBlock;// the returned data block is a composed block or not
} SReaderStatus; } SReaderStatus;
@ -273,10 +276,12 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
// return pNew; // return pNew;
// } // }
// todo
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow, int32_t order) { static bool isEmptyQueryTimeWindow(STimeWindow* pWindow, int32_t order) {
ASSERT(pWindow != NULL); ASSERT(pWindow != NULL);
bool asc = ASCENDING_TRAVERSE(order); bool asc = ASCENDING_TRAVERSE(order);
return ((asc && pWindow->skey > pWindow->ekey) || (!asc && pWindow->ekey > pWindow->skey)); return false;
// return ((asc && pWindow->skey > pWindow->ekey) || (!asc && pWindow->ekey > pWindow->skey));
} }
// // Update the query time window according to the data time to live(TTL) information, in order to avoid to return // // Update the query time window according to the data time to live(TTL) information, in order to avoid to return
@ -329,26 +334,32 @@ static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* cap
} }
// init file iterator // init file iterator
static int32_t initFileIterator(SFileSetIter* pIter, const STsdbFSState* pFState, const char* idstr) { static int32_t initFileIterator(SFilesetIter* pIter, const STsdbFSState* pFState, int32_t order, const char* idstr) {
pIter->index = -1; size_t numOfFileset = taosArrayGetSize(pFState->aDFileSet);
pIter->numOfFiles = taosArrayGetSize(pFState->aDFileSet);
pIter->pFileList = taosArrayDup(pFState->aDFileSet); pIter->index = ASCENDING_TRAVERSE(order) ? -1 : numOfFileset;
pIter->order = order;
pIter->pFileList = taosArrayDup(pFState->aDFileSet);
pIter->numOfFiles = numOfFileset;
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr); tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) { static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
pIter->index += 1; bool asc = ASCENDING_TRAVERSE(pIter->order);
if (pIter->index >= pIter->numOfFiles) { int32_t step = asc? 1:-1;
pIter->index += step;
if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
return false; return false;
} }
// check file the time range of coverage // check file the time range of coverage
STimeWindow win = {0}; STimeWindow win = {0};
pReader->status.pCurrentFileSet = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index); pReader->status.pCurrentFileset = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index);
int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileSet); int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _err; goto _err;
} }
@ -357,12 +368,11 @@ static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader*
// tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); // tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files // current file are not overlapped with query time window, ignore remain files
if ((ASCENDING_TRAVERSE(order) && win.skey > pReader->window.ekey) || // if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.ekey)) {
(!ASCENDING_TRAVERSE(order) && win.ekey < pReader->window.ekey)) { // tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, // pReader->window.skey, pReader->window.ekey, pReader->idStr);
pReader->window.skey, pReader->window.ekey, pReader->idStr); // return false;
return false; // }
}
return true; return true;
@ -370,10 +380,11 @@ static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader*
return false; return false;
} }
static void resetDataBlockIterator(SDataBlockIter* pIter) { static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
pIter->index = -1; pIter->order = order;
pIter->index = -1;
pIter->numOfBlocks = -1; pIter->numOfBlocks = -1;
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
} }
static void initReaderStatus(SReaderStatus* pStatus) { static void initReaderStatus(SReaderStatus* pStatus) {
@ -403,6 +414,12 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->type = pCond->type; pReader->type = pCond->type;
pReader->window = *pCond->twindows; pReader->window = *pCond->twindows;
#if 1
if (pReader->window.skey > pReader->window.ekey) {
TSWAP(pReader->window.skey, pReader->window.ekey);
}
#endif
if (pReader->suid != 0) { if (pReader->suid != 0) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1); pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1);
ASSERT(pReader->pSchema); ASSERT(pReader->pSchema);
@ -435,8 +452,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
} }
STsdbFSState* pFState = pReader->pTsdb->fs->cState; STsdbFSState* pFState = pReader->pTsdb->fs->cState;
initFileIterator(&pReader->status.fileIter, pFState, pReader->idStr); initFileIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
resetDataBlockIterator(&pReader->status.blockIter); resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
// no data in files, let's try buffer in memory // no data in files, let's try buffer in memory
if (pReader->status.fileIter.numOfFiles == 0) { if (pReader->status.fileIter.numOfFiles == 0) {
@ -708,6 +725,7 @@ _end:
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) { static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
int32_t code = 0; int32_t code = 0;
bool asc = ASCENDING_TRAVERSE(pReader->order);
SArray *aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); SArray *aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
@ -735,10 +753,8 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
continue; continue;
} }
if ((ASCENDING_TRAVERSE(pReader->order) && if ((asc && (pBlockIdx->minKey > pReader->window.ekey || pBlockIdx->maxKey < pReader->window.skey)) /*||
(pBlockIdx->minKey > pReader->window.ekey || pBlockIdx->maxKey < pReader->window.skey)) || (!asc && (pBlockIdx->minKey > pReader->window.skey || pBlockIdx->maxKey < pReader->window.ekey))*/) {
(!ASCENDING_TRAVERSE(pReader->order) &&
(pBlockIdx->minKey > pReader->window.skey || pBlockIdx->maxKey < pReader->window.ekey))) {
continue; continue;
} }
@ -783,10 +799,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
} }
// 1. time range check // 1. time range check
if ((ASCENDING_TRAVERSE(pReader->order) && if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
(block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey)) ||
(!ASCENDING_TRAVERSE(pReader->order) &&
(block.minKey.ts > pReader->window.skey || block.maxKey.ts < pReader->window.ekey))) {
continue; continue;
} }
@ -811,11 +824,12 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void setBlockDumpCompleted(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int32_t order) { // todo remove pblock parameter
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int32_t order) {
int32_t step = ASCENDING_TRAVERSE(order)? 1:-1; int32_t step = ASCENDING_TRAVERSE(order)? 1:-1;
pDumpInfo->rowIndex = pBlock->nRow; pDumpInfo->allDumped = true;
pDumpInfo->totalRows = pBlock->nRow; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(order)? 0:pBlock->nRow-1;
pDumpInfo->lastKey = pBlock->maxKey.ts + step; pDumpInfo->lastKey = pBlock->maxKey.ts + step;
} }
@ -833,6 +847,7 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_
} }
} }
// todo consider the output buffer size
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
@ -842,6 +857,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
int32_t numOfCols = blockDataGetNumOfCols(pResBlock); int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
uint8_t *pb = NULL, *pb1 = NULL; uint8_t *pb = NULL, *pb1 = NULL;
int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, pBlockData, &pb, &pb1); int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, pBlockData, &pb, &pb1);
@ -852,29 +868,49 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SColVal cv = {0}; SColVal cv = {0};
int32_t colIndex = 0; int32_t colIndex = 0;
for (int32_t i = 0; i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aColDataP); ++i) { bool asc = ASCENDING_TRAVERSE(pReader->order);
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i); int32_t step = asc ? 1 : -1;
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { int32_t rowIndex = 0;
for (int32_t j = 0; j < pBlockData->nRow; ++j) {
colDataAppend(pColData, j, (const char*)&pBlockData->aTSKEY[j], false);
}
} else {
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, colIndex);
if (pData->cid == pColData->info.colId) { int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);
for (int32_t j = 0; j < pBlockData->nRow; ++j) {
tColDataGetValue(pData, j, &cv); int32_t i = 0;
doCopyColVal(pColData, j, i, &cv, pSupInfo); SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
} if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
colIndex += 1; for (int32_t j = pDumpInfo->rowIndex; j < pBlockData->nRow && j >= 0; j += step) {
} else { // the specified column does not exist in file block, fill with null data colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
colDataAppendNNULL(pColData, 0, pBlockData->nRow);
}
} }
i += 1;
}
while(i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aColDataP)) {
rowIndex = 0;
pColData = taosArrayGet(pResBlock->pDataBlock, i);
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, colIndex);
if (pData->cid == pColData->info.colId) {
for (int32_t j = pDumpInfo->rowIndex; j < pBlockData->nRow && j >= 0; j += step) {
tColDataGetValue(pData, j, &cv);
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
}
colIndex += 1;
} else { // the specified column does not exist in file block, fill with null data
colDataAppendNNULL(pColData, 0, remain);
}
ASSERT(rowIndex == remain);
i += 1;
}
while(i < numOfCols) {
pColData = taosArrayGet(pResBlock->pDataBlock, i);
colDataAppendNNULL(pColData, 0, remain);
i += 1;
} }
pResBlock->info.rows = pBlockData->nRow; pResBlock->info.rows = pBlockData->nRow;
setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlock, pReader->order); setBlockAllDumped(&pReader->status.fBlockDumpInfo, pBlock, pReader->order);
int64_t elapsedTime = (taosGetTimestampUs() - st); int64_t elapsedTime = (taosGetTimestampUs() - st);
pReader->cost.blockLoadTime += elapsedTime; pReader->cost.blockLoadTime += elapsedTime;
@ -1875,8 +1911,9 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
} }
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) { static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
pBlockIter->numOfBlocks = numOfBlocks; bool asc = ASCENDING_TRAVERSE(pReader->order);
pBlockIter->numOfBlocks = numOfBlocks;
// access data blocks according to the offset of each block in asc/desc order. // access data blocks according to the offset of each block in asc/desc order.
int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap); int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
@ -1932,12 +1969,13 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted %s", tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted %s",
pReader, cnt, pReader->idStr); pReader, cnt, pReader->idStr);
pBlockIter->index = 0; pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables, pReader->idStr); tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables, pReader->idStr);
assert(cnt <= numOfBlocks && sup.numOfTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0 // the pTableQueryInfo[j]->numOfBlocks may be 0
assert(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
SMultiwayMergeTreeInfo* pTree = NULL; SMultiwayMergeTreeInfo* pTree = NULL;
@ -1968,21 +2006,22 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
cleanupBlockOrderSupporter(&sup); cleanupBlockOrderSupporter(&sup);
taosMemoryFree(pTree); taosMemoryFree(pTree);
pBlockIter->index = 0; pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool blockIteratorNext(SDataBlockIter* pBlockIter) { static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
if (pBlockIter->index >= pBlockIter->numOfBlocks - 1) { bool asc = ASCENDING_TRAVERSE(pBlockIter->order);
int32_t step = asc? 1:-1;
if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
return false; return false;
} }
pBlockIter->index += 1; pBlockIter->index += step;
return true; return true;
} }
// static int32_t getFirstFileDataBlock(STsdbReader* pTsdbReadHandle, bool* exists);
//static int32_t getDataBlock(STsdbReader* pTsdbReadHandle, SFileBlockInfo* pNext, bool* exists) { //static int32_t getDataBlock(STsdbReader* pTsdbReadHandle, SFileBlockInfo* pNext, bool* exists) {
// int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; // int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
// SQueryFilePos* cur = &pTsdbReadHandle->cur; // SQueryFilePos* cur = &pTsdbReadHandle->cur;
@ -2099,10 +2138,14 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
// return (numOfRows - startRow) / bucketRange; // return (numOfRows - startRow) / bucketRange;
// } // }
// query ended in/started from current block /**
static int32_t dataBlockPartialRequired(STimeWindow* pWindow, SBlock* pBlock) { * This is an two rectangles overlap cases.
return ((pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) || */
(pWindow->skey <= pBlock->maxKey.ts && pWindow->skey > pBlock->minKey.ts)); static int32_t dataBlockPartialRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) {
return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
(pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
(pVerRange->minVer > pBlock->minVersion && pVerRange->minVer <= pBlock->maxVersion) ||
(pVerRange->maxVer < pBlock->maxVersion && pVerRange->maxVer >= pBlock->minVersion);
} }
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
@ -2131,9 +2174,12 @@ static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVer
return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer); return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer);
} }
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) { static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) {
return (dataBlockPartialRequired(&pReader->window, pBlock) || overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo) || return (dataBlockPartialRequired(&pReader->window, &pReader->verRange, pBlock) ||
keyOverlapFileBlock(key, pBlock, &pReader->verRange)); overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo) ||
keyOverlapFileBlock(key, pBlock, &pReader->verRange) ||
(pBlock->nRow > pReader->capacity));
} }
static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) { static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) {
@ -2326,45 +2372,51 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->verRange.minVer};
TSDBKEY startKey = {0};
if (ASCENDING_TRAVERSE(pReader->order)) {
startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
} else {
startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
}
int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
STbData* d = NULL; STbData* d = NULL;
if (pReader->pTsdb->mem != NULL) { if (pReader->pTsdb->mem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d); tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d);
if (d != NULL) { if (d != NULL) {
code = tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter); code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pBlockScanInfo->memHasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter) != NULL); pBlockScanInfo->memHasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter) != NULL);
tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s", "-%" PRId64 " %s",
pReader, pBlockScanInfo->uid, pReader->window.skey, pReader->order, d->minKey, d->maxKey, pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
pReader->idStr);
} else { } else {
tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr); tstrerror(code), pReader->idStr);
return code; return code;
} }
} }
} else { } else {
tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr); tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
} }
STbData* di = NULL; STbData* di = NULL;
if (pReader->pTsdb->imem != NULL) { if (pReader->pTsdb->imem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di); tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di);
if (di != NULL) { if (di != NULL) {
code = tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter); code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pBlockScanInfo->imemHasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter) != NULL); pBlockScanInfo->imemHasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter) != NULL);
tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s", "-%" PRId64 " %s",
pReader, pBlockScanInfo->uid, pReader->window.skey, pReader->order, di->minKey, di->maxKey, pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
pReader->idStr);
} else { } else {
tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr); tstrerror(code), pReader->idStr);
return code; return code;
} }
} }
@ -2403,7 +2455,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
while (1) { while (1) {
bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader->order, pReader); bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
if (!hasNext) { // no data files on disk if (!hasNext) { // no data files on disk
break; break;
} }
@ -2473,7 +2525,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
pInfo->uid = pScanInfo->uid; pInfo->uid = pScanInfo->uid;
pInfo->window = (STimeWindow) {.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts}; pInfo->window = (STimeWindow) {.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
setComposedBlockFlag(pReader, false); setComposedBlockFlag(pReader, false);
setBlockDumpCompleted(&pStatus->fBlockDumpInfo, pBlock, pReader->order); setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock, pReader->order);
} }
return code; return code;
@ -2511,6 +2563,18 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
} }
} }
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
pDumpInfo->totalRows = pBlock->nRow;
pDumpInfo->allDumped = false;
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order)? 0: pBlock->nRow - 1;
}
static int32_t initForFirstBlockOfFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { static int32_t initForFirstBlockOfFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
int32_t code = moveToNextFile(pReader, &numOfBlocks); int32_t code = moveToNextFile(pReader, &numOfBlocks);
@ -2526,17 +2590,13 @@ static int32_t initForFirstBlockOfFile(STsdbReader* pReader, SDataBlockIter* pBl
// initialize the block iterator for a new fileset // initialize the block iterator for a new fileset
code = initBlockIterator(pReader, pBlockIter, numOfBlocks); code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
initBlockDumpInfo(pReader, pBlockIter);
return code; return code;
} }
static int32_t buildBlockFromFiles(STsdbReader* pReader) { static int32_t buildBlockFromFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SReaderStatus* pStatus = &pReader->status;
SFileSetIter* pFIter = &pStatus->fileIter;
SDataBlockIter* pBlockIter = &pReader->status.blockIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter;
ASSERT (pFIter->index < pFIter->numOfFiles);
if (pReader->status.blockIter.index == -1) { if (pReader->status.blockIter.index == -1) {
code = initForFirstBlockOfFile(pReader, pBlockIter); code = initForFirstBlockOfFile(pReader, pBlockIter);
@ -2555,10 +2615,12 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
// current block are exhausted, try the next file block // current block are exhausted, try the next file block
if (pDumpInfo->rowIndex >= pBlock->nRow) { if (pDumpInfo->allDumped) {
// try next data block in current file // try next data block in current file
bool hasNext = blockIteratorNext(&pReader->status.blockIter); bool hasNext = blockIteratorNext(&pReader->status.blockIter);
if (!hasNext) { // current file is exhausted, let's try the next file if (hasNext) { // current file is exhausted, let's try the next file
initBlockDumpInfo(pReader, pBlockIter);
} else {
code = initForFirstBlockOfFile(pReader, pBlockIter); code = initForFirstBlockOfFile(pReader, pBlockIter);
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
return code; return code;
@ -2611,6 +2673,10 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
// taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info); // taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
// } // }
static bool outofTimeWindow(int64_t ts, STimeWindow* pWindow, int32_t order) {
return (((ts > pWindow->ekey) && ASCENDING_TRAVERSE(order)) || ((ts < pWindow->skey) && ASCENDING_TRAVERSE(order)));
}
TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) { TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) {
if (!(*hasVal)) { if (!(*hasVal)) {
return NULL; return NULL;
@ -2619,7 +2685,7 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) {
TSDBROW* pRow = tsdbTbDataIterGet(pIter); TSDBROW* pRow = tsdbTbDataIterGet(pIter);
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key = TSDBROW_KEY(pRow);
if (key.ts > pReader->window.ekey) { if (outofTimeWindow(key.ts, &pReader->window, pReader->order)) {
*hasVal = false; *hasVal = false;
return NULL; return NULL;
} }
@ -2637,7 +2703,7 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) {
pRow = tsdbTbDataIterGet(pIter); pRow = tsdbTbDataIterGet(pIter);
key = TSDBROW_KEY(pRow); key = TSDBROW_KEY(pRow);
if (key.ts > pReader->window.ekey) { if (outofTimeWindow(key.ts, &pReader->window, pReader->order)) {
*hasVal = false; *hasVal = false;
return NULL; return NULL;
} }
@ -2657,7 +2723,7 @@ int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SR
TSDBROW* pRow = getValidRow(pIter, hasVal, pReader); TSDBROW* pRow = getValidRow(pIter, hasVal, pReader);
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
if (k.ts > ts) { if (k.ts != ts) {
break; break;
} }
@ -2806,6 +2872,13 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow
} }
} }
// set null value since current column does not exist in the "pSchema"
while(i < numOfCols) {
pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
colDataAppendNULL(pColInfoData, numOfRows);
i += 1;
}
pBlock->info.rows += 1; pBlock->info.rows += 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3212,12 +3285,12 @@ void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockI
int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) { int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
int32_t code = 0; int32_t code = 0;
*allHave = false; *allHave = false;
if (pReader->status.composedDataBlock) { if (pReader->status.composedDataBlock) {
*pBlockStatis = NULL; *pBlockStatis = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// SFileBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[c->slot]; // SFileBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[c->slot];
// assert((c->slot >= 0 && c->slot < pReader->numOfBlocks) || ((c->slot == pReader->numOfBlocks) && (c->slot == 0))); // assert((c->slot >= 0 && c->slot < pReader->numOfBlocks) || ((c->slot == pReader->numOfBlocks) && (c->slot == 0)));

View File

@ -150,6 +150,7 @@ endi
sql select count(a), count(b), count(c), count(d), count(e), count(f), count(g), count(h) from tb sql select count(a), count(b), count(c), count(d), count(e), count(f), count(g), count(h) from tb
if $data01 != 21 then if $data01 != 21 then
print expect 21, actual $data01
return -1 return -1
endi endi