diff --git a/documentation20/cn/11.administrator/docs.md b/documentation20/cn/11.administrator/docs.md index bfa0456c7d..26cfa91beb 100644 --- a/documentation20/cn/11.administrator/docs.md +++ b/documentation20/cn/11.administrator/docs.md @@ -129,7 +129,7 @@ taosd -C - blocks:每个VNODE(TSDB)中有多少cache大小的内存块。因此一个VNODE的用的内存大小粗略为(cache * blocks)。单位为块,默认值:4。(可通过 alter database 修改) - replica:副本个数,取值范围:1-3。单位为个,默认值:1。(可通过 alter database 修改) - precision:时间戳精度标识,ms表示毫秒,us表示微秒。默认值:ms。 -- cacheLast:是否在内存中缓存子表 last_row,0:关闭;1:开启。默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数) +- cacheLast:是否在内存中缓存子表的最近数据,0:关闭;1:缓存子表最近一行数据;2:缓存子表每一列的最近的非NULL值,默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数) 对于一个应用场景,可能有多种数据特征的数据并存,最佳的设计是将具有相同数据特征的表放在一个库里,这样一个应用有多个库,而每个库可以配置不同的存储参数,从而保证系统有最优的性能。TDengine允许应用在创建库时指定上述存储参数,如果指定,该参数就将覆盖对应的系统配置参数。举例,有下述SQL: diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 88d5b85010..fcae7a415f 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -234,6 +234,7 @@ typedef struct SDataCol { int len; // column data length VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column void * pData; // Actual data pointer + TSKEY ts; // only used in last NULL column } SDataCol; static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } diff --git a/src/common/src/ttypes.c b/src/common/src/ttypes.c index 6fa27a029b..6587a27760 100644 --- a/src/common/src/ttypes.c +++ b/src/common/src/ttypes.c @@ -417,6 +417,18 @@ void setVardataNull(char* val, int32_t type) { } } +bool isVardataNull(char* val, int32_t type) { + if (type == TSDB_DATA_TYPE_BINARY) { + return *(uint8_t*) varDataVal(val) == TSDB_DATA_BINARY_NULL; + } else if (type == TSDB_DATA_TYPE_NCHAR) { + return *(uint32_t*) varDataVal(val) == TSDB_DATA_NCHAR_NULL; + } else { + assert(0); + } + + return false; +} + void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); } void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) { @@ -492,6 +504,55 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) { } } +bool isNullN(char *val, int32_t type) { + switch (type) { + case TSDB_DATA_TYPE_BOOL: + return *(uint8_t *)(val) == TSDB_DATA_BOOL_NULL; + break; + case TSDB_DATA_TYPE_TINYINT: + return *(uint8_t *)(val) == TSDB_DATA_TINYINT_NULL; + break; + case TSDB_DATA_TYPE_SMALLINT: + return *(uint16_t *)(val) == TSDB_DATA_SMALLINT_NULL; + break; + case TSDB_DATA_TYPE_INT: + return *(uint32_t *)(val) == TSDB_DATA_INT_NULL; + break; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_TIMESTAMP: + return *(uint64_t *)(val) == TSDB_DATA_BIGINT_NULL; + break; + case TSDB_DATA_TYPE_UTINYINT: + return *(uint8_t *)(val) == TSDB_DATA_UTINYINT_NULL; + break; + case TSDB_DATA_TYPE_USMALLINT: + return *(uint16_t *)(val) == TSDB_DATA_USMALLINT_NULL; + break; + case TSDB_DATA_TYPE_UINT: + return *(uint32_t *)(val) == TSDB_DATA_UINT_NULL; + break; + case TSDB_DATA_TYPE_UBIGINT: + return *(uint64_t *)(val) == TSDB_DATA_UBIGINT_NULL; + break; + case TSDB_DATA_TYPE_FLOAT: + return *(uint32_t *)(val) == TSDB_DATA_FLOAT_NULL; + break; + case TSDB_DATA_TYPE_DOUBLE: + return *(uint64_t *)(val) == TSDB_DATA_DOUBLE_NULL; + break; + case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_BINARY: + return isVardataNull(val, type); + break; + default: { + return *(uint32_t *)(val) == TSDB_DATA_INT_NULL; + break; + } + } + + return false; +} + static uint8_t nullBool = TSDB_DATA_BOOL_NULL; static uint8_t nullTinyInt = TSDB_DATA_TINYINT_NULL; static uint16_t nullSmallInt = TSDB_DATA_SMALLINT_NULL; diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index e596ee67ec..2882faf7be 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -298,7 +298,7 @@ do { \ #define TSDB_DEFAULT_DB_UPDATE_OPTION 0 #define TSDB_MIN_DB_CACHE_LAST_ROW 0 -#define TSDB_MAX_DB_CACHE_LAST_ROW 1 +#define TSDB_MAX_DB_CACHE_LAST_ROW 2 #define TSDB_DEFAULT_CACHE_LAST_ROW 0 #define TSDB_MIN_FSYNC_PERIOD 0 diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 1ba5131f6d..d231769c18 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -69,9 +69,13 @@ typedef struct { int8_t precision; int8_t compression; int8_t update; - int8_t cacheLastRow; + int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column } STsdbCfg; +#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0) +#define CACHE_LAST_ROW(c) ((c)->cacheLastRow == 1) +#define CACHE_LAST_NULL_COLUMN(c) ((c)->cacheLastRow == 2) + // --------- TSDB REPOSITORY USAGE STATISTICS typedef struct { int64_t totalStorage; // total bytes occupie diff --git a/src/inc/ttype.h b/src/inc/ttype.h index 9949f31c59..43dbeb7640 100644 --- a/src/inc/ttype.h +++ b/src/inc/ttype.h @@ -178,6 +178,9 @@ void setNull(char *val, int32_t type, int32_t bytes); void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems); void *getNullValue(int32_t type); +bool isVardataNull(char* val, int32_t type); +bool isNullN(char *val, int32_t type); + void assignVal(char *val, const char *src, int32_t len, int32_t type); void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf); diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index 7484071ce3..a8c7a6c358 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -36,6 +36,10 @@ typedef struct STable { char* sql; void* cqhandle; SRWLatch latch; // TODO: implementa latch functions + + SDataCol *lastCols; + int32_t lastColNum; + int32_t restoreColumnNum; T_REF_DECLARE() } STable; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index fd02a3c8b9..a7e1efb5ed 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -511,8 +511,10 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { if (pCfg->update != 0) pCfg->update = 1; // update cacheLastRow - if (pCfg->cacheLastRow != 0) pCfg->cacheLastRow = 1; - + if (pCfg->cacheLastRow != 0) { + if (pCfg->cacheLastRow > 3) + pCfg->cacheLastRow = 1; + } return 0; } @@ -614,6 +616,129 @@ static void tsdbStopStream(STsdbRepo *pRepo) { } } +static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) { + SBlock* pBlock; + int numColumns; + int32_t blockIdx; + SDataStatis* pBlockStatis = NULL; + SDataRow row = NULL; + STSchema *pSchema = tsdbGetTableSchema(pTable); + int err = 0; + + numColumns = schemaNCols(pSchema); + if (numColumns <= pTable->restoreColumnNum) { + return 0; + } + + row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); + if (row == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + err = -1; + goto out; + } + tdInitDataRow(row, pSchema); + + // first load block index info + if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { + err = -1; + goto out; + } + + pBlockStatis = calloc(numColumns, sizeof(SDataStatis)); + if (pBlockStatis == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + err = -1; + goto out; + } + memset(pBlockStatis, 0, numColumns * sizeof(SDataStatis)); + for(int32_t i = 0; i < numColumns; ++i) { + STColumn *pCol = schemaColAt(pSchema, i); + pBlockStatis[i].colId = pCol->colId; + } + + // load block from backward + SBlockIdx *pIdx = pReadh->pBlkIdx; + blockIdx = (int32_t)(pIdx->numOfBlocks - 1); + + while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) { + bool loadStatisData = false; + pBlock = pReadh->pBlkInfo->blocks + blockIdx; + blockIdx -= 1; + + // load block data + if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) { + err = -1; + goto out; + } + + // file block with sub-blocks has no statistics data + if (pBlock->numOfSubBlocks <= 1) { + tsdbLoadBlockStatis(pReadh, pBlock); + tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns); + loadStatisData = true; + } + + for (uint32_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) { + STColumn *pCol = schemaColAt(pSchema, i); + + if (i >= pTable->lastColNum) { + pTable->lastCols = realloc(pTable->lastCols, i + 5); + for (int m = 0; m < 5; ++m) { + pTable->lastCols[m + pTable->lastColNum].bytes = 0; + pTable->lastCols[m + pTable->lastColNum].pData = NULL; + } + pTable->lastColNum += i + 5; + } + + // ignore loaded columns + if (pTable->lastCols[i].bytes != 0) { + continue; + } + + // ignore block which has no not-null colId column + if (loadStatisData && pBlockStatis[i].numOfNull == pBlock->numOfRows) { + continue; + } + + // OK,let's load row from backward to get not-null column + for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) { + SDataCol *pDataCol = pReadh->pDCols[0]->cols + i; + tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); + //SDataCol *pDataCol = readh.pDCols[0]->cols + j; + void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); + if (isNullN(value, pCol->type)) { + continue; + } + + // save not-null column + SDataCol *pLastCol = &(pTable->lastCols[i]); + pLastCol->pData = malloc(pCol->bytes); + pLastCol->bytes = pCol->bytes; + pLastCol->offset = pCol->offset; + pLastCol->colId = pCol->colId; + memcpy(pLastCol->pData, value, pCol->bytes); + + // save row ts(in column 0) + pDataCol = pReadh->pDCols[0]->cols + 0; + pCol = schemaColAt(pSchema, 0); + tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); + pLastCol->ts = dataRowTKey(row); + + pTable->restoreColumnNum += 1; + + tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %d", REPO_ID(pRepo), pTable->name->data, pCol->colId, (int32_t)pLastCol->ts); + break; + } + } + } + +out: + taosTZfree(row); + tfree(pBlockStatis); + + return err; +} + int tsdbRestoreInfo(STsdbRepo *pRepo) { SFSIter fsiter; SReadH readh; @@ -628,6 +753,14 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD); + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + for (int i = 1; i < pMeta->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + pTable->restoreColumnNum = 0; + } + } + while ((pSet = tsdbFSIterNext(&fsiter)) != NULL) { if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) { tsdbDestroyReadH(&readh); @@ -643,6 +776,8 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; + //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data); + if (tsdbSetReadTable(&readh, pTable) < 0) { tsdbDestroyReadH(&readh); return -1; @@ -653,7 +788,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { if (pIdx && lastKey < pIdx->maxKey) { pTable->lastKey = pIdx->maxKey; - if (pCfg->cacheLastRow) { + if (CACHE_LAST_ROW(pCfg)) { if (tsdbLoadBlockInfo(&readh, NULL) < 0) { tsdbDestroyReadH(&readh); return -1; @@ -686,6 +821,13 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { } } + // restore NULL columns + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + if (restoreLastColumns(pRepo, pTable, &readh) != 0) { + tsdbDestroyReadH(&readh); + return -1; + } + } } } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index c6fcf55686..70e27a5700 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -964,6 +964,56 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { } } +static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) { + //tsdbInfo("vgId:%d updateTableLatestColumn, row version:%d", REPO_ID(pRepo), dataRowVersion(row)); + + if (pTable->numOfSchemas <= 0) { + return; + } + + STSchema* pSchema = pTable->schema[pTable->numOfSchemas - 1]; + int i = pTable->numOfSchemas - 1; + while ((pSchema == NULL || pSchema->version != dataRowVersion(row)) && i >= 0) { + i -= 1; + pSchema = pTable->schema[i]; + } + if (pSchema == NULL || pSchema->version != dataRowVersion(row)) { + return; + } + + SDataCol *pLatestCols = pTable->lastCols; + + for (int j = 0; j < schemaNCols(pSchema); j++) { + STColumn *pTCol = schemaColAt(pSchema, j); + + if (pTCol->colId >= pTable->lastColNum) { + pTable->lastCols = realloc(pTable->lastCols, pTCol->colId + 5); + for (i = 0; i < 10; ++i) { + pTable->lastCols[i + pTable->lastColNum].bytes = 0; + pTable->lastCols[i + pTable->lastColNum].pData = NULL; + } + pTable->lastColNum += pTCol->colId + 5; + } + + SDataCol *pDataCol = &(pLatestCols[pTCol->colId]); + void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); + if (isNullN(value, pTCol->type)) { + continue; + } + if (pDataCol->pData == NULL) { + pDataCol->pData = malloc(pSchema->columns[j].bytes); + pDataCol->bytes = pSchema->columns[j].bytes; + } else if (pDataCol->bytes < pSchema->columns[j].bytes) { + pDataCol->pData = realloc(pDataCol->pData, pSchema->columns[j].bytes); + pDataCol->bytes = pSchema->columns[j].bytes; + } + + memcpy(pDataCol->pData, value, pDataCol->bytes); + //tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData); + pDataCol->ts = dataRowTKey(row); + } +} + static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) { STsdbCfg *pCfg = &pRepo->config; @@ -977,7 +1027,7 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow } if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) { - if (pCfg->cacheLastRow || pTable->lastRow != NULL) { + if (CACHE_LAST_ROW(pCfg) || pTable->lastRow != NULL) { SDataRow nrow = pTable->lastRow; if (taosTSizeof(nrow) < dataRowLen(row)) { SDataRow orow = nrow; @@ -1002,7 +1052,10 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow } else { pTable->lastKey = dataRowKey(row); } - } + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + updateTableLatestColumn(pRepo, pTable, row); + } + } return 0; } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index e6cbc4da9e..5a108a5d06 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -14,6 +14,7 @@ */ #include "tsdbint.h" +#define TSDB_LATEST_COLUMN_ARRAY_SIZE 20 #define TSDB_SUPER_TABLE_SL_LEVEL 5 #define DEFAULT_TAG_INDEX_COLUMN 0 @@ -671,6 +672,13 @@ static STable *tsdbNewTable() { } pTable->lastKey = TSKEY_INITIAL_VAL; + pTable->lastCols = (SDataCol*)malloc(TSDB_LATEST_COLUMN_ARRAY_SIZE * sizeof(SDataCol)); + pTable->lastColNum = TSDB_LATEST_COLUMN_ARRAY_SIZE; + for (int i = 0; i < pTable->lastColNum; ++i) { + pTable->lastCols[i].bytes = 0; + pTable->lastCols[i].pData = NULL; + } + pTable->restoreColumnNum = 0; return pTable; } @@ -785,8 +793,17 @@ static void tsdbFreeTable(STable *pTable) { kvRowFree(pTable->tagVal); tSkipListDestroy(pTable->pIndex); - taosTZfree(pTable->lastRow); + taosTZfree(pTable->lastRow); tfree(pTable->sql); + + for (int i = 0; i < pTable->lastColNum; ++i) { + if (pTable->lastCols[i].pData == NULL) { + continue; + } + free(pTable->lastCols[i].pData); + } + tfree(pTable->lastCols); + free(pTable); } }