Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact
This commit is contained in:
commit
dd537af153
|
@ -215,8 +215,6 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
|
||||
ASSERT(numOfTables >= 1);
|
||||
|
||||
// allocate buffer in order to load data blocks from file
|
||||
// todo use simple hash instead
|
||||
SHashObj* pTableMap = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
|
@ -265,23 +263,9 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
|
|||
// }
|
||||
// }
|
||||
|
||||
// // only one table, not need to sort again
|
||||
// static SArray* createCheckInfoFromCheckInfo(STableBlockScanInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
|
||||
// SArray* pNew = taosArrayInit(1, sizeof(STableBlockScanInfo));
|
||||
|
||||
// STableBlockScanInfo info = {.lastKey = skey};
|
||||
|
||||
// info.tableId = pCheckInfo->tableId;
|
||||
// taosArrayPush(pNew, &info);
|
||||
// return pNew;
|
||||
// }
|
||||
|
||||
// todo
|
||||
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow, int32_t order) {
|
||||
ASSERT(pWindow != NULL);
|
||||
bool asc = ASCENDING_TRAVERSE(order);
|
||||
return false;
|
||||
// return ((asc && pWindow->skey > pWindow->ekey) || (!asc && pWindow->ekey > pWindow->skey));
|
||||
return pWindow->skey > pWindow->ekey;
|
||||
}
|
||||
|
||||
// // Update the query time window according to the data time to live(TTL) information, in order to avoid to return
|
||||
|
@ -348,7 +332,7 @@ static int32_t initFileIterator(SFilesetIter* pIter, const STsdbFSState* pFState
|
|||
|
||||
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
|
||||
bool asc = ASCENDING_TRAVERSE(pIter->order);
|
||||
int32_t step = asc? 1:-1;
|
||||
int32_t step = asc ? 1 : -1;
|
||||
pIter->index += step;
|
||||
|
||||
if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
|
||||
|
@ -357,26 +341,36 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
|
|||
|
||||
// check file the time range of coverage
|
||||
STimeWindow win = {0};
|
||||
pReader->status.pCurrentFileset = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index);
|
||||
|
||||
while(1) {
|
||||
pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
|
||||
|
||||
int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// todo file range check
|
||||
// tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
|
||||
int32_t fid = pReader->status.pCurrentFileset->fid;
|
||||
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
|
||||
|
||||
// current file are not overlapped with query time window, ignore remain files
|
||||
// if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.ekey)) {
|
||||
// tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
|
||||
// pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
||||
// return false;
|
||||
// }
|
||||
// current file are no longer overlapped with query time window, ignore remain files
|
||||
if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
|
||||
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
|
||||
pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
||||
return false;
|
||||
}
|
||||
|
||||
if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
|
||||
pIter->index += step;
|
||||
continue;
|
||||
}
|
||||
|
||||
tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, fid,
|
||||
pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
||||
return true;
|
||||
}
|
||||
|
||||
_err:
|
||||
_err:
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -394,6 +388,29 @@ static void initReaderStatus(SReaderStatus* pStatus) {
|
|||
pStatus->loadFromFile = true;
|
||||
}
|
||||
|
||||
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
|
||||
SSDataBlock* pResBlock = createDataBlock();
|
||||
if (pResBlock == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||
SColumnInfoData colInfo = {{0}, 0};
|
||||
colInfo.info = pCond->colList[i];
|
||||
blockDataAppendColInfo(pResBlock, &colInfo);
|
||||
}
|
||||
|
||||
int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
taosMemoryFree(pResBlock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pResBlock;
|
||||
}
|
||||
|
||||
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, const char* idstr) {
|
||||
int32_t code = 0;
|
||||
STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
|
||||
|
@ -427,29 +444,26 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
|||
|
||||
// todo remove this
|
||||
setQueryTimewindow(pReader, pCond, 0);
|
||||
ASSERT (pCond->numOfCols > 0);
|
||||
|
||||
if (pCond->numOfCols > 0) {
|
||||
limitOutputBufferSize(pCond, &pReader->capacity);
|
||||
|
||||
// allocate buffer in order to load data blocks from file
|
||||
pReader->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
|
||||
if (pReader->suppInfo.pstatis == NULL) {
|
||||
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
||||
pSup->pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
|
||||
pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
|
||||
if (pSup->pstatis == NULL || pSup->plist == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
pReader->pResBlock = createDataBlock();
|
||||
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||
SColumnInfoData colInfo = {{0}, 0};
|
||||
colInfo.info = pCond->colList[i];
|
||||
blockDataAppendColInfo(pReader->pResBlock, &colInfo);
|
||||
pReader->pResBlock = createResBlock(pCond, pReader->capacity);
|
||||
if (pReader->pResBlock == NULL) {
|
||||
code = terrno;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
blockDataEnsureCapacity(pReader->pResBlock, pReader->capacity);
|
||||
|
||||
setColumnIdSlotList(pReader, pReader->pResBlock);
|
||||
pReader->suppInfo.plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
|
||||
}
|
||||
|
||||
STsdbFSState* pFState = pReader->pTsdb->fs->cState;
|
||||
initFileIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
|
||||
|
@ -829,7 +843,6 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int
|
|||
int32_t step = ASCENDING_TRAVERSE(order)? 1:-1;
|
||||
|
||||
pDumpInfo->allDumped = true;
|
||||
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(order)? 0:pBlock->nRow-1;
|
||||
pDumpInfo->lastKey = pBlock->maxKey.ts + step;
|
||||
}
|
||||
|
||||
|
@ -847,6 +860,90 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||
|
||||
SBlockData* pBlockData = &pStatus->fileBlockData;
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
|
||||
|
||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
SColVal cv = {0};
|
||||
int32_t colIndex = 0;
|
||||
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
||||
int32_t step = asc ? 1 : -1;
|
||||
|
||||
int32_t rowIndex = 0;
|
||||
int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);
|
||||
|
||||
int32_t endIndex = 0;
|
||||
if (remain <= pReader->capacity) {
|
||||
endIndex = pBlockData->nRow;
|
||||
} else {
|
||||
endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
|
||||
remain = pReader->capacity;
|
||||
}
|
||||
|
||||
int32_t i = 0;
|
||||
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
||||
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
|
||||
colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
|
||||
while(i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aColDataP)) {
|
||||
rowIndex = 0;
|
||||
pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
||||
|
||||
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, colIndex);
|
||||
|
||||
if (pData->cid == pColData->info.colId) {
|
||||
for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
|
||||
tColDataGetValue(pData, j, &cv);
|
||||
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
|
||||
}
|
||||
colIndex += 1;
|
||||
} else { // the specified column does not exist in file block, fill with null data
|
||||
colDataAppendNNULL(pColData, 0, remain);
|
||||
}
|
||||
|
||||
ASSERT(rowIndex == remain);
|
||||
i += 1;
|
||||
}
|
||||
|
||||
while(i < numOfCols) {
|
||||
pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
||||
colDataAppendNNULL(pColData, 0, remain);
|
||||
i += 1;
|
||||
}
|
||||
|
||||
pResBlock->info.rows = remain;
|
||||
pDumpInfo->rowIndex += step*remain;
|
||||
|
||||
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
|
||||
|
||||
int64_t elapsedTime = (taosGetTimestampUs() - st);
|
||||
pReader->cost.blockLoadTime += elapsedTime;
|
||||
|
||||
int32_t unDumpedRows = asc? pBlock->nRow - pDumpInfo->rowIndex: pDumpInfo->rowIndex + 1;
|
||||
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
||||
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
|
||||
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
|
||||
pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// todo consider the output buffer size
|
||||
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
@ -865,59 +962,13 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
|||
goto _error;
|
||||
}
|
||||
|
||||
SColVal cv = {0};
|
||||
int32_t colIndex = 0;
|
||||
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
||||
int32_t step = asc ? 1 : -1;
|
||||
int32_t rowIndex = 0;
|
||||
|
||||
int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);
|
||||
|
||||
int32_t i = 0;
|
||||
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
||||
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
for (int32_t j = pDumpInfo->rowIndex; j < pBlockData->nRow && j >= 0; j += step) {
|
||||
colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
|
||||
while(i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aColDataP)) {
|
||||
rowIndex = 0;
|
||||
pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
||||
|
||||
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, colIndex);
|
||||
|
||||
if (pData->cid == pColData->info.colId) {
|
||||
for (int32_t j = pDumpInfo->rowIndex; j < pBlockData->nRow && j >= 0; j += step) {
|
||||
tColDataGetValue(pData, j, &cv);
|
||||
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
|
||||
}
|
||||
colIndex += 1;
|
||||
} else { // the specified column does not exist in file block, fill with null data
|
||||
colDataAppendNNULL(pColData, 0, remain);
|
||||
}
|
||||
|
||||
ASSERT(rowIndex == remain);
|
||||
i += 1;
|
||||
}
|
||||
|
||||
while(i < numOfCols) {
|
||||
pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
||||
colDataAppendNNULL(pColData, 0, remain);
|
||||
i += 1;
|
||||
}
|
||||
|
||||
pResBlock->info.rows = pBlockData->nRow;
|
||||
setBlockAllDumped(&pReader->status.fBlockDumpInfo, pBlock, pReader->order);
|
||||
|
||||
int64_t elapsedTime = (taosGetTimestampUs() - st);
|
||||
pReader->cost.blockLoadTime += elapsedTime;
|
||||
|
||||
pDumpInfo->allDumped = false;
|
||||
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
||||
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
|
||||
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlockData->nRow,
|
||||
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
|
||||
pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -2022,29 +2073,6 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
|
|||
return true;
|
||||
}
|
||||
|
||||
//static int32_t getDataBlock(STsdbReader* pTsdbReadHandle, SFileBlockInfo* pNext, bool* exists) {
|
||||
// int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
|
||||
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
|
||||
//
|
||||
// while (1) {
|
||||
// int32_t code = loadFileDataBlock(pTsdbReadHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
|
||||
// if (code != TSDB_CODE_SUCCESS || *exists) {
|
||||
// return code;
|
||||
// }
|
||||
//
|
||||
// if ((cur->slot == pTsdbReadHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
|
||||
// (cur->slot == 0 && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
|
||||
// // all data blocks in current file has been checked already, try next file if exists
|
||||
// return getFirstFileDataBlock(pTsdbReadHandle, exists);
|
||||
// } else { // next block of the same file
|
||||
// cur->slot += step;
|
||||
// cur->mixBlock = false;
|
||||
// cur->blockCompleted = false;
|
||||
// pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
||||
// static int32_t getFirstFileDataBlock(STsdbReader* pTsdbReadHandle, bool* exists) {
|
||||
// pTsdbReadHandle->numOfBlocks = 0;
|
||||
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
|
||||
|
@ -2283,7 +2311,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
} else if (pBlockScanInfo->imemHasVal) {
|
||||
} else {
|
||||
if (pBlockScanInfo->imemHasVal) {
|
||||
TSDBKEY ik = TSDBROW_KEY(piRow);
|
||||
if (key <= ik.ts) {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
|
@ -2304,9 +2333,12 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
|
|||
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
} else { // pBlockScanInfo->memHasVal != NULL
|
||||
|
||||
if (pBlockScanInfo->memHasVal) { // pBlockScanInfo->memHasVal != NULL
|
||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||
if (key <= k.ts) {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
|
@ -2327,8 +2359,17 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
|
|||
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// imem & mem are all empty
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2343,8 +2384,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, SFileDataBlockInfo*
|
|||
|
||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
if (pBlockInfo->tbBlockIdx == pFBlock->tbBlockIdx) { // still in the same file block now
|
||||
|
||||
if (pDumpInfo->rowIndex >= pBlock->nRow) {
|
||||
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
|
||||
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -2497,10 +2538,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
|
||||
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
|
||||
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
|
||||
SBlockData data = {0};
|
||||
tBlockDataInit(&data);
|
||||
|
||||
code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data);
|
||||
tBlockDataInit(&pStatus->fileBlockData);
|
||||
code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -2736,32 +2775,64 @@ int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SR
|
|||
int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* pBlock, SBlockData* pBlockData,
|
||||
STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger) {
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
||||
|
||||
int32_t step = asc? 1:-1;
|
||||
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
|
||||
|
||||
if (asc) { // todo refactor
|
||||
if (pDumpInfo->rowIndex < pBlockData->nRow - 1) {
|
||||
if (pBlockData->aTSKEY[pDumpInfo->rowIndex + 1] == key) {
|
||||
int32_t rowIndex = pDumpInfo->rowIndex + 1;
|
||||
if (pBlockData->aTSKEY[pDumpInfo->rowIndex + step] == key) {
|
||||
int32_t rowIndex = pDumpInfo->rowIndex + step;
|
||||
|
||||
while (pBlockData->aTSKEY[rowIndex] == key) {
|
||||
if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer) {
|
||||
if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer || pBlockData->aVersion[rowIndex] < pReader->verRange.minVer) {
|
||||
continue;
|
||||
}
|
||||
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
|
||||
tRowMerge(pMerger, &fRow);
|
||||
rowIndex += 1;
|
||||
rowIndex += step;
|
||||
}
|
||||
|
||||
pDumpInfo->rowIndex = rowIndex;
|
||||
} else {
|
||||
pDumpInfo->rowIndex += step;
|
||||
}
|
||||
} else { // last row of current block, check if current block is overlapped with neighbor block
|
||||
pDumpInfo->rowIndex += 1;
|
||||
pDumpInfo->rowIndex += step;
|
||||
bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo);
|
||||
if (overlap) {
|
||||
// load next block
|
||||
if (overlap) { // load next block
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (pDumpInfo->rowIndex > 0) {
|
||||
if (pBlockData->aTSKEY[pDumpInfo->rowIndex + step] == key) {
|
||||
int32_t rowIndex = pDumpInfo->rowIndex + step;
|
||||
|
||||
while (pBlockData->aTSKEY[rowIndex] == key) {
|
||||
if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer ||
|
||||
pBlockData->aVersion[rowIndex] < pReader->verRange.minVer) {
|
||||
continue;
|
||||
}
|
||||
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
|
||||
tRowMerge(pMerger, &fRow);
|
||||
rowIndex += step;
|
||||
}
|
||||
|
||||
pDumpInfo->rowIndex = rowIndex;
|
||||
} else {
|
||||
pDumpInfo->rowIndex += step;
|
||||
}
|
||||
} else { // last row of current block, check if current block is overlapped with previous neighbor block
|
||||
pDumpInfo->rowIndex += step;
|
||||
bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo);
|
||||
if (overlap) { // load next block
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2889,24 +2960,12 @@ int32_t buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY max
|
|||
do {
|
||||
STSRow* pTSRow = NULL;
|
||||
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow);
|
||||
if (pTSRow == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
doAppendOneRow(pBlock, pReader, pTSRow);
|
||||
|
||||
if (pBlockScanInfo->memHasVal) {
|
||||
TSDBROW* pRow = tsdbTbDataIterGet(pBlockScanInfo->iter);
|
||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||
if (k.ts >= maxKey.ts) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pBlockScanInfo->imemHasVal) {
|
||||
TSDBROW* pRow = tsdbTbDataIterGet(pBlockScanInfo->iiter);
|
||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||
if (k.ts >= maxKey.ts) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// no data in buffer, return immediately
|
||||
if (!(pBlockScanInfo->memHasVal || pBlockScanInfo->imemHasVal)) {
|
||||
break;
|
||||
|
@ -3127,34 +3186,6 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
|||
goto _err;
|
||||
}
|
||||
|
||||
#if 0
|
||||
// int32_t code = setCurrentSchema(pVnode, pReader);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// terrno = code;
|
||||
// return NULL;
|
||||
// }
|
||||
|
||||
// int32_t numOfCols = taosArrayGetSize(pReader->suppInfo.defaultLoadColumn);
|
||||
// int16_t* ids = pReader->suppInfo.defaultLoadColumn->pData;
|
||||
|
||||
// STSchema* pSchema = pReader->pSchema;
|
||||
|
||||
// int32_t i = 0, j = 0;
|
||||
// while (i < numOfCols && j < pSchema->numOfCols) {
|
||||
// if (ids[i] == pSchema->columns[j].colId) {
|
||||
// pReader->suppInfo.slotIds[i] = j;
|
||||
// i++;
|
||||
// j++;
|
||||
// } else if (ids[i] > pSchema->columns[j].colId) {
|
||||
// j++;
|
||||
// } else {
|
||||
// // tsdbReaderClose(pTsdbReadHandle);
|
||||
// terrno = TSDB_CODE_INVALID_PARA;
|
||||
// return NULL;
|
||||
// }
|
||||
// }
|
||||
#endif
|
||||
|
||||
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
|
||||
return code;
|
||||
|
||||
|
@ -3362,7 +3393,8 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
|||
|
||||
if (pStatus->composedDataBlock) {
|
||||
return pReader->pResBlock->pDataBlock;
|
||||
} else {
|
||||
}
|
||||
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
|
||||
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
|
||||
|
@ -3378,8 +3410,8 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
|
||||
return pReader->pResBlock->pDataBlock;
|
||||
}
|
||||
}
|
||||
|
||||
void tsdbResetReadHandle(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) {
|
||||
|
|
|
@ -285,8 +285,11 @@ static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){
|
|||
|
||||
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
|
||||
if(pListInfo->pTableList == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (pListInfo->pTableList == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
uint64_t tableUid = pScanNode->uid;
|
||||
|
||||
|
@ -300,7 +303,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
|||
.metaEx = metaHandle, .idx = vnodeGetIdx(metaHandle), .ivtIdx = vnodeGetIvtIdx(metaHandle), .suid = tableUid};
|
||||
|
||||
SArray* res = taosArrayInit(8, sizeof(uint64_t));
|
||||
//code = doFilterTag(pTagIndexCond, &metaArg, res);
|
||||
// code = doFilterTag(pTagIndexCond, &metaArg, res);
|
||||
code = TSDB_CODE_INDEX_REBUILDING;
|
||||
if (code == TSDB_CODE_INDEX_REBUILDING) {
|
||||
code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList);
|
||||
|
@ -322,24 +325,27 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
|||
code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList);
|
||||
}
|
||||
|
||||
if(pTagCond){
|
||||
if (pTagCond) {
|
||||
int32_t i = 0;
|
||||
while(i < taosArrayGetSize(pListInfo->pTableList)) {
|
||||
while (i < taosArrayGetSize(pListInfo->pTableList)) {
|
||||
STableKeyInfo* info = taosArrayGet(pListInfo->pTableList, i);
|
||||
bool isOk = isTableOk(info, pTagCond, metaHandle);
|
||||
if(!isOk){
|
||||
if (!isOk) {
|
||||
taosArrayRemove(pListInfo->pTableList, i);
|
||||
continue;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
}
|
||||
}else { // Create one table group.
|
||||
} else { // Create one table group.
|
||||
STableKeyInfo info = {.lastKey = 0, .uid = tableUid, .groupId = 0};
|
||||
taosArrayPush(pListInfo->pTableList, &info);
|
||||
}
|
||||
|
||||
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);
|
||||
if(pListInfo->pGroupList == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
if(pListInfo->pGroupList == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
//put into list as default group, remove it if grouping sorting is required later
|
||||
taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList);
|
||||
|
|
Loading…
Reference in New Issue