From cf8be044de5f3ed4b0a6733183700beb3c7b7903 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Thu, 20 May 2021 14:41:21 +0800 Subject: [PATCH] fix bug --- src/inc/taosdef.h | 2 + src/inc/tsdb.h | 4 + src/query/src/qExecutor.c | 4 +- src/tsdb/src/tsdbRead.c | 238 ++++++++++++++++++-------------------- 4 files changed, 119 insertions(+), 129 deletions(-) diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 2882faf7be..1c63a9a05d 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -33,6 +33,8 @@ extern "C" { #endif #define TSWINDOW_INITIALIZER ((STimeWindow) {INT64_MIN, INT64_MAX}) +#define TSWINDOW_DESC_INITIALIZER ((STimeWindow) {INT64_MAX, INT64_MIN}) + #define TSKEY_INITIAL_VAL INT64_MIN // Bytes for each type. diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 7c28d3e485..468c5ab271 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -265,6 +265,10 @@ TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId, SMemRef *pRef); + +TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef); + + /** * get the queried table object list * @param pHandle diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index b48436a617..c2a4c7febe 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -33,6 +33,8 @@ #define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) +#define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey)) + #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0} @@ -1989,7 +1991,7 @@ static bool isCachedLastQuery(SQueryAttr *pQueryAttr) { return false; } - if (!TSWINDOW_IS_EQUAL(pQueryAttr->window, TSWINDOW_INITIALIZER)) { + if (pQueryAttr->order.order != TSDB_ORDER_DESC || !TSWINDOW_IS_EQUAL(pQueryAttr->window, TSWINDOW_DESC_INITIALIZER)) { return false; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index a326bd07fc..54833e5e78 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -94,12 +94,6 @@ typedef struct SIOCostSummary { int64_t checkForNextTime; } SIOCostSummary; - -typedef struct SCacheLastColInfo { - int16_t i; - int16_t j; -} SCacheLastColInfo; - typedef struct STsdbQueryHandle { STsdbRepo* pTsdb; SQueryFilePos cur; // current position @@ -124,8 +118,6 @@ typedef struct STsdbQueryHandle { SFSIter fileIter; SReadH rhelper; STableBlockInfo* pDataBlockInfo; - - SCacheLastColInfo lastCols; SDataCols *pDataCols; // in order to hold current file data block int32_t allocSize; // allocated data block size SMemRef *pMemRef; @@ -146,6 +138,7 @@ typedef struct STableGroupSupporter { static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList); +static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList); static int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); @@ -554,7 +547,6 @@ TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STab return NULL; } - assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey); pQueryHandle->type = TSDB_QUERY_TYPE_LAST; return pQueryHandle; @@ -2490,133 +2482,124 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) { } -static int32_t copyColsFromCacheMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, STable* pTable) { - char* pData = NULL; - - STSchema* pSchema = tsdbGetTableSchema(pTable); - int32_t numOfCols = schemaNCols(pSchema); - int32_t tgNumOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); - - assert(numOfCols == pTable->restoreColumnNum); - assert(pTable->lastCols != NULL); - - int32_t i = pQueryHandle->lastCols.i, j = pQueryHandle->lastCols.j; - while(i < tgNumOfCols && j < numOfCols) { - SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - if (pTable->lastCols[j].colId < pColInfo->info.colId) { - j++; - continue; - } else if (pTable->lastCols[j].colId > pColInfo->info.colId) { - i++; - continue; - } - - if (ASCENDING_TRAVERSE(pQueryHandle->order)) { - pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; - } else { - pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; - } - - if (pTable->lastCols[j].bytes > 0) { - void* value = pTable->lastCols[j].pData; - switch (pColInfo->info.type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - memcpy(pData, value, varDataTLen(value)); - break; - case TSDB_DATA_TYPE_NULL: - case TSDB_DATA_TYPE_BOOL: - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_UTINYINT: - *(uint8_t *)pData = *(uint8_t *)value; - break; - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_USMALLINT: - *(uint16_t *)pData = *(uint16_t *)value; - break; - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_UINT: - *(uint32_t *)pData = *(uint32_t *)value; - break; - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_UBIGINT: - *(uint64_t *)pData = *(uint64_t *)value; - break; - case TSDB_DATA_TYPE_FLOAT: - SET_FLOAT_PTR(pData, value); - break; - case TSDB_DATA_TYPE_DOUBLE: - SET_DOUBLE_PTR(pData, value); - break; - case TSDB_DATA_TYPE_TIMESTAMP: - if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - *(TSKEY *)pData = tdGetKey(*(TKEY *)value); - } else { - *(TSKEY *)pData = *(TSKEY *)value; - } - break; - default: - memcpy(pData, value, pColInfo->info.bytes); - } - - for (int32_t n = 0; n < tgNumOfCols; ++n) { - if (n == i) { - continue; - } - - SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { - setVardataNull(pData, pColInfo->info.type); - } else { - setNull(pData, pColInfo->info.type, pColInfo->info.bytes); - } - } - - ++i; - ++j; - - if (i >= tgNumOfCols || j >= numOfCols) { - pQueryHandle->lastCols.i = 0; - pQueryHandle->lastCols.j = 0; - pQueryHandle->activeIndex++; - } else { - pQueryHandle->lastCols.i = i; - pQueryHandle->lastCols.j = j; - } - - return 1; - } - - i++; - j++; - } - - pQueryHandle->lastCols.i = 0; - pQueryHandle->lastCols.j = 0; - pQueryHandle->activeIndex++; - - return 0; -} - static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) { // the last row is cached in buffer, return it directly. // here note that the pQueryHandle->window must be the TS_INITIALIZER - int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)); + int32_t tgNumOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); - assert(numOfTables > 0 && numOfCols > 0); - - SQueryFilePos* cur = &pQueryHandle->cur; - - TSKEY key = TSKEY_INITIAL_VAL; - int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; + int32_t numOfRows = 0; + assert(numOfTables > 0 && tgNumOfCols > 0); while (pQueryHandle->activeIndex < numOfTables) { STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex); + STable* pTable = pCheckInfo->pTableObj; + char* pData = NULL; + + int32_t numOfCols = pTable->lastColNum; - if (copyColsFromCacheMem(pQueryHandle, pQueryHandle->outputCapacity, 0, numOfCols, pCheckInfo->pTableObj, NULL)) { - return true; + if (pTable->lastCols == NULL || pTable->lastColNum <= 0) { + tsdbWarn("no last cached for table, uid:%" PRIu64 ",tid:%d", pTable->tableId.uid, pTable->tableId.tid); + pQueryHandle->activeIndex++; + continue; } + + int32_t i = 0, j = 0; + while(i < tgNumOfCols && j < numOfCols) { + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + if (pTable->lastCols[j].colId < pColInfo->info.colId) { + j++; + continue; + } else if (pTable->lastCols[j].colId > pColInfo->info.colId) { + i++; + continue; + } + + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;; + } else { + pData = (char*)pColInfo->pData + (pQueryHandle->outputCapacity + numOfRows - 1) * pColInfo->info.bytes; + } + + if (pTable->lastCols[j].bytes > 0) { + void* value = pTable->lastCols[j].pData; + switch (pColInfo->info.type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + memcpy(pData, value, varDataTLen(value)); + break; + case TSDB_DATA_TYPE_NULL: + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: + *(uint8_t *)pData = *(uint8_t *)value; + break; + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: + *(uint16_t *)pData = *(uint16_t *)value; + break; + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: + *(uint32_t *)pData = *(uint32_t *)value; + break; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + *(uint64_t *)pData = *(uint64_t *)value; + break; + case TSDB_DATA_TYPE_FLOAT: + SET_FLOAT_PTR(pData, value); + break; + case TSDB_DATA_TYPE_DOUBLE: + SET_DOUBLE_PTR(pData, value); + break; + case TSDB_DATA_TYPE_TIMESTAMP: + if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + *(TSKEY *)pData = tdGetKey(*(TKEY *)value); + } else { + *(TSKEY *)pData = *(TSKEY *)value; + } + break; + default: + memcpy(pData, value, pColInfo->info.bytes); + } + + for (int32_t n = 0; n < tgNumOfCols; ++n) { + if (n == i) { + continue; + } + + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;; + } else { + pData = (char*)pColInfo->pData + (pQueryHandle->outputCapacity + numOfRows - 1) * pColInfo->info.bytes; + } + + if (n == 0) { + *(TSKEY *)pData = pTable->lastCols[j].ts; + continue; + } + + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pData, pColInfo->info.type); + } else { + setNull(pData, pColInfo->info.type, pColInfo->info.bytes); + } + } + + numOfRows++; + assert(numOfRows < pQueryHandle->outputCapacity); + } + + i++; + j++; + } + + pQueryHandle->activeIndex++; + + if (numOfRows > 0) { + return true; + } } return false; @@ -2850,7 +2833,7 @@ int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) { TSDB_RLOCK_TABLE(pTable); *lastKey = pTable->lastKey; - if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow == 1) { + if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow) { *pRes = tdDataRowDup(pTable->lastRow); if (*pRes == NULL) { TSDB_RUNLOCK_TABLE(pTable); @@ -2899,7 +2882,6 @@ int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle, STableGroupInfo *grou assert(pQueryHandle != NULL && groupList != NULL); SDataRow pRow = NULL; - TSKEY key = TSKEY_INITIAL_VAL; SArray* group = taosArrayGetP(groupList->pGroupList, 0); assert(group != NULL);