diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index b3460ac3b2..a326bd07fc 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -62,12 +62,6 @@ typedef struct SLoadCompBlockInfo { int32_t fileId; } SLoadCompBlockInfo; -typedef struct SCacheLastColInfo { - int16_t size; - int16_t num; - int16_t fetchIdx; - int16_t *idx; -} SCacheLastColInfo; typedef struct STableCheckInfo { STableId tableId; @@ -75,7 +69,6 @@ typedef struct STableCheckInfo { STable* pTableObj; SBlockInfo* pCompInfo; int32_t compSize; - SCacheLastColInfo cacheLast; // cache last column chosen int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks int8_t chosen:2; // indicate which iterator should move forward bool initBuf; // whether to initialize the in-memory skip list iterator or not @@ -101,6 +94,12 @@ typedef struct SIOCostSummary { int64_t checkForNextTime; } SIOCostSummary; + +typedef struct SCacheLastColInfo { + int16_t i; + int16_t j; +} SCacheLastColInfo; + typedef struct STsdbQueryHandle { STsdbRepo* pTsdb; SQueryFilePos cur; // current position @@ -126,6 +125,7 @@ typedef struct STsdbQueryHandle { SReadH rhelper; STableBlockInfo* pDataBlockInfo; + SCacheLastColInfo lastCols; SDataCols *pDataCols; // in order to hold current file data block int32_t allocSize; // allocated data block size SMemRef *pMemRef; @@ -546,98 +546,16 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable } - -STimeWindow updateCacheLastForEachGroup(STableGroupInfo *groupList) { - STimeWindow window = {INT64_MAX, INT64_MIN}; - - int32_t totalNumOfTable = 0; - - // NOTE: starts from the buffer in case of descending timestamp order check data blocks - size_t numOfGroups = taosArrayGetSize(groupList->pGroupList); - for(int32_t j = 0; j < numOfGroups; ++j) { - SArray* pGroup = taosArrayGetP(groupList->pGroupList, j); - TSKEY key = TSKEY_INITIAL_VAL; - - STableKeyInfo keyInfo = {0}; - - size_t numOfTables = taosArrayGetSize(pGroup); - for(int32_t i = 0; i < numOfTables; ++i) { - STableKeyInfo* pInfo = (STableKeyInfo*) taosArrayGet(pGroup, i); - - // if the lastKey equals to INT64_MIN, there is no data in this table - TSKEY lastKey = ((STable*)(pInfo->pTable))->lastKey; - if (key < lastKey) { - key = lastKey; - - keyInfo.pTable = pInfo->pTable; - keyInfo.lastKey = key; - pInfo->lastKey = key; - - if (key < window.skey) { - window.skey = key; - } - - if (key > window.ekey) { - window.ekey = key; - } - } - } - - // clear current group, unref unused table - for (int32_t i = 0; i < numOfTables; ++i) { - STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i); - - // keyInfo.pTable may be NULL here. - if (pInfo->pTable != keyInfo.pTable) { - tsdbUnRefTable(pInfo->pTable); - } - } - - taosArrayClear(pGroup); - - // more than one table in each group, only one table left for each group - if (keyInfo.pTable != NULL) { - totalNumOfTable++; - taosArrayPush(pGroup, &keyInfo); - } else { - taosArrayDestroy(pGroup); - - taosArrayRemove(groupList->pGroupList, j); - numOfGroups -= 1; - j -= 1; - } - } - - // window does not being updated, so set the original - if (window.skey == INT64_MAX && window.ekey == INT64_MIN) { - window = TSWINDOW_INITIALIZER; - assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == 0); - } - - groupList->numOfTables = totalNumOfTable; - return window; -} - - TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) { - pCond->twindow = updateCacheLastForEachGroup(groupList); - - // no qualified table - if (groupList->numOfTables == 0) { - return NULL; - } - STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef); - int32_t code = checkForCachedLastRow(pQueryHandle, groupList); + int32_t code = checkForCachedLast(pQueryHandle, groupList); if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 terrno = code; return NULL; } assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey); - if (pQueryHandle->cachelastrow == 2) { - pQueryHandle->type = TSDB_QUERY_TYPE_LAST; - } + pQueryHandle->type = TSDB_QUERY_TYPE_LAST; return pQueryHandle; } @@ -2572,6 +2490,115 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) { } +static int32_t copyColsFromCacheMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, STable* pTable) { + char* pData = NULL; + + STSchema* pSchema = tsdbGetTableSchema(pTable); + int32_t numOfCols = schemaNCols(pSchema); + int32_t tgNumOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); + + assert(numOfCols == pTable->restoreColumnNum); + assert(pTable->lastCols != NULL); + + int32_t i = pQueryHandle->lastCols.i, j = pQueryHandle->lastCols.j; + while(i < tgNumOfCols && j < numOfCols) { + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + if (pTable->lastCols[j].colId < pColInfo->info.colId) { + j++; + continue; + } else if (pTable->lastCols[j].colId > pColInfo->info.colId) { + i++; + continue; + } + + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; + } + + if (pTable->lastCols[j].bytes > 0) { + void* value = pTable->lastCols[j].pData; + switch (pColInfo->info.type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + memcpy(pData, value, varDataTLen(value)); + break; + case TSDB_DATA_TYPE_NULL: + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: + *(uint8_t *)pData = *(uint8_t *)value; + break; + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: + *(uint16_t *)pData = *(uint16_t *)value; + break; + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: + *(uint32_t *)pData = *(uint32_t *)value; + break; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + *(uint64_t *)pData = *(uint64_t *)value; + break; + case TSDB_DATA_TYPE_FLOAT: + SET_FLOAT_PTR(pData, value); + break; + case TSDB_DATA_TYPE_DOUBLE: + SET_DOUBLE_PTR(pData, value); + break; + case TSDB_DATA_TYPE_TIMESTAMP: + if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + *(TSKEY *)pData = tdGetKey(*(TKEY *)value); + } else { + *(TSKEY *)pData = *(TSKEY *)value; + } + break; + default: + memcpy(pData, value, pColInfo->info.bytes); + } + + for (int32_t n = 0; n < tgNumOfCols; ++n) { + if (n == i) { + continue; + } + + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pData, pColInfo->info.type); + } else { + setNull(pData, pColInfo->info.type, pColInfo->info.bytes); + } + } + + ++i; + ++j; + + if (i >= tgNumOfCols || j >= numOfCols) { + pQueryHandle->lastCols.i = 0; + pQueryHandle->lastCols.j = 0; + pQueryHandle->activeIndex++; + } else { + pQueryHandle->lastCols.i = i; + pQueryHandle->lastCols.j = j; + } + + return 1; + } + + i++; + j++; + } + + pQueryHandle->lastCols.i = 0; + pQueryHandle->lastCols.j = 0; + pQueryHandle->activeIndex++; + + return 0; +} + + static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) { // the last row is cached in buffer, return it directly. // here note that the pQueryHandle->window must be the TS_INITIALIZER @@ -2581,42 +2608,15 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) { SQueryFilePos* cur = &pQueryHandle->cur; - SDataRow pRow = NULL; TSKEY key = TSKEY_INITIAL_VAL; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; - if (pQueryHandle->activeIndex < 0) { - updateCacheLastForEachGroup(pQueryHandle); - } - - if (pQueryHandle->activeIndex < numOfTables) { + while (pQueryHandle->activeIndex < numOfTables) { STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex); - if (pQueryHandle->cachelastrow == 1) { - int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key); - if (ret != TSDB_CODE_SUCCESS) { - return false; - } - - copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL); - tfree(pRow); - - // update the last key value - pCheckInfo->lastKey = key + step; - - cur->rows = 1; // only one row - cur->lastKey = key + step; - cur->mixBlock = true; - cur->win.skey = key; - cur->win.ekey = key; - } else if (pQueryHandle->cachelastrow == 2) { - - } else { - tsdbError("invalid cachelastrow:%d", pQueryHandle->cachelastrow); - return false; + if (copyColsFromCacheMem(pQueryHandle, pQueryHandle->outputCapacity, 0, numOfCols, pCheckInfo->pTableObj, NULL)) { + return true; } - - return true; } return false; @@ -2662,7 +2662,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT pHandle) { if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST) { if (pQueryHandle->cachelastrow == 1) { return loadCachedLastRow(pQueryHandle); - } if (pQueryHandle->cachelastrow == 2) else { + } else if (pQueryHandle->cachelastrow == 2) { return loadCachedLast(pQueryHandle); } } @@ -2875,17 +2875,14 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g int32_t code = 0; - if (((STable*)pInfo->pTable)->lastRow == 1) { + if (((STable*)pInfo->pTable)->lastRow) { code = tsdbGetCachedLastRow(pInfo->pTable, &pRow, &key); if (code != TSDB_CODE_SUCCESS) { pQueryHandle->cachelastrow = 0; } else { - pQueryHandle->cachelastrow = ((STable*)pInfo->pTable)->lastRow; + pQueryHandle->cachelastrow = 1; } - } else if (((STable*)pInfo->pTable)->lastCols && ((STable*)pInfo->pTable)->lastColNum > 0 && ((STable*)pInfo->pTable)->lastRow == 2){ - pQueryHandle->cachelastrow = ((STable*)pInfo->pTable)->lastRow; } - // update the tsdb query time range if (pQueryHandle->cachelastrow) { @@ -2898,6 +2895,36 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g return code; } +int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList) { + assert(pQueryHandle != NULL && groupList != NULL); + + SDataRow pRow = NULL; + TSKEY key = TSKEY_INITIAL_VAL; + + SArray* group = taosArrayGetP(groupList->pGroupList, 0); + assert(group != NULL); + + STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0); + + int32_t code = 0; + + if (((STable*)pInfo->pTable)->lastCols && ((STable*)pInfo->pTable)->lastColNum > 0){ + pQueryHandle->cachelastrow = 2; + } + + + // update the tsdb query time range + if (pQueryHandle->cachelastrow) { + pQueryHandle->window = TSWINDOW_INITIALIZER; + pQueryHandle->checkFiles = false; + pQueryHandle->activeIndex = 0; // start from -1 + } + + tfree(pRow); + return code; +} + + STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) { STimeWindow window = {INT64_MAX, INT64_MIN};