From 5485c7fc9f5a0a7a6c8de17752aeafb7d971e8eb Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 16 Jun 2022 09:18:30 +0000 Subject: [PATCH] refact tsdb read --- source/dnode/vnode/inc/vnode.h | 22 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 554 ++++++++++++------------- 2 files changed, 281 insertions(+), 295 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index fddc4033f9..7290c50bca 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -110,7 +110,7 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur); // tsdb // typedef struct STsdb STsdb; -typedef void *STsdbReader; +typedef struct STsdbReader STsdbReader; #define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2 @@ -118,19 +118,19 @@ typedef void *STsdbReader; STsdbReader *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); +bool tsdbNextDataBlock(STsdbReader *pReader); +void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo); +int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave); +SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); +void tsdbResetReadHandle(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx); +void tsdbCleanupReadHandle(STsdbReader *pReader); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); -int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list); -int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list); -void *tsdbGetIdx(SMeta *pMeta); -void *tsdbGetIvtIdx(SMeta *pMeta); int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); -bool tsdbNextDataBlock(STsdbReader pTsdbReadHandle); -void tsdbRetrieveDataBlockInfo(STsdbReader *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); -int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave); -SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); -void tsdbResetReadHandle(STsdbReader queryHandle, SQueryTableDataCond *pCond, int32_t tWinIdx); -void tsdbCleanupReadHandle(STsdbReader queryHandle); +int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list); +int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list); +void *tsdbGetIdx(SMeta *pMeta); +void *tsdbGetIvtIdx(SMeta *pMeta); // tq diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 2fbfcddb1d..b398d7dbbd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -104,7 +104,7 @@ typedef struct SBlockLoadSuppInfo { int32_t* slotIds; // colId to slotId } SBlockLoadSuppInfo; -typedef struct STsdbReadHandle { +struct STsdbReader { STsdb* pTsdb; uint64_t suid; SQueryFilePos cur; // current position @@ -126,9 +126,9 @@ typedef struct STsdbReadHandle { int32_t loadType; // block load type char* idStr; // query info handle, for debug purpose int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows - SDFileSet* pFileGroup; - SFSIter fileIter; - SReadH rhelper; + SDFileSet* pFileGroup; + // SFSIter fileIter; + // SReadH rhelper; STableBlockInfo* pDataBlockInfo; SDataCols* pDataCols; // in order to hold current file data block int32_t allocSize; // allocated data block size @@ -139,24 +139,24 @@ typedef struct STsdbReadHandle { SArray* next; // next row which is after the query time window SIOCostSummary cost; STSchema* pSchema; -} STsdbReadHandle; +}; static STimeWindow updateLastrowForEachGroup(STableListInfo* pList); -static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pList); -static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle); +static int32_t checkForCachedLastRow(STsdbReader* pTsdbReadHandle, STableListInfo* pList); +static int32_t checkForCachedLast(STsdbReader* pTsdbReadHandle); // static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey); -static void changeQueryHandleForInterpQuery(tsdbReaderT pHandle); -static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock); +static void changeQueryHandleForInterpQuery(STsdbReader* pHandle); +static void doMergeTwoLevelData(STsdbReader* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock); static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, - STsdbReadHandle* pTsdbReadHandle); + STsdbReader* pTsdbReadHandle); static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2); -// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef); +// static int32_t doGetExternalRow(STsdbReader* pTsdbReadHandle, int16_t type, void* pMemRef); // static void* doFreeColumnInfoData(SArray* pColumnInfoData); // static void* destroyTableCheckInfo(SArray* pTableCheckInfo); -static bool tsdbGetExternalRow(tsdbReaderT pHandle); +static bool tsdbGetExternalRow(STsdbReader* pHandle); -static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, TSKEY winSKey, SRetention* retentions); +static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReader* pReadHandle, TSKEY winSKey, SRetention* retentions); static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { pBlockLoadInfo->slot = -1; @@ -169,7 +169,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { pCompBlockLoadInfo->fileId = -1; } -static SArray* getColumnIdList(STsdbReadHandle* pTsdbReadHandle) { +static SArray* getColumnIdList(STsdbReader* pTsdbReadHandle) { size_t numOfCols = QH_GET_NUM_OF_COLS(pTsdbReadHandle); assert(numOfCols <= TSDB_MAX_COLUMNS); @@ -182,7 +182,7 @@ static SArray* getColumnIdList(STsdbReadHandle* pTsdbReadHandle) { return pIdList; } -static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool loadTS) { +static SArray* getDefaultLoadColumns(STsdbReader* pTsdbReadHandle, bool loadTS) { SArray* pLocalIdList = getColumnIdList(pTsdbReadHandle); // check if the primary time stamp column needs to load @@ -197,18 +197,16 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load return pLocalIdList; } -int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) { - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle; - +int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { int64_t rows = 0; SMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable; if (pMemTable == NULL) { return rows; } - size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); + size_t size = taosArrayGetSize(pReader->pTableCheckInfo); for (int32_t i = 0; i < size; ++i) { - STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i); + STableCheckInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i); // if (pMemT && pCheckInfo->tableId < pMemT->maxTables) { // pMem = pMemT->tData[pCheckInfo->tableId]; @@ -222,7 +220,7 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) { return rows; } -static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pTableList) { +static SArray* createCheckInfoFromTableGroup(STsdbReader* pTsdbReadHandle, STableListInfo* pTableList) { size_t tableSize = taosArrayGetSize(pTableList->pTableList); assert(tableSize >= 1); @@ -258,7 +256,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S return pTableCheckInfo; } -static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) { +static void resetCheckInfo(STsdbReader* pTsdbReadHandle) { size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); assert(numOfTables >= 1); @@ -289,7 +287,7 @@ static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY s return pNew; } -static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) { +static bool emptyQueryTimewindow(STsdbReader* pTsdbReadHandle) { assert(pTsdbReadHandle != NULL); STimeWindow* w = &pTsdbReadHandle->window; @@ -301,13 +299,13 @@ static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) { // Update the query time window according to the data time to live(TTL) information, in order to avoid to return // the expired data to client, even it is queried already. static int64_t getEarliestValidTimestamp(STsdb* pTsdb) { - STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdb); + STsdbKeepCfg* pCfg = &pTsdb->keepCfg; int64_t now = taosGetTimestamp(pCfg->precision); return now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick } -static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableDataCond* pCond, int32_t tWinIdx) { +static void setQueryTimewindow(STsdbReader* pTsdbReadHandle, SQueryTableDataCond* pCond, int32_t tWinIdx) { pTsdbReadHandle->window = pCond->twindows[tWinIdx]; bool updateTs = false; @@ -333,7 +331,7 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData } } -static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, TSKEY winSKey, SRetention* retentions) { +static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReader* pReadHandle, TSKEY winSKey, SRetention* retentions) { if (VND_IS_RSMA(pVnode)) { int level = 0; int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision); @@ -369,8 +367,8 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, return VND_TSDB(pVnode); } -static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) { - STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle)); +static STsdbReader* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) { + STsdbReader* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReader)); if (pReadHandle == NULL) { goto _end; } @@ -397,9 +395,9 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId); pReadHandle->idStr = strdup(buf); - if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) { - goto _end; - } + // if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) { + // goto _end; + // } assert(pCond != NULL); setQueryTimewindow(pReadHandle, pCond, 0); @@ -457,7 +455,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo); - return (tsdbReaderT)pReadHandle; + return (STsdbReader*)pReadHandle; _end: tsdbCleanupReadHandle(pReadHandle); @@ -465,7 +463,7 @@ _end: return NULL; } -static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle) { +static int32_t setCurrentSchema(SVnode* pVnode, STsdbReader* pTsdbReadHandle) { STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0); int32_t sversion = 1; @@ -498,99 +496,45 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle return TSDB_CODE_SUCCESS; } -tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, - uint64_t taskId) { - STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); - if (pTsdbReadHandle == NULL) { - return NULL; - } - - if (emptyQueryTimewindow(pTsdbReadHandle)) { - return (tsdbReaderT*)pTsdbReadHandle; - } - - // todo apply the lastkey of table check to avoid to load header file - pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, tableList); - if (pTsdbReadHandle->pTableCheckInfo == NULL) { - // tsdbCleanupReadHandle(pTsdbReadHandle); - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return NULL; - } - - int32_t code = setCurrentSchema(pVnode, pTsdbReadHandle); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return NULL; - } - - int32_t numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn); - int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData; - - STSchema* pSchema = pTsdbReadHandle->pSchema; - - int32_t i = 0, j = 0; - while (i < numOfCols && j < pSchema->numOfCols) { - if (ids[i] == pSchema->columns[j].colId) { - pTsdbReadHandle->suppInfo.slotIds[i] = j; - i++; - j++; - } else if (ids[i] > pSchema->columns[j].colId) { - j++; - } else { - // tsdbCleanupReadHandle(pTsdbReadHandle); - terrno = TSDB_CODE_INVALID_PARA; - return NULL; - } - } - - tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pTsdbReadHandle, - taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList->pTableList), - pTsdbReadHandle->idStr); - - return (tsdbReaderT)pTsdbReadHandle; -} - -void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, int32_t tWinIdx) { - STsdbReadHandle* pTsdbReadHandle = queryHandle; - - if (emptyQueryTimewindow(pTsdbReadHandle)) { - if (pCond->order != pTsdbReadHandle->order) { - pTsdbReadHandle->order = pCond->order; - TSWAP(pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey); +void tsdbResetReadHandle(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) { + if (emptyQueryTimewindow(pReader)) { + if (pCond->order != pReader->order) { + pReader->order = pCond->order; + TSWAP(pReader->window.skey, pReader->window.ekey); } return; } - pTsdbReadHandle->order = pCond->order; - setQueryTimewindow(pTsdbReadHandle, pCond, tWinIdx); - pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL; - pTsdbReadHandle->cur.fid = -1; - pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER; - pTsdbReadHandle->checkFiles = true; - pTsdbReadHandle->activeIndex = 0; // current active table index - pTsdbReadHandle->locateStart = false; - pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows; + pReader->order = pCond->order; + setQueryTimewindow(pReader, pCond, tWinIdx); + pReader->type = TSDB_QUERY_TYPE_ALL; + pReader->cur.fid = -1; + pReader->cur.win = TSWINDOW_INITIALIZER; + pReader->checkFiles = true; + pReader->activeIndex = 0; // current active table index + pReader->locateStart = false; + pReader->loadExternalRow = pCond->loadExternalRows; if (ASCENDING_TRAVERSE(pCond->order)) { - assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey); + assert(pReader->window.skey <= pReader->window.ekey); } else { - assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey); + assert(pReader->window.skey >= pReader->window.ekey); } // allocate buffer in order to load data blocks from file - memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg)); - memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES); + memset(pReader->suppInfo.pstatis, 0, sizeof(SColumnDataAgg)); + memset(pReader->suppInfo.plist, 0, POINTER_BYTES); - tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo); - tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo); + tsdbInitDataBlockLoadInfo(&pReader->dataBlockLoadInfo); + tsdbInitCompBlockLoadInfo(&pReader->compBlockLoadInfo); - resetCheckInfo(pTsdbReadHandle); + resetCheckInfo(pReader); } -void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList, +void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList, int32_t tWinIdx) { - STsdbReadHandle* pTsdbReadHandle = queryHandle; + STsdbReader* pTsdbReadHandle = queryHandle; pTsdbReadHandle->order = pCond->order; pTsdbReadHandle->window = pCond->twindows[tWinIdx]; @@ -631,8 +575,8 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCon // pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); } -tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* pList, uint64_t qId, - uint64_t taskId) { +STsdbReader* tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* pList, uint64_t qId, + uint64_t taskId) { pCond->twindows[0] = updateLastrowForEachGroup(pList); // no qualified table @@ -640,7 +584,7 @@ tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableL return NULL; } - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbReaderOpen(pVnode, pCond, pList, qId, taskId); + STsdbReader* pTsdbReadHandle = (STsdbReader*)tsdbReaderOpen(pVnode, pCond, pList, qId, taskId); if (pTsdbReadHandle == NULL) { return NULL; } @@ -660,8 +604,8 @@ tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableL } #if 0 -tsdbReaderT tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemTable* pMemRef) { - STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTablesT(tsdb, pCond, groupList, qId, pMemRef); +STsdbReader * tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemTable* pMemRef) { + STsdbReader *pTsdbReadHandle = (STsdbReader*) tsdbQueryTablesT(tsdb, pCond, groupList, qId, pMemRef); if (pTsdbReadHandle == NULL) { return NULL; } @@ -680,10 +624,10 @@ tsdbReaderT tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableG } #endif -SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) { +SArray* tsdbGetQueriedTableList(STsdbReader** pHandle) { assert(pHandle != NULL); - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle; + STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle; size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); SArray* res = taosArrayInit(size, POINTER_BYTES); @@ -723,7 +667,7 @@ SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) { // return pNew; //} -// tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, +// STsdbReader * tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, // uint64_t qId, uint64_t taskId) { // STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList); // @@ -739,14 +683,14 @@ SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) { // } // } // -// STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pNew, qId, taskId); +// STsdbReader* pTsdbReadHandle = (STsdbReader*)tsdbQueryTables(pVnode, pCond, pNew, qId, taskId); // pTsdbReadHandle->loadExternalRow = true; // pTsdbReadHandle->currentLoadExternalRows = true; // // return pTsdbReadHandle; //} -static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pCheckInfo) { +static bool initTableMemIterator(STsdbReader* pHandle, STableCheckInfo* pCheckInfo) { if (pCheckInfo->initBuf) { return true; } @@ -1011,7 +955,7 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) { return hasNext; } -static bool hasMoreDataInCache(STsdbReadHandle* pHandle) { +static bool hasMoreDataInCache(STsdbReader* pHandle) { STsdbCfg* pCfg = REPO_CFG(pHandle->pTsdb); size_t size = taosArrayGetSize(pHandle->pTableCheckInfo); assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1); @@ -1102,7 +1046,7 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s return midSlot; } -static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) { +static int32_t loadBlockInfo(STsdbReader* pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) { int32_t code = 0; STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index); @@ -1178,7 +1122,7 @@ static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, in return 0; } -static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfBlocks) { +static int32_t getFileCompInfo(STsdbReader* pTsdbReadHandle, int32_t* numOfBlocks) { // load all the comp offset value for all tables in this file int32_t code = TSDB_CODE_SUCCESS; *numOfBlocks = 0; @@ -1210,7 +1154,7 @@ static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfB return code; } -static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, +static int32_t doLoadFileDataBlock(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) { int64_t st = taosGetTimestampUs(); @@ -1282,14 +1226,14 @@ _error: return terrno; } -static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo); -static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, - int32_t start, int32_t end); -static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle); -static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, +static int32_t getEndPosInDataBlock(STsdbReader* pTsdbReadHandle, SDataBlockInfo* pBlockInfo); +static int32_t doCopyRowsFromFileBlock(STsdbReader* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, + int32_t end); +static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle); +static void copyAllRemainRowsFromFileBlock(STsdbReader* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos); -static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) { +static int32_t handleDataMergeIfNeeded(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) { SQueryFilePos* cur = &pTsdbReadHandle->cur; STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); @@ -1398,7 +1342,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); -static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, +static int32_t loadFileDataBlock(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) { SQueryFilePos* cur = &pTsdbReadHandle->cur; int32_t code = TSDB_CODE_SUCCESS; @@ -1517,8 +1461,8 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { return midPos; } -static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, - int32_t start, int32_t end) { +static int32_t doCopyRowsFromFileBlock(STsdbReader* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, + int32_t end) { SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; TSKEY* tsArray = pCols->cols[0].pData; @@ -1618,7 +1562,7 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t * @param lastRowKey * @return int32_t The quantity of rows appended */ -static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t* curRow, STSRow* row1, +static int32_t mergeTwoRowFromMem(STsdbReader* pTsdbReadHandle, int32_t capacity, int32_t* curRow, STSRow* row1, STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2, bool update, TSKEY* lastRowKey) { #if 1 @@ -1813,8 +1757,8 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa #endif } -static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos, - int32_t numOfExisted, int32_t* start, int32_t* end) { +static void getQualifiedRowsPos(STsdbReader* pTsdbReadHandle, int32_t startPos, int32_t endPos, int32_t numOfExisted, + int32_t* start, int32_t* end) { *start = -1; if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { @@ -1839,7 +1783,7 @@ static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startP } } -static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows, +static void updateInfoAfterMerge(STsdbReader* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows, int32_t endPos) { SQueryFilePos* cur = &pTsdbReadHandle->cur; @@ -1849,7 +1793,7 @@ static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckIn cur->pos = endPos; } -static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle) { +static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle) { SQueryFilePos* cur = &pTsdbReadHandle->cur; if (cur->rows > 0) { @@ -1870,7 +1814,7 @@ static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle) { } } -static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, +static void copyAllRemainRowsFromFileBlock(STsdbReader* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) { SQueryFilePos* cur = &pTsdbReadHandle->cur; @@ -1907,7 +1851,7 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa pTsdbReadHandle->idStr); } -int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) { +int32_t getEndPosInDataBlock(STsdbReader* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) { // NOTE: reverse the order to find the end position in data block int32_t endPos = -1; bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order); @@ -1956,7 +1900,7 @@ int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* p // only return the qualified data to client in terms of query time window, data rows in the same block but do not // be included in the query time window will be discarded -static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) { +static void doMergeTwoLevelData(STsdbReader* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) { SQueryFilePos* cur = &pTsdbReadHandle->cur; SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); @@ -2308,7 +2252,7 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1; } -static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) { +static int32_t createDataBlocksInfo(STsdbReader* pTsdbReadHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) { size_t size = sizeof(STableBlockInfo) * numOfBlocks; if (pTsdbReadHandle->allocSize < size) { @@ -2425,9 +2369,9 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu return TSDB_CODE_SUCCESS; } -static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists); +static int32_t getFirstFileDataBlock(STsdbReader* pTsdbReadHandle, bool* exists); -static int32_t getDataBlock(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) { +static int32_t getDataBlock(STsdbReader* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) { int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; SQueryFilePos* cur = &pTsdbReadHandle->cur; @@ -2450,7 +2394,7 @@ static int32_t getDataBlock(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* p } } -static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists) { +static int32_t getFirstFileDataBlock(STsdbReader* pTsdbReadHandle, bool* exists) { pTsdbReadHandle->numOfBlocks = 0; SQueryFilePos* cur = &pTsdbReadHandle->cur; @@ -2544,7 +2488,7 @@ static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool asc return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav); } -static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) { +static void moveToNextDataBlockInCurrentFile(STsdbReader* pTsdbReadHandle) { int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; SQueryFilePos* cur = &pTsdbReadHandle->cur; @@ -2559,25 +2503,23 @@ static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t num return (numOfRows - startRow) / bucketRange; } -int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) { - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle; - +int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) { pTableBlockInfo->totalSize = 0; pTableBlockInfo->totalRows = 0; - STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb); + STsdbFS* pFileHandle = REPO_FS(pReader->pTsdb); // find the start data block in file - pTsdbReadHandle->locateStart = true; - STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb); - int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision); + pReader->locateStart = true; + STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pReader->pTsdb); + int32_t fid = getFileIdFromKey(pReader->window.skey, pCfg->days, pCfg->precision); tsdbRLockFS(pFileHandle); - tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order); - tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid); + tsdbFSIterInit(&pReader->fileIter, pFileHandle, pReader->order); + tsdbFSIterSeek(&pReader->fileIter, fid); tsdbUnLockFS(pFileHandle); - STsdbCfg* pc = REPO_CFG(pTsdbReadHandle->pTsdb); + STsdbCfg* pc = REPO_CFG(pReader->pTsdb); pTableBlockInfo->defMinRows = pc->minRows; pTableBlockInfo->defMaxRows = pc->maxRows; @@ -2587,50 +2529,50 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* int32_t code = TSDB_CODE_SUCCESS; int32_t numOfBlocks = 0; - int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); + int32_t numOfTables = (int32_t)taosArrayGetSize(pReader->pTableCheckInfo); int defaultRows = 4096; STimeWindow win = TSWINDOW_INITIALIZER; while (true) { numOfBlocks = 0; - tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); + tsdbRLockFS(REPO_FS(pReader->pTsdb)); - if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) { - tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); + if ((pReader->pFileGroup = tsdbFSIterNext(&pReader->fileIter)) == NULL) { + tsdbUnLockFS(REPO_FS(pReader->pTsdb)); break; } - tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); + tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pReader->pFileGroup->fid, &win.skey, &win.ekey); // current file are not overlapped with query time window, ignore remain files - if ((win.skey > pTsdbReadHandle->window.ekey) /* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) { - tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); - tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle, - pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr); - pTsdbReadHandle->pFileGroup = NULL; + if ((win.skey > pReader->window.ekey) /* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) { + tsdbUnLockFS(REPO_FS(pReader->pTsdb)); + tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, + pReader->window.skey, pReader->window.ekey, pReader->idStr); + pReader->pFileGroup = NULL; break; } pTableBlockInfo->numOfFiles += 1; - if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) { - tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); + if (tsdbSetAndOpenReadFSet(&pReader->rhelper, pReader->pFileGroup) < 0) { + tsdbUnLockFS(REPO_FS(pReader->pTsdb)); code = terrno; break; } - tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); + tsdbUnLockFS(REPO_FS(pReader->pTsdb)); - if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) { + if (tsdbLoadBlockIdx(&pReader->rhelper) < 0) { code = terrno; break; } - if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) { + if ((code = getFileCompInfo(pReader, &numOfBlocks)) != TSDB_CODE_SUCCESS) { break; } - tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables, - pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr); + tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables, + pReader->pFileGroup->fid, pReader->idStr); if (numOfBlocks == 0) { continue; @@ -2639,7 +2581,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo->numOfBlocks += numOfBlocks; for (int32_t i = 0; i < numOfTables; ++i) { - STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i); + STableCheckInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i); SBlock* pBlock = pCheckInfo->pCompInfo->blocks; @@ -2671,7 +2613,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* return code; } -static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exists) { +static int32_t getDataBlocksInFiles(STsdbReader* pTsdbReadHandle, bool* exists) { STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb); SQueryFilePos* cur = &pTsdbReadHandle->cur; @@ -2718,7 +2660,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis } } -static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) { +static bool doHasDataInBuffer(STsdbReader* pTsdbReadHandle) { size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); while (pTsdbReadHandle->activeIndex < numOfTables) { @@ -2733,9 +2675,9 @@ static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) { } // todo not unref yet, since it is not support multi-group interpolation query -static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) { +static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) { // filter the queried time stamp in the first place - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle; + STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle; // starts from the buffer in case of descending timestamp order check data blocks size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); @@ -2766,7 +2708,7 @@ static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) { } static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, - STsdbReadHandle* pTsdbReadHandle) { + STsdbReader* pTsdbReadHandle) { int numOfRows = 0; int curRows = 0; int32_t numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns); @@ -2885,7 +2827,7 @@ static void destroyHelper(void* param) { #define TSDB_PREV_ROW 0x1 #define TSDB_NEXT_ROW 0x2 -static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) { +static bool loadBlockOfActiveTable(STsdbReader* pTsdbReadHandle) { if (pTsdbReadHandle->checkFiles) { // check if the query range overlaps with the file data block bool exists = true; @@ -2897,7 +2839,7 @@ static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) { } if (exists) { - tsdbRetrieveDataBlock((tsdbReaderT*)pTsdbReadHandle, NULL); + tsdbRetrieveDataBlock((STsdbReader**)pTsdbReadHandle, NULL); if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) { SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0); assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey); @@ -2935,7 +2877,7 @@ static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) { return false; } -static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) { +static bool loadCachedLastRow(STsdbReader* pTsdbReadHandle) { // the last row is cached in buffer, return it directly. // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)); @@ -2975,7 +2917,7 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) { return false; } -// static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) { +// static bool loadCachedLast(STsdbReader* pTsdbReadHandle) { // // the last row is cached in buffer, return it directly. // // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER // int32_t tgNumOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle); @@ -3125,7 +3067,7 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) { // return false; //} -static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) { +static bool loadDataBlockFromTableSeq(STsdbReader* pTsdbReadHandle) { size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); assert(numOfTables > 0); @@ -3155,19 +3097,16 @@ static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) { } // handle data in cache situation -// bool tsdbNextDataBlock(tsdbReaderT pHandle, uint64_t uid) -bool tsdbNextDataBlock(tsdbReaderT pHandle) { - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle; - - size_t numOfCols = taosArrayGetSize(pTsdbReadHandle->pColumns); +// bool tsdbNextDataBlock(STsdbReader * pHandle, uint64_t uid) +bool tsdbNextDataBlock(STsdbReader* pReader) { + size_t numOfCols = taosArrayGetSize(pReader->pColumns); for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); - colInfoDataCleanup(pColInfo, pTsdbReadHandle->outputCapacity); + SColumnInfoData* pColInfo = taosArrayGet(pReader->pColumns, i); + colInfoDataCleanup(pColInfo, pReader->outputCapacity); } - if (emptyQueryTimewindow(pTsdbReadHandle)) { - tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle, - pTsdbReadHandle->idStr); + if (emptyQueryTimewindow(pReader)) { + tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr); return false; } @@ -3175,50 +3114,50 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) { int64_t elapsedTime = stime; // TODO refactor: remove "type" - if (pTsdbReadHandle->type == TSDB_QUERY_TYPE_LAST) { - if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LASTROW) { + if (pReader->type == TSDB_QUERY_TYPE_LAST) { + if (pReader->cachelastrow == TSDB_CACHED_TYPE_LASTROW) { // return loadCachedLastRow(pTsdbReadHandle); - } else if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LAST) { + } else if (pReader->cachelastrow == TSDB_CACHED_TYPE_LAST) { // return loadCachedLast(pTsdbReadHandle); } } - if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) { - return loadDataBlockFromTableSeq(pTsdbReadHandle); + if (pReader->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) { + return loadDataBlockFromTableSeq(pReader); } else { // loadType == RR and Offset Order - if (pTsdbReadHandle->checkFiles) { + if (pReader->checkFiles) { // check if the query range overlaps with the file data block bool exists = true; - int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists); + int32_t code = getDataBlocksInFiles(pReader, &exists); if (code != TSDB_CODE_SUCCESS) { - pTsdbReadHandle->activeIndex = 0; - pTsdbReadHandle->checkFiles = false; + pReader->activeIndex = 0; + pReader->checkFiles = false; return false; } if (exists) { - pTsdbReadHandle->cost.checkForNextTime += (taosGetTimestampUs() - stime); + pReader->cost.checkForNextTime += (taosGetTimestampUs() - stime); return exists; } - pTsdbReadHandle->activeIndex = 0; - pTsdbReadHandle->checkFiles = false; + pReader->activeIndex = 0; + pReader->checkFiles = false; } // TODO: opt by consider the scan order - bool ret = doHasDataInBuffer(pTsdbReadHandle); + bool ret = doHasDataInBuffer(pReader); terrno = TSDB_CODE_SUCCESS; elapsedTime = taosGetTimestampUs() - stime; - pTsdbReadHandle->cost.checkForNextTime += elapsedTime; + pReader->cost.checkForNextTime += elapsedTime; return ret; } } -// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, SMemTable* pMemRef) { -// STsdbReadHandle* pSecQueryHandle = NULL; +// static int32_t doGetExternalRow(STsdbReader* pTsdbReadHandle, int16_t type, SMemTable* pMemRef) { +// STsdbReader* pSecQueryHandle = NULL; // // if (type == TSDB_PREV_ROW && pTsdbReadHandle->prev) { // return TSDB_CODE_SUCCESS; @@ -3327,9 +3266,9 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) { // return terrno; //} -bool tsdbGetExternalRow(tsdbReaderT pHandle) { - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle; - SQueryFilePos* cur = &pTsdbReadHandle->cur; +bool tsdbGetExternalRow(STsdbReader* pHandle) { + STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle; + SQueryFilePos* cur = &pTsdbReadHandle->cur; cur->fid = INT32_MIN; cur->mixBlock = true; @@ -3384,7 +3323,7 @@ bool tsdbGetExternalRow(tsdbReaderT pHandle) { // return code; //} -int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableListInfo* tableList) { +int32_t checkForCachedLastRow(STsdbReader* pTsdbReadHandle, STableListInfo* tableList) { assert(pTsdbReadHandle != NULL && tableList != NULL); // TSKEY key = TSKEY_INITIAL_VAL; @@ -3415,7 +3354,7 @@ int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableListInfo* return TSDB_CODE_SUCCESS; } -int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) { +int32_t checkForCachedLast(STsdbReader* pTsdbReadHandle) { assert(pTsdbReadHandle != NULL); int32_t code = 0; @@ -3497,23 +3436,22 @@ STimeWindow updateLastrowForEachGroup(STableListInfo* pList) { return window; } -void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDataBlockInfo) { - STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle; - SQueryFilePos* cur = &pHandle->cur; +void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { + SQueryFilePos* cur = &pReader->cur; uint64_t uid = 0; // there are data in file - if (pHandle->cur.fid != INT32_MIN) { - STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot]; + if (pReader->cur.fid != INT32_MIN) { + STableBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[cur->slot]; uid = pBlockInfo->pTableCheckInfo->tableId; } else { - STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); + STableCheckInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, pReader->activeIndex); uid = pCheckInfo->tableId; } tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows, - cur->win.skey, cur->win.ekey, pHandle->idStr); + cur->win.skey, cur->win.ekey, pReader->idStr); pDataBlockInfo->uid = uid; @@ -3529,18 +3467,17 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa /* * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL */ -int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDataAgg*** pBlockStatis, bool* allHave) { - STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle; +int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) { *allHave = false; - SQueryFilePos* c = &pHandle->cur; + SQueryFilePos* c = &pReader->cur; if (c->mixBlock) { *pBlockStatis = NULL; return TSDB_CODE_SUCCESS; } - STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot]; - assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0))); + STableBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[c->slot]; + assert((c->slot >= 0 && c->slot < pReader->numOfBlocks) || ((c->slot == pReader->numOfBlocks) && (c->slot == 0))); // file block with sub-blocks has no statistics data if (pBlockInfo->compBlock->numOfSubBlocks > 1) { @@ -3549,7 +3486,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDat } int64_t stime = taosGetTimestampUs(); - int statisStatus = tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock); + int statisStatus = tsdbLoadBlockStatis(&pReader->rhelper, pBlockInfo->compBlock); if (statisStatus < TSDB_STATIS_OK) { return terrno; } else if (statisStatus > TSDB_STATIS_OK) { @@ -3557,85 +3494,84 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDat return TSDB_CODE_SUCCESS; } - tsdbDebug("vgId:%d, succeed to load block statis part for uid %" PRIu64, REPO_ID(pHandle->pTsdb), - TSDB_READ_TABLE_UID(&pHandle->rhelper)); + tsdbDebug("vgId:%d, succeed to load block statis part for uid %" PRIu64, REPO_ID(pReader->pTsdb), + TSDB_READ_TABLE_UID(&pReader->rhelper)); - int16_t* colIds = pHandle->suppInfo.defaultLoadColumn->pData; + int16_t* colIds = pReader->suppInfo.defaultLoadColumn->pData; - size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle); - memset(pHandle->suppInfo.plist, 0, numOfCols * POINTER_BYTES); - memset(pHandle->suppInfo.pstatis, 0, numOfCols * sizeof(SColumnDataAgg)); + size_t numOfCols = QH_GET_NUM_OF_COLS(pReader); + memset(pReader->suppInfo.plist, 0, numOfCols * POINTER_BYTES); + memset(pReader->suppInfo.pstatis, 0, numOfCols * sizeof(SColumnDataAgg)); for (int32_t i = 0; i < numOfCols; ++i) { - pHandle->suppInfo.pstatis[i].colId = colIds[i]; + pReader->suppInfo.pstatis[i].colId = colIds[i]; } *allHave = true; - tsdbGetBlockStatis(&pHandle->rhelper, pHandle->suppInfo.pstatis, (int)numOfCols, pBlockInfo->compBlock); + tsdbGetBlockStatis(&pReader->rhelper, pReader->suppInfo.pstatis, (int)numOfCols, pBlockInfo->compBlock); // always load the first primary timestamp column data - SColumnDataAgg* pPrimaryColStatis = &pHandle->suppInfo.pstatis[0]; + SColumnDataAgg* pPrimaryColStatis = &pReader->suppInfo.pstatis[0]; assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID); pPrimaryColStatis->numOfNull = 0; pPrimaryColStatis->min = pBlockInfo->compBlock->minKey.ts; pPrimaryColStatis->max = pBlockInfo->compBlock->maxKey.ts; - pHandle->suppInfo.plist[0] = &pHandle->suppInfo.pstatis[0]; + pReader->suppInfo.plist[0] = &pReader->suppInfo.pstatis[0]; // update the number of NULL data rows - int32_t* slotIds = pHandle->suppInfo.slotIds; + int32_t* slotIds = pReader->suppInfo.slotIds; for (int32_t i = 1; i < numOfCols; ++i) { - ASSERT(colIds[i] == pHandle->pSchema->columns[slotIds[i]].colId); - if (IS_BSMA_ON(&(pHandle->pSchema->columns[slotIds[i]]))) { - if (pHandle->suppInfo.pstatis[i].numOfNull == -1) { // set the column data are all NULL - pHandle->suppInfo.pstatis[i].numOfNull = pBlockInfo->compBlock->numOfRows; + ASSERT(colIds[i] == pReader->pSchema->columns[slotIds[i]].colId); + if (IS_BSMA_ON(&(pReader->pSchema->columns[slotIds[i]]))) { + if (pReader->suppInfo.pstatis[i].numOfNull == -1) { // set the column data are all NULL + pReader->suppInfo.pstatis[i].numOfNull = pBlockInfo->compBlock->numOfRows; } - pHandle->suppInfo.plist[i] = &pHandle->suppInfo.pstatis[i]; + pReader->suppInfo.plist[i] = &pReader->suppInfo.pstatis[i]; } else { *allHave = false; } } int64_t elapsed = taosGetTimestampUs() - stime; - pHandle->cost.statisInfoLoadTime += elapsed; + pReader->cost.statisInfoLoadTime += elapsed; - *pBlockStatis = pHandle->suppInfo.plist; + *pBlockStatis = pReader->suppInfo.plist; return TSDB_CODE_SUCCESS; } -SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) { +SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { /** * In the following two cases, the data has been loaded to SColumnInfoData. * 1. data is from cache, 2. data block is not completed qualified to query time range */ - STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle; - if (pHandle->cur.fid == INT32_MIN) { - return pHandle->pColumns; + if (pReader->cur.fid == INT32_MIN) { + return pReader->pColumns; } else { - STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot]; + STableBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[pReader->cur.slot]; STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; - if (pHandle->cur.mixBlock) { - return pHandle->pColumns; + if (pReader->cur.mixBlock) { + return pReader->pColumns; } else { SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock); - assert(pHandle->realNumOfRows <= binfo.rows); + assert(pReader->realNumOfRows <= binfo.rows); // data block has been loaded, todo extract method - SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo; + SDataBlockLoadInfo* pBlockLoadInfo = &pReader->dataBlockLoadInfo; - if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid && + if (pBlockLoadInfo->slot == pReader->cur.slot && pBlockLoadInfo->fileGroup->fid == pReader->cur.fid && pBlockLoadInfo->uid == pCheckInfo->tableId) { - return pHandle->pColumns; + return pReader->pColumns; } else { // only load the file block SBlock* pBlock = pBlockInfo->compBlock; - if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) { + if (doLoadFileDataBlock(pReader, pBlock, pCheckInfo, pReader->cur.slot) != TSDB_CODE_SUCCESS) { return NULL; } - int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1); - return pHandle->pColumns; + int32_t numOfRows = doCopyRowsFromFileBlock(pReader, pReader->outputCapacity, 0, 0, pBlock->numOfRows - 1); + return pReader->pColumns; } } } @@ -3680,46 +3616,96 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) { return NULL; } -void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle; - if (pTsdbReadHandle == NULL) { +void tsdbCleanupReadHandle(STsdbReader* pReader) { + if (pReader == NULL) { return; } - pTsdbReadHandle->pColumns = doFreeColumnInfoData(pTsdbReadHandle->pColumns); + pReader->pColumns = doFreeColumnInfoData(pReader->pColumns); - taosArrayDestroy(pTsdbReadHandle->suppInfo.defaultLoadColumn); - taosMemoryFreeClear(pTsdbReadHandle->pDataBlockInfo); - taosMemoryFreeClear(pTsdbReadHandle->suppInfo.pstatis); - taosMemoryFreeClear(pTsdbReadHandle->suppInfo.plist); - taosMemoryFree(pTsdbReadHandle->suppInfo.slotIds); + taosArrayDestroy(pReader->suppInfo.defaultLoadColumn); + taosMemoryFreeClear(pReader->pDataBlockInfo); + taosMemoryFreeClear(pReader->suppInfo.pstatis); + taosMemoryFreeClear(pReader->suppInfo.plist); + taosMemoryFree(pReader->suppInfo.slotIds); - if (!emptyQueryTimewindow(pTsdbReadHandle)) { + if (!emptyQueryTimewindow(pReader)) { // tsdbMayUnTakeMemSnapshot(pTsdbReadHandle); } else { - assert(pTsdbReadHandle->pTableCheckInfo == NULL); + assert(pReader->pTableCheckInfo == NULL); } - if (pTsdbReadHandle->pTableCheckInfo != NULL) { - pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo); + if (pReader->pTableCheckInfo != NULL) { + pReader->pTableCheckInfo = destroyTableCheckInfo(pReader->pTableCheckInfo); } - tsdbDestroyReadH(&pTsdbReadHandle->rhelper); + tsdbDestroyReadH(&pReader->rhelper); - tdFreeDataCols(pTsdbReadHandle->pDataCols); - pTsdbReadHandle->pDataCols = NULL; + tdFreeDataCols(pReader->pDataCols); + pReader->pDataCols = NULL; - pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev); - pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); + pReader->prev = doFreeColumnInfoData(pReader->prev); + pReader->next = doFreeColumnInfoData(pReader->next); - SIOCostSummary* pCost = &pTsdbReadHandle->cost; + SIOCostSummary* pCost = &pReader->cost; tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64 " us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s", - pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, - pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr); + pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, + pCost->checkForNextTime, pReader->idStr); - taosMemoryFree(pTsdbReadHandle->idStr); - taosMemoryFree(pTsdbReadHandle->pSchema); - taosMemoryFreeClear(pTsdbReadHandle); + taosMemoryFree(pReader->idStr); + taosMemoryFree(pReader->pSchema); + taosMemoryFreeClear(pReader); } + +STsdbReader* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, + uint64_t taskId) { + STsdbReader* pReader = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); + if (pReader == NULL) { + return NULL; + } + + if (emptyQueryTimewindow(pReader)) { + return (STsdbReader*)pReader; + } + + // todo apply the lastkey of table check to avoid to load header file + pReader->pTableCheckInfo = createCheckInfoFromTableGroup(pReader, tableList); + if (pReader->pTableCheckInfo == NULL) { + // tsdbCleanupReadHandle(pTsdbReadHandle); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + int32_t code = setCurrentSchema(pVnode, pReader); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + + int32_t numOfCols = taosArrayGetSize(pReader->suppInfo.defaultLoadColumn); + int16_t* ids = pReader->suppInfo.defaultLoadColumn->pData; + + STSchema* pSchema = pReader->pSchema; + + int32_t i = 0, j = 0; + while (i < numOfCols && j < pSchema->numOfCols) { + if (ids[i] == pSchema->columns[j].colId) { + pReader->suppInfo.slotIds[i] = j; + i++; + j++; + } else if (ids[i] > pSchema->columns[j].colId) { + j++; + } else { + // tsdbCleanupReadHandle(pTsdbReadHandle); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + } + + tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pReader, + taosArrayGetSize(pReader->pTableCheckInfo), taosArrayGetSize(tableList->pTableList), pReader->idStr); + + return (STsdbReader*)pReader; +} \ No newline at end of file