diff --git a/.gitignore b/.gitignore index d5c7f763cf..d7fcb019ae 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ mac/ .mypy_cache *.tmp *.swp +*.swo *.orig src/connector/nodejs/node_modules/ src/connector/nodejs/out/ diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 9e7aea03ea..e1aadd4486 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -153,11 +153,10 @@ typedef struct SQueryTableDataCond { int32_t order; // desc|asc order to iterate the data block int32_t numOfCols; SColumnInfo* colList; - int32_t type; // data block load type: - // int32_t numOfTWindows; - STimeWindow twindows; - int64_t startVersion; - int64_t endVersion; + int32_t type; // data block load type: + STimeWindow twindows; + int64_t startVersion; + int64_t endVersion; } SQueryTableDataCond; int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 5871d56a8f..f6ecd4493d 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -312,7 +312,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) { pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0; pBasic->queryDesc = NULL; - mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries); + mDebug("queries updated in conn %u, num:%d", pConn->id, pConn->numOfQueries); taosWUnLockLatch(&pConn->queryLock); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6d976083d2..4d95a9d7a5 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -118,9 +118,8 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur); // typedef struct STsdb STsdb; typedef struct STsdbReader STsdbReader; -#define BLOCK_LOAD_OFFSET_ORDER 1 -#define BLOCK_LOAD_TABLESEQ_ORDER 2 -#define BLOCK_LOAD_EXTERN_ORDER 3 +#define TIMEWINDOW_RANGE_CONTAINED 1 +#define TIMEWINDOW_RANGE_EXTERNAL 2 #define LASTROW_RETRIEVE_TYPE_ALL 0x1 #define LASTROW_RETRIEVE_TYPE_SINGLE 0x2 diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index cec714e0ee..ea8ac09429 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -16,6 +16,12 @@ #include "tsdb.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) +typedef enum { + EXTERNAL_ROWS_PREV = 0x1, + EXTERNAL_ROWS_MAIN = 0x2, + EXTERNAL_ROWS_NEXT = 0x3, +} EContentData; + typedef struct { STbDataIter* iter; int32_t index; @@ -70,9 +76,9 @@ typedef struct SFilesetIter { } SFilesetIter; typedef struct SFileDataBlockInfo { - int32_t - tbBlockIdx; // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it + // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it uint64_t uid; + int32_t tbBlockIdx; } SFileDataBlockInfo; typedef struct SDataBlockIter { @@ -99,12 +105,11 @@ typedef struct SReaderStatus { SHashObj* pTableMap; // SHash STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks. SFileBlockDumpInfo fBlockDumpInfo; - - SDFileSet* pCurrentFileset; // current opened file set - SBlockData fileBlockData; - SFilesetIter fileIter; - SDataBlockIter blockIter; - bool composedDataBlock; // the returned data block is a composed block or not + SDFileSet* pCurrentFileset; // current opened file set + SBlockData fileBlockData; + SFilesetIter fileIter; + SDataBlockIter blockIter; + bool composedDataBlock; // the returned data block is a composed block or not } SReaderStatus; struct STsdbReader { @@ -115,15 +120,17 @@ struct STsdbReader { SSDataBlock* pResBlock; int32_t capacity; SReaderStatus status; - char* idStr; // query info handle, for debug purpose - int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows + char* idStr; // query info handle, for debug purpose + int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows SBlockLoadSuppInfo suppInfo; STsdbReadSnap* pReadSnap; + SIOCostSummary cost; + STSchema* pSchema; + SDataFReader* pFileReader; + SVersionRange verRange; - SIOCostSummary cost; - STSchema* pSchema; - SDataFReader* pFileReader; - SVersionRange verRange; + int32_t step; + STsdbReader* innerReader[2]; }; static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); @@ -200,6 +207,9 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK pTsdbReader->idStr); } + tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables, (sizeof(STableBlockScanInfo)*numOfTables)/1024.0, + pTsdbReader->idStr); + return pTableMap; } @@ -328,7 +338,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) { continue; } - tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, fid, pReader->window.skey, + tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey, pReader->window.ekey, pReader->idStr); return true; } @@ -378,7 +388,7 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) return pResBlock; } -static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, const char* idstr) { +static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity, const char* idstr) { int32_t code = 0; int8_t level = 0; STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader)); @@ -392,7 +402,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level); pReader->suid = pCond->suid; pReader->order = pCond->order; - pReader->capacity = 4096; + pReader->capacity = capacity; pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL; pReader->verRange = getQueryVerRange(pVnode, pCond, level); pReader->type = pCond->type; @@ -483,95 +493,6 @@ _end: // return res; // } -// static TSKEY extractFirstTraverseKey(STableBlockScanInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT -// maxVer) { -// TSDBROW row = {0}; -// STSRow *rmem = NULL, *rimem = NULL; - -// if (pCheckInfo->iter) { -// if (tsdbTbDataIterGet(pCheckInfo->iter, &row)) { -// rmem = row.pTSRow; -// } -// } - -// if (pCheckInfo->iiter) { -// if (tsdbTbDataIterGet(pCheckInfo->iiter, &row)) { -// rimem = row.pTSRow; -// } -// } - -// if (rmem == NULL && rimem == NULL) { -// return TSKEY_INITIAL_VAL; -// } - -// if (rmem != NULL && rimem == NULL) { -// pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; -// return TD_ROW_KEY(rmem); -// } - -// if (rmem == NULL && rimem != NULL) { -// pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; -// return TD_ROW_KEY(rimem); -// } - -// TSKEY r1 = TD_ROW_KEY(rmem); -// TSKEY r2 = TD_ROW_KEY(rimem); - -// if (r1 == r2) { -// if (TD_SUPPORT_UPDATE(update)) { -// pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH; -// } else { -// pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; -// tsdbTbDataIterNext(pCheckInfo->iter); -// } -// return r1; -// } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) { -// pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; -// return r1; -// } else { -// pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; -// return r2; -// } -// } - -// static bool moveToNextRowInMem(STableBlockScanInfo* pCheckInfo) { -// bool hasNext = false; -// if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) { -// if (pCheckInfo->iter != NULL) { -// hasNext = tsdbTbDataIterNext(pCheckInfo->iter); -// } - -// if (hasNext) { -// return hasNext; -// } - -// if (pCheckInfo->iiter != NULL) { -// return tsdbTbDataIterGet(pCheckInfo->iiter, NULL); -// } -// } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) { -// if (pCheckInfo->iiter != NULL) { -// hasNext = tsdbTbDataIterNext(pCheckInfo->iiter); -// } - -// if (hasNext) { -// return hasNext; -// } - -// if (pCheckInfo->iter != NULL) { -// return tsdbTbDataIterGet(pCheckInfo->iter, NULL); -// } -// } else { -// if (pCheckInfo->iter != NULL) { -// hasNext = tsdbTbDataIterNext(pCheckInfo->iter); -// } -// if (pCheckInfo->iiter != NULL) { -// hasNext = tsdbTbDataIterNext(pCheckInfo->iiter) || hasNext; -// } -// } - -// return hasNext; -// } - // static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { // int32_t firstSlot = 0; // int32_t lastSlot = numOfBlocks - 1; @@ -602,18 +523,22 @@ _end: static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) { SArray* aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + int64_t st = taosGetTimestampUs(); int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL); if (code != TSDB_CODE_SUCCESS) { goto _end; } - if (taosArrayGetSize(aBlockIdx) == 0) { + size_t num = taosArrayGetSize(aBlockIdx); + if (num == 0) { taosArrayClear(aBlockIdx); return TSDB_CODE_SUCCESS; } - SBlockIdx* pBlockIdx; - for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) { + int64_t et1 = taosGetTimestampUs(); + + SBlockIdx* pBlockIdx = NULL; + for (int32_t i = 0; i < num; ++i) { pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i); // uid check @@ -627,17 +552,6 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, continue; } - // todo: not valid info in bockIndex - // time range check - // if (pBlockIdx->minKey > pReader->window.ekey || pBlockIdx->maxKey < pReader->window.skey) { - // continue; - // } - - // version check - // if (pBlockIdx->minVersion > pReader->verRange.maxVer || pBlockIdx->maxVersion < pReader->verRange.minVer) { - // continue; - // } - STableBlockScanInfo* pScanInfo = p; if (pScanInfo->pBlockList == NULL) { pScanInfo->pBlockList = taosArrayInit(16, sizeof(SBlock)); @@ -647,6 +561,9 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, taosArrayPush(pIndexList, pBlockIdx); } + int64_t et2 = taosGetTimestampUs(); + tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%d bytes %s", + (int32_t)num, (et1 - st)/1000.0, (et2-et1)/1000.0, num * sizeof(SBlockIdx), pReader->idStr); _end: taosArrayDestroy(aBlockIdx); return code; @@ -655,9 +572,11 @@ _end: static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_t* numOfValidTables, int32_t* numOfBlocks) { size_t numOfTables = taosArrayGetSize(pIndexList); - *numOfValidTables = 0; + int64_t st = taosGetTimestampUs(); + size_t size = 0; + STableBlockScanInfo* px = NULL; while (1) { px = taosHashIterate(pReader->status.pTableMap, px); @@ -675,6 +594,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ tMapDataReset(&mapData); tsdbReadBlock(pReader->pFileReader, pBlockIdx, &mapData, NULL); + size += mapData.nData; + STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t)); for (int32_t j = 0; j < mapData.nItem; ++j) { SBlock block = {0}; @@ -706,6 +627,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ } } + int64_t et = taosGetTimestampUs(); + tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, size:%.2f Kb, elapsed time:%.2f ms %s", + numOfTables, *numOfBlocks, *numOfValidTables, size/1000.0, (et-st)/1000.0, pReader->idStr); + return TSDB_CODE_SUCCESS; } @@ -816,7 +741,6 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn return TSDB_CODE_SUCCESS; } -// todo consider the output buffer size static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { int64_t st = taosGetTimestampUs(); @@ -853,346 +777,6 @@ _error: return code; } -// static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { -// int firstPos, lastPos, midPos = -1; -// int numOfRows; -// TSKEY* keyList; - -// assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); - -// if (num <= 0) return -1; - -// keyList = (TSKEY*)pValue; -// firstPos = 0; -// lastPos = num - 1; - -// if (order == TSDB_ORDER_DESC) { -// // find the first position which is smaller than the key -// while (1) { -// if (key >= keyList[lastPos]) return lastPos; -// if (key == keyList[firstPos]) return firstPos; -// if (key < keyList[firstPos]) return firstPos - 1; - -// numOfRows = lastPos - firstPos + 1; -// midPos = (numOfRows >> 1) + firstPos; - -// if (key < keyList[midPos]) { -// lastPos = midPos - 1; -// } else if (key > keyList[midPos]) { -// firstPos = midPos + 1; -// } else { -// break; -// } -// } - -// } else { -// // find the first position which is bigger than the key -// while (1) { -// if (key <= keyList[firstPos]) return firstPos; -// if (key == keyList[lastPos]) return lastPos; - -// if (key > keyList[lastPos]) { -// lastPos = lastPos + 1; -// if (lastPos >= num) -// return -1; -// else -// return lastPos; -// } - -// numOfRows = lastPos - firstPos + 1; -// midPos = (numOfRows >> 1) + firstPos; - -// if (key < keyList[midPos]) { -// lastPos = midPos - 1; -// } else if (key > keyList[midPos]) { -// firstPos = midPos + 1; -// } else { -// break; -// } -// } -// } - -// return midPos; -// } - -// static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle) { -// SQueryFilePos* cur = &pTsdbReadHandle->cur; - -// if (cur->rows > 0) { -// if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { -// assert(cur->win.skey >= pTsdbReadHandle->window.skey && cur->win.ekey <= pTsdbReadHandle->window.ekey); -// } else { -// assert(cur->win.skey >= pTsdbReadHandle->window.ekey && cur->win.ekey <= pTsdbReadHandle->window.skey); -// } - -// SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0); -// assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && -// cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows - 1]); -// } else { -// cur->win = pTsdbReadHandle->window; - -// int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; -// cur->lastKey = pTsdbReadHandle->window.ekey + step; -// } -// } - -// static void copyAllRemainRowsFromFileBlock(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo, -// SDataBlockInfo* pBlockInfo, int32_t endPos) { -// SQueryFilePos* cur = &pTsdbReadHandle->cur; - -// SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; -// TSKEY* tsArray = pCols->cols[0].pData; - -// bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order); - -// int32_t step = ascScan ? 1 : -1; - -// int32_t start = cur->pos; -// int32_t end = endPos; - -// if (!ascScan) { -// TSWAP(start, end); -// } - -// assert(pTsdbReadHandle->outputCapacity >= (end - start + 1)); -// int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end); - -// // the time window should always be ascending order: skey <= ekey -// cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]}; -// cur->mixBlock = (numOfRows != pBlockInfo->rows); -// cur->lastKey = tsArray[endPos] + step; -// cur->blockCompleted = (ascScan ? (endPos == pBlockInfo->rows - 1) : (endPos == 0)); - -// // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases. -// int32_t pos = endPos + step; -// updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos); -// doCheckGeneratedBlockRange(pTsdbReadHandle); - -// tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", -// pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, -// pTsdbReadHandle->idStr); -// } - -// // only return the qualified data to client in terms of query time window, data rows in the same block but do not -// // be included in the query time window will be discarded -// static void doMergeTwoLevelData(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo, SBlock* pBlock) { -// SQueryFilePos* cur = &pTsdbReadHandle->cur; -// SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); -// STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); - -// initTableMemIterator(pTsdbReadHandle, pCheckInfo); - -// SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; -// assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID && -// cur->pos >= 0 && cur->pos < pBlock->numOfRows); -// // Even Multi-Version supported, the records with duplicated TSKEY would be merged inside of tsdbLoadData -// interface. TSKEY* tsArray = pCols->cols[0].pData; assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == -// pBlock->minKey.ts && -// tsArray[pBlock->numOfRows - 1] == pBlock->maxKey.ts); - -// bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order); -// int32_t step = ascScan ? 1 : -1; - -// // for search the endPos, so the order needs to reverse -// int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; - -// int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)); -// int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo); - -// STimeWindow* pWin = &blockInfo.window; -// tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64 -// " rows:%d, start:%d, end:%d, %s", -// pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, cur->pos, endPos, -// pTsdbReadHandle->idStr); - -// // compared with the data from in-memory buffer, to generate the correct timestamp array list -// int32_t numOfRows = 0; -// int32_t curRow = 0; - -// int16_t rv1 = -1; -// int16_t rv2 = -1; -// STSchema* pSchema1 = NULL; -// STSchema* pSchema2 = NULL; - -// int32_t pos = cur->pos; -// cur->win = TSWINDOW_INITIALIZER; -// bool adjustPos = false; - -// // no data in buffer, load data from file directly -// if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) { -// copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &blockInfo, endPos); -// return; -// } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { -// SSkipListNode* node = NULL; -// TSKEY lastKeyAppend = TSKEY_INITIAL_VAL; - -// do { -// STSRow* row2 = NULL; -// STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2, TD_VER_MAX); -// if (row1 == NULL) { -// break; -// } - -// TSKEY key = TD_ROW_KEY(row1); -// if ((key > pTsdbReadHandle->window.ekey && ascScan) || (key < pTsdbReadHandle->window.ekey && !ascScan)) { -// break; -// } - -// if (adjustPos) { -// if (key == lastKeyAppend) { -// pos -= step; -// } -// adjustPos = false; -// } - -// if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) || -// ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) { -// break; -// } - -// if ((key < tsArray[pos] && ascScan) || (key > tsArray[pos] && !ascScan)) { -// if (rv1 != TD_ROW_SVER(row1)) { -// // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); -// rv1 = TD_ROW_SVER(row1); -// } -// if (row2 && rv2 != TD_ROW_SVER(row2)) { -// // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); -// rv2 = TD_ROW_SVER(row2); -// } - -// numOfRows += -// mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, -// pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); -// if (cur->win.skey == TSKEY_INITIAL_VAL) { -// cur->win.skey = key; -// } - -// cur->win.ekey = key; -// cur->lastKey = key + step; -// cur->mixBlock = true; -// moveToNextRowInMem(pCheckInfo); -// } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it -// if (TD_SUPPORT_UPDATE(pCfg->update)) { -// if (lastKeyAppend != key) { -// if (lastKeyAppend != TSKEY_INITIAL_VAL) { -// ++curRow; -// } -// lastKeyAppend = key; -// } -// // load data from file firstly -// numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos); - -// if (rv1 != TD_ROW_SVER(row1)) { -// rv1 = TD_ROW_SVER(row1); -// } -// if (row2 && rv2 != TD_ROW_SVER(row2)) { -// rv2 = TD_ROW_SVER(row2); -// } - -// // still assign data into current row -// numOfRows += -// mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, -// pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); - -// if (cur->win.skey == TSKEY_INITIAL_VAL) { -// cur->win.skey = key; -// } - -// cur->win.ekey = key; -// cur->lastKey = key + step; -// cur->mixBlock = true; - -// moveToNextRowInMem(pCheckInfo); - -// pos += step; -// adjustPos = true; -// } else { -// // discard the memory record -// moveToNextRowInMem(pCheckInfo); -// } -// } else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) { -// if (cur->win.skey == TSKEY_INITIAL_VAL) { -// cur->win.skey = tsArray[pos]; -// } - -// int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order); -// assert(end != -1); - -// if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it -// #if 0 -// if (pCfg->update == TD_ROW_DISCARD_UPDATE) { -// moveToNextRowInMem(pCheckInfo); -// } else { -// end -= step; -// } -// #endif -// if (!TD_SUPPORT_UPDATE(pCfg->update)) { -// moveToNextRowInMem(pCheckInfo); -// } else { -// end -= step; -// } -// } - -// int32_t qstart = 0, qend = 0; -// getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend); - -// if ((lastKeyAppend != TSKEY_INITIAL_VAL) && (lastKeyAppend != (ascScan ? tsArray[qstart] : tsArray[qend]))) { -// ++curRow; -// } - -// numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend); -// pos += (qend - qstart + 1) * step; -// if (numOfRows > 0) { -// curRow = numOfRows - 1; -// } - -// cur->win.ekey = ascScan ? tsArray[qend] : tsArray[qstart]; -// cur->lastKey = cur->win.ekey + step; -// lastKeyAppend = cur->win.ekey; -// } -// } while (numOfRows < pTsdbReadHandle->outputCapacity); - -// if (numOfRows < pTsdbReadHandle->outputCapacity) { -// /** -// * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT -// * copy them all to result buffer, since it may be overlapped with file data block. -// */ -// if (node == NULL || ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) -// || -// ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) { -// // no data in cache or data in cache is greater than the ekey of time window, load data from file block -// if (cur->win.skey == TSKEY_INITIAL_VAL) { -// cur->win.skey = tsArray[pos]; -// } - -// int32_t start = -1, end = -1; -// getQualifiedRowsPos(pTsdbReadHandle, pos, endPos, numOfRows, &start, &end); - -// numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end); -// pos += (end - start + 1) * step; - -// cur->win.ekey = ascScan ? tsArray[end] : tsArray[start]; -// cur->lastKey = cur->win.ekey + step; -// cur->mixBlock = true; -// } -// } -// } - -// cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) || -// ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan)); - -// if (!ascScan) { -// TSWAP(cur->win.skey, cur->win.ekey); -// } - -// updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos); -// doCheckGeneratedBlockRange(pTsdbReadHandle); - -// tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", -// pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, -// pTsdbReadHandle->idStr); -// } - static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) { taosMemoryFreeClear(pSup->numOfBlocksPerTable); taosMemoryFreeClear(pSup->indexPerTable); @@ -1252,8 +836,9 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte // access data blocks according to the offset of each block in asc/desc order. int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap); - SBlockOrderSupporter sup = {0}; + int64_t st = taosGetTimestampUs(); + SBlockOrderSupporter sup = {0}; int32_t code = initBlockOrderSupporter(&sup, numOfTables); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1302,11 +887,12 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i}; taosArrayPush(pBlockIter->blockList, &blockInfo); } - tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted %s", pReader, cnt, - pReader->idStr); + + int64_t et = taosGetTimestampUs(); + tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s", pReader, cnt, + (et - st)/1000.0, pReader->idStr); pBlockIter->index = asc ? 0 : (numOfBlocks - 1); - cleanupBlockOrderSupporter(&sup); return TSDB_CODE_SUCCESS; } @@ -1340,7 +926,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree)); } - tsdbDebug("%p %d data blocks sort completed, %s", pReader, cnt, pReader->idStr); + int64_t et = taosGetTimestampUs(); + tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, cnt, (et-st)/1000.0, pReader->idStr); cleanupBlockOrderSupporter(&sup); taosMemoryFree(pTree); @@ -1813,6 +1400,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* SBlockData* pBlockData = &pReader->status.fileBlockData; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; + int64_t st = taosGetTimestampUs(); + while (1) { // todo check the validate of row in file block { @@ -1851,10 +1440,11 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* blockDataUpdateTsWindow(pResBlock, 0); setComposedBlockFlag(pReader, true); + int64_t et = taosGetTimestampUs(); - tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", pReader, + tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, elapsed time:%.2f ms %s", pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, - pReader->idStr); + (et - st)/1000.0, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -2031,7 +1621,9 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) { SReaderStatus* pStatus = &pReader->status; - SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx)); + + size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); + SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx)); while (1) { bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader); @@ -2799,24 +2391,57 @@ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) { // ====================================== EXPOSED APIs ====================================== int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, const char* idstr) { - int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, idstr); - if (code) { + int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr); + if (code != TSDB_CODE_SUCCESS) { goto _err; } - if (pCond->suid != 0) { - (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, (*ppReader)->suid, -1); - } else if (taosArrayGetSize(pTableList) > 0) { - STableKeyInfo* pKey = taosArrayGet(pTableList, 0); - (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, pKey->uid, -1); - } - + // check for query time window STsdbReader* pReader = *ppReader; if (isEmptyQueryTimeWindow(&pReader->window)) { tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr); return TSDB_CODE_SUCCESS; } + if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) { + // update the SQueryTableDataCond to create inner reader + STimeWindow w = pCond->twindows; + int32_t order = pCond->order; + if (order == TSDB_ORDER_ASC) { + pCond->twindows.ekey = pCond->twindows.skey; + pCond->twindows.skey = INT64_MIN; + pCond->order = TSDB_ORDER_DESC; + } else { + pCond->twindows.skey = pCond->twindows.ekey; + pCond->twindows.ekey = INT64_MAX; + pCond->order = TSDB_ORDER_ASC; + } + + code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + if (order == TSDB_ORDER_ASC) { + pCond->twindows.skey = w.ekey; + pCond->twindows.ekey = INT64_MAX; + } else { + pCond->twindows.skey = INT64_MIN; + pCond->twindows.ekey = w.ekey; + } + code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + } + + if (pCond->suid != 0) { + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1); + } else if (taosArrayGetSize(pTableList) > 0) { + STableKeyInfo* pKey = taosArrayGet(pTableList, 0); + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1); + } + int32_t numOfTables = taosArrayGetSize(pTableList); pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables); if (pReader->status.pTableMap == NULL) { @@ -2827,21 +2452,41 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl goto _err; } - SDataBlockIter* pBlockIter = &pReader->status.blockIter; - code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap); - if (code) goto _err; + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } - initFilesetIterator(&pReader->status.fileIter, (*ppReader)->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr); - resetDataBlockIterator(&pReader->status.blockIter, pReader->order); + if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) { + SDataBlockIter* pBlockIter = &pReader->status.blockIter; - // no data in files, let's try buffer in memory - if (pReader->status.fileIter.numOfFiles == 0) { - pReader->status.loadFromFile = false; + initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr); + resetDataBlockIterator(&pReader->status.blockIter, pReader->order); + + // no data in files, let's try buffer in memory + if (pReader->status.fileIter.numOfFiles == 0) { + pReader->status.loadFromFile = false; + } else { + code = initForFirstBlockInFile(pReader, pBlockIter); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } } else { - code = initForFirstBlockInFile(pReader, pBlockIter); - if (code != TSDB_CODE_SUCCESS) { - return code; + STsdbReader* pPrevReader = pReader->innerReader[0]; + SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter; + + initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order, pPrevReader->idStr); + resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order); + + // no data in files, let's try buffer in memory + if (pPrevReader->status.fileIter.numOfFiles == 0) { + pPrevReader->status.loadFromFile = false; + } else { + code = initForFirstBlockInFile(pPrevReader, pBlockIter); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } @@ -2881,20 +2526,6 @@ void tsdbReaderClose(STsdbReader* pReader) { tsdbDataFReaderClose(&pReader->pFileReader); } -#if 0 -// if (pReader->status.pTableScanInfo != NULL) { -// pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo); -// } - -// tsdbDestroyReadH(&pReader->rhelper); - -// tdFreeDataCols(pReader->pDataCols); -// pReader->pDataCols = NULL; -// -// pReader->prev = doFreeColumnInfoData(pReader->prev); -// pReader->next = doFreeColumnInfoData(pReader->next); -#endif - SIOCostSummary* pCost = &pReader->cost; tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64 @@ -2907,55 +2538,100 @@ void tsdbReaderClose(STsdbReader* pReader) { taosMemoryFreeClear(pReader); } -bool tsdbNextDataBlock(STsdbReader* pReader) { - if (isEmptyQueryTimeWindow(&pReader->window)) { - return false; - } - +static bool doTsdbNextDataBlock(STsdbReader* pReader) { // cleanup the data that belongs to the previous data block SSDataBlock* pBlock = pReader->pResBlock; blockDataCleanup(pBlock); int64_t stime = taosGetTimestampUs(); - int64_t elapsedTime = stime; SReaderStatus* pStatus = &pReader->status; - if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) { - if (pStatus->loadFromFile) { - int32_t code = buildBlockFromFiles(pReader); - if (code != TSDB_CODE_SUCCESS) { - return false; - } + if (pStatus->loadFromFile) { + int32_t code = buildBlockFromFiles(pReader); + if (code != TSDB_CODE_SUCCESS) { + return false; + } - if (pBlock->info.rows > 0) { - return true; - } else { - buildBlockFromBufferSequentially(pReader); - return pBlock->info.rows > 0; - } - } else { // no data in files, let's try the buffer + if (pBlock->info.rows > 0) { + return true; + } else { buildBlockFromBufferSequentially(pReader); return pBlock->info.rows > 0; } - } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) { - } else if (pReader->type == BLOCK_LOAD_EXTERN_ORDER) { - } else { - ASSERT(0); + } else { // no data in files, let's try the buffer + buildBlockFromBufferSequentially(pReader); + return pBlock->info.rows > 0; } + return false; } -void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { +bool tsdbNextDataBlock(STsdbReader* pReader) { + if (isEmptyQueryTimeWindow(&pReader->window)) { + return false; + } + + if (pReader->innerReader[0] != NULL) { + bool ret = doTsdbNextDataBlock(pReader->innerReader[0]); + if (ret) { + pReader->step = EXTERNAL_ROWS_PREV; + return ret; + } + + tsdbReaderClose(pReader->innerReader[0]); + pReader->innerReader[0] = NULL; + } + + pReader->step = EXTERNAL_ROWS_MAIN; + bool ret = doTsdbNextDataBlock(pReader); + if (ret) { + return ret; + } + + if (pReader->innerReader[1] != NULL) { + bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]); + if (ret1) { + pReader->step = EXTERNAL_ROWS_NEXT; + return ret1; + } + + tsdbReaderClose(pReader->innerReader[1]); + pReader->innerReader[1] = NULL; + } + + return false; +} + +static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { ASSERT(pDataBlockInfo != NULL && pReader != NULL); pDataBlockInfo->rows = pReader->pResBlock->info.rows; pDataBlockInfo->uid = pReader->pResBlock->info.uid; pDataBlockInfo->window = pReader->pResBlock->info.window; } +void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { + if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { + if (pReader->step == EXTERNAL_ROWS_MAIN) { + setBlockInfo(pReader, pDataBlockInfo); + } else if (pReader->step == EXTERNAL_ROWS_PREV) { + setBlockInfo(pReader->innerReader[0], pDataBlockInfo); + } else { + setBlockInfo(pReader->innerReader[1], pDataBlockInfo); + } + } else { + setBlockInfo(pReader, pDataBlockInfo); + } +} + int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) { int32_t code = 0; *allHave = false; + if(pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { + *pBlockStatis = NULL; + return TSDB_CODE_SUCCESS; + } + // there is no statistics data for composed block if (pReader->status.composedDataBlock) { *pBlockStatis = NULL; @@ -3025,7 +2701,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS return code; } -SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { +static SArray* doRetrieveDataBlock(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; if (pStatus->composedDataBlock) { @@ -3054,16 +2730,27 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { return pReader->pResBlock->pDataBlock; } +SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { + if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { + if (pReader->step == EXTERNAL_ROWS_PREV) { + return doRetrieveDataBlock(pReader->innerReader[0]); + } else if (pReader->step == EXTERNAL_ROWS_NEXT) { + return doRetrieveDataBlock(pReader->innerReader[1]); + } + } + + return doRetrieveDataBlock(pReader); +} + int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { if (isEmptyQueryTimeWindow(&pReader->window)) { return TSDB_CODE_SUCCESS; } pReader->order = pCond->order; - pReader->type = BLOCK_LOAD_OFFSET_ORDER; + pReader->type = TIMEWINDOW_RANGE_CONTAINED; pReader->status.loadFromFile = true; pReader->status.pTableIter = NULL; - pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); // allocate buffer in order to load data blocks from file @@ -3073,10 +2760,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; tsdbDataFReaderClose(&pReader->pFileReader); - // todo set the correct numOfTables - int32_t numOfTables = 1; - SDataBlockIter* pBlockIter = &pReader->status.blockIter; - + int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap); tsdbDataFReaderClose(&pReader->pFileReader); initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr); @@ -3084,18 +2768,23 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { resetDataBlockScanInfo(pReader->status.pTableMap); int32_t code = 0; + SDataBlockIter* pBlockIter = &pReader->status.blockIter; + // no data in files, let's try buffer in memory if (pReader->status.fileIter.numOfFiles == 0) { pReader->status.loadFromFile = false; } else { code = initForFirstBlockInFile(pReader, pBlockIter); if (code != TSDB_CODE_SUCCESS) { + tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", + pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr); return code; } } tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr); + return code; } @@ -3186,7 +2875,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { STbData* d = NULL; if (pReader->pTsdb->mem != NULL) { - tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d); + tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid, &d); if (d != NULL) { rows += tsdbGetNRowsInTbData(d); } @@ -3194,7 +2883,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { STbData* di = NULL; if (pReader->pTsdb->imem != NULL) { - tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di); + tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid, &di); if (di != NULL) { rows += tsdbGetNRowsInTbData(di); } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 2f96482643..be97b20455 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -82,8 +82,6 @@ size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); -void closeAllResultRows(SResultRowInfo* pResultRowInfo); - void initResultRow(SResultRow* pResultRow); void closeResultRow(SResultRow* pResultRow); bool isResultRowClosed(SResultRow* pResultRow); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1ad17bbc76..f4d0eb3b5e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -108,7 +108,6 @@ typedef struct STaskCostInfo { SFileBlockLoadRecorder* pRecoder; uint64_t elapsedTime; - uint64_t firstStageMergeTime; uint64_t winInfoSize; uint64_t tableInfoSize; uint64_t hashSize; @@ -549,6 +548,7 @@ typedef struct SProjectOperatorInfo { SLimitInfo limitInfo; bool mergeDataBlocks; SSDataBlock* pFinalRes; + SNode* pCondition; } SProjectOperatorInfo; typedef struct SIndefOperatorInfo { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index a76253ab20..ec8e3c4abb 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -43,10 +43,6 @@ void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) { } } -void closeAllResultRows(SResultRowInfo* pResultRowInfo) { - // do nothing -} - bool isResultRowClosed(SResultRow* pRow) { return (pRow->closed == true); } void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; } @@ -160,11 +156,13 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) { SArray* createSortInfo(SNodeList* pNodeList) { size_t numOfCols = 0; + if (pNodeList != NULL) { numOfCols = LIST_LENGTH(pNodeList); } else { numOfCols = 0; } + SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo)); if (pList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -196,10 +194,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { for (int32_t i = 0; i < numOfCols; ++i) { SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i); - /*if (!pDescNode->output) { // todo disable it temporarily*/ - /*continue;*/ - /*}*/ - SColumnInfoData idata = createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId); idata.info.scale = pDescNode->dataType.scale; @@ -701,9 +695,6 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu } } -#ifdef BUF_PAGE_DEBUG - qDebug("page_setSelect num:%d", num); -#endif if (p != NULL) { p->subsidiaries.pCtx = pValCtx; p->subsidiaries.num = num; @@ -852,7 +843,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi // TODO: get it from stable scan node pCond->twindows = pTableScanNode->scanRange; pCond->suid = pTableScanNode->scan.suid; - pCond->type = BLOCK_LOAD_OFFSET_ORDER; + pCond->type = TIMEWINDOW_RANGE_CONTAINED; pCond->startVersion = -1; pCond->endVersion = -1; // pCond->type = pTableScanNode->scanFlag; @@ -947,6 +938,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter } // get the correct time window according to the handled timestamp +// todo refactor STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, int32_t order) { STimeWindow w = {0}; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b063d0b3bd..38da9de32c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1665,9 +1665,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) { // hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map); // pSummary->hashSize = hashSize; - // add the merge time - pSummary->elapsedTime += pSummary->firstStageMergeTime; - // SResultRowPool* p = pTaskInfo->pool; // if (p != NULL) { // pSummary->winInfoSize = getResultRowPoolMemSize(p); @@ -1676,17 +1673,16 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) { // pSummary->winInfoSize = 0; // pSummary->numOfTimeWindows = 0; // } - // - // calculateOperatorProfResults(pQInfo); SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder; if (pSummary->pRecoder != NULL) { - qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64 - " us, total blocks:%d, " - "load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64, - GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pRecorder->totalBlocks, - pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows); + qDebug( + "%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total rows:%" + PRId64 ", check rows:%" PRId64, GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, + pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, + pRecorder->totalCheckedRows); } + // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, // hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0, // pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0); @@ -3031,7 +3027,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } } - closeAllResultRows(&pAggInfo->binfo.resultRowInfo); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); OPTR_SET_OPENED(pOperator); @@ -3405,28 +3400,32 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { break; } - // no results generated - if (pInfo->pRes->info.rows == 0 || (!pProjectInfo->mergeDataBlocks)) { - break; - } + if (pProjectInfo->mergeDataBlocks && pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) { + if (pRes->info.rows > 0) { + pFinalRes->info.groupId = pRes->info.groupId; + pFinalRes->info.version = pRes->info.version; - if (pProjectInfo->mergeDataBlocks) { - pFinalRes->info.groupId = pInfo->pRes->info.groupId; - pFinalRes->info.version = pInfo->pRes->info.version; - - // continue merge data, ignore the group id - blockDataMerge(pFinalRes, pInfo->pRes); - - if (pFinalRes->info.rows + pInfo->pRes->info.rows <= pOperator->resultInfo.threshold && - pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) { - continue; + // continue merge data, ignore the group id + blockDataMerge(pFinalRes, pRes); + if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold) { + continue; + } } - } - // do apply filter - SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes; - doFilter(pProjectInfo->pFilterNode, p, NULL); - if (p->info.rows > 0) { + // do apply filter + doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL); + if (pFinalRes->info.rows > 0 || pRes->info.rows == 0) { + break; + } + } else { + // do apply filter + if (pRes->info.rows > 0) { + doFilter(pProjectInfo->pFilterNode, pRes, NULL); + if (pRes->info.rows == 0) { + continue; + } + } + break; } } @@ -3890,8 +3889,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = pResBlock; - pInfo->pFinalRes = createOneDataBlock(pResBlock, false); - + pInfo->pFinalRes = createOneDataBlock(pResBlock, false); pInfo->pFilterNode = pProjPhyNode->node.pConditions; pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock; @@ -4422,7 +4420,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; pCond->suid = uid; - pCond->type = BLOCK_LOAD_OFFSET_ORDER; + pCond->type = TIMEWINDOW_RANGE_CONTAINED; pCond->startVersion = -1; pCond->endVersion = -1; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 4204988eb0..1e001a29a0 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1094,7 +1094,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); } - closeAllResultRows(&pInfo->binfo.resultRowInfo); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->order); OPTR_SET_OPENED(pOperator); @@ -1250,7 +1249,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->status = OP_RES_TO_RETURN; - closeAllResultRows(&pBInfo->resultRowInfo); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); @@ -2045,7 +2043,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; - closeAllResultRows(&pBInfo->resultRowInfo); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); @@ -2209,8 +2206,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { SSDataBlock* pResBlock = pSliceInfo->pRes; SExprSupp* pSup = &pOperator->exprSupp; - blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); - // if (pOperator->status == OP_RES_TO_RETURN) { // // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) { @@ -2350,10 +2345,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues); - pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - pInfo->win = pInterpPhyNode->timeRange; + pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + pInfo->win = pInterpPhyNode->timeRange; pInfo->interval.interval = pInterpPhyNode->interval; - pInfo->current = pInfo->win.skey; + pInfo->current = pInfo->win.skey; pOperator->name = "TimeSliceOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC; diff --git a/tools/taos-tools b/tools/taos-tools index 9cfa195713..0b8a3373bb 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 9cfa195713d1cae9edf417a8d49bde87dd971016 +Subproject commit 0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a