diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 8015360919..9607157bda 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -362,10 +362,10 @@ bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functio bool doRevisedResultsByLimit(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - if ((pQuery->limit.limit > 0) && (pQuery->rec.rows + pQuery->rec.rows > pQuery->limit.limit)) { - pQuery->rec.rows = pQuery->limit.limit - pQuery->rec.rows; - - // query completed + if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) { + pQuery->rec.rows = pQuery->limit.limit - pQuery->rec.total; + assert(pQuery->rec.rows > 0); + setQueryStatus(pQuery, QUERY_COMPLETED); return true; } @@ -2503,17 +2503,20 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl } int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { - int firstPos, lastPos, midPos = -1; - int numOfPoints; - TSKEY *keyList; + int32_t midPos = -1; + int32_t numOfPoints; - if (num <= 0) return -1; + if (num <= 0) { + return -1; + } - keyList = (TSKEY *)pValue; - firstPos = 0; - lastPos = num - 1; + assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); + + TSKEY* keyList = (TSKEY *)pValue; + int32_t firstPos = 0; + int32_t lastPos = num - 1; - if (order == 0) { + if (order == TSDB_ORDER_DESC) { // find the first position which is smaller than the key while (1) { if (key >= keyList[lastPos]) return lastPos; diff --git a/src/util/src/tcompare.c b/src/util/src/tcompare.c index bff10c3022..848eeee573 100644 --- a/src/util/src/tcompare.c +++ b/src/util/src/tcompare.c @@ -13,7 +13,7 @@ int32_t compareInt32Val(const void *pLeft, const void *pRight) { } int32_t compareInt64Val(const void *pLeft, const void *pRight) { - int32_t ret = GET_INT64_VAL(pLeft) - GET_INT64_VAL(pRight); + int64_t ret = GET_INT64_VAL(pLeft) - GET_INT64_VAL(pRight); if (ret == 0) { return 0; } else { @@ -248,8 +248,9 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) { case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_BIGINT: { - if (filterDataType == TSDB_DATA_TYPE_BIGINT) { + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_TIMESTAMP: { + if (filterDataType == TSDB_DATA_TYPE_BIGINT || filterDataType == TSDB_DATA_TYPE_TIMESTAMP) { comparFn = compareInt64Val; break; } diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 138c75c197..beb831ea67 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -75,53 +75,6 @@ static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSk static SSkipListNode* tSkipListDoAppend(SSkipList *pSkipList, SSkipListNode *pNode); static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order); -//static __compar_fn_t getComparFunc(SSkipList *pSkipList, int32_t filterDataType) { -// __compar_fn_t comparFn = NULL; -// -// switch (pSkipList->keyInfo.type) { -// case TSDB_DATA_TYPE_TINYINT: -// case TSDB_DATA_TYPE_SMALLINT: -// case TSDB_DATA_TYPE_INT: -// case TSDB_DATA_TYPE_BIGINT: { -// if (filterDataType == TSDB_DATA_TYPE_BIGINT) { -// comparFn = compareInt64Val; -// break; -// } -// } -// case TSDB_DATA_TYPE_BOOL: { -// if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { -// comparFn = compareInt32Val; -// } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { -// comparFn = compareIntDoubleVal; -// } -// break; -// } -// case TSDB_DATA_TYPE_FLOAT: -// case TSDB_DATA_TYPE_DOUBLE: { -//// if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { -//// comparFn = compareDoubleIntVal; -//// } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { -//// comparFn = compareDoubleVal; -//// } -// if (filterDataType == TSDB_DATA_TYPE_DOUBLE) { -// comparFn = compareDoubleVal; -// } -// break; -// } -// case TSDB_DATA_TYPE_BINARY: -// comparFn = compareStrVal; -// break; -// case TSDB_DATA_TYPE_NCHAR: -// comparFn = compareWStrVal; -// break; -// default: -// comparFn = compareInt32Val; -// break; -// } -// -// return comparFn; -//} - static bool initForwardBackwardPtr(SSkipList* pSkipList) { uint32_t maxLevel = pSkipList->maxLevel; @@ -445,6 +398,11 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char* iter->cur = forward[0]; // greater equals than the value } else { iter->cur = SL_GET_FORWARD_POINTER(forward[0], 0); + + if (ret == 0) { + assert(iter->cur != pSkipList->pTail); + iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); + } } return iter; diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index ee95199e5b..bf45c2d0af 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -26,7 +26,7 @@ #define EXTRA_BYTES 2 #define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoData*)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) -#define QUERY_IS_ASC_QUERY(o) (o == TSDB_ORDER_ASC) +#define ASCENDING_ORDER_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns))) enum { @@ -154,6 +154,7 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond pQueryHandle->loadDataAfterSeek = false; pQueryHandle->isFirstSlot = true; + pQueryHandle->cur.fid = -1; size_t size = taosArrayGetSize(idList); assert(size >= 1); @@ -183,7 +184,7 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond * For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place * in case of descending timestamp order query. */ - pQueryHandle->checkFiles = QUERY_IS_ASC_QUERY(pQueryHandle->order); + pQueryHandle->checkFiles = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order); pQueryHandle->activeIndex = 0; // allocate buffer in order to load data blocks from file @@ -208,10 +209,11 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1); + pHandle->cur.fid = -1; + + STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); - STableCheckInfo* pTableCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); - - STable* pTable = pTableCheckInfo->pTableObj; + STable* pTable = pCheckInfo->pTableObj; assert(pTable != NULL); // no data in cache, abort @@ -219,11 +221,10 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { return false; } - STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); - pTable = pCheckInfo->pTableObj; - if (pCheckInfo->iter == NULL) { - pCheckInfo->iter = tSkipListCreateIter(pTable->mem->pData); + pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey, + TSDB_DATA_TYPE_TIMESTAMP, pHandle->order); + if (pCheckInfo->iter == NULL) { return false; } @@ -240,9 +241,12 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { SDataRow row = SL_GET_NODE_DATA(node); pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer + dTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d", pHandle, + pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order); // all data in mem are checked already. - if (pTableCheckInfo->lastKey > pHandle->window.ekey) { + if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_ORDER_TRAVERSE(pHandle->order)) || + (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_ORDER_TRAVERSE(pHandle->order))) { return false; } @@ -440,7 +444,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock SArray* sa = getDefaultLoadColumns(pQueryHandle, true); SQueryFilePos* cur = &pQueryHandle->cur; - if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { + if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { // query ended in current block if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) { if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) { @@ -611,11 +615,11 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf SDataCols* pCols = pCheckInfo->pDataCols; int32_t endPos = cur->pos; - if (QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) { + if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) { endPos = blockInfo.rows - 1; pQueryHandle->realNumOfRows = endPos - cur->pos + 1; pCheckInfo->lastKey = blockInfo.window.ekey + 1; - } else if (!QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) { + } else if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) { endPos = 0; pQueryHandle->realNumOfRows = cur->pos + 1; pCheckInfo->lastKey = blockInfo.window.ekey - 1; @@ -623,7 +627,7 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, order); - if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { + if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { if (endPos < cur->pos) { pQueryHandle->realNumOfRows = 0; return; @@ -940,7 +944,7 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) { int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) { - int32_t type = QUERY_IS_ASC_QUERY(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL; + int32_t type = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL; if (getFileCompInfo(pQueryHandle, &numOfBlocks, type) != TSDB_CODE_SUCCESS) { break; } @@ -968,7 +972,7 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) { return false; } - cur->slot = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1; + cur->slot = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1; cur->fid = pQueryHandle->pFileGroup->fileId; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; @@ -993,16 +997,16 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { return getDataBlocksInFilesImpl(pQueryHandle); } else { - if ((cur->slot == pQueryHandle->numOfBlocks - 1 && QUERY_IS_ASC_QUERY(pQueryHandle->order)) || - (cur->slot == 0 && !QUERY_IS_ASC_QUERY(pQueryHandle->order))) { // all blocks + if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || + (cur->slot == 0 && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { // all blocks return getDataBlocksInFilesImpl(pQueryHandle); } else { // next block of the same file - int32_t step = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 1:-1; + int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1:-1; cur->slot += step; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; - if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { + if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { cur->pos = 0; } else { cur->pos = pBlockInfo->pBlock.compBlock->numOfPoints - 1; @@ -1035,7 +1039,7 @@ bool tsdbNextDataBlock(tsdb_query_handle_t* pqHandle) { size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); assert(numOfTables > 0); - if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { + if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { if (pQueryHandle->checkFiles) { if (getDataBlocksInFiles(pQueryHandle)) { return true; @@ -1066,12 +1070,23 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); *skey = INT64_MIN; - do/* (1) */{ + do { SSkipListNode* node = tSkipListIterGet(pIter); - if (node == NULL) break; + if (node == NULL) { + break; + } SDataRow row = SL_GET_NODE_DATA(node); - if (dataRowKey(row) > maxKey) break; + TSKEY key = dataRowKey(row); + + if ((key > maxKey && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || + (key < maxKey && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { + + dTrace("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey, + pQueryHandle->window.ekey); + + break; + } if (*skey == INT64_MIN) { *skey = dataRowKey(row); @@ -1080,9 +1095,18 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max *ekey = dataRowKey(row); int32_t offset = 0; + char* pData = NULL; + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - memcpy(pColInfo->pData + numOfRows * pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes); + + if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { + pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes; + } + + memcpy(pData, dataRowTuple(row) + offset, pColInfo->info.bytes); offset += pColInfo->info.bytes; } @@ -1093,6 +1117,18 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max } while(tSkipListIterNext(pIter)); + assert(numOfRows <= maxRowsToRead); + + // if the buffer is not full in case of descending order query, move the data in the front of the buffer + if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) { + int32_t emptySize = maxRowsToRead - numOfRows; + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); + } + } + return numOfRows; } @@ -1105,6 +1141,8 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) { TSKEY skey = 0, ekey = 0; int32_t rows = 0; + int32_t step = ASCENDING_ORDER_TRAVERSE(pHandle->order)? 1:-1; + // data in file if (pHandle->cur.fid >= 0) { STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot]; @@ -1126,7 +1164,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) { ekey = *(TSKEY*)((char*)pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1)); // update the last key value - pBlockInfo->pTableCheckInfo->lastKey = ekey + 1; + pBlockInfo->pTableCheckInfo->lastKey = ekey + step; } } else { STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); @@ -1138,12 +1176,16 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) { rows = tsdbReadRowsFromCache(pCheckInfo->iter, pHandle->window.ekey, 2, &skey, &ekey, pHandle); // update the last key value - pCheckInfo->lastKey = ekey + 1; + pCheckInfo->lastKey = ekey + step; } } SDataBlockInfo blockInfo = { - .uid = pTable->tableId.uid, .sid = pTable->tableId.tid, .rows = rows, .window = {.skey = skey, .ekey = ekey}}; + .uid = pTable->tableId.uid, + .sid = pTable->tableId.tid, + .rows = rows, + .window = {.skey = MIN(skey, ekey), .ekey = MAX(skey, ekey)} + }; return blockInfo; } @@ -1399,7 +1441,10 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) { STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle; - + if (pQueryHandle == NULL) { + return; + } + size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); for (int32_t i = 0; i < size; ++i) { STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);