diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e308c9c2b9..b506f7638b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -13,21 +13,23 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" -#include "tdatablock.h" #include "os.h" #include "talgo.h" #include "tcompare.h" +#include "tdatablock.h" #include "tdataformat.h" #include "texception.h" +#include "vnodeInt.h" +#include "filter.h" +#include "scalar.h" #include "taosdef.h" #include "tlosertree.h" -#include "vnodeInt.h" #include "tmsg.h" +#include "vnodeInt.h" -#define EXTRA_BYTES 2 -#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) +#define EXTRA_BYTES 2 +#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns))) #define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \ @@ -37,32 +39,32 @@ .uid = (_checkInfo)->tableId}) enum { - TSDB_QUERY_TYPE_ALL = 1, - TSDB_QUERY_TYPE_LAST = 2, + TSDB_QUERY_TYPE_ALL = 1, + TSDB_QUERY_TYPE_LAST = 2, }; enum { - TSDB_CACHED_TYPE_NONE = 0, + TSDB_CACHED_TYPE_NONE = 0, TSDB_CACHED_TYPE_LASTROW = 1, - TSDB_CACHED_TYPE_LAST = 2, + TSDB_CACHED_TYPE_LAST = 2, }; typedef struct SQueryFilePos { - int32_t fid; - int32_t slot; - int32_t pos; - int64_t lastKey; - int32_t rows; - bool mixBlock; - bool blockCompleted; + int32_t fid; + int32_t slot; + int32_t pos; + int64_t lastKey; + int32_t rows; + bool mixBlock; + bool blockCompleted; STimeWindow win; } SQueryFilePos; typedef struct SDataBlockLoadInfo { - SDFileSet* fileGroup; - int32_t slot; - uint64_t uid; - SArray* pLoadedCols; + SDFileSet* fileGroup; + int32_t slot; + uint64_t uid; + SArray* pLoadedCols; } SDataBlockLoadInfo; typedef struct SLoadCompBlockInfo { @@ -71,33 +73,33 @@ typedef struct SLoadCompBlockInfo { } SLoadCompBlockInfo; enum { - CHECKINFO_CHOSEN_MEM = 0, + CHECKINFO_CHOSEN_MEM = 0, CHECKINFO_CHOSEN_IMEM = 1, - CHECKINFO_CHOSEN_BOTH = 2 //for update=2(merge case) + CHECKINFO_CHOSEN_BOTH = 2 // for update=2(merge case) }; typedef struct STableCheckInfo { - uint64_t tableId; - TSKEY lastKey; - SBlockInfo* pCompInfo; - int32_t compSize; - int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks - uint8_t chosen:2; // indicate which iterator should move forward - bool initBuf:1; // whether to initialize the in-memory skip list iterator or not - SSkipListIterator* iter; // mem buffer skip list iterator - SSkipListIterator* iiter; // imem buffer skip list iterator + uint64_t tableId; + TSKEY lastKey; + SBlockInfo* pCompInfo; + int32_t compSize; + int32_t numOfBlocks : 29; // number of qualified data blocks not the original blocks + uint8_t chosen : 2; // indicate which iterator should move forward + bool initBuf : 1; // whether to initialize the in-memory skip list iterator or not + SSkipListIterator* iter; // mem buffer skip list iterator + SSkipListIterator* iiter; // imem buffer skip list iterator } STableCheckInfo; typedef struct STableBlockInfo { - SBlock *compBlock; - STableCheckInfo *pTableCheckInfo; + SBlock* compBlock; + STableCheckInfo* pTableCheckInfo; } STableBlockInfo; typedef struct SBlockOrderSupporter { - int32_t numOfTables; - STableBlockInfo** pDataBlockInfo; - int32_t* blockIndexArray; - int32_t* numOfBlocksPerTable; + int32_t numOfTables; + STableBlockInfo** pDataBlockInfo; + int32_t* blockIndexArray; + int32_t* numOfBlocksPerTable; } SBlockOrderSupporter; typedef struct SIOCostSummary { @@ -109,37 +111,37 @@ typedef struct SIOCostSummary { } SIOCostSummary; typedef struct STsdbReadHandle { - STsdb* pTsdb; - SQueryFilePos cur; // current position - int16_t order; - STimeWindow window; // the primary query time window that applies to all queries - SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time - int32_t numOfBlocks; - SArray* pColumns; // column list, SColumnInfoData array list - bool locateStart; - int32_t outputCapacity; - int32_t realNumOfRows; - SArray* pTableCheckInfo; // SArray - int32_t activeIndex; - bool checkFiles; // check file stage - int8_t cachelastrow; // check if last row cached - bool loadExternalRow; // load time window external data rows - bool currentLoadExternalRows; // current load external rows - int32_t loadType; // block load type - char *idStr; // query info handle, for debug purpose - int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows - SDFileSet* pFileGroup; - SFSIter fileIter; - SReadH rhelper; - STableBlockInfo* pDataBlockInfo; - SDataCols *pDataCols; // in order to hold current file data block - int32_t allocSize; // allocated data block size - SArray *defaultLoadColumn;// default load column - SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ - SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */ + STsdb* pTsdb; + SQueryFilePos cur; // current position + int16_t order; + STimeWindow window; // the primary query time window that applies to all queries + SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time + int32_t numOfBlocks; + SArray* pColumns; // column list, SColumnInfoData array list + bool locateStart; + int32_t outputCapacity; + int32_t realNumOfRows; + SArray* pTableCheckInfo; // SArray + int32_t activeIndex; + bool checkFiles; // check file stage + int8_t cachelastrow; // check if last row cached + bool loadExternalRow; // load time window external data rows + bool currentLoadExternalRows; // current load external rows + int32_t loadType; // block load type + char* idStr; // query info handle, for debug purpose + int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows + SDFileSet* pFileGroup; + SFSIter fileIter; + SReadH rhelper; + STableBlockInfo* pDataBlockInfo; + SDataCols* pDataCols; // in order to hold current file data block + int32_t allocSize; // allocated data block size + SArray* defaultLoadColumn; // default load column + SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ + SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */ - SArray *prev; // previous row which is before than time window - SArray *next; // next row which is after the query time window + SArray* prev; // previous row which is before than time window + SArray* next; // next row which is after the query time window SIOCostSummary cost; } STsdbReadHandle; @@ -149,24 +151,27 @@ typedef struct STableGroupSupporter { SSchema* pTagSchema; } STableGroupSupporter; -static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); -static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList); -static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle); +int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo); + +static STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList); +static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList); +static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle); // static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey); static void changeQueryHandleForInterpQuery(tsdbReaderT pHandle); static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); -static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbReadHandle* pTsdbReadHandle); +static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, + STsdbReadHandle* pTsdbReadHandle); static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2); -//static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef); -//static void* doFreeColumnInfoData(SArray* pColumnInfoData); -//static void* destroyTableCheckInfo(SArray* pTableCheckInfo); -static bool tsdbGetExternalRow(tsdbReaderT pHandle); +// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef); +// static void* doFreeColumnInfoData(SArray* pColumnInfoData); +// static void* destroyTableCheckInfo(SArray* pTableCheckInfo); +static bool tsdbGetExternalRow(tsdbReaderT pHandle); static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { pBlockLoadInfo->slot = -1; - pBlockLoadInfo->uid = 0; + pBlockLoadInfo->uid = 0; pBlockLoadInfo->fileGroup = NULL; } @@ -204,30 +209,32 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load } int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) { - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle; + STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle; - int64_t rows = 0; - STsdbMemTable* pMemTable = NULL;//pTsdbReadHandle->pMemTable; - if (pMemTable == NULL) { return rows; } + int64_t rows = 0; + STsdbMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable; + if (pMemTable == NULL) { + return rows; + } -// STableData* pMem = NULL; -// STableData* pIMem = NULL; + // STableData* pMem = NULL; + // STableData* pIMem = NULL; -// SMemTable* pMemT = pMemRef->snapshot.mem; -// SMemTable* pIMemT = pMemRef->snapshot.imem; + // SMemTable* pMemT = pMemRef->snapshot.mem; + // SMemTable* pIMemT = pMemRef->snapshot.imem; size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); for (int32_t i = 0; i < size; ++i) { STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i); -// if (pMemT && pCheckInfo->tableId < pMemT->maxTables) { -// pMem = pMemT->tData[pCheckInfo->tableId]; -// rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0; -// } -// if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) { -// pIMem = pIMemT->tData[pCheckInfo->tableId]; -// rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0; -// } + // if (pMemT && pCheckInfo->tableId < pMemT->maxTables) { + // pMem = pMemT->tData[pCheckInfo->tableId]; + // rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0; + // } + // if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) { + // pIMem = pIMemT->tData[pCheckInfo->tableId]; + // rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0; + // } } return rows; } @@ -244,15 +251,15 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S // todo apply the lastkey of table check to avoid to load header file for (int32_t i = 0; i < numOfGroup; ++i) { - SArray* group = *(SArray**) taosArrayGet(pGroupList->pGroupList, i); + SArray* group = *(SArray**)taosArrayGet(pGroupList->pGroupList, i); size_t gsize = taosArrayGetSize(group); assert(gsize > 0); for (int32_t j = 0; j < gsize; ++j) { - STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j); + STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(group, j); - STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid}; + STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid}; if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) { info.lastKey = pTsdbReadHandle->window.skey; @@ -264,7 +271,8 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S } taosArrayPush(pTableCheckInfo, &info); - tsdbDebug("%p check table uid:%"PRId64" from lastKey:%"PRId64" %s", pTsdbReadHandle, info.tableId, info.lastKey, pTsdbReadHandle->idStr); + tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, + info.lastKey, pTsdbReadHandle->idStr); } } @@ -279,10 +287,10 @@ static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) { // todo apply the lastkey of table check to avoid to load header file for (int32_t i = 0; i < numOfTables; ++i) { - STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i); + STableCheckInfo* pCheckInfo = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i); pCheckInfo->lastKey = pTsdbReadHandle->window.skey; - pCheckInfo->iter = tSkipListDestroyIter(pCheckInfo->iter); - pCheckInfo->iiter = tSkipListDestroyIter(pCheckInfo->iiter); + pCheckInfo->iter = tSkipListDestroyIter(pCheckInfo->iter); + pCheckInfo->iiter = tSkipListDestroyIter(pCheckInfo->iiter); pCheckInfo->initBuf = false; if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { @@ -297,7 +305,7 @@ static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) { static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) { SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo)); - STableCheckInfo info = { .lastKey = skey}; + STableCheckInfo info = {.lastKey = skey}; info.tableId = pCheckInfo->tableId; taosArrayPush(pNew, &info); @@ -308,7 +316,7 @@ static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) { assert(pTsdbReadHandle != NULL); STimeWindow* w = &pTsdbReadHandle->window; - bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order); + bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order); return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey)); } @@ -354,23 +362,23 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, goto _end; } - pReadHandle->order = pCond->order; - pReadHandle->pTsdb = tsdb; - pReadHandle->type = TSDB_QUERY_TYPE_ALL; - pReadHandle->cur.fid = INT32_MIN; - pReadHandle->cur.win = TSWINDOW_INITIALIZER; - pReadHandle->checkFiles = true; - pReadHandle->activeIndex = 0; // current active table index - pReadHandle->allocSize = 0; + pReadHandle->order = pCond->order; + pReadHandle->pTsdb = tsdb; + pReadHandle->type = TSDB_QUERY_TYPE_ALL; + pReadHandle->cur.fid = INT32_MIN; + pReadHandle->cur.win = TSWINDOW_INITIALIZER; + pReadHandle->checkFiles = true; + pReadHandle->activeIndex = 0; // current active table index + pReadHandle->allocSize = 0; pReadHandle->locateStart = false; - pReadHandle->loadType = pCond->type; + pReadHandle->loadType = pCond->type; - pReadHandle->outputCapacity = 4096;//((STsdb*)tsdb)->config.maxRowsPerFileBlock; + pReadHandle->outputCapacity = 4096; //((STsdb*)tsdb)->config.maxRowsPerFileBlock; pReadHandle->loadExternalRow = pCond->loadExternalRows; pReadHandle->currentLoadExternalRows = pCond->loadExternalRows; char buf[128] = {0}; - snprintf(buf, tListLen(buf), "TID:0x%"PRIx64" QID:0x%"PRIx64, taskId, qId); + snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId); pReadHandle->idStr = strdup(buf); if (tsdbInitReadH(&pReadHandle->rhelper, (STsdb*)tsdb) != 0) { @@ -421,37 +429,39 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, return (tsdbReaderT)pReadHandle; - _end: +_end: tsdbCleanupReadHandle(pReadHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; } -tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, uint64_t taskId) { +tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, + uint64_t taskId) { STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId); if (pTsdbReadHandle == NULL) { return NULL; } if (emptyQueryTimewindow(pTsdbReadHandle)) { - return (tsdbReaderT*) pTsdbReadHandle; + return (tsdbReaderT*)pTsdbReadHandle; } // todo apply the lastkey of table check to avoid to load header file pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList); if (pTsdbReadHandle->pTableCheckInfo == NULL) { -// tsdbCleanupReadHandle(pTsdbReadHandle); + // tsdbCleanupReadHandle(pTsdbReadHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; } - tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %"PRIzu" %s", pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), - taosArrayGetSize(groupList->pGroupList), pTsdbReadHandle->idStr); + tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %" PRIzu " %s", pTsdbReadHandle, + taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(groupList->pGroupList), + pTsdbReadHandle->idStr); - return (tsdbReaderT) pTsdbReadHandle; + return (tsdbReaderT)pTsdbReadHandle; } -void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond *pCond) { +void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond* pCond) { STsdbReadHandle* pTsdbReadHandle = queryHandle; if (emptyQueryTimewindow(pTsdbReadHandle)) { @@ -463,13 +473,13 @@ void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond *pCond) { return; } - pTsdbReadHandle->order = pCond->order; - pTsdbReadHandle->window = pCond->twindow; - pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL; - pTsdbReadHandle->cur.fid = -1; - pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER; - pTsdbReadHandle->checkFiles = true; - pTsdbReadHandle->activeIndex = 0; // current active table index + pTsdbReadHandle->order = pCond->order; + pTsdbReadHandle->window = pCond->twindow; + pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL; + pTsdbReadHandle->cur.fid = -1; + pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER; + pTsdbReadHandle->checkFiles = true; + pTsdbReadHandle->activeIndex = 0; // current active table index pTsdbReadHandle->locateStart = false; pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows; @@ -488,16 +498,16 @@ void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond *pCond) { resetCheckInfo(pTsdbReadHandle); } -void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList) { +void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond* pCond, STableGroupInfo* groupList) { STsdbReadHandle* pTsdbReadHandle = queryHandle; - pTsdbReadHandle->order = pCond->order; - pTsdbReadHandle->window = pCond->twindow; - pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL; - pTsdbReadHandle->cur.fid = -1; - pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER; - pTsdbReadHandle->checkFiles = true; - pTsdbReadHandle->activeIndex = 0; // current active table index + pTsdbReadHandle->order = pCond->order; + pTsdbReadHandle->window = pCond->twindow; + pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL; + pTsdbReadHandle->cur.fid = -1; + pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER; + pTsdbReadHandle->checkFiles = true; + pTsdbReadHandle->activeIndex = 0; // current active table index pTsdbReadHandle->locateStart = false; pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows; @@ -514,21 +524,23 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond *pC tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo); SArray* pTable = NULL; -// STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb); + // STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb); -// pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo); + // pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo); - pTsdbReadHandle->pTableCheckInfo = NULL;//createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta, &pTable); + pTsdbReadHandle->pTableCheckInfo = NULL; // createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta, + // &pTable); if (pTsdbReadHandle->pTableCheckInfo == NULL) { -// tsdbCleanupReadHandle(pTsdbReadHandle); + // tsdbCleanupReadHandle(pTsdbReadHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; } -// pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev); -// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); + // pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev); + // pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); } -tsdbReaderT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) { +tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, + uint64_t taskId) { pCond->twindow = updateLastrowForEachGroup(groupList); // no qualified table @@ -536,13 +548,13 @@ tsdbReaderT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo return NULL; } - STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, taskId); + STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(tsdb, pCond, groupList, qId, taskId); if (pTsdbReadHandle == NULL) { return NULL; } int32_t code = checkForCachedLastRow(pTsdbReadHandle, groupList); - if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 + if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 terrno = code; return NULL; } @@ -551,7 +563,7 @@ tsdbReaderT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo if (pTsdbReadHandle->cachelastrow) { pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST; } - + return pTsdbReadHandle; } @@ -576,12 +588,12 @@ tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupIn } #endif -SArray* tsdbGetQueriedTableList(tsdbReaderT *pHandle) { +SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) { assert(pHandle != NULL); - STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) pHandle; + STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle; - size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); + size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); SArray* res = taosArrayInit(size, POINTER_BYTES); return res; } @@ -594,18 +606,18 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr STableGroupInfo* pNew = taosMemoryCalloc(1, sizeof(STableGroupInfo)); pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES); - for(int32_t i = 0; i < numOfGroup; ++i) { + for (int32_t i = 0; i < numOfGroup; ++i) { SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i); - size_t numOfTables = taosArrayGetSize(oneGroup); + size_t numOfTables = taosArrayGetSize(oneGroup); SArray* px = taosArrayInit(4, sizeof(STableKeyInfo)); for (int32_t j = 0; j < numOfTables; ++j) { STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(oneGroup, j); -// if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) { -// taosArrayPush(px, pInfo); -// pNew->numOfTables += 1; -// break; -// } + // if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) { + // taosArrayPush(px, pInfo); + // pNew->numOfTables += 1; + // break; + // } } // there are no data in this group @@ -619,7 +631,8 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr return pNew; } -tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) { +tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, + uint64_t taskId) { STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList); if (pNew->numOfTables == 0) { @@ -634,7 +647,7 @@ tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond, ST } } - STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, pNew, qId, taskId); + STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(tsdb, pCond, pNew, qId, taskId); pTsdbReadHandle->loadExternalRow = true; pTsdbReadHandle->currentLoadExternalRows = true; @@ -674,9 +687,9 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe return false; } - bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter)); + bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter)); bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter)); - if (memEmpty && imemEmpty) { // buffer is empty + if (memEmpty && imemEmpty) { // buffer is empty return false; } @@ -687,8 +700,9 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe STSRow* row = (STSRow*)SL_GET_NODE_DATA(node); TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 - "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s", - pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey, (*pMem)->nrows, pHandle->idStr); + "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s", + pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey, + (*pMem)->nrows, pHandle->idStr); if (ASCENDING_TRAVERSE(order)) { assert(pCheckInfo->lastKey <= key); @@ -697,7 +711,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe } } else { - tsdbDebug("%p uid:%"PRId64", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr); + tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr); } if (!imemEmpty) { @@ -707,8 +721,9 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe STSRow* row = (STSRow*)SL_GET_NODE_DATA(node); TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 - "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s", - pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey, (*pIMem)->nrows, pHandle->idStr); + "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s", + pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey, + (*pIMem)->nrows, pHandle->idStr); if (ASCENDING_TRAVERSE(order)) { assert(pCheckInfo->lastKey <= key); @@ -716,7 +731,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe assert(pCheckInfo->lastKey >= key); } } else { - tsdbDebug("%p uid:%"PRId64", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr); + tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr); } return true; @@ -761,30 +776,20 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, TSKEY r2 = TD_ROW_KEY(rimem); if (r1 == r2) { -#if 0 - if(update == TD_ROW_DISCARD_UPDATE){ + if (update == TD_ROW_DISCARD_UPDATE) { pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; tSkipListIterNext(pCheckInfo->iter); - } - else if(update == TD_ROW_OVERWRITE_UPDATE) { + } else if (update == TD_ROW_OVERWRITE_UPDATE) { pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; tSkipListIterNext(pCheckInfo->iiter); } else { pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH; } -#endif - if (TD_SUPPORT_UPDATE(update)) { - pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH; - } else { - pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; - tSkipListIterNext(pCheckInfo->iter); - } return r1; } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) { pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; return r1; - } - else { + } else { pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; return r2; } @@ -828,7 +833,7 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int tSkipListIterNext(pCheckInfo->iter); pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; return rimem; - } else if(update == TD_ROW_OVERWRITE_UPDATE){ + } else if (update == TD_ROW_OVERWRITE_UPDATE) { tSkipListIterNext(pCheckInfo->iiter); pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; return rmem; @@ -872,7 +877,7 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) { if (pCheckInfo->iiter != NULL) { return tSkipListIterGet(pCheckInfo->iiter) != NULL; } - } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM){ + } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) { if (pCheckInfo->iiter != NULL) { hasNext = tSkipListIterNext(pCheckInfo->iiter); } @@ -897,8 +902,8 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) { } static bool hasMoreDataInCache(STsdbReadHandle* pHandle) { - STsdbCfg *pCfg = &pHandle->pTsdb->config; - size_t size = taosArrayGetSize(pHandle->pTableCheckInfo); + STsdbCfg* pCfg = &pHandle->pTsdb->config; + size_t size = taosArrayGetSize(pHandle->pTableCheckInfo); assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1); pHandle->cur.fid = INT32_MIN; @@ -913,8 +918,8 @@ static bool hasMoreDataInCache(STsdbReadHandle* pHandle) { } pCheckInfo->lastKey = TD_ROW_KEY(row); // first timestamp in buffer - tsdbDebug("%p uid:%" PRId64", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle, - pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->idStr); + tsdbDebug("%p uid:%" PRId64 ", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle, + pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->idStr); // all data in mem are checked already. if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) || @@ -922,7 +927,7 @@ static bool hasMoreDataInCache(STsdbReadHandle* pHandle) { return false; } - int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1; + int32_t step = ASCENDING_TRAVERSE(pHandle->order) ? 1 : -1; STimeWindow* win = &pHandle->cur.win; pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle); @@ -947,9 +952,9 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio if (key < 0) { key -= (daysPerFile * tsTickPerDay[precision]); } - + int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerDay[precision])); // set the starting fileId - if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32 + if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32 fid = INT32_MIN; } @@ -987,7 +992,7 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s return midSlot; } -static int32_t loadBlockInfo(STsdbReadHandle * pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) { +static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) { int32_t code = 0; STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index); @@ -1030,9 +1035,11 @@ static int32_t loadBlockInfo(STsdbReadHandle * pTsdbReadHandle, int32_t index, i TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL; if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { - assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey && pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey); + assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey && + pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey); } else { - assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey && pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey); + assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey && + pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey); } s = TMIN(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey); @@ -1093,10 +1100,11 @@ static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfB return code; } -static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) { +static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, + int32_t slotIndex) { int64_t st = taosGetTimestampUs(); - STSchema *pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0); + STSchema* pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0); int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr); @@ -1120,7 +1128,8 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl int16_t* colIds = pTsdbReadHandle->defaultLoadColumn->pData; - int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true); + int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, + (int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true); if (ret != TSDB_CODE_SUCCESS) { int32_t c = terrno; assert(c != TSDB_CODE_SUCCESS); @@ -1139,9 +1148,9 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl pBlock->numOfRows = pCols->numOfRows; // Convert from TKEY to TSKEY for primary timestamp column if current block has timestamp before 1970-01-01T00:00:00Z - if(pBlock->keyFirst < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) { + if (pBlock->keyFirst < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) { int64_t* src = pCols->cols[0].pData; - for(int32_t i = 0; i < pBlock->numOfRows; ++i) { + for (int32_t i = 0; i < pBlock->numOfRows; ++i) { src[i] = tdGetKey(src[i]); } } @@ -1149,30 +1158,34 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl int64_t elapsedTime = (taosGetTimestampUs() - st); pTsdbReadHandle->cost.blockLoadTime += elapsedTime; - tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, %s", - pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pTsdbReadHandle->idStr); + tsdbDebug("%p load file block into buffer, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%" PRId64 + " us, %s", + pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, + pTsdbReadHandle->idStr); return TSDB_CODE_SUCCESS; _error: pBlock->numOfRows = 0; - tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, %s", + tsdbError("%p error occurs in loading file block, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, %s", pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pTsdbReadHandle->idStr); return terrno; } static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo); -static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end); -static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols); -static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle); -static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos); +static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, + int32_t start, int32_t end); +static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols); +static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle); +static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, + SDataBlockInfo* pBlockInfo, int32_t endPos); -static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo){ +static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) { SQueryFilePos* cur = &pTsdbReadHandle->cur; STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); TSKEY key; - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; /*bool hasData = */ initTableMemIterator(pTsdbReadHandle, pCheckInfo); assert(cur->pos >= 0 && cur->pos <= binfo.rows); @@ -1180,22 +1193,22 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update); if (key != TSKEY_INITIAL_VAL) { - tsdbDebug("%p key in mem:%"PRId64", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr); + tsdbDebug("%p key in mem:%" PRId64 ", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr); } else { tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr); } if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { - if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) { - // do not load file block into buffer int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; - TSKEY maxKey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? (binfo.window.skey - step):(binfo.window.ekey - step); - cur->rows = tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle); + TSKEY maxKey = + ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step); + cur->rows = + tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle); pTsdbReadHandle->realNumOfRows = cur->rows; // update the last key value @@ -1209,7 +1222,6 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* return code; } - // return error, add test cases if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) { return code; @@ -1226,12 +1238,12 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* assert(pTsdbReadHandle->outputCapacity >= binfo.rows); int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo); - if ((cur->pos == 0 && endPos == binfo.rows -1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || + if ((cur->pos == 0 && endPos == binfo.rows - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || (cur->pos == (binfo.rows - 1) && endPos == 0 && (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)))) { pTsdbReadHandle->realNumOfRows = binfo.rows; cur->rows = binfo.rows; - cur->win = binfo.window; + cur->win = binfo.window; cur->mixBlock = false; cur->blockCompleted = true; @@ -1242,29 +1254,31 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* cur->lastKey = binfo.window.skey - 1; cur->pos = -1; } - } else { // partially copy to dest buffer + } else { // partially copy to dest buffer copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos); cur->mixBlock = true; } assert(cur->blockCompleted); if (cur->rows == binfo.rows) { - tsdbDebug("%p whole file block qualified, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %s", + tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s", pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr); } else { - tsdbDebug("%p create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, total:%d, lastKey:%"PRId64", %s", - pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey, pTsdbReadHandle->idStr); + tsdbDebug("%p create data block from remain file block, brange:%" PRId64 "-%" PRId64 + ", rows:%d, total:%d, lastKey:%" PRId64 ", %s", + pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey, + pTsdbReadHandle->idStr); } - } return code; } -static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) { +static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, + bool* exists) { SQueryFilePos* cur = &pTsdbReadHandle->cur; - int32_t code = TSDB_CODE_SUCCESS; - bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order); + int32_t code = TSDB_CODE_SUCCESS; + bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order); if (asc) { // query ended in/started from current block @@ -1287,10 +1301,10 @@ static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBloc assert(pCheckInfo->lastKey <= pBlock->keyLast); doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock); } else { // the whole block is loaded in to buffer - cur->pos = asc? 0:(pBlock->numOfRows - 1); + cur->pos = asc ? 0 : (pBlock->numOfRows - 1); code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo); } - } else { //desc order, query ended in current block + } else { // desc order, query ended in current block if (pTsdbReadHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) { if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) { *exists = false; @@ -1299,7 +1313,8 @@ static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBloc SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0]; if (pCheckInfo->lastKey < pBlock->keyLast) { - cur->pos = binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order); + cur->pos = + binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order); } else { cur->pos = pBlock->numOfRows - 1; } @@ -1307,7 +1322,7 @@ static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBloc assert(pCheckInfo->lastKey >= pBlock->keyFirst); doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock); } else { - cur->pos = asc? 0:(pBlock->numOfRows-1); + cur->pos = asc ? 0 : (pBlock->numOfRows - 1); code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo); } } @@ -1378,11 +1393,12 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { return midPos; } -static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) { - int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1; +static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, + int32_t start, int32_t end) { + int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; - TSKEY* tsArray = pCols->cols[0].pData; + TSKEY* tsArray = pCols->cols[0].pData; int32_t num = end - start + 1; assert(num >= 0); @@ -1393,9 +1409,9 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns); - //data in buffer has greater timestamp, copy data in file block + // data in buffer has greater timestamp, copy data in file block int32_t i = 0, j = 0; - while(i < requiredNumOfCols && j < pCols->numOfCols) { + while (i < requiredNumOfCols && j < pCols->numOfCols) { SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); SDataCol* src = &pCols->cols[j]; @@ -1406,8 +1422,8 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) { if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) { // todo opt performance -// memmove(pData, (char*)src->pData + bytes * start, bytes * num); - for(int32_t k = start; k < num + start; ++k) { + // memmove(pData, (char*)src->pData + bytes * start, bytes * num); + for (int32_t k = start; k < num + start; ++k) { SCellVal sVal = {0}; if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) { TASSERT(0); @@ -1423,7 +1439,7 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t // todo refactor, only copy one-by-one for (int32_t k = start; k < num + start; ++k) { SCellVal sVal = {0}; - if(tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0){ + if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) { TASSERT(0); } @@ -1433,18 +1449,18 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t j++; i++; - } else { // pColInfo->info.colId < src->colId, it is a NULL data - for(int32_t k = start; k < num + start; ++k) { // TODO opt performance + } else { // pColInfo->info.colId < src->colId, it is a NULL data + for (int32_t k = start; k < num + start; ++k) { // TODO opt performance colDataAppend(pColInfo, k, NULL, true); } i++; } } - while (i < requiredNumOfCols) { // the remain columns are all null data + while (i < requiredNumOfCols) { // the remain columns are all null data SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); - for(int32_t k = start; k < num + start; ++k) { - colDataAppend(pColInfo, k, NULL, true); // TODO add a fast version to set a number of consecutive NULL value. + for (int32_t k = start; k < num + start; ++k) { + colDataAppend(pColInfo, k, NULL, true); // TODO add a fast version to set a number of consecutive NULL value. } i++; } @@ -1461,15 +1477,15 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2, bool forceSetNull) { #if 1 - STSchema* pSchema; - STSRow* row; - int16_t colId; - int16_t offset; + STSchema* pSchema; + STSRow* row; + int16_t colId; + int16_t offset; - bool isRow1DataRow = TD_IS_TP_ROW(row1); - bool isRow2DataRow; - bool isChosenRowDataRow; - int32_t chosen_itr; + bool isRow1DataRow = TD_IS_TP_ROW(row1); + bool isRow2DataRow; + bool isChosenRowDataRow; + int32_t chosen_itr; SCellVal sVal = {0}; // the schema version info is embeded in STSRow @@ -1479,19 +1495,19 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit pSchema1 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, TD_ROW_SVER(row1)); } - if(isRow1DataRow) { + if (isRow1DataRow) { numOfColsOfRow1 = schemaNCols(pSchema1); } else { numOfColsOfRow1 = tdRowGetNCols(row1); } int32_t numOfColsOfRow2 = 0; - if(row2) { + if (row2) { isRow2DataRow = TD_IS_TP_ROW(row2); if (pSchema2 == NULL) { pSchema2 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, TD_ROW_SVER(row2)); } - if(isRow2DataRow) { + if (isRow2DataRow) { numOfColsOfRow2 = schemaNCols(pSchema2); } else { numOfColsOfRow2 = tdRowGetNCols(row2); @@ -1499,31 +1515,31 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit } int32_t i = 0, j = 0, k = 0; - while(i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) { + while (i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) { SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); int32_t colIdOfRow1; - if(j >= numOfColsOfRow1) { + if (j >= numOfColsOfRow1) { colIdOfRow1 = INT32_MAX; - } else if(isRow1DataRow) { + } else if (isRow1DataRow) { colIdOfRow1 = pSchema1->columns[j].colId; } else { - SKvRowIdx *pColIdx = tdKvRowColIdxAt(row1, j); + SKvRowIdx* pColIdx = tdKvRowColIdxAt(row1, j); colIdOfRow1 = pColIdx->colId; } int32_t colIdOfRow2; - if(k >= numOfColsOfRow2) { + if (k >= numOfColsOfRow2) { colIdOfRow2 = INT32_MAX; - } else if(isRow2DataRow) { + } else if (isRow2DataRow) { colIdOfRow2 = pSchema2->columns[k].colId; } else { - SKvRowIdx *pColIdx = tdKvRowColIdxAt(row2, k); + SKvRowIdx* pColIdx = tdKvRowColIdxAt(row2, k); colIdOfRow2 = pColIdx->colId; } - if(colIdOfRow1 == colIdOfRow2) { - if(colIdOfRow1 < pColInfo->info.colId) { + if (colIdOfRow1 == colIdOfRow2) { + if (colIdOfRow1 < pColInfo->info.colId) { j++; k++; continue; @@ -1532,8 +1548,8 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit pSchema = pSchema1; isChosenRowDataRow = isRow1DataRow; chosen_itr = j; - } else if(colIdOfRow1 < colIdOfRow2) { - if(colIdOfRow1 < pColInfo->info.colId) { + } else if (colIdOfRow1 < colIdOfRow2) { + if (colIdOfRow1 < pColInfo->info.colId) { j++; continue; } @@ -1542,7 +1558,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit isChosenRowDataRow = isRow1DataRow; chosen_itr = j; } else { - if(colIdOfRow2 < pColInfo->info.colId) { + if (colIdOfRow2 < pColInfo->info.colId) { k++; continue; } @@ -1551,12 +1567,12 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit chosen_itr = k; isChosenRowDataRow = isRow2DataRow; } - if(isChosenRowDataRow) { + if (isChosenRowDataRow) { colId = pSchema->columns[chosen_itr].colId; offset = pSchema->columns[chosen_itr].offset; tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr, &sVal); } else { - SKvRowIdx *pColIdx = tdKvRowColIdxAt(row, chosen_itr); + SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr); colId = pColIdx->colId; offset = pColIdx->offset; tdSKvRowGetVal(row, colId, offset, chosen_itr, &sVal); @@ -1571,21 +1587,21 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit i++; - if(row == row1) { + if (row == row1) { j++; } else { k++; } } else { - if(forceSetNull) { + if (forceSetNull) { colDataAppend(pColInfo, numOfRows, NULL, true); } i++; } } - if(forceSetNull) { - while (i < numOfCols) { // the remain columns are all null data + if (forceSetNull) { + while (i < numOfCols) { // the remain columns are all null data SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); colDataAppend(pColInfo, numOfRows, NULL, true); i++; @@ -1602,15 +1618,16 @@ static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, // if the buffer is not full in case of descending order query, move the data in the front of the buffer if (numOfRows < pTsdbReadHandle->outputCapacity) { int32_t emptySize = pTsdbReadHandle->outputCapacity - numOfRows; - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); - memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); + memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, + numOfRows * pColInfo->info.bytes); } } } -static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos, int32_t numOfExisted, - int32_t* start, int32_t* end) { +static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos, + int32_t numOfExisted, int32_t* start, int32_t* end) { *start = -1; if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { @@ -1635,7 +1652,8 @@ static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startP } } -static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows, int32_t endPos) { +static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows, + int32_t endPos) { SQueryFilePos* cur = &pTsdbReadHandle->cur; pCheckInfo->lastKey = cur->lastKey; @@ -1655,22 +1673,24 @@ static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle) { } SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0); - assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows-1]); + assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && + cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows - 1]); } else { cur->win = pTsdbReadHandle->window; - int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1; + int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; cur->lastKey = pTsdbReadHandle->window.ekey + step; } } -static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) { +static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, + SDataBlockInfo* pBlockInfo, int32_t endPos) { SQueryFilePos* cur = &pTsdbReadHandle->cur; SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; - TSKEY* tsArray = pCols->cols[0].pData; + TSKEY* tsArray = pCols->cols[0].pData; - int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1; + int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)); int32_t pos = cur->pos; @@ -1686,7 +1706,7 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end); // the time window should always be ascending order: skey <= ekey - cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]}; + cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]}; cur->mixBlock = (numOfRows != pBlockInfo->rows); cur->lastKey = tsArray[endPos] + step; cur->blockCompleted = true; @@ -1699,17 +1719,18 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos); doCheckGeneratedBlockRange(pTsdbReadHandle); - tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %s", - pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr); + tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", + pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, + pTsdbReadHandle->idStr); } int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) { // NOTE: reverse the order to find the end position in data block int32_t endPos = -1; - int32_t order = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC; + int32_t order = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; SQueryFilePos* cur = &pTsdbReadHandle->cur; - SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; + SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; if (ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) { endPos = pBlockInfo->rows - 1; @@ -1730,37 +1751,39 @@ int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* p // be included in the query time window will be discarded static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) { SQueryFilePos* cur = &pTsdbReadHandle->cur; - SDataBlockInfo blockInfo = {0};//GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); + SDataBlockInfo blockInfo = {0}; // GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; initTableMemIterator(pTsdbReadHandle, pCheckInfo); SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID && - cur->pos >= 0 && cur->pos < pBlock->numOfRows); + cur->pos >= 0 && cur->pos < pBlock->numOfRows); TSKEY* tsArray = pCols->cols[0].pData; - assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast); + assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && + tsArray[pBlock->numOfRows - 1] == pBlock->keyLast); // for search the endPos, so the order needs to reverse - int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; + int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; - int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1; + int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)); STable* pTable = NULL; int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo); - tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d," + tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64 + " rows:%d, start:%d," "end:%d, %s", - pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, - blockInfo.rows, cur->pos, endPos, pTsdbReadHandle->idStr); + pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, + cur->pos, endPos, pTsdbReadHandle->idStr); // compared with the data from in-memory buffer, to generate the correct timestamp array list int32_t numOfRows = 0; - int16_t rv1 = -1; - int16_t rv2 = -1; + int16_t rv1 = -1; + int16_t rv2 = -1; STSchema* pSchema1 = NULL; STSchema* pSchema2 = NULL; @@ -1786,49 +1809,53 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf break; } - if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || - ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { + if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && + ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || + ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && + !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { break; } if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || (key > tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { if (rv1 != TD_ROW_SVER(row1)) { -// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); + // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); rv1 = TD_ROW_SVER(row1); } - if(row2 && rv2 != TD_ROW_SVER(row2)) { -// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); + if (row2 && rv2 != TD_ROW_SVER(row2)) { + // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); rv2 = TD_ROW_SVER(row2); } - - mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, true); + + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, + pCheckInfo->tableId, pSchema1, pSchema2, true); numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; } cur->win.ekey = key; - cur->lastKey = key + step; + cur->lastKey = key + step; cur->mixBlock = true; moveToNextRowInMem(pCheckInfo); } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it if (pCfg->update) { - if(pCfg->update == TD_ROW_PARTIAL_UPDATE) { + if (pCfg->update == TD_ROW_PARTIAL_UPDATE) { doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, pos, pos); } if (rv1 != TD_ROW_SVER(row1)) { -// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); + // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); rv1 = TD_ROW_SVER(row1); } - if(row2 && rv2 != TD_ROW_SVER(row2)) { -// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); + if (row2 && rv2 != TD_ROW_SVER(row2)) { + // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); rv2 = TD_ROW_SVER(row2); } - + bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE; - mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull); + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, + pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull); numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -1844,7 +1871,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf moveToNextRowInMem(pCheckInfo); } } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || - (key < tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { + (key < tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = tsArray[pos]; } @@ -1852,7 +1879,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order); assert(end != -1); - if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it + if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it if (pCfg->update == TD_ROW_DISCARD_UPDATE) { moveToNextRowInMem(pCheckInfo); } else { @@ -1866,8 +1893,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend); pos += (qend - qstart + 1) * step; - cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? tsArray[qend]:tsArray[qstart]; - cur->lastKey = cur->win.ekey + step; + cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart]; + cur->lastKey = cur->win.ekey + step; } } while (numOfRows < pTsdbReadHandle->outputCapacity); @@ -1892,8 +1919,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end); pos += (end - start + 1) * step; - cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? tsArray[end]:tsArray[start]; - cur->lastKey = cur->win.ekey + step; + cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[end] : tsArray[start]; + cur->lastKey = cur->win.ekey + step; cur->mixBlock = true; } } @@ -1911,8 +1938,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos); doCheckGeneratedBlockRange(pTsdbReadHandle); - tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %s", - pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr); + tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", + pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, + pTsdbReadHandle->idStr); } int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { @@ -2008,7 +2036,7 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex]; // assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset); -#if 0 // TODO: temporarily comment off requested by Dr. Liao +#if 0 // TODO: temporarily comment off requested by Dr. Liao if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset && pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) { tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset); @@ -2028,7 +2056,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu return TSDB_CODE_TDB_OUT_OF_MEMORY; } - pTsdbReadHandle->pDataBlockInfo = (STableBlockInfo*) tmp; + pTsdbReadHandle->pDataBlockInfo = (STableBlockInfo*)tmp; } memset(pTsdbReadHandle->pDataBlockInfo, 0, size); @@ -2087,18 +2115,18 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu cleanBlockOrderSupporter(&sup, numOfQualTables); tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %s", pTsdbReadHandle, cnt, - pTsdbReadHandle->idStr); + pTsdbReadHandle->idStr); return TSDB_CODE_SUCCESS; } tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pTsdbReadHandle, cnt, - numOfQualTables, pTsdbReadHandle->idStr); + numOfQualTables, pTsdbReadHandle->idStr); assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0 sup.numOfTables = numOfQualTables; SMultiwayMergeTreeInfo* pTree = NULL; - uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar); + uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar); if (ret != TSDB_CODE_SUCCESS) { cleanBlockOrderSupporter(&sup, numOfTables); return TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -2137,11 +2165,11 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists); -static int32_t getDataBlockRv(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool *exists) { - int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1; +static int32_t getDataBlockRv(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) { + int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; SQueryFilePos* cur = &pTsdbReadHandle->cur; - while(1) { + while (1) { int32_t code = loadFileDataBlock(pTsdbReadHandle, pNext->compBlock, pNext->pTableCheckInfo, exists); if (code != TSDB_CODE_SUCCESS || *exists) { return code; @@ -2169,7 +2197,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi int32_t numOfBlocks = 0; int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); - STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; + STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; STimeWindow win = TSWINDOW_INITIALIZER; while (true) { @@ -2219,7 +2247,8 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi } // todo return error code to query engine - if ((code = createDataBlocksInfo(pTsdbReadHandle, numOfBlocks, &pTsdbReadHandle->numOfBlocks)) != TSDB_CODE_SUCCESS) { + if ((code = createDataBlocksInfo(pTsdbReadHandle, numOfBlocks, &pTsdbReadHandle->numOfBlocks)) != + TSDB_CODE_SUCCESS) { break; } @@ -2241,7 +2270,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi } assert(pTsdbReadHandle->pFileGroup != NULL && pTsdbReadHandle->numOfBlocks > 0); - cur->slot = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 0:pTsdbReadHandle->numOfBlocks-1; + cur->slot = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 0 : pTsdbReadHandle->numOfBlocks - 1; cur->fid = pTsdbReadHandle->pFileGroup->fid; STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot]; @@ -2254,18 +2283,18 @@ static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool asc } static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) { - int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1; + int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; SQueryFilePos* cur = &pTsdbReadHandle->cur; assert(cur->slot < pTsdbReadHandle->numOfBlocks && cur->slot >= 0); cur->slot += step; - cur->mixBlock = false; + cur->mixBlock = false; cur->blockCompleted = false; } int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) { - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) queryHandle; + STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle; pTableBlockInfo->totalSize = 0; pTableBlockInfo->totalRows = 0; @@ -2287,7 +2316,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* int32_t code = TSDB_CODE_SUCCESS; int32_t numOfBlocks = 0; int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); - int defaultRows = 4096;//TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock); + int defaultRows = 4096; // TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock); STimeWindow win = TSWINDOW_INITIALIZER; bool ascTraverse = ASCENDING_TRAVERSE(pTsdbReadHandle->order); @@ -2304,7 +2333,8 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); // current file are not overlapped with query time window, ignore remain files - if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)) { + if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) || + (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)) { tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle, pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr); @@ -2357,9 +2387,9 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* if (numOfRows < defaultRows) { pTableBlockInfo->numOfSmallBlocks += 1; } -// int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS; -// SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex); -// blockInfo->numBlocksOfStep++; + // int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS; + // SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex); + // blockInfo->numBlocksOfStep++; } } } @@ -2416,7 +2446,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) { size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); - + while (pTsdbReadHandle->activeIndex < numOfTables) { if (hasMoreDataInCache(pTsdbReadHandle)) { return true; @@ -2428,23 +2458,23 @@ static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) { return false; } -//todo not unref yet, since it is not support multi-group interpolation query +// todo not unref yet, since it is not support multi-group interpolation query static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) { // filter the queried time stamp in the first place - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle; + STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle; // starts from the buffer in case of descending timestamp order check data blocks size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); int32_t i = 0; - while(i < numOfTables) { + while (i < numOfTables) { STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i); // the first qualified table for interpolation query -// if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) && -// (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) { -// break; -// } + // if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) && + // (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) { + // break; + // } i++; } @@ -2454,7 +2484,7 @@ static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) { return; } - STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i); + STableCheckInfo info = *(STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i); taosArrayClear(pTsdbReadHandle->pTableCheckInfo); info.lastKey = pTsdbReadHandle->window.skey; @@ -2463,13 +2493,13 @@ static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) { static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbReadHandle* pTsdbReadHandle) { - int numOfRows = 0; - int32_t numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns); + int numOfRows = 0; + int32_t numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns); STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; win->skey = TSKEY_INITIAL_VAL; - int64_t st = taosGetTimestampUs(); - int16_t rv = -1; + int64_t st = taosGetTimestampUs(); + int16_t rv = -1; STSchema* pSchema = NULL; do { @@ -2479,9 +2509,10 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } TSKEY key = TD_ROW_KEY(row); - if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { - tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pTsdbReadHandle, key, pTsdbReadHandle->window.skey, - pTsdbReadHandle->window.ekey); + if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || + (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { + tsdbDebug("%p key:%" PRIu64 " beyond qrange:%" PRId64 " - %" PRId64 ", no more data in buffer", pTsdbReadHandle, + key, pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey); break; } @@ -2495,14 +2526,15 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0); rv = TD_ROW_SVER(row); } - mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, NULL, true); + mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, + NULL, true); if (++numOfRows >= maxRowsToRead) { moveToNextRowInMem(pCheckInfo); break; } - } while(moveToNextRowInMem(pCheckInfo)); + } while (moveToNextRowInMem(pCheckInfo)); assert(numOfRows <= maxRowsToRead); @@ -2510,15 +2542,16 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && numOfRows < maxRowsToRead) { int32_t emptySize = maxRowsToRead - numOfRows; - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); - memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); + memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, + numOfRows * pColInfo->info.bytes); } } int64_t elapsedTime = taosGetTimestampUs() - st; - tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, %s", pTsdbReadHandle, - elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr); + tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s", + pTsdbReadHandle, elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr); return numOfRows; } @@ -2545,20 +2578,20 @@ static void destroyHelper(void* param) { return; } -// tQueryInfo* pInfo = (tQueryInfo*)param; -// if (pInfo->optr != TSDB_RELATION_IN) { -// taosMemoryFreeClear(pInfo->q); -// } else { -// taosHashCleanup((SHashObj *)(pInfo->q)); -// } + // tQueryInfo* pInfo = (tQueryInfo*)param; + // if (pInfo->optr != TSDB_RELATION_IN) { + // taosMemoryFreeClear(pInfo->q); + // } else { + // taosHashCleanup((SHashObj *)(pInfo->q)); + // } taosMemoryFree(param); } -#define TSDB_PREV_ROW 0x1 -#define TSDB_NEXT_ROW 0x2 +#define TSDB_PREV_ROW 0x1 +#define TSDB_NEXT_ROW 0x2 -static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) { +static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) { if (pTsdbReadHandle->checkFiles) { // check if the query range overlaps with the file data block bool exists = true; @@ -2570,13 +2603,13 @@ static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) { } if (exists) { - tsdbRetrieveDataBlock((tsdbReaderT*) pTsdbReadHandle, NULL); + tsdbRetrieveDataBlock((tsdbReaderT*)pTsdbReadHandle, NULL); if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) { SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0); assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey); } - pTsdbReadHandle->currentLoadExternalRows = false; // clear the flag, since the exact matched row is found. + pTsdbReadHandle->currentLoadExternalRows = false; // clear the flag, since the exact matched row is found. return exists; } @@ -2589,16 +2622,17 @@ static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) { } // current result is empty - if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey && pTsdbReadHandle->cur.rows == 0) { -// STsdbMemTable* pMemRef = pTsdbReadHandle->pMemTable; + if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey && + pTsdbReadHandle->cur.rows == 0) { + // STsdbMemTable* pMemRef = pTsdbReadHandle->pMemTable; -// doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef); -// doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef); + // doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef); + // doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef); bool result = tsdbGetExternalRow(pTsdbReadHandle); -// pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev); -// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); + // pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev); + // pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); pTsdbReadHandle->currentLoadExternalRows = false; return result; @@ -2610,30 +2644,31 @@ static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) { static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) { // the last row is cached in buffer, return it directly. // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER - int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)); + int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)); size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); assert(numOfTables > 0 && numOfCols > 0); SQueryFilePos* cur = &pTsdbReadHandle->cur; - STSRow* pRow = NULL; - TSKEY key = TSKEY_INITIAL_VAL; - int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1; + STSRow* pRow = NULL; + TSKEY key = TSKEY_INITIAL_VAL; + int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; if (++pTsdbReadHandle->activeIndex < numOfTables) { STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex); -// int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key); -// if (ret != TSDB_CODE_SUCCESS) { -// return false; -// } - mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId, NULL, NULL, true); + // int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key); + // if (ret != TSDB_CODE_SUCCESS) { + // return false; + // } + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId, + NULL, NULL, true); taosMemoryFreeClear(pRow); // update the last key value pCheckInfo->lastKey = key + step; - cur->rows = 1; // only one row - cur->lastKey = key + step; + cur->rows = 1; // only one row + cur->lastKey = key + step; cur->mixBlock = true; cur->win.skey = key; cur->win.ekey = key; @@ -2644,9 +2679,7 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) { return false; } - - -//static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) { +// static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) { // // the last row is cached in buffer, return it directly. // // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER // int32_t tgNumOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle); @@ -2666,8 +2699,8 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) { // int32_t numOfCols = pTable->maxColNum; // // if (pTable->lastCols == NULL || pTable->maxColNum <= 0) { -// tsdbWarn("no last cached for table %s, uid:%" PRIu64 ",tid:%d", pTable->name->data, pTable->uid, pTable->tableId); -// continue; +// tsdbWarn("no last cached for table %s, uid:%" PRIu64 ",tid:%d", pTable->name->data, pTable->uid, +// pTable->tableId); continue; // } // // int32_t i = 0, j = 0; @@ -2802,7 +2835,7 @@ static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) { int64_t stime = taosGetTimestampUs(); - while(pTsdbReadHandle->activeIndex < numOfTables) { + while (pTsdbReadHandle->activeIndex < numOfTables) { if (loadBlockOfActiveTable(pTsdbReadHandle)) { return true; } @@ -2812,8 +2845,8 @@ static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) { pTsdbReadHandle->activeIndex += 1; pTsdbReadHandle->locateStart = false; - pTsdbReadHandle->checkFiles = true; - pTsdbReadHandle->cur.rows = 0; + pTsdbReadHandle->checkFiles = true; + pTsdbReadHandle->cur.rows = 0; pTsdbReadHandle->currentLoadExternalRows = pTsdbReadHandle->loadExternalRow; terrno = TSDB_CODE_SUCCESS; @@ -2827,15 +2860,16 @@ static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) { // handle data in cache situation bool tsdbNextDataBlock(tsdbReaderT pHandle) { - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle; + STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle; - for(int32_t i = 0; i < taosArrayGetSize(pTsdbReadHandle->pColumns); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pTsdbReadHandle->pColumns); ++i) { SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); colInfoDataCleanup(pColInfo, pTsdbReadHandle->outputCapacity); } if (emptyQueryTimewindow(pTsdbReadHandle)) { - tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle, pTsdbReadHandle->idStr); + tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle, + pTsdbReadHandle->idStr); return false; } @@ -2845,15 +2879,15 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) { // TODO refactor: remove "type" if (pTsdbReadHandle->type == TSDB_QUERY_TYPE_LAST) { if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LASTROW) { -// return loadCachedLastRow(pTsdbReadHandle); + // return loadCachedLastRow(pTsdbReadHandle); } else if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LAST) { -// return loadCachedLast(pTsdbReadHandle); + // return loadCachedLast(pTsdbReadHandle); } } if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) { return loadDataBlockFromTableSeq(pTsdbReadHandle); - } else { // loadType == RR and Offset Order + } else { // loadType == RR and Offset Order if (pTsdbReadHandle->checkFiles) { // check if the query range overlaps with the file data block bool exists = true; @@ -2885,7 +2919,7 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) { } } -//static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, STsdbMemTable* pMemRef) { +// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, STsdbMemTable* pMemRef) { // STsdbReadHandle* pSecQueryHandle = NULL; // // if (type == TSDB_PREV_ROW && pTsdbReadHandle->prev) { @@ -2990,14 +3024,14 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) { // memcpy((char*)pCol->pData, (char*)s->pData + s->info.bytes * pos, pCol->info.bytes); // } // -//out_of_memory: +// out_of_memory: // tsdbCleanupReadHandle(pSecQueryHandle); // return terrno; //} bool tsdbGetExternalRow(tsdbReaderT pHandle) { - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle; - SQueryFilePos* cur = &pTsdbReadHandle->cur; + STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle; + SQueryFilePos* cur = &pTsdbReadHandle->cur; cur->fid = INT32_MIN; cur->mixBlock = true; @@ -3006,7 +3040,7 @@ bool tsdbGetExternalRow(tsdbReaderT pHandle) { return false; } - int32_t numOfCols = (int32_t) QH_GET_NUM_OF_COLS(pTsdbReadHandle); + int32_t numOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i); SColumnInfoData* first = taosArrayGet(pTsdbReadHandle->prev, i); @@ -3053,36 +3087,36 @@ bool tsdbGetExternalRow(tsdbReaderT pHandle) { //} bool isTsdbCacheLastRow(tsdbReaderT* pReader) { - return ((STsdbReadHandle *)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE; + return ((STsdbReadHandle*)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE; } -int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList) { +int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList) { assert(pTsdbReadHandle != NULL && groupList != NULL); -// TSKEY key = TSKEY_INITIAL_VAL; -// -// SArray* group = taosArrayGetP(groupList->pGroupList, 0); -// assert(group != NULL); -// -// STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0); -// -// int32_t code = 0; -// -// if (((STable*)pInfo->pTable)->lastRow) { -// code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key); -// if (code != TSDB_CODE_SUCCESS) { -// pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_NONE; -// } else { -// pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LASTROW; -// } -// } -// -// // update the tsdb query time range -// if (pTsdbReadHandle->cachelastrow != TSDB_CACHED_TYPE_NONE) { -// pTsdbReadHandle->window = TSWINDOW_INITIALIZER; -// pTsdbReadHandle->checkFiles = false; -// pTsdbReadHandle->activeIndex = -1; // start from -1 -// } + // TSKEY key = TSKEY_INITIAL_VAL; + // + // SArray* group = taosArrayGetP(groupList->pGroupList, 0); + // assert(group != NULL); + // + // STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0); + // + // int32_t code = 0; + // + // if (((STable*)pInfo->pTable)->lastRow) { + // code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key); + // if (code != TSDB_CODE_SUCCESS) { + // pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_NONE; + // } else { + // pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LASTROW; + // } + // } + // + // // update the tsdb query time range + // if (pTsdbReadHandle->cachelastrow != TSDB_CACHED_TYPE_NONE) { + // pTsdbReadHandle->window = TSWINDOW_INITIALIZER; + // pTsdbReadHandle->checkFiles = false; + // pTsdbReadHandle->activeIndex = -1; // start from -1 + // } return TSDB_CODE_SUCCESS; } @@ -3091,21 +3125,20 @@ int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) { assert(pTsdbReadHandle != NULL); int32_t code = 0; -// if (pTsdbReadHandle->pTsdb && atomic_load_8(&pTsdbReadHandle->pTsdb->hasCachedLastColumn)){ -// pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LAST; -// } + // if (pTsdbReadHandle->pTsdb && atomic_load_8(&pTsdbReadHandle->pTsdb->hasCachedLastColumn)){ + // pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LAST; + // } // update the tsdb query time range if (pTsdbReadHandle->cachelastrow) { - pTsdbReadHandle->checkFiles = false; + pTsdbReadHandle->checkFiles = false; pTsdbReadHandle->activeIndex = -1; // start from -1 } return code; } - -STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) { +STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList) { STimeWindow window = {INT64_MAX, INT64_MIN}; int32_t totalNumOfTable = 0; @@ -3113,24 +3146,24 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) { // NOTE: starts from the buffer in case of descending timestamp order check data blocks size_t numOfGroups = taosArrayGetSize(groupList->pGroupList); - for(int32_t j = 0; j < numOfGroups; ++j) { + for (int32_t j = 0; j < numOfGroups; ++j) { SArray* pGroup = taosArrayGetP(groupList->pGroupList, j); TSKEY key = TSKEY_INITIAL_VAL; STableKeyInfo keyInfo = {0}; size_t numOfTables = taosArrayGetSize(pGroup); - for(int32_t i = 0; i < numOfTables; ++i) { - STableKeyInfo* pInfo = (STableKeyInfo*) taosArrayGet(pGroup, i); + for (int32_t i = 0; i < numOfTables; ++i) { + STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i); // if the lastKey equals to INT64_MIN, there is no data in this table - TSKEY lastKey = 0;//((STable*)(pInfo->pTable))->lastKey; + TSKEY lastKey = 0; //((STable*)(pInfo->pTable))->lastKey; if (key < lastKey) { key = lastKey; -// keyInfo.pTable = pInfo->pTable; + // keyInfo.pTable = pInfo->pTable; keyInfo.lastKey = key; - pInfo->lastKey = key; + pInfo->lastKey = key; if (key < window.skey) { window.skey = key; @@ -3143,18 +3176,18 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) { } // more than one table in each group, only one table left for each group -// if (keyInfo.pTable != NULL) { -// totalNumOfTable++; -// if (taosArrayGetSize(pGroup) == 1) { -// // do nothing -// } else { -// taosArrayClear(pGroup); -// taosArrayPush(pGroup, &keyInfo); -// } -// } else { // mark all the empty groups, and remove it later -// taosArrayDestroy(pGroup); -// taosArrayPush(emptyGroup, &j); -// } + // if (keyInfo.pTable != NULL) { + // totalNumOfTable++; + // if (taosArrayGetSize(pGroup) == 1) { + // // do nothing + // } else { + // taosArrayClear(pGroup); + // taosArrayPush(pGroup, &keyInfo); + // } + // } else { // mark all the empty groups, and remove it later + // taosArrayDestroy(pGroup); + // taosArrayPush(emptyGroup, &j); + // } } // window does not being updated, so set the original @@ -3163,7 +3196,7 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) { assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups); } - taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t) taosArrayGetSize(emptyGroup)); + taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup)); taosArrayDestroy(emptyGroup); groupList->numOfTables = totalNumOfTable; @@ -3172,7 +3205,7 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) { void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDataBlockInfo) { STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle; - SQueryFilePos* cur = &pHandle->cur; + SQueryFilePos* cur = &pHandle->cur; uint64_t uid = 0; @@ -3185,11 +3218,12 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa uid = pCheckInfo->tableId; } - tsdbDebug("data block generated, uid:%"PRIu64" numOfRows:%d, tsrange:%"PRId64" - %"PRId64" %s", uid, cur->rows, cur->win.skey, - cur->win.ekey, pHandle->idStr); + tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows, + cur->win.skey, cur->win.ekey, pHandle->idStr); -// pDataBlockInfo->uid = uid; // block Id may be over write by assigning uid fro this data block. Do NOT assign the table uid - pDataBlockInfo->rows = cur->rows; + // pDataBlockInfo->uid = uid; // block Id may be over write by assigning uid fro this data block. Do NOT assign + // the table uid + pDataBlockInfo->rows = cur->rows; pDataBlockInfo->window = cur->win; pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); } @@ -3198,7 +3232,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL */ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStatis** pBlockStatis) { - STsdbReadHandle* pHandle = (STsdbReadHandle*) pTsdbReadHandle; + STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle; SQueryFilePos* c = &pHandle->cur; if (c->mixBlock) { @@ -3228,7 +3262,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle); memset(pHandle->statis, 0, numOfCols * sizeof(SDataStatis)); - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { pHandle->statis[i].colId = colIds[i]; } @@ -3242,9 +3276,9 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst; pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast; - //update the number of NULL data rows - for(int32_t i = 1; i < numOfCols; ++i) { - if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL + // update the number of NULL data rows + for (int32_t i = 1; i < numOfCols; ++i) { + if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows; } } @@ -3294,9 +3328,10 @@ SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) { int32_t emptySize = pHandle->outputCapacity - numOfRows; int32_t reqNumOfCols = (int32_t)taosArrayGetSize(pHandle->pColumns); - for(int32_t i = 0; i < reqNumOfCols; ++i) { + for (int32_t i = 0; i < reqNumOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i); - memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); + memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, + numOfRows * pColInfo->info.bytes); } } @@ -3352,7 +3387,7 @@ void filterPrepare(void* expr, void* param) { #endif -static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { +static int32_t tableGroupComparFn(const void* p1, const void* p2, const void* param) { #if 0 STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param; STable* pTable1 = ((STableKeyInfo*) p1)->uid; @@ -3449,7 +3484,8 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable taosArrayPush(pGroups, &g); } -SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) { +SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, + TSKEY skey) { assert(pTableList != NULL); SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); @@ -3459,7 +3495,7 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd return pTableGroup; } - if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table + if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table SArray* sa = taosArrayDup(pTableList); if (sa == NULL) { taosArrayDestroy(pTableGroup); @@ -3481,7 +3517,7 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd return pTableGroup; } -//static bool tableFilterFp(const void* pNode, void* param) { +// static bool tableFilterFp(const void* pNode, void* param) { // tQueryInfo* pInfo = (tQueryInfo*) param; // // STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); @@ -3566,15 +3602,16 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd // return true; //} -//static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param); +// static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp +// *param); -//static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) { -// // query according to the expression tree +// static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) { +// // // query according to the expression tree // SExprTraverseSupp supp = { -// .nodeFilterFn = (__result_filter_fn_t) tableFilterFp, +// .nodeFilterFn = (__result_filter_fn_t)tableFilterFp, // .setupInfoFn = filterPrepare, // .pExtInfo = pSTable->tagSchema, -// }; +// }; // // getTableListfromSkipList(pExpr, pSTable->pIndex, pRes, &supp); // tExprTreeDestroy(pExpr, destroyHelper); @@ -3586,19 +3623,20 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) { STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid); if (pTbCfg == NULL) { - tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId); + tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId); terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; goto _error; } if (pTbCfg->type != META_SUPER_TABLE) { - tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId); - terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client + tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, + reqId); + terrno = TSDB_CODE_OPS_NOT_SUPPORT; // basically, this error is caused by invalid sql issued by client goto _error; } - //NOTE: not add ref count for super table - SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); + // NOTE: not add ref count for super table + SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 0, true); // no tags and tbname condition, all child tables of this stable are involved @@ -3608,66 +3646,44 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch goto _error; } - pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res); - pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey); + pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res); + pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey); - tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, - pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId); + tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%" PRIx64 + " QID:0x%" PRIx64, + pMeta, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId); taosArrayDestroy(res); return ret; } int32_t ret = TSDB_CODE_SUCCESS; -// tExprNode* expr = NULL; -// -// TRY(TSDB_MAX_TAG_CONDITIONS) { -// expr = exprTreeFromTableName(tbnameCond); -// if (expr == NULL) { -// expr = exprTreeFromBinary(pTagCond, len); -// } else { -// CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL); -// tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len); -// if (tagExpr != NULL) { -// CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, tagExpr, NULL); -// tExprNode* tbnameExpr = expr; -// expr = taosMemoryCalloc(1, sizeof(tExprNode)); -// if (expr == NULL) { -// THROW( TSDB_CODE_TDB_OUT_OF_MEMORY ); -// } -// expr->nodeType = TSQL_NODE_EXPR; -// expr->_node.optr = (uint8_t)tagNameRelType; -// expr->_node.pLeft = tagExpr; -// expr->_node.pRight = tbnameExpr; -// } -// } -// CLEANUP_EXECUTE(); -// -// } CATCH( code ) { -// CLEANUP_EXECUTE(); -// terrno = code; -// tsdbUnlockRepoMeta(tsdb); // unlock tsdb in any cases -// -// goto _error; -// // TODO: more error handling -// } END_TRY -// -// doQueryTableList(pTable, res, expr); -// pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res); -// pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey); -// -// tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%u, belong to %" PRIzu " groups", tsdb, pTable->tableId, -// pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList)); -// -// taosArrayDestroy(res); -// -// if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error; -// return ret; - _error: + void* filterInfo = taosMemoryCalloc(1, sizeof(SFilterInfo)); + ret = filterInitFromNode((SNode*)pTagCond, *filterInfo, 0); + if (ret != TSDB_CODE_SUCCESS) { + terrno = ret; + return ret; + } + ret = tsdbQueryTableList(pMeta, res, filterInfo); + pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res); + pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey); + + tsdbDebug("%p stable tid:%d, uid:%" PRIu64 " query, numOfTables:%u, belong to %" PRIzu " groups", tsdb, + pTable->tableId, pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList)); + + taosArrayDestroy(res); + return ret; + +_error: return terrno; } +int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo) { + // impl later + + return TSDB_CODE_SUCCESS; +} int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) { STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid); if (pTbCfg == NULL) { @@ -3686,7 +3702,7 @@ int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGr taosArrayPush(pGroupInfo->pGroupList, &group); return TSDB_CODE_SUCCESS; - _error: +_error: return terrno; } @@ -3765,7 +3781,6 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) { return NULL; } - void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle; if (pTsdbReadHandle == NULL) { @@ -3779,7 +3794,7 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { taosMemoryFreeClear(pTsdbReadHandle->statis); if (!emptyQueryTimewindow(pTsdbReadHandle)) { -// tsdbMayUnTakeMemSnapshot(pTsdbReadHandle); + // tsdbMayUnTakeMemSnapshot(pTsdbReadHandle); } else { assert(pTsdbReadHandle->pTableCheckInfo == NULL); } @@ -3798,8 +3813,10 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { SIOCostSummary* pCost = &pTsdbReadHandle->cost; - tsdbDebug("%p :io-cost summary: head-file read cnt:%"PRIu64", head-file time:%"PRIu64" us, statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %s", - pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr); + tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64 + " us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s", + pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, + pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr); taosMemoryFreeClear(pTsdbReadHandle); } @@ -4053,37 +4070,37 @@ static void queryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, S } // Apply the filter expression to each node in the skiplist to acquire the qualified nodes in skip list -void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) { - if (pExpr == NULL) { - return; - } - - tExprNode *pLeft = pExpr->_node.pLeft; - tExprNode *pRight = pExpr->_node.pRight; - - // column project - if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) { - assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY)); - - param->setupInfoFn(pExpr, param->pExtInfo); - - tQueryInfo *pQueryInfo = pExpr->_node.info; - if (pQueryInfo->indexed && (pQueryInfo->optr != TSDB_RELATION_LIKE - && pQueryInfo->optr != TSDB_RELATION_MATCH && pQueryInfo->optr != TSDB_RELATION_NMATCH - && pQueryInfo->optr != TSDB_RELATION_IN)) { - queryIndexedColumn(pSkipList, pQueryInfo, result); - } else { - queryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn); - } - - return; - } - - // The value of hasPK is always 0. - uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK; - assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0); - - //apply the hierarchical filter expression to every node in skiplist to find the qualified nodes - applyFilterToSkipListNode(pSkipList, pExpr, result, param); -} +//void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) { +// if (pExpr == NULL) { +// return; +// } +// +// tExprNode *pLeft = pExpr->_node.pLeft; +// tExprNode *pRight = pExpr->_node.pRight; +// +// // column project +// if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) { +// assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY)); +// +// param->setupInfoFn(pExpr, param->pExtInfo); +// +// tQueryInfo *pQueryInfo = pExpr->_node.info; +// if (pQueryInfo->indexed && (pQueryInfo->optr != TSDB_RELATION_LIKE +// && pQueryInfo->optr != TSDB_RELATION_MATCH && pQueryInfo->optr != TSDB_RELATION_NMATCH +// && pQueryInfo->optr != TSDB_RELATION_IN)) { +// queryIndexedColumn(pSkipList, pQueryInfo, result); +// } else { +// queryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn); +// } +// +// return; +// } +// +// // The value of hasPK is always 0. +// uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK; +// assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0); +// +// //apply the hierarchical filter expression to every node in skiplist to find the qualified nodes +// applyFilterToSkipListNode(pSkipList, pExpr, result, param); +//} #endif