This commit is contained in:
Hongze Cheng 2022-06-16 11:14:56 +00:00
parent f3baaf1a60
commit 760905f632
4 changed files with 353 additions and 350 deletions

View File

@ -118,12 +118,12 @@ typedef struct STsdbReader STsdbReader;
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
uint64_t taskId, STsdbReader **ppReader);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx);
void tsdbCleanupReadHandle(STsdbReader *pReader);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);

View File

@ -180,29 +180,6 @@ static SArray* getDefaultLoadColumns(STsdbReader* pTsdbReadHandle, bool loadTS)
return pLocalIdList;
}
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
int64_t rows = 0;
SMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable;
if (pMemTable == NULL) {
return rows;
}
size_t size = taosArrayGetSize(pReader->pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i);
// if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
// pMem = pMemT->tData[pCheckInfo->tableId];
// rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
// }
// if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
// pIMem = pIMemT->tData[pCheckInfo->tableId];
// rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
// }
}
return rows;
}
static SArray* createCheckInfoFromTableGroup(STsdbReader* pTsdbReadHandle, STableListInfo* pTableList) {
size_t tableSize = taosArrayGetSize(pTableList->pTableList);
assert(tableSize >= 1);
@ -441,7 +418,7 @@ static STsdbReader* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCo
return (STsdbReader*)pReadHandle;
_end:
tsdbCleanupReadHandle(pReadHandle);
tsdbReaderClose(pReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
@ -479,42 +456,6 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReader* pTsdbReadHandle) {
return TSDB_CODE_SUCCESS;
}
void tsdbResetReadHandle(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) {
if (emptyQueryTimewindow(pReader)) {
if (pCond->order != pReader->order) {
pReader->order = pCond->order;
TSWAP(pReader->window.skey, pReader->window.ekey);
}
return;
}
pReader->order = pCond->order;
setQueryTimewindow(pReader, pCond, tWinIdx);
pReader->type = TSDB_QUERY_TYPE_ALL;
pReader->cur.fid = -1;
pReader->cur.win = TSWINDOW_INITIALIZER;
pReader->checkFiles = true;
pReader->activeIndex = 0; // current active table index
pReader->locateStart = false;
pReader->loadExternalRow = pCond->loadExternalRows;
if (ASCENDING_TRAVERSE(pCond->order)) {
assert(pReader->window.skey <= pReader->window.ekey);
} else {
assert(pReader->window.skey >= pReader->window.ekey);
}
// allocate buffer in order to load data blocks from file
memset(pReader->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
memset(pReader->suppInfo.plist, 0, POINTER_BYTES);
tsdbInitDataBlockLoadInfo(&pReader->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pReader->compBlockLoadInfo);
resetCheckInfo(pReader);
}
void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
int32_t tWinIdx) {
STsdbReader* pTsdbReadHandle = queryHandle;
@ -550,7 +491,7 @@ void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCo
pTsdbReadHandle->pTableCheckInfo = NULL; // createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta,
// &pTable);
if (pTsdbReadHandle->pTableCheckInfo == NULL) {
// tsdbCleanupReadHandle(pTsdbReadHandle);
// tsdbReaderClose(pTsdbReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
}
@ -2477,116 +2418,6 @@ static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t num
return (numOfRows - startRow) / bucketRange;
}
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
pTableBlockInfo->totalSize = 0;
pTableBlockInfo->totalRows = 0;
STsdbFS* pFileHandle = REPO_FS(pReader->pTsdb);
// find the start data block in file
pReader->locateStart = true;
STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pReader->pTsdb);
int32_t fid = getFileIdFromKey(pReader->window.skey, pCfg->days, pCfg->precision);
tsdbRLockFS(pFileHandle);
tsdbFSIterInit(&pReader->fileIter, pFileHandle, pReader->order);
tsdbFSIterSeek(&pReader->fileIter, fid);
tsdbUnLockFS(pFileHandle);
STsdbCfg* pc = REPO_CFG(pReader->pTsdb);
pTableBlockInfo->defMinRows = pc->minRows;
pTableBlockInfo->defMaxRows = pc->maxRows;
int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
pTableBlockInfo->numOfFiles += 1;
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfBlocks = 0;
int32_t numOfTables = (int32_t)taosArrayGetSize(pReader->pTableCheckInfo);
int defaultRows = 4096;
STimeWindow win = TSWINDOW_INITIALIZER;
while (true) {
numOfBlocks = 0;
tsdbRLockFS(REPO_FS(pReader->pTsdb));
if ((pReader->pFileGroup = tsdbFSIterNext(&pReader->fileIter)) == NULL) {
tsdbUnLockFS(REPO_FS(pReader->pTsdb));
break;
}
tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pReader->pFileGroup->fid, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files
if ((win.skey > pReader->window.ekey) /* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) {
tsdbUnLockFS(REPO_FS(pReader->pTsdb));
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
pReader->window.skey, pReader->window.ekey, pReader->idStr);
pReader->pFileGroup = NULL;
break;
}
pTableBlockInfo->numOfFiles += 1;
if (tsdbSetAndOpenReadFSet(&pReader->rhelper, pReader->pFileGroup) < 0) {
tsdbUnLockFS(REPO_FS(pReader->pTsdb));
code = terrno;
break;
}
tsdbUnLockFS(REPO_FS(pReader->pTsdb));
if (tsdbLoadBlockIdx(&pReader->rhelper) < 0) {
code = terrno;
break;
}
if ((code = getFileCompInfo(pReader, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
break;
}
tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
pReader->pFileGroup->fid, pReader->idStr);
if (numOfBlocks == 0) {
continue;
}
pTableBlockInfo->numOfBlocks += numOfBlocks;
for (int32_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i);
SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
pTableBlockInfo->totalSize += pBlock[j].len;
int32_t numOfRows = pBlock[j].numOfRows;
pTableBlockInfo->totalRows += numOfRows;
if (numOfRows > pTableBlockInfo->maxRows) {
pTableBlockInfo->maxRows = numOfRows;
}
if (numOfRows < pTableBlockInfo->minRows) {
pTableBlockInfo->minRows = numOfRows;
}
if (numOfRows < defaultRows) {
pTableBlockInfo->numOfSmallBlocks += 1;
}
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
pTableBlockInfo->blockRowsHisto[bucketIndex]++;
}
}
}
pTableBlockInfo->numOfTables = numOfTables;
return code;
}
static int32_t getDataBlocksInFiles(STsdbReader* pTsdbReadHandle, bool* exists) {
STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
SQueryFilePos* cur = &pTsdbReadHandle->cur;
@ -3072,63 +2903,6 @@ static bool loadDataBlockFromTableSeq(STsdbReader* pTsdbReadHandle) {
// handle data in cache situation
// bool tsdbNextDataBlock(STsdbReader * pHandle, uint64_t uid)
bool tsdbNextDataBlock(STsdbReader* pReader) {
size_t numOfCols = taosArrayGetSize(pReader->pColumns);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pReader->pColumns, i);
colInfoDataCleanup(pColInfo, pReader->outputCapacity);
}
if (emptyQueryTimewindow(pReader)) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
return false;
}
int64_t stime = taosGetTimestampUs();
int64_t elapsedTime = stime;
// TODO refactor: remove "type"
if (pReader->type == TSDB_QUERY_TYPE_LAST) {
if (pReader->cachelastrow == TSDB_CACHED_TYPE_LASTROW) {
// return loadCachedLastRow(pTsdbReadHandle);
} else if (pReader->cachelastrow == TSDB_CACHED_TYPE_LAST) {
// return loadCachedLast(pTsdbReadHandle);
}
}
if (pReader->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
return loadDataBlockFromTableSeq(pReader);
} else { // loadType == RR and Offset Order
if (pReader->checkFiles) {
// check if the query range overlaps with the file data block
bool exists = true;
int32_t code = getDataBlocksInFiles(pReader, &exists);
if (code != TSDB_CODE_SUCCESS) {
pReader->activeIndex = 0;
pReader->checkFiles = false;
return false;
}
if (exists) {
pReader->cost.checkForNextTime += (taosGetTimestampUs() - stime);
return exists;
}
pReader->activeIndex = 0;
pReader->checkFiles = false;
}
// TODO: opt by consider the scan order
bool ret = doHasDataInBuffer(pReader);
terrno = TSDB_CODE_SUCCESS;
elapsedTime = taosGetTimestampUs() - stime;
pReader->cost.checkForNextTime += elapsedTime;
return ret;
}
}
// static int32_t doGetExternalRow(STsdbReader* pTsdbReadHandle, int16_t type, SMemTable* pMemRef) {
// STsdbReader* pSecQueryHandle = NULL;
@ -3236,7 +3010,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
// }
//
// out_of_memory:
// tsdbCleanupReadHandle(pSecQueryHandle);
// tsdbReaderClose(pSecQueryHandle);
// return terrno;
//}
@ -3410,6 +3184,204 @@ STimeWindow updateLastrowForEachGroup(STableListInfo* pList) {
return window;
}
/*
* return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
*/
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
return -1;
} else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) {
return 1;
} else {
ASSERT(false);
return 0;
}
}
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
if (pColumnInfoData == NULL) {
return NULL;
}
size_t cols = taosArrayGetSize(pColumnInfoData);
for (int32_t i = 0; i < cols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i);
colDataDestroy(pColInfo);
}
taosArrayDestroy(pColumnInfoData);
return NULL;
}
static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
size_t size = taosArrayGetSize(pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i);
destroyTableMemIterator(p);
taosMemoryFreeClear(p->pCompInfo);
}
taosArrayDestroy(pTableCheckInfo);
return NULL;
}
// ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId, STsdbReader** ppReader) {
int32_t code = 0;
STsdbReader* pReader = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
if (pReader == NULL) {
return NULL;
}
if (emptyQueryTimewindow(pReader)) {
return (STsdbReader*)pReader;
}
// todo apply the lastkey of table check to avoid to load header file
pReader->pTableCheckInfo = createCheckInfoFromTableGroup(pReader, tableList);
if (pReader->pTableCheckInfo == NULL) {
// tsdbReaderClose(pTsdbReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
int32_t code = setCurrentSchema(pVnode, pReader);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
int32_t numOfCols = taosArrayGetSize(pReader->suppInfo.defaultLoadColumn);
int16_t* ids = pReader->suppInfo.defaultLoadColumn->pData;
STSchema* pSchema = pReader->pSchema;
int32_t i = 0, j = 0;
while (i < numOfCols && j < pSchema->numOfCols) {
if (ids[i] == pSchema->columns[j].colId) {
pReader->suppInfo.slotIds[i] = j;
i++;
j++;
} else if (ids[i] > pSchema->columns[j].colId) {
j++;
} else {
// tsdbReaderClose(pTsdbReadHandle);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
}
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pReader,
taosArrayGetSize(pReader->pTableCheckInfo), taosArrayGetSize(tableList->pTableList), pReader->idStr);
return (STsdbReader*)pReader;
}
void tsdbReaderClose(STsdbReader* pReader) {
if (pReader == NULL) {
return;
}
pReader->pColumns = doFreeColumnInfoData(pReader->pColumns);
taosArrayDestroy(pReader->suppInfo.defaultLoadColumn);
taosMemoryFreeClear(pReader->pDataBlockInfo);
taosMemoryFreeClear(pReader->suppInfo.pstatis);
taosMemoryFreeClear(pReader->suppInfo.plist);
taosMemoryFree(pReader->suppInfo.slotIds);
if (!emptyQueryTimewindow(pReader)) {
// tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
} else {
assert(pReader->pTableCheckInfo == NULL);
}
if (pReader->pTableCheckInfo != NULL) {
pReader->pTableCheckInfo = destroyTableCheckInfo(pReader->pTableCheckInfo);
}
tsdbDestroyReadH(&pReader->rhelper);
tdFreeDataCols(pReader->pDataCols);
pReader->pDataCols = NULL;
pReader->prev = doFreeColumnInfoData(pReader->prev);
pReader->next = doFreeColumnInfoData(pReader->next);
SIOCostSummary* pCost = &pReader->cost;
tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
" us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime,
pCost->checkForNextTime, pReader->idStr);
taosMemoryFree(pReader->idStr);
taosMemoryFree(pReader->pSchema);
taosMemoryFreeClear(pReader);
}
bool tsdbNextDataBlock(STsdbReader* pReader) {
size_t numOfCols = taosArrayGetSize(pReader->pColumns);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pReader->pColumns, i);
colInfoDataCleanup(pColInfo, pReader->outputCapacity);
}
if (emptyQueryTimewindow(pReader)) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
return false;
}
int64_t stime = taosGetTimestampUs();
int64_t elapsedTime = stime;
// TODO refactor: remove "type"
if (pReader->type == TSDB_QUERY_TYPE_LAST) {
if (pReader->cachelastrow == TSDB_CACHED_TYPE_LASTROW) {
// return loadCachedLastRow(pTsdbReadHandle);
} else if (pReader->cachelastrow == TSDB_CACHED_TYPE_LAST) {
// return loadCachedLast(pTsdbReadHandle);
}
}
if (pReader->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
return loadDataBlockFromTableSeq(pReader);
} else { // loadType == RR and Offset Order
if (pReader->checkFiles) {
// check if the query range overlaps with the file data block
bool exists = true;
int32_t code = getDataBlocksInFiles(pReader, &exists);
if (code != TSDB_CODE_SUCCESS) {
pReader->activeIndex = 0;
pReader->checkFiles = false;
return false;
}
if (exists) {
pReader->cost.checkForNextTime += (taosGetTimestampUs() - stime);
return exists;
}
pReader->activeIndex = 0;
pReader->checkFiles = false;
}
// TODO: opt by consider the scan order
bool ret = doHasDataInBuffer(pReader);
terrno = TSDB_CODE_SUCCESS;
elapsedTime = taosGetTimestampUs() - stime;
pReader->cost.checkForNextTime += elapsedTime;
return ret;
}
}
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
SQueryFilePos* cur = &pReader->cur;
@ -3438,9 +3410,6 @@ void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockI
pDataBlockInfo->window = cur->win;
}
/*
* return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
*/
int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
*allHave = false;
@ -3551,137 +3520,171 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
}
}
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
return -1;
} else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) {
return 1;
} else {
ASSERT(false);
return 0;
}
}
void tsdbResetReadHandle(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) {
if (emptyQueryTimewindow(pReader)) {
if (pCond->order != pReader->order) {
pReader->order = pCond->order;
TSWAP(pReader->window.skey, pReader->window.ekey);
}
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
if (pColumnInfoData == NULL) {
return NULL;
}
size_t cols = taosArrayGetSize(pColumnInfoData);
for (int32_t i = 0; i < cols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i);
colDataDestroy(pColInfo);
}
taosArrayDestroy(pColumnInfoData);
return NULL;
}
static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
size_t size = taosArrayGetSize(pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i);
destroyTableMemIterator(p);
taosMemoryFreeClear(p->pCompInfo);
}
taosArrayDestroy(pTableCheckInfo);
return NULL;
}
void tsdbCleanupReadHandle(STsdbReader* pReader) {
if (pReader == NULL) {
return;
}
pReader->pColumns = doFreeColumnInfoData(pReader->pColumns);
pReader->order = pCond->order;
setQueryTimewindow(pReader, pCond, tWinIdx);
pReader->type = TSDB_QUERY_TYPE_ALL;
pReader->cur.fid = -1;
pReader->cur.win = TSWINDOW_INITIALIZER;
pReader->checkFiles = true;
pReader->activeIndex = 0; // current active table index
pReader->locateStart = false;
pReader->loadExternalRow = pCond->loadExternalRows;
taosArrayDestroy(pReader->suppInfo.defaultLoadColumn);
taosMemoryFreeClear(pReader->pDataBlockInfo);
taosMemoryFreeClear(pReader->suppInfo.pstatis);
taosMemoryFreeClear(pReader->suppInfo.plist);
taosMemoryFree(pReader->suppInfo.slotIds);
if (!emptyQueryTimewindow(pReader)) {
// tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
if (ASCENDING_TRAVERSE(pCond->order)) {
assert(pReader->window.skey <= pReader->window.ekey);
} else {
assert(pReader->pTableCheckInfo == NULL);
assert(pReader->window.skey >= pReader->window.ekey);
}
if (pReader->pTableCheckInfo != NULL) {
pReader->pTableCheckInfo = destroyTableCheckInfo(pReader->pTableCheckInfo);
}
// allocate buffer in order to load data blocks from file
memset(pReader->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
memset(pReader->suppInfo.plist, 0, POINTER_BYTES);
tsdbDestroyReadH(&pReader->rhelper);
tsdbInitDataBlockLoadInfo(&pReader->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pReader->compBlockLoadInfo);
tdFreeDataCols(pReader->pDataCols);
pReader->pDataCols = NULL;
pReader->prev = doFreeColumnInfoData(pReader->prev);
pReader->next = doFreeColumnInfoData(pReader->next);
SIOCostSummary* pCost = &pReader->cost;
tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
" us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime,
pCost->checkForNextTime, pReader->idStr);
taosMemoryFree(pReader->idStr);
taosMemoryFree(pReader->pSchema);
taosMemoryFreeClear(pReader);
resetCheckInfo(pReader);
}
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId, STsdbReader** ppReader) {
int32_t code = 0;
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
pTableBlockInfo->totalSize = 0;
pTableBlockInfo->totalRows = 0;
STsdbReader* pReader = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
if (pReader == NULL) {
return NULL;
}
STsdbFS* pFileHandle = REPO_FS(pReader->pTsdb);
if (emptyQueryTimewindow(pReader)) {
return (STsdbReader*)pReader;
}
// find the start data block in file
pReader->locateStart = true;
STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pReader->pTsdb);
int32_t fid = getFileIdFromKey(pReader->window.skey, pCfg->days, pCfg->precision);
// todo apply the lastkey of table check to avoid to load header file
pReader->pTableCheckInfo = createCheckInfoFromTableGroup(pReader, tableList);
if (pReader->pTableCheckInfo == NULL) {
// tsdbCleanupReadHandle(pTsdbReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
tsdbRLockFS(pFileHandle);
tsdbFSIterInit(&pReader->fileIter, pFileHandle, pReader->order);
tsdbFSIterSeek(&pReader->fileIter, fid);
tsdbUnLockFS(pFileHandle);
int32_t code = setCurrentSchema(pVnode, pReader);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
STsdbCfg* pc = REPO_CFG(pReader->pTsdb);
pTableBlockInfo->defMinRows = pc->minRows;
pTableBlockInfo->defMaxRows = pc->maxRows;
int32_t numOfCols = taosArrayGetSize(pReader->suppInfo.defaultLoadColumn);
int16_t* ids = pReader->suppInfo.defaultLoadColumn->pData;
int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
STSchema* pSchema = pReader->pSchema;
pTableBlockInfo->numOfFiles += 1;
int32_t i = 0, j = 0;
while (i < numOfCols && j < pSchema->numOfCols) {
if (ids[i] == pSchema->columns[j].colId) {
pReader->suppInfo.slotIds[i] = j;
i++;
j++;
} else if (ids[i] > pSchema->columns[j].colId) {
j++;
} else {
// tsdbCleanupReadHandle(pTsdbReadHandle);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfBlocks = 0;
int32_t numOfTables = (int32_t)taosArrayGetSize(pReader->pTableCheckInfo);
int defaultRows = 4096;
STimeWindow win = TSWINDOW_INITIALIZER;
while (true) {
numOfBlocks = 0;
tsdbRLockFS(REPO_FS(pReader->pTsdb));
if ((pReader->pFileGroup = tsdbFSIterNext(&pReader->fileIter)) == NULL) {
tsdbUnLockFS(REPO_FS(pReader->pTsdb));
break;
}
tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pReader->pFileGroup->fid, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files
if ((win.skey > pReader->window.ekey) /* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) {
tsdbUnLockFS(REPO_FS(pReader->pTsdb));
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
pReader->window.skey, pReader->window.ekey, pReader->idStr);
pReader->pFileGroup = NULL;
break;
}
pTableBlockInfo->numOfFiles += 1;
if (tsdbSetAndOpenReadFSet(&pReader->rhelper, pReader->pFileGroup) < 0) {
tsdbUnLockFS(REPO_FS(pReader->pTsdb));
code = terrno;
break;
}
tsdbUnLockFS(REPO_FS(pReader->pTsdb));
if (tsdbLoadBlockIdx(&pReader->rhelper) < 0) {
code = terrno;
break;
}
if ((code = getFileCompInfo(pReader, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
break;
}
tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
pReader->pFileGroup->fid, pReader->idStr);
if (numOfBlocks == 0) {
continue;
}
pTableBlockInfo->numOfBlocks += numOfBlocks;
for (int32_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i);
SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
pTableBlockInfo->totalSize += pBlock[j].len;
int32_t numOfRows = pBlock[j].numOfRows;
pTableBlockInfo->totalRows += numOfRows;
if (numOfRows > pTableBlockInfo->maxRows) {
pTableBlockInfo->maxRows = numOfRows;
}
if (numOfRows < pTableBlockInfo->minRows) {
pTableBlockInfo->minRows = numOfRows;
}
if (numOfRows < defaultRows) {
pTableBlockInfo->numOfSmallBlocks += 1;
}
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
pTableBlockInfo->blockRowsHisto[bucketIndex]++;
}
}
}
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pReader,
taosArrayGetSize(pReader->pTableCheckInfo), taosArrayGetSize(tableList->pTableList), pReader->idStr);
pTableBlockInfo->numOfTables = numOfTables;
return code;
}
return (STsdbReader*)pReader;
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
int64_t rows = 0;
SMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable;
if (pMemTable == NULL) {
return rows;
}
size_t size = taosArrayGetSize(pReader->pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i);
// if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
// pMem = pMemT->tData[pCheckInfo->tableId];
// rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
// }
// if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
// pIMem = pIMemT->tData[pCheckInfo->tableId];
// rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
// }
}
return rows;
}

View File

@ -4519,7 +4519,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
if (code) {
tsdbCleanupReadHandle(pDataReader);
tsdbReaderClose(pDataReader);
pTaskInfo->code = terrno;
return NULL;
}
@ -4528,7 +4528,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys);
if (code) {
tsdbCleanupReadHandle(pDataReader);
tsdbReaderClose(pDataReader);
pTaskInfo->code = terrno;
return NULL;
}
@ -4575,7 +4575,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys);
if (code) {
tsdbCleanupReadHandle(pDataReader);
tsdbReaderClose(pDataReader);
return NULL;
}
@ -4900,8 +4900,8 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
}
SArray* extractPartitionColInfo(SNodeList* pNodeList) {
if(!pNodeList) {
return NULL;
if (!pNodeList) {
return NULL;
}
size_t numOfCols = LIST_LENGTH(pNodeList);

View File

@ -522,7 +522,7 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
blockDataDestroy(pTableScanInfo->pResBlock);
clearupQueryTableDataCond(&pTableScanInfo->cond);
tsdbCleanupReadHandle(pTableScanInfo->dataReader);
tsdbReaderClose(pTableScanInfo->dataReader);
if (pTableScanInfo->pColMatchInfo != NULL) {
taosArrayDestroy(pTableScanInfo->pColMatchInfo);
@ -2222,7 +2222,7 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
tsdbCleanupReadHandle(reader);
tsdbReaderClose(reader);
}
taosArrayDestroy(pTableScanInfo->dataReaders);