From 7cacd403d5ecf379c5660718062aa49a9f7744ad Mon Sep 17 00:00:00 2001 From: lichuang Date: Sat, 8 May 2021 16:40:45 +0800 Subject: [PATCH 01/30] add cache last null columns feature --- documentation20/cn/11.administrator/docs.md | 2 +- src/common/src/ttypes.c | 58 +++++++++++++++++++++ src/inc/taosdef.h | 2 +- src/inc/tsdb.h | 6 ++- src/inc/ttype.h | 3 ++ src/tsdb/inc/tsdbMeta.h | 3 ++ src/tsdb/src/tsdbMain.c | 8 +-- src/tsdb/src/tsdbMemTable.c | 57 +++++++++++++++++++- src/tsdb/src/tsdbMeta.c | 7 +++ 9 files changed, 138 insertions(+), 8 deletions(-) diff --git a/documentation20/cn/11.administrator/docs.md b/documentation20/cn/11.administrator/docs.md index bfa0456c7d..b9340210d2 100644 --- a/documentation20/cn/11.administrator/docs.md +++ b/documentation20/cn/11.administrator/docs.md @@ -129,7 +129,7 @@ taosd -C - blocks:每个VNODE(TSDB)中有多少cache大小的内存块。因此一个VNODE的用的内存大小粗略为(cache * blocks)。单位为块,默认值:4。(可通过 alter database 修改) - replica:副本个数,取值范围:1-3。单位为个,默认值:1。(可通过 alter database 修改) - precision:时间戳精度标识,ms表示毫秒,us表示微秒。默认值:ms。 -- cacheLast:是否在内存中缓存子表 last_row,0:关闭;1:开启。默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数) +- cacheLast:是否在内存中缓存子表的最近数据,0:关闭;1:缓存子表最近一行数据;2:缓存子表每一列的最近的非NULL值,设置为3表示同时开启了1和2。默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数) 对于一个应用场景,可能有多种数据特征的数据并存,最佳的设计是将具有相同数据特征的表放在一个库里,这样一个应用有多个库,而每个库可以配置不同的存储参数,从而保证系统有最优的性能。TDengine允许应用在创建库时指定上述存储参数,如果指定,该参数就将覆盖对应的系统配置参数。举例,有下述SQL: diff --git a/src/common/src/ttypes.c b/src/common/src/ttypes.c index 6fa27a029b..77e12e23d2 100644 --- a/src/common/src/ttypes.c +++ b/src/common/src/ttypes.c @@ -417,6 +417,16 @@ void setVardataNull(char* val, int32_t type) { } } +bool isVardataNull(char* val, int32_t type) { + if (type == TSDB_DATA_TYPE_BINARY) { + return *(uint8_t*) varDataVal(val) == TSDB_DATA_BINARY_NULL; + } else if (type == TSDB_DATA_TYPE_NCHAR) { + return *(uint32_t*) varDataVal(val) == TSDB_DATA_NCHAR_NULL; + } else { + assert(0); + } +} + void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); } void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) { @@ -492,6 +502,54 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) { } } +bool isNullN(char *val, int32_t type) { + switch (type) { + case TSDB_DATA_TYPE_BOOL: + return *(uint8_t *)(val) == TSDB_DATA_BOOL_NULL; + break; + case TSDB_DATA_TYPE_TINYINT: + return *(uint8_t *)(val) == TSDB_DATA_TINYINT_NULL; + break; + case TSDB_DATA_TYPE_SMALLINT: + return *(uint16_t *)(val) == TSDB_DATA_SMALLINT_NULL; + break; + case TSDB_DATA_TYPE_INT: + return *(uint32_t *)(val) == TSDB_DATA_INT_NULL; + break; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_TIMESTAMP: + return *(uint64_t *)(val) == TSDB_DATA_BIGINT_NULL; + break; + case TSDB_DATA_TYPE_UTINYINT: + return *(uint8_t *)(val) == TSDB_DATA_UTINYINT_NULL; + break; + case TSDB_DATA_TYPE_USMALLINT: + return *(uint16_t *)(val) == TSDB_DATA_USMALLINT_NULL; + break; + case TSDB_DATA_TYPE_UINT: + return *(uint32_t *)(val) == TSDB_DATA_UINT_NULL; + break; + case TSDB_DATA_TYPE_UBIGINT: + return *(uint64_t *)(val) == TSDB_DATA_UBIGINT_NULL; + break; + case TSDB_DATA_TYPE_FLOAT: + return *(uint32_t *)(val) == TSDB_DATA_FLOAT_NULL; + break; + case TSDB_DATA_TYPE_DOUBLE: + return *(uint64_t *)(val) == TSDB_DATA_DOUBLE_NULL; + break; + case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_BINARY: + return isVardataNull(val, type); + break; + default: { + return *(uint32_t *)(val) == TSDB_DATA_INT_NULL; + break; + } + } + +} + static uint8_t nullBool = TSDB_DATA_BOOL_NULL; static uint8_t nullTinyInt = TSDB_DATA_TINYINT_NULL; static uint16_t nullSmallInt = TSDB_DATA_SMALLINT_NULL; diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index e9170860a6..f6479ea15d 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -299,7 +299,7 @@ do { \ #define TSDB_DEFAULT_DB_UPDATE_OPTION 0 #define TSDB_MIN_DB_CACHE_LAST_ROW 0 -#define TSDB_MAX_DB_CACHE_LAST_ROW 1 +#define TSDB_MAX_DB_CACHE_LAST_ROW 2 #define TSDB_DEFAULT_CACHE_LAST_ROW 0 #define TSDB_MIN_FSYNC_PERIOD 0 diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 1ba5131f6d..7c28d3e485 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -69,9 +69,13 @@ typedef struct { int8_t precision; int8_t compression; int8_t update; - int8_t cacheLastRow; + int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column } STsdbCfg; +#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0) +#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0) +#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0) + // --------- TSDB REPOSITORY USAGE STATISTICS typedef struct { int64_t totalStorage; // total bytes occupie diff --git a/src/inc/ttype.h b/src/inc/ttype.h index 662a23bfdb..120667ce6e 100644 --- a/src/inc/ttype.h +++ b/src/inc/ttype.h @@ -176,6 +176,9 @@ void setNull(char *val, int32_t type, int32_t bytes); void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems); void *getNullValue(int32_t type); +bool isVardataNull(char* val, int32_t type); +bool isNullN(char *val, int32_t type); + void assignVal(char *val, const char *src, int32_t len, int32_t type); void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf); diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index cc916fa689..d339989bb3 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -36,6 +36,9 @@ typedef struct STable { char* sql; void* cqhandle; SRWLatch latch; // TODO: implementa latch functions + + SDataCol *lastCols; + int32_t lastColNum; T_REF_DECLARE() } STable; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 99929f3542..5a44b03c81 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -447,8 +447,10 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { if (pCfg->update != 0) pCfg->update = 1; // update cacheLastRow - if (pCfg->cacheLastRow != 0) pCfg->cacheLastRow = 1; - + if (pCfg->cacheLastRow != 0) { + if (pCfg->cacheLastRow > 3) + pCfg->cacheLastRow = 1; + } return 0; } @@ -581,7 +583,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { if (pIdx && lastKey < pIdx->maxKey) { pTable->lastKey = pIdx->maxKey; - if (pCfg->cacheLastRow) { + if (CACHE_LAST_ROW(pCfg)) { if (tsdbLoadBlockInfo(&readh, NULL) < 0) { tsdbDestroyReadH(&readh); return -1; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 20ec426018..18041f0291 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -955,11 +955,61 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { } } +static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) { + //tsdbDebug("vgId:%d updateTableLatestColumn, row version:%d", REPO_ID(pRepo), dataRowVersion(row)); + + if (pTable->numOfSchemas <= 0) { + return; + } + STSchema* pSchema = pTable->schema[pTable->numOfSchemas - 1]; + int i = pTable->numOfSchemas - 1; + while ((pSchema == NULL || pSchema->version != dataRowVersion(row)) && i >= 0) { + i -= 1; + pSchema = pTable->schema[i]; + } + if (pSchema == NULL || pSchema->version != dataRowVersion(row)) { + return; + } + + SDataCol *pLatestCols = pTable->lastCols; + + for (int j = 0; j < schemaNCols(pSchema); j++) { + if (j >= pTable->lastColNum) { + pTable->lastCols = realloc(pTable->lastCols, pTable->lastColNum + 10); + for (int i = 0; i < 10; ++i) { + pTable->lastCols[i + pTable->lastColNum].bytes = 0; + pTable->lastCols[i + pTable->lastColNum].pData = NULL; + } + pTable->lastColNum += 10; + } + + STColumn *pTCol = schemaColAt(pSchema, j); + SDataCol *pDataCol = &(pLatestCols[j]); + void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); + if (isNullN(value, pTCol->type)) { + continue; + } + if (pDataCol->pData == NULL) { + pDataCol->pData = malloc(pSchema->columns[j].bytes); + pDataCol->bytes = pSchema->columns[j].bytes; + } else if (pDataCol->bytes < pSchema->columns[j].bytes) { + pDataCol->pData = realloc(pDataCol->pData, pSchema->columns[j].bytes); + pDataCol->bytes = pSchema->columns[j].bytes; + } + + //tsdbDebug("vgId:%d cache column %d for %d,%p", REPO_ID(pRepo), j, pDataCol->bytes, pDataCol->pData); + + memcpy(pDataCol->pData, value, pDataCol->bytes); + } +} + static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) { STsdbCfg *pCfg = &pRepo->config; + tsdbDebug("vgId:%d tsdbUpdateTableLatestInfo, %ld, %ld, %d", REPO_ID(pRepo), tsdbGetTableLastKeyImpl(pTable), dataRowKey(row), pCfg->cacheLastRow); + if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) { - if (pCfg->cacheLastRow || pTable->lastRow != NULL) { + if (CACHE_LAST_ROW(pCfg) || pTable->lastRow != NULL) { SDataRow nrow = pTable->lastRow; if (taosTSizeof(nrow) < dataRowLen(row)) { SDataRow orow = nrow; @@ -984,7 +1034,10 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow } else { pTable->lastKey = dataRowKey(row); } - } + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + updateTableLatestColumn(pRepo, pTable, row); + } + } return 0; } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 3e6263b9d3..b8e3273664 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -14,6 +14,7 @@ */ #include "tsdbint.h" +#define TSDB_LATEST_COLUMN_ARRAY_SIZE 20 #define TSDB_SUPER_TABLE_SL_LEVEL 5 #define DEFAULT_TAG_INDEX_COLUMN 0 @@ -671,6 +672,12 @@ static STable *tsdbNewTable() { } pTable->lastKey = TSKEY_INITIAL_VAL; + pTable->lastCols = (SDataCol*)malloc(TSDB_LATEST_COLUMN_ARRAY_SIZE * sizeof(SDataCol)); + pTable->lastColNum = TSDB_LATEST_COLUMN_ARRAY_SIZE; + for (int i = 0; i < pTable->lastColNum; ++i) { + pTable->lastCols[i].bytes = 0; + pTable->lastCols[i].pData = NULL; + } return pTable; } From 913eb1c71911752378f79c1fa978c09dd8fba83c Mon Sep 17 00:00:00 2001 From: lichuang Date: Sat, 8 May 2021 17:08:29 +0800 Subject: [PATCH 02/30] cache last null columns feature --- src/tsdb/src/tsdbMeta.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index b8e3273664..9b98ca19fc 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -794,6 +794,13 @@ static void tsdbFreeTable(STable *pTable) { tSkipListDestroy(pTable->pIndex); taosTZfree(pTable->lastRow); tfree(pTable->sql); + + for (int i = 0; i < pTable->lastColNum; ++i) { + if (pTable->lastCols[i].pData == NULL) { + continue; + } + free(pTable->lastCols[i].pData); + } free(pTable); } } From f0640ac09ef0af41781fd3daed847b4ff91a1920 Mon Sep 17 00:00:00 2001 From: lichuang Date: Sat, 8 May 2021 18:02:26 +0800 Subject: [PATCH 03/30] disable debug log --- src/tsdb/src/tsdbMemTable.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 18041f0291..d7d1e5ea18 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -1006,7 +1006,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) { STsdbCfg *pCfg = &pRepo->config; - tsdbDebug("vgId:%d tsdbUpdateTableLatestInfo, %ld, %ld, %d", REPO_ID(pRepo), tsdbGetTableLastKeyImpl(pTable), dataRowKey(row), pCfg->cacheLastRow); + //tsdbDebug("vgId:%d tsdbUpdateTableLatestInfo, %ld, %ld, %d", REPO_ID(pRepo), tsdbGetTableLastKeyImpl(pTable), dataRowKey(row), pCfg->cacheLastRow); if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) { if (CACHE_LAST_ROW(pCfg) || pTable->lastRow != NULL) { From 4814df777c8166e1d01f707efdc5e397e1a5ac81 Mon Sep 17 00:00:00 2001 From: lichuang Date: Sat, 8 May 2021 18:40:48 +0800 Subject: [PATCH 04/30] fix compile error --- src/common/src/ttypes.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/common/src/ttypes.c b/src/common/src/ttypes.c index 77e12e23d2..6587a27760 100644 --- a/src/common/src/ttypes.c +++ b/src/common/src/ttypes.c @@ -425,6 +425,8 @@ bool isVardataNull(char* val, int32_t type) { } else { assert(0); } + + return false; } void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); } @@ -548,6 +550,7 @@ bool isNullN(char *val, int32_t type) { } } + return false; } static uint8_t nullBool = TSDB_DATA_BOOL_NULL; From 8497cdee74ffb87b82157e38bfc837120fdb5eed Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 10 May 2021 15:26:33 +0800 Subject: [PATCH 05/30] fix memory leak --- src/tsdb/src/tsdbMeta.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 9b98ca19fc..f4f555e2df 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -793,6 +793,7 @@ static void tsdbFreeTable(STable *pTable) { tSkipListDestroy(pTable->pIndex); taosTZfree(pTable->lastRow); + tfree(pTable->lastCols); tfree(pTable->sql); for (int i = 0; i < pTable->lastColNum; ++i) { From 00836e137cd23c973667c050a5396c01150ba5bd Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 10 May 2021 17:01:38 +0800 Subject: [PATCH 06/30] fix memory leak --- src/tsdb/src/tsdbMeta.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index f4f555e2df..d45f4c2d60 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -792,8 +792,7 @@ static void tsdbFreeTable(STable *pTable) { kvRowFree(pTable->tagVal); tSkipListDestroy(pTable->pIndex); - taosTZfree(pTable->lastRow); - tfree(pTable->lastCols); + taosTZfree(pTable->lastRow); tfree(pTable->sql); for (int i = 0; i < pTable->lastColNum; ++i) { @@ -802,6 +801,8 @@ static void tsdbFreeTable(STable *pTable) { } free(pTable->lastCols[i].pData); } + tfree(pTable->lastCols); + free(pTable); } } From 14b699f53bcc07973cb391cb665186d1d30814f1 Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 13 May 2021 11:24:18 +0800 Subject: [PATCH 07/30] [TD-4034]refactor cache condition --- documentation20/cn/11.administrator/docs.md | 2 +- src/inc/tsdb.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/documentation20/cn/11.administrator/docs.md b/documentation20/cn/11.administrator/docs.md index b9340210d2..26cfa91beb 100644 --- a/documentation20/cn/11.administrator/docs.md +++ b/documentation20/cn/11.administrator/docs.md @@ -129,7 +129,7 @@ taosd -C - blocks:每个VNODE(TSDB)中有多少cache大小的内存块。因此一个VNODE的用的内存大小粗略为(cache * blocks)。单位为块,默认值:4。(可通过 alter database 修改) - replica:副本个数,取值范围:1-3。单位为个,默认值:1。(可通过 alter database 修改) - precision:时间戳精度标识,ms表示毫秒,us表示微秒。默认值:ms。 -- cacheLast:是否在内存中缓存子表的最近数据,0:关闭;1:缓存子表最近一行数据;2:缓存子表每一列的最近的非NULL值,设置为3表示同时开启了1和2。默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数) +- cacheLast:是否在内存中缓存子表的最近数据,0:关闭;1:缓存子表最近一行数据;2:缓存子表每一列的最近的非NULL值,默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数) 对于一个应用场景,可能有多种数据特征的数据并存,最佳的设计是将具有相同数据特征的表放在一个库里,这样一个应用有多个库,而每个库可以配置不同的存储参数,从而保证系统有最优的性能。TDengine允许应用在创建库时指定上述存储参数,如果指定,该参数就将覆盖对应的系统配置参数。举例,有下述SQL: diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 7c28d3e485..d231769c18 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -73,8 +73,8 @@ typedef struct { } STsdbCfg; #define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0) -#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0) -#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0) +#define CACHE_LAST_ROW(c) ((c)->cacheLastRow == 1) +#define CACHE_LAST_NULL_COLUMN(c) ((c)->cacheLastRow == 2) // --------- TSDB REPOSITORY USAGE STATISTICS typedef struct { From 9d3d129fef2ad42747022e5c96dea2363e31e698 Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 17 May 2021 13:37:59 +0800 Subject: [PATCH 08/30] [TD-4034]add timestamp in last NULL column --- src/common/inc/tdataformat.h | 1 + src/tsdb/src/tsdbMemTable.c | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 88d5b85010..fcae7a415f 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -234,6 +234,7 @@ typedef struct SDataCol { int len; // column data length VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column void * pData; // Actual data pointer + TSKEY ts; // only used in last NULL column } SDataCol; static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 2409536924..e1f6625ffa 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -985,7 +985,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r for (int j = 0; j < schemaNCols(pSchema); j++) { if (j >= pTable->lastColNum) { pTable->lastCols = realloc(pTable->lastCols, pTable->lastColNum + 10); - for (int i = 0; i < 10; ++i) { + for (i = 0; i < 10; ++i) { pTable->lastCols[i + pTable->lastColNum].bytes = 0; pTable->lastCols[i + pTable->lastColNum].pData = NULL; } @@ -1009,6 +1009,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r //tsdbDebug("vgId:%d cache column %d for %d,%p", REPO_ID(pRepo), j, pDataCol->bytes, pDataCol->pData); memcpy(pDataCol->pData, value, pDataCol->bytes); + pDataCol->ts = dataRowTKey(row); } } From 3b2d5f74ed2d7a34dc2d05154da0353265201029 Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 17 May 2021 14:29:36 +0800 Subject: [PATCH 09/30] [TD-4034]check if STColumn is NULL,then ignore cache NULL column --- src/tsdb/src/tsdbMemTable.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index e1f6625ffa..1321d43653 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -993,6 +993,10 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r } STColumn *pTCol = schemaColAt(pSchema, j); + if (pTCol == NULL) { + // since schema maybe changed, check if STColumn NULL then ignore + continue; + } SDataCol *pDataCol = &(pLatestCols[j]); void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); if (isNullN(value, pTCol->type)) { From 08848116e5958b41efe85f91426387638d3d9917 Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 17 May 2021 16:32:19 +0800 Subject: [PATCH 10/30] [TD-4034]restore last not NULL column --- src/tsdb/src/tsdbMain.c | 45 +++++++++++++++++++++++++++++++++++++ src/tsdb/src/tsdbMemTable.c | 23 +++++++++---------- 2 files changed, 55 insertions(+), 13 deletions(-) diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index bf195bef33..23556df580 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -645,6 +645,8 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; + //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data); + if (tsdbSetReadTable(&readh, pTable) < 0) { tsdbDestroyReadH(&readh); return -1; @@ -686,6 +688,49 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { pCol->offset); } } + + // restore NULL columns + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + STSchema *pSchema = tsdbGetTableSchema(pTable); + int numColumns = schemaNCols(pSchema); + pTable->lastCols = (SDataCol*)malloc(numColumns * sizeof(SDataCol)); + if (pTable->lastCols == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + pTable->lastColNum = numColumns; + + SDataRow row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); + if (row == NULL) { + tfree(pTable->lastCols); + pTable->lastColNum = 0; + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + tdInitDataRow(row, pSchema); + + SDataCol *pLatestCols = pTable->lastCols; + for (i = 0; i < pTable->lastColNum; ++i) { + STColumn *pTCol = schemaColAt(pSchema, i); + + SDataCol *pDataCol = &(pLatestCols[pTCol->colId]); + pDataCol->pData = malloc(pTCol->bytes); + pDataCol->bytes = pTCol->bytes; + + void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pTCol->offset); + if (isNullN(value, pTCol->type)) { + //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s cache column %d NULL", REPO_ID(pRepo), pTable->name->data, pTCol->colId); + continue; + } + + memcpy(pDataCol->pData, value, pDataCol->bytes); + //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s cache column %d for %d,%s", REPO_ID(pRepo), pTable->name->data, pTCol->colId, pDataCol->bytes, (char*)pDataCol->pData); + pDataCol->ts = dataRowTKey(row); + } + + taosTZfree(row); + } } } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 1321d43653..70e27a5700 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -965,11 +965,12 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { } static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) { - //tsdbDebug("vgId:%d updateTableLatestColumn, row version:%d", REPO_ID(pRepo), dataRowVersion(row)); + //tsdbInfo("vgId:%d updateTableLatestColumn, row version:%d", REPO_ID(pRepo), dataRowVersion(row)); if (pTable->numOfSchemas <= 0) { return; } + STSchema* pSchema = pTable->schema[pTable->numOfSchemas - 1]; int i = pTable->numOfSchemas - 1; while ((pSchema == NULL || pSchema->version != dataRowVersion(row)) && i >= 0) { @@ -983,21 +984,18 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r SDataCol *pLatestCols = pTable->lastCols; for (int j = 0; j < schemaNCols(pSchema); j++) { - if (j >= pTable->lastColNum) { - pTable->lastCols = realloc(pTable->lastCols, pTable->lastColNum + 10); + STColumn *pTCol = schemaColAt(pSchema, j); + + if (pTCol->colId >= pTable->lastColNum) { + pTable->lastCols = realloc(pTable->lastCols, pTCol->colId + 5); for (i = 0; i < 10; ++i) { pTable->lastCols[i + pTable->lastColNum].bytes = 0; pTable->lastCols[i + pTable->lastColNum].pData = NULL; } - pTable->lastColNum += 10; + pTable->lastColNum += pTCol->colId + 5; } - - STColumn *pTCol = schemaColAt(pSchema, j); - if (pTCol == NULL) { - // since schema maybe changed, check if STColumn NULL then ignore - continue; - } - SDataCol *pDataCol = &(pLatestCols[j]); + + SDataCol *pDataCol = &(pLatestCols[pTCol->colId]); void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); if (isNullN(value, pTCol->type)) { continue; @@ -1010,9 +1008,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r pDataCol->bytes = pSchema->columns[j].bytes; } - //tsdbDebug("vgId:%d cache column %d for %d,%p", REPO_ID(pRepo), j, pDataCol->bytes, pDataCol->pData); - memcpy(pDataCol->pData, value, pDataCol->bytes); + //tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData); pDataCol->ts = dataRowTKey(row); } } From 312453705fab3ab91568d420038ff7929ea1c8c4 Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 17 May 2021 16:52:33 +0800 Subject: [PATCH 11/30] [TD-4034]restore last not NULL column --- src/tsdb/src/tsdbMain.c | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 23556df580..44e2bffbe2 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -691,11 +691,24 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { // restore NULL columns if (CACHE_LAST_NULL_COLUMN(pCfg)) { + if (tsdbLoadBlockInfo(&readh, NULL) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1; + + if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + STSchema *pSchema = tsdbGetTableSchema(pTable); int numColumns = schemaNCols(pSchema); pTable->lastCols = (SDataCol*)malloc(numColumns * sizeof(SDataCol)); if (pTable->lastCols == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyReadH(&readh); return -1; } pTable->lastColNum = numColumns; @@ -704,11 +717,18 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { if (row == NULL) { tfree(pTable->lastCols); pTable->lastColNum = 0; + tsdbDestroyReadH(&readh); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } tdInitDataRow(row, pSchema); + for (int icol = 0; icol < schemaNCols(pSchema); icol++) { + STColumn *pCol = schemaColAt(pSchema, icol); + SDataCol *pDataCol = readh.pDCols[0]->cols + icol; + tdAppendColVal(row, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, + pCol->offset); + } SDataCol *pLatestCols = pTable->lastCols; for (i = 0; i < pTable->lastColNum; ++i) { From cad6e7ec510378f6909ad172d7c311f9eb6de94f Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 18 May 2021 16:29:26 +0800 Subject: [PATCH 12/30] [TD-4034]restore last not NULL column --- src/tsdb/inc/tsdbMeta.h | 1 + src/tsdb/src/tsdbMain.c | 190 +++++++++++++++++++++++++++------------- src/tsdb/src/tsdbMeta.c | 1 + 3 files changed, 129 insertions(+), 63 deletions(-) diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index ff47e0cf39..a8c7a6c358 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -39,6 +39,7 @@ typedef struct STable { SDataCol *lastCols; int32_t lastColNum; + int32_t restoreColumnNum; T_REF_DECLARE() } STable; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 44e2bffbe2..3be78e21dd 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -616,6 +616,118 @@ static void tsdbStopStream(STsdbRepo *pRepo) { } } +static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) { + SBlock* pBlock; + int numColumns; + int32_t blockIdx; + SDataStatis* pBlockStatis = NULL; + SDataRow row = NULL; + STSchema *pSchema = tsdbGetTableSchema(pTable); + int err = 0; + + numColumns = schemaNCols(pSchema); + if (numColumns <= pTable->restoreColumnNum) { + return 0; + } + + row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); + if (row == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + err = -1; + goto out; + } + tdInitDataRow(row, pSchema); + + // first load block index info + if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { + err = -1; + goto out; + } + + pBlockStatis = calloc(numColumns, sizeof(SDataStatis)); + if (pBlockStatis == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + err = -1; + goto out; + } + memset(pBlockStatis, 0, numColumns * sizeof(SDataStatis)); + for(int32_t i = 0; i < numColumns; ++i) { + pBlockStatis[i].colId = i; + } + + // load block from backward + SBlockIdx *pIdx = pReadh->pBlkIdx; + blockIdx = (int32_t)(pIdx->numOfBlocks - 1); + + while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) { + bool loadStatisData = false; + pBlock = pReadh->pBlkInfo->blocks + blockIdx; + blockIdx -= 1; + + // load block data + if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) { + err = -1; + goto out; + } + + // file block with sub-blocks has no statistics data + if (pBlock->numOfSubBlocks <= 1) { + tsdbLoadBlockStatis(pReadh, pBlock); + tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns); + loadStatisData = true; + } + + for (uint32_t colId = 0; colId < numColumns && numColumns > pTable->restoreColumnNum; ++colId) { + // ignore loaded columns + if (pTable->lastCols[colId].bytes != 0) { + continue; + } + + // ignore block which has no not-null colId column + if (loadStatisData && pBlockStatis[colId].numOfNull == pBlock->numOfRows) { + continue; + } + + // OK,let's load row from backward to get not-null column + STColumn *pCol = schemaColAt(pSchema, colId); + for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) { + SDataCol *pDataCol = pReadh->pDCols[0]->cols + colId; + tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); + //SDataCol *pDataCol = readh.pDCols[0]->cols + j; + void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); + if (isNullN(value, pCol->type)) { + continue; + } + + // save not-null column + SDataCol *pLastCol = &(pTable->lastCols[colId]); + pLastCol->pData = malloc(pCol->bytes); + pLastCol->bytes = pCol->bytes; + pLastCol->offset = pCol->offset; + pLastCol->colId = pCol->colId; + memcpy(pLastCol->pData, value, pCol->bytes); + + // save row ts(in column 0) + pDataCol = pReadh->pDCols[0]->cols + 0; + pCol = schemaColAt(pSchema, 0); + tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); + pLastCol->ts = dataRowTKey(row); + + pTable->restoreColumnNum += 1; + + tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %ld", REPO_ID(pRepo), pTable->name->data, colId, pLastCol->ts); + break; + } + } + } + +out: + taosTZfree(row); + tfree(pBlockStatis); + + return err; +} + int tsdbRestoreInfo(STsdbRepo *pRepo) { SFSIter fsiter; SReadH readh; @@ -630,6 +742,14 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD); + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + for (int i = 1; i < pMeta->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + pTable->restoreColumnNum = 0; + } + } + while ((pSet = tsdbFSIterNext(&fsiter)) != NULL) { if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) { tsdbDestroyReadH(&readh); @@ -688,71 +808,15 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { pCol->offset); } } - - // restore NULL columns - if (CACHE_LAST_NULL_COLUMN(pCfg)) { - if (tsdbLoadBlockInfo(&readh, NULL) < 0) { - tsdbDestroyReadH(&readh); - return -1; - } - - pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1; - - if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) { - tsdbDestroyReadH(&readh); - return -1; - } - - STSchema *pSchema = tsdbGetTableSchema(pTable); - int numColumns = schemaNCols(pSchema); - pTable->lastCols = (SDataCol*)malloc(numColumns * sizeof(SDataCol)); - if (pTable->lastCols == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyReadH(&readh); - return -1; - } - pTable->lastColNum = numColumns; - - SDataRow row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); - if (row == NULL) { - tfree(pTable->lastCols); - pTable->lastColNum = 0; - tsdbDestroyReadH(&readh); - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - tdInitDataRow(row, pSchema); - for (int icol = 0; icol < schemaNCols(pSchema); icol++) { - STColumn *pCol = schemaColAt(pSchema, icol); - SDataCol *pDataCol = readh.pDCols[0]->cols + icol; - tdAppendColVal(row, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, - pCol->offset); - } - - SDataCol *pLatestCols = pTable->lastCols; - for (i = 0; i < pTable->lastColNum; ++i) { - STColumn *pTCol = schemaColAt(pSchema, i); - - SDataCol *pDataCol = &(pLatestCols[pTCol->colId]); - pDataCol->pData = malloc(pTCol->bytes); - pDataCol->bytes = pTCol->bytes; - - void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pTCol->offset); - if (isNullN(value, pTCol->type)) { - //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s cache column %d NULL", REPO_ID(pRepo), pTable->name->data, pTCol->colId); - continue; - } - - memcpy(pDataCol->pData, value, pDataCol->bytes); - //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s cache column %d for %d,%s", REPO_ID(pRepo), pTable->name->data, pTCol->colId, pDataCol->bytes, (char*)pDataCol->pData); - pDataCol->ts = dataRowTKey(row); - } - - taosTZfree(row); - } } + // restore NULL columns + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + if (restoreLastColumns(pRepo, pTable, &readh) != 0) { + tsdbDestroyReadH(&readh); + return -1; + } + } } } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index bdf0cfdfad..5a108a5d06 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -678,6 +678,7 @@ static STable *tsdbNewTable() { pTable->lastCols[i].bytes = 0; pTable->lastCols[i].pData = NULL; } + pTable->restoreColumnNum = 0; return pTable; } From 6f202d270e66eee02e7d87f314ea2fcc3a281a82 Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 18 May 2021 19:50:35 +0800 Subject: [PATCH 13/30] [TD-4034]restore last not NULL column --- src/tsdb/src/tsdbMain.c | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 3be78e21dd..8e1207b98a 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -652,7 +652,8 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) } memset(pBlockStatis, 0, numColumns * sizeof(SDataStatis)); for(int32_t i = 0; i < numColumns; ++i) { - pBlockStatis[i].colId = i; + STColumn *pCol = schemaColAt(pSchema, i); + pBlockStatis[i].colId = pCol->colId; } // load block from backward @@ -678,6 +679,17 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) } for (uint32_t colId = 0; colId < numColumns && numColumns > pTable->restoreColumnNum; ++colId) { + STColumn *pCol = schemaColAt(pSchema, colId); + + if (colId >= pTable->lastColNum) { + pTable->lastCols = realloc(pTable->lastCols, colId + 5); + for (int m = 0; m < 5; ++m) { + pTable->lastCols[m + pTable->lastColNum].bytes = 0; + pTable->lastCols[m + pTable->lastColNum].pData = NULL; + } + pTable->lastColNum += colId + 5; + } + // ignore loaded columns if (pTable->lastCols[colId].bytes != 0) { continue; @@ -689,7 +701,6 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) } // OK,let's load row from backward to get not-null column - STColumn *pCol = schemaColAt(pSchema, colId); for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) { SDataCol *pDataCol = pReadh->pDCols[0]->cols + colId; tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); From bec2b58922f5ea25ad9dc6673c533b24bbf10a48 Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 19 May 2021 09:16:45 +0800 Subject: [PATCH 14/30] [TD-4034]fix compile error --- src/tsdb/src/tsdbMain.c | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 8e1207b98a..a7e1efb5ed 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -678,31 +678,31 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) loadStatisData = true; } - for (uint32_t colId = 0; colId < numColumns && numColumns > pTable->restoreColumnNum; ++colId) { - STColumn *pCol = schemaColAt(pSchema, colId); + for (uint32_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) { + STColumn *pCol = schemaColAt(pSchema, i); - if (colId >= pTable->lastColNum) { - pTable->lastCols = realloc(pTable->lastCols, colId + 5); + if (i >= pTable->lastColNum) { + pTable->lastCols = realloc(pTable->lastCols, i + 5); for (int m = 0; m < 5; ++m) { pTable->lastCols[m + pTable->lastColNum].bytes = 0; pTable->lastCols[m + pTable->lastColNum].pData = NULL; } - pTable->lastColNum += colId + 5; + pTable->lastColNum += i + 5; } // ignore loaded columns - if (pTable->lastCols[colId].bytes != 0) { + if (pTable->lastCols[i].bytes != 0) { continue; } // ignore block which has no not-null colId column - if (loadStatisData && pBlockStatis[colId].numOfNull == pBlock->numOfRows) { + if (loadStatisData && pBlockStatis[i].numOfNull == pBlock->numOfRows) { continue; } // OK,let's load row from backward to get not-null column for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) { - SDataCol *pDataCol = pReadh->pDCols[0]->cols + colId; + SDataCol *pDataCol = pReadh->pDCols[0]->cols + i; tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); //SDataCol *pDataCol = readh.pDCols[0]->cols + j; void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); @@ -710,8 +710,8 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) continue; } - // save not-null column - SDataCol *pLastCol = &(pTable->lastCols[colId]); + // save not-null column + SDataCol *pLastCol = &(pTable->lastCols[i]); pLastCol->pData = malloc(pCol->bytes); pLastCol->bytes = pCol->bytes; pLastCol->offset = pCol->offset; @@ -726,7 +726,7 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) pTable->restoreColumnNum += 1; - tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %ld", REPO_ID(pRepo), pTable->name->data, colId, pLastCol->ts); + tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %d", REPO_ID(pRepo), pTable->name->data, pCol->colId, (int32_t)pLastCol->ts); break; } } From 865ddbc0268239ea62d0344449e2abdf53557b86 Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 19 May 2021 20:40:04 +0800 Subject: [PATCH 15/30] [TD-4034]fix schema change --- src/tsdb/inc/tsdbMeta.h | 11 ++- src/tsdb/src/tsdbMain.c | 45 ++++++------ src/tsdb/src/tsdbMemTable.c | 23 ++++--- src/tsdb/src/tsdbMeta.c | 132 +++++++++++++++++++++++++++++++----- 4 files changed, 161 insertions(+), 50 deletions(-) diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index a8c7a6c358..43c85d89cb 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -38,11 +38,15 @@ typedef struct STable { SRWLatch latch; // TODO: implementa latch functions SDataCol *lastCols; - int32_t lastColNum; - int32_t restoreColumnNum; + int16_t lastColNum; + int16_t maxColumnNum; + int lastColSVersion; T_REF_DECLARE() } STable; +#define TSDB_LATEST_COLUMN_ARRAY_SIZE 20 +#define TSDB_LATEST_COLUMN_ARRAY_ADD_SIZE 5 + typedef struct { pthread_rwlock_t rwLock; @@ -82,6 +86,9 @@ void tsdbUnRefTable(STable* pTable); void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct); int tsdbRestoreTable(STsdbRepo* pRepo, void* cont, int contLen); void tsdbOrgMeta(STsdbRepo* pRepo); +int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema); +int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId); +int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema); static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index a7e1efb5ed..3241d617b6 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -617,18 +617,27 @@ static void tsdbStopStream(STsdbRepo *pRepo) { } static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) { + if (pTable->numOfSchemas == 0) { + return 0; + } SBlock* pBlock; int numColumns; int32_t blockIdx; SDataStatis* pBlockStatis = NULL; SDataRow row = NULL; - STSchema *pSchema = tsdbGetTableSchema(pTable); + // restore last column data with last schema + STSchema *pSchema = pTable->schema[pTable->numOfSchemas - 1]; int err = 0; numColumns = schemaNCols(pSchema); - if (numColumns <= pTable->restoreColumnNum) { + if (numColumns <= pTable->maxColumnNum) { return 0; } + if (pTable->lastColSVersion != schemaVersion(pSchema)) { + if (tsdbInitColIdCacheWithSchema(pTable, pSchema) < 0) { + return -1; + } + } row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); if (row == NULL) { @@ -660,7 +669,7 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) SBlockIdx *pIdx = pReadh->pBlkIdx; blockIdx = (int32_t)(pIdx->numOfBlocks - 1); - while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) { + while (numColumns > pTable->maxColumnNum && blockIdx >= 0) { bool loadStatisData = false; pBlock = pReadh->pBlkInfo->blocks + blockIdx; blockIdx -= 1; @@ -678,18 +687,8 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) loadStatisData = true; } - for (uint32_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) { + for (int16_t i = 0; i < numColumns && numColumns > pTable->maxColumnNum; ++i) { STColumn *pCol = schemaColAt(pSchema, i); - - if (i >= pTable->lastColNum) { - pTable->lastCols = realloc(pTable->lastCols, i + 5); - for (int m = 0; m < 5; ++m) { - pTable->lastCols[m + pTable->lastColNum].bytes = 0; - pTable->lastCols[m + pTable->lastColNum].pData = NULL; - } - pTable->lastColNum += i + 5; - } - // ignore loaded columns if (pTable->lastCols[i].bytes != 0) { continue; @@ -710,11 +709,15 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) continue; } + int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId); + if (idx == -1) { + tsdbError("restoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), pTable->name->data, pCol->colId); + continue; + } // save not-null column - SDataCol *pLastCol = &(pTable->lastCols[i]); + SDataCol *pLastCol = &(pTable->lastCols[idx]); pLastCol->pData = malloc(pCol->bytes); pLastCol->bytes = pCol->bytes; - pLastCol->offset = pCol->offset; pLastCol->colId = pCol->colId; memcpy(pLastCol->pData, value, pCol->bytes); @@ -722,11 +725,11 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) pDataCol = pReadh->pDCols[0]->cols + 0; pCol = schemaColAt(pSchema, 0); tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); - pLastCol->ts = dataRowTKey(row); + pLastCol->ts = dataRowTKey(row); - pTable->restoreColumnNum += 1; + pTable->maxColumnNum += 1; - tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %d", REPO_ID(pRepo), pTable->name->data, pCol->colId, (int32_t)pLastCol->ts); + tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); break; } } @@ -757,7 +760,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { for (int i = 1; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; - pTable->restoreColumnNum = 0; + pTable->maxColumnNum = 0; } } @@ -822,7 +825,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { } // restore NULL columns - if (CACHE_LAST_NULL_COLUMN(pCfg)) { + if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg)) { if (restoreLastColumns(pRepo, pTable, &readh) != 0) { tsdbDestroyReadH(&readh); return -1; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 70e27a5700..900c034a22 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -972,7 +972,11 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r } STSchema* pSchema = pTable->schema[pTable->numOfSchemas - 1]; - int i = pTable->numOfSchemas - 1; + if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) { + return; + } + + int16_t i = pTable->numOfSchemas - 1; while ((pSchema == NULL || pSchema->version != dataRowVersion(row)) && i >= 0) { i -= 1; pSchema = pTable->schema[i]; @@ -983,23 +987,20 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r SDataCol *pLatestCols = pTable->lastCols; - for (int j = 0; j < schemaNCols(pSchema); j++) { + for (int16_t j = 0; j < schemaNCols(pSchema); j++) { STColumn *pTCol = schemaColAt(pSchema, j); - - if (pTCol->colId >= pTable->lastColNum) { - pTable->lastCols = realloc(pTable->lastCols, pTCol->colId + 5); - for (i = 0; i < 10; ++i) { - pTable->lastCols[i + pTable->lastColNum].bytes = 0; - pTable->lastCols[i + pTable->lastColNum].pData = NULL; - } - pTable->lastColNum += pTCol->colId + 5; + // ignore not exist colId + int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pTCol->colId); + if (idx == -1) { + continue; } - SDataCol *pDataCol = &(pLatestCols[pTCol->colId]); void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); if (isNullN(value, pTCol->type)) { continue; } + + SDataCol *pDataCol = &(pLatestCols[idx]); if (pDataCol->pData == NULL) { pDataCol->pData = malloc(pSchema->columns[j].bytes); pDataCol->bytes = pSchema->columns[j].bytes; diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 5a108a5d06..d785d259fa 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -14,7 +14,6 @@ */ #include "tsdbint.h" -#define TSDB_LATEST_COLUMN_ARRAY_SIZE 20 #define TSDB_SUPER_TABLE_SL_LEVEL 5 #define DEFAULT_TAG_INDEX_COLUMN 0 @@ -45,6 +44,7 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable); static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable); static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid); static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema); +static void tsdbFreeLastColumns(STable* pTable); // ------------------ OUTER FUNCTIONS ------------------ int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) { @@ -590,6 +590,116 @@ void tsdbUnRefTable(STable *pTable) { } } +static void tsdbFreeLastColumns(STable* pTable) { + if (pTable->lastCols == NULL) { + return; + } + + for (int i = 0; i < pTable->lastColNum; ++i) { + if (pTable->lastCols[i].bytes == 0) { + continue; + } + tfree(pTable->lastCols[i].pData); + pTable->lastCols[i].bytes = 0; + pTable->lastCols[i].pData = NULL; + } + tfree(pTable->lastCols); + pTable->lastCols = NULL; + pTable->lastColNum = 0; +} + +int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) { + if (pTable->lastCols == NULL) { + return -1; + } + for (int16_t i = 0; i < pTable->lastColNum; ++i) { + if (pTable->lastCols[i].colId == colId) { + return i; + } + } + + return -1; +} + +int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) { + ASSERT(pTable->lastCols == NULL); + + int16_t numOfColumn = pSchema->numOfCols; + + pTable->lastCols = (SDataCol*)malloc(numOfColumn * sizeof(SDataCol)); + if (pTable->lastCols == NULL) { + return -1; + } + + for (int16_t i = 0; i < numOfColumn; ++i) { + STColumn *pCol = schemaColAt(pSchema, i); + SDataCol* pDataCol = &(pTable->lastCols[i]); + pDataCol->bytes = 0; + pDataCol->pData = NULL; + pDataCol->colId = pCol->colId; + } + + pTable->lastColSVersion = schemaVersion(pSchema); + pTable->lastColNum = numOfColumn; + pTable->maxColumnNum = 0; + return 0; +} + +int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) { + if (pTable->lastColSVersion == schemaVersion(pNewSchema)) { + return 0; + } + + tsdbInfo("tsdbUpdateLastColSchema:%s,%d->%d", pTable->name->data, pTable->lastColSVersion, schemaVersion(pNewSchema)); + + int16_t numOfCols = pNewSchema->numOfCols; + SDataCol *lastCols = (SDataCol*)malloc(numOfCols * sizeof(SDataCol)); + if (lastCols == NULL) { + return -1; + } + + TSDB_WLOCK_TABLE(pTable); + + int16_t oldIdx = 0; + for (int16_t i = 0; i < numOfCols; ++i) { + STColumn *pCol = schemaColAt(pNewSchema, i); + int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId); + + SDataCol* pDataCol = &(lastCols[i]); + if (idx != -1) { + SDataCol* pOldDataCol = &(pTable->lastCols[idx]); + memcpy(pDataCol, pOldDataCol, sizeof(SDataCol)); + } else { + pDataCol->colId = pCol->colId; + pDataCol->bytes = 0; + pDataCol->pData = NULL; + } + + // free dropped column data + while (oldIdx < idx && oldIdx < pTable->lastColNum) { + SDataCol* pOldDataCol = &(pTable->lastCols[oldIdx]); + if (pOldDataCol->bytes != 0) { + tfree(pOldDataCol->pData); + pOldDataCol->bytes = 0; + } + ++oldIdx; + } + if (idx != -1 && oldIdx == idx) { + oldIdx += 1; + } + } + + // free old schema last column datas + tfree(pTable->lastCols); + + pTable->lastColSVersion = schemaVersion(pNewSchema); + pTable->lastCols = lastCols; + pTable->lastColNum = numOfCols; + + TSDB_WUNLOCK_TABLE(pTable); + return 0; +} + void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema, bool insertAct) { ASSERT(TABLE_TYPE(pTable) != TSDB_STREAM_TABLE && TABLE_TYPE(pTable) != TSDB_SUPER_TABLE); STsdbMeta *pMeta = pRepo->tsdbMeta; @@ -672,14 +782,11 @@ static STable *tsdbNewTable() { } pTable->lastKey = TSKEY_INITIAL_VAL; - pTable->lastCols = (SDataCol*)malloc(TSDB_LATEST_COLUMN_ARRAY_SIZE * sizeof(SDataCol)); - pTable->lastColNum = TSDB_LATEST_COLUMN_ARRAY_SIZE; - for (int i = 0; i < pTable->lastColNum; ++i) { - pTable->lastCols[i].bytes = 0; - pTable->lastCols[i].pData = NULL; - } - pTable->restoreColumnNum = 0; + pTable->lastCols = NULL; + pTable->maxColumnNum = 0; + pTable->lastColNum = 0; + pTable->lastColSVersion = -1; return pTable; } @@ -796,14 +903,7 @@ static void tsdbFreeTable(STable *pTable) { taosTZfree(pTable->lastRow); tfree(pTable->sql); - for (int i = 0; i < pTable->lastColNum; ++i) { - if (pTable->lastCols[i].pData == NULL) { - continue; - } - free(pTable->lastCols[i].pData); - } - tfree(pTable->lastCols); - + tsdbFreeLastColumns(pTable); free(pTable); } } From f182495628bfd32ccdbeb56aed15649544181c11 Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 20 May 2021 09:29:31 +0800 Subject: [PATCH 16/30] [TD-4034]fix schema change,free not exist column data --- src/inc/tsdb.h | 4 ++-- src/tsdb/src/tsdbMeta.c | 43 +++++++++++++++++++++++++---------------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index d231769c18..7c28d3e485 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -73,8 +73,8 @@ typedef struct { } STsdbCfg; #define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0) -#define CACHE_LAST_ROW(c) ((c)->cacheLastRow == 1) -#define CACHE_LAST_NULL_COLUMN(c) ((c)->cacheLastRow == 2) +#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0) +#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0) // --------- TSDB REPOSITORY USAGE STATISTICS typedef struct { diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index d785d259fa..fde593b59a 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -659,44 +659,53 @@ int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) { } TSDB_WLOCK_TABLE(pTable); - - int16_t oldIdx = 0; + for (int16_t i = 0; i < numOfCols; ++i) { STColumn *pCol = schemaColAt(pNewSchema, i); int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId); SDataCol* pDataCol = &(lastCols[i]); if (idx != -1) { + // move col data to new last column array SDataCol* pOldDataCol = &(pTable->lastCols[idx]); memcpy(pDataCol, pOldDataCol, sizeof(SDataCol)); } else { + // init new colid data pDataCol->colId = pCol->colId; pDataCol->bytes = 0; pDataCol->pData = NULL; } - - // free dropped column data - while (oldIdx < idx && oldIdx < pTable->lastColNum) { - SDataCol* pOldDataCol = &(pTable->lastCols[oldIdx]); - if (pOldDataCol->bytes != 0) { - tfree(pOldDataCol->pData); - pOldDataCol->bytes = 0; - } - ++oldIdx; - } - if (idx != -1 && oldIdx == idx) { - oldIdx += 1; - } } - // free old schema last column datas - tfree(pTable->lastCols); + SDataCol *oldLastCols = pTable->lastCols; + int16_t oldLastColNum = pTable->lastColNum; pTable->lastColSVersion = schemaVersion(pNewSchema); pTable->lastCols = lastCols; pTable->lastColNum = numOfCols; + if (oldLastCols == NULL) { + TSDB_WUNLOCK_TABLE(pTable); + return 0; + } + + // free old schema last column datas + for (int16_t i = 0; i < oldLastColNum; ++i) { + SDataCol* pDataCol = &(oldLastCols[i]); + if (pDataCol->bytes == 0) { + continue; + } + int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pDataCol->colId); + if (idx != -1) { + continue; + } + + // free not exist column data + tfree(pDataCol->pData); + } TSDB_WUNLOCK_TABLE(pTable); + tfree(oldLastCols); + return 0; } From 61b2473bf927f16d31b319e309fc904ca1e6f086 Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 20 May 2021 13:35:06 +0800 Subject: [PATCH 17/30] [TD-4034]fix Data column ts type --- src/common/inc/tdataformat.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index fcae7a415f..9bb997ddcc 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -234,7 +234,7 @@ typedef struct SDataCol { int len; // column data length VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column void * pData; // Actual data pointer - TSKEY ts; // only used in last NULL column + TKEY ts; // only used in last NULL column } SDataCol; static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } From e51fd4bf77b4aa7d9904adcd53fcc592902d3e58 Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 20 May 2021 15:32:25 +0800 Subject: [PATCH 18/30] [TD-4034]get latest schema from super table --- src/tsdb/src/tsdbMain.c | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 3241d617b6..29842c0c16 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -616,17 +616,34 @@ static void tsdbStopStream(STsdbRepo *pRepo) { } } +static STSchema* getTableLatestSchema(STable *pTable) { + if (pTable->numOfSchemas > 0) { + return pTable->schema[pTable->numOfSchemas - 1]; + } + + if (pTable->type == TSDB_CHILD_TABLE) { + if (pTable->pSuper && pTable->pSuper->numOfSchemas) { + tsdbInfo("getTableLatestSchema of table %s from super table", pTable->name->data); + return pTable->pSuper->schema[pTable->pSuper->numOfSchemas - 1]; + } + } + return NULL; +} + static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) { - if (pTable->numOfSchemas == 0) { + STSchema *pSchema = getTableLatestSchema(pTable); + if (pSchema == NULL) { + tsdbError("getTableLatestSchema of table %s fail", pTable->name->data); return 0; } + SBlock* pBlock; int numColumns; int32_t blockIdx; SDataStatis* pBlockStatis = NULL; SDataRow row = NULL; // restore last column data with last schema - STSchema *pSchema = pTable->schema[pTable->numOfSchemas - 1]; + int err = 0; numColumns = schemaNCols(pSchema); From 220bc3502a023cb7c8a79a5e640798632e379858 Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 20 May 2021 15:36:44 +0800 Subject: [PATCH 19/30] [TD-4034]get latest schema from super table --- src/tsdb/src/tsdbMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 29842c0c16..8e31ae5022 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -623,7 +623,7 @@ static STSchema* getTableLatestSchema(STable *pTable) { if (pTable->type == TSDB_CHILD_TABLE) { if (pTable->pSuper && pTable->pSuper->numOfSchemas) { - tsdbInfo("getTableLatestSchema of table %s from super table", pTable->name->data); + tsdbDebug("getTableLatestSchema of table %s from super table %s", pTable->name->data, pTable->pSuper->name->data); return pTable->pSuper->schema[pTable->pSuper->numOfSchemas - 1]; } } From fcfd4d29d1df853599f73200cf8a6ddc3c254827 Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 20 May 2021 16:14:16 +0800 Subject: [PATCH 20/30] [TD-4034]fix Data column ts type --- src/common/inc/tdataformat.h | 2 +- src/tsdb/src/tsdbMain.c | 2 +- src/tsdb/src/tsdbMemTable.c | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 9bb997ddcc..fcae7a415f 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -234,7 +234,7 @@ typedef struct SDataCol { int len; // column data length VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column void * pData; // Actual data pointer - TKEY ts; // only used in last NULL column + TSKEY ts; // only used in last NULL column } SDataCol; static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 8e31ae5022..d593997d3c 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -742,7 +742,7 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) pDataCol = pReadh->pDCols[0]->cols + 0; pCol = schemaColAt(pSchema, 0); tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); - pLastCol->ts = dataRowTKey(row); + pLastCol->ts = dataRowKey(row); pTable->maxColumnNum += 1; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 900c034a22..156bd29ee1 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -1011,7 +1011,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r memcpy(pDataCol->pData, value, pDataCol->bytes); //tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData); - pDataCol->ts = dataRowTKey(row); + pDataCol->ts = dataRowKey(row); } } From 4beb0b4d076b1c2f01d7d086186804013f26db97 Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 20 May 2021 19:16:12 +0800 Subject: [PATCH 21/30] [TD-4034]fix super table bug --- src/tsdb/inc/tsdbMeta.h | 1 + src/tsdb/src/tsdbMain.c | 27 ++++++++------------------- src/tsdb/src/tsdbMemTable.c | 16 ++++------------ src/tsdb/src/tsdbMeta.c | 4 ++++ 4 files changed, 17 insertions(+), 31 deletions(-) diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index 43c85d89cb..45868c002d 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -89,6 +89,7 @@ void tsdbOrgMeta(STsdbRepo* pRepo); int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema); int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId); int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema); +STSchema* tsdbGetTableLatestSchema(STable *pTable); static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index d593997d3c..1e6f9eac12 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -26,6 +26,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); static void tsdbFreeRepo(STsdbRepo *pRepo); static void tsdbStartStream(STsdbRepo *pRepo); static void tsdbStopStream(STsdbRepo *pRepo); +static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh); // Function declaration int32_t tsdbCreateRepo(int repoid) { @@ -616,24 +617,12 @@ static void tsdbStopStream(STsdbRepo *pRepo) { } } -static STSchema* getTableLatestSchema(STable *pTable) { - if (pTable->numOfSchemas > 0) { - return pTable->schema[pTable->numOfSchemas - 1]; - } +static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) { + //tsdbInfo("tsdbRestoreLastColumns of table %s", pTable->name->data); - if (pTable->type == TSDB_CHILD_TABLE) { - if (pTable->pSuper && pTable->pSuper->numOfSchemas) { - tsdbDebug("getTableLatestSchema of table %s from super table %s", pTable->name->data, pTable->pSuper->name->data); - return pTable->pSuper->schema[pTable->pSuper->numOfSchemas - 1]; - } - } - return NULL; -} - -static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) { - STSchema *pSchema = getTableLatestSchema(pTable); + STSchema *pSchema = tsdbGetTableLatestSchema(pTable); if (pSchema == NULL) { - tsdbError("getTableLatestSchema of table %s fail", pTable->name->data); + tsdbError("tsdbGetTableLatestSchema of table %s fail", pTable->name->data); return 0; } @@ -728,7 +717,7 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId); if (idx == -1) { - tsdbError("restoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), pTable->name->data, pCol->colId); + tsdbError("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), pTable->name->data, pCol->colId); continue; } // save not-null column @@ -746,7 +735,7 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) pTable->maxColumnNum += 1; - tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); + tsdbInfo("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); break; } } @@ -843,7 +832,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { // restore NULL columns if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg)) { - if (restoreLastColumns(pRepo, pTable, &readh) != 0) { + if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) { tsdbDestroyReadH(&readh); return -1; } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 156bd29ee1..ff2a870f3f 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -965,23 +965,15 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { } static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) { - //tsdbInfo("vgId:%d updateTableLatestColumn, row version:%d", REPO_ID(pRepo), dataRowVersion(row)); + tsdbInfo("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row)); - if (pTable->numOfSchemas <= 0) { - return; - } - - STSchema* pSchema = pTable->schema[pTable->numOfSchemas - 1]; + STSchema* pSchema = tsdbGetTableLatestSchema(pTable); if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) { return; } - int16_t i = pTable->numOfSchemas - 1; - while ((pSchema == NULL || pSchema->version != dataRowVersion(row)) && i >= 0) { - i -= 1; - pSchema = pTable->schema[i]; - } - if (pSchema == NULL || pSchema->version != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); + if (pSchema == NULL) { return; } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index fde593b59a..5717d52eea 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -645,6 +645,10 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) { return 0; } +STSchema* tsdbGetTableLatestSchema(STable *pTable) { + return tsdbGetTableSchemaByVersion(pTable, -1); +} + int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) { if (pTable->lastColSVersion == schemaVersion(pNewSchema)) { return 0; From 210c0ae3f85b182b18237ae53541c95a7fe085fb Mon Sep 17 00:00:00 2001 From: lichuang Date: Fri, 21 May 2021 15:08:17 +0800 Subject: [PATCH 22/30] [TD-4034]fix cache last valid range --- documentation20/cn/11.administrator/docs.md | 2 +- src/inc/taosdef.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/documentation20/cn/11.administrator/docs.md b/documentation20/cn/11.administrator/docs.md index 26cfa91beb..ee2f34ff11 100644 --- a/documentation20/cn/11.administrator/docs.md +++ b/documentation20/cn/11.administrator/docs.md @@ -129,7 +129,7 @@ taosd -C - blocks:每个VNODE(TSDB)中有多少cache大小的内存块。因此一个VNODE的用的内存大小粗略为(cache * blocks)。单位为块,默认值:4。(可通过 alter database 修改) - replica:副本个数,取值范围:1-3。单位为个,默认值:1。(可通过 alter database 修改) - precision:时间戳精度标识,ms表示毫秒,us表示微秒。默认值:ms。 -- cacheLast:是否在内存中缓存子表的最近数据,0:关闭;1:缓存子表最近一行数据;2:缓存子表每一列的最近的非NULL值,默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数) +- cacheLast:是否在内存中缓存子表的最近数据,0:关闭;1:缓存子表最近一行数据;2:缓存子表每一列的最近的非NULL值,3:同时打开缓存最近行和列功能,默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数) 对于一个应用场景,可能有多种数据特征的数据并存,最佳的设计是将具有相同数据特征的表放在一个库里,这样一个应用有多个库,而每个库可以配置不同的存储参数,从而保证系统有最优的性能。TDengine允许应用在创建库时指定上述存储参数,如果指定,该参数就将覆盖对应的系统配置参数。举例,有下述SQL: diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 2882faf7be..f888d037b3 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -298,7 +298,7 @@ do { \ #define TSDB_DEFAULT_DB_UPDATE_OPTION 0 #define TSDB_MIN_DB_CACHE_LAST_ROW 0 -#define TSDB_MAX_DB_CACHE_LAST_ROW 2 +#define TSDB_MAX_DB_CACHE_LAST_ROW 3 #define TSDB_DEFAULT_CACHE_LAST_ROW 0 #define TSDB_MIN_FSYNC_PERIOD 0 From fbb160572efa96495a91a1a60abe5a11425b4f19 Mon Sep 17 00:00:00 2001 From: lichuang Date: Sat, 22 May 2021 10:09:29 +0800 Subject: [PATCH 23/30] [TD-4034]debug log --- src/tsdb/src/tsdbMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 1e6f9eac12..880400dd9f 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -735,7 +735,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea pTable->maxColumnNum += 1; - tsdbInfo("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); + tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); break; } } From 89fd502625adf7b6af1779c9ee341d2116854e69 Mon Sep 17 00:00:00 2001 From: Elias Soong Date: Wed, 26 May 2021 13:51:06 +0800 Subject: [PATCH 24/30] [TD-3948] : column names can be away from table name in insert. --- documentation20/cn/12.taos-sql/docs.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/documentation20/cn/12.taos-sql/docs.md b/documentation20/cn/12.taos-sql/docs.md index bcf80d8fa2..186287515f 100644 --- a/documentation20/cn/12.taos-sql/docs.md +++ b/documentation20/cn/12.taos-sql/docs.md @@ -399,7 +399,12 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM INSERT INTO tb1_name (tb1_field1_name, ...) [USING stb1_name TAGS (tag_value1, ...)] VALUES (field1_value1, ...) (field1_value2, ...) ... tb2_name (tb2_field1_name, ...) [USING stb2_name TAGS (tag_value2, ...)] VALUES (field1_value1, ...) (field1_value2, ...) ...; ``` - 以自动建表的方式,同时向表tb1_name和tb2_name中按列分别插入多条记录。 + 以自动建表的方式,同时向表tb1_name和tb2_name中按列分别插入多条记录。 + 从 2.0.20.5 版本开始,子表的列名可以不跟在子表名称后面,而是可以放在 TAGS 和 VALUES 之间,例如像下面这样写: + ```mysql + INSERT INTO tb1_name [USING stb1_name TAGS (tag_value1, ...)] (tb1_field1_name, ...) VALUES (field1_value1, ...) (field1_value2, ...) ...; + ``` + 注意:虽然两种写法都可以,但并不能在一条 SQL 语句中混用,否则会报语法错误。 **历史记录写入**:可使用IMPORT或者INSERT命令,IMPORT的语法,功能与INSERT完全一样。 From ede8a4a6f040fb0710f7022da77ff08f7031bb51 Mon Sep 17 00:00:00 2001 From: Elias Soong Date: Wed, 26 May 2021 15:08:53 +0800 Subject: [PATCH 25/30] [TD-4089] : stopping taosd service may take remarkable time. --- documentation20/cn/02.getting-started/docs.md | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/documentation20/cn/02.getting-started/docs.md b/documentation20/cn/02.getting-started/docs.md index a98159d8c4..6eb58a1433 100644 --- a/documentation20/cn/02.getting-started/docs.md +++ b/documentation20/cn/02.getting-started/docs.md @@ -24,7 +24,7 @@ TDengine的安装非常简单,从下载到安装成功仅仅只要几秒钟。 ## 轻松启动 -安装成功后,用户可使用`systemctl`命令来启动TDengine的服务进程。 +安装成功后,用户可使用 `systemctl` 命令来启动 TDengine 的服务进程。 ```bash $ systemctl start taosd @@ -35,21 +35,22 @@ $ systemctl start taosd $ systemctl status taosd ``` -如果TDengine服务正常工作,那么您可以通过TDengine的命令行程序`taos`来访问并体验TDengine。 +如果 TDengine 服务正常工作,那么您可以通过 TDengine 的命令行程序 `taos` 来访问并体验 TDengine。 **注意:** -- systemctl命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 sudo -- 为更好的获得产品反馈,改善产品,TDengine会采集基本的使用信息,但您可以修改系统配置文件taos.cfg里的配置参数telemetryReporting, 将其设为0,就可将其关闭。 -- TDengine采用FQDN(一般就是hostname)作为节点的ID,为保证正常运行,需要给运行taosd的服务器配置好hostname,在客户端应用运行的机器配置好DNS服务或hosts文件,保证FQDN能够解析。 +- systemctl 命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 sudo 。 +- 为更好的获得产品反馈,改善产品,TDengine 会采集基本的使用信息,但您可以修改系统配置文件 taos.cfg 里的配置参数 telemetryReporting, 将其设为 0,就可将其关闭。 +- TDengine 采用 FQDN (一般就是 hostname )作为节点的 ID,为保证正常运行,需要给运行 taosd 的服务器配置好 hostname,在客户端应用运行的机器配置好 DNS 服务或 hosts 文件,保证 FQDN 能够解析。 +- `systemctl stop taosd` 指令在执行后并不会马上停止 TDengine 服务,而是会等待系统中必要的落盘工作正常完成。在数据量很大的情况下,这可能会消耗较长时间。 -* TDengine 支持在使用[`systemd`](https://en.wikipedia.org/wiki/Systemd)做进程服务管理的linux系统上安装,用`which systemctl`命令来检测系统中是否存在`systemd`包: +* TDengine 支持在使用 [`systemd`](https://en.wikipedia.org/wiki/Systemd) 做进程服务管理的 linux 系统上安装,用 `which systemctl` 命令来检测系统中是否存在 `systemd` 包: ```bash $ which systemctl ``` - 如果系统中不支持systemd,也可以用手动运行 /usr/local/taos/bin/taosd 方式启动 TDengine 服务。 + 如果系统中不支持 systemd,也可以用手动运行 /usr/local/taos/bin/taosd 方式启动 TDengine 服务。 ## TDengine命令行程序 From 0a38b29c3d12ee12942432a2bcf8fc215a958966 Mon Sep 17 00:00:00 2001 From: Elias Soong Date: Wed, 26 May 2021 16:06:38 +0800 Subject: [PATCH 26/30] [TD-3963] : allow alter some database parameters without reboot server. --- documentation20/cn/12.taos-sql/docs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/documentation20/cn/12.taos-sql/docs.md b/documentation20/cn/12.taos-sql/docs.md index 186287515f..b130e0544c 100644 --- a/documentation20/cn/12.taos-sql/docs.md +++ b/documentation20/cn/12.taos-sql/docs.md @@ -126,7 +126,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM ```mysql ALTER DATABASE db_name CACHELAST 0; ``` - CACHELAST 参数控制是否在内存中缓存数据子表的 last_row。缺省值为 0,取值范围 [0, 1]。其中 0 表示不启用、1 表示启用。(从 2.0.11 版本开始支持,修改后需要重启服务器生效。) + CACHELAST 参数控制是否在内存中缓存数据子表的 last_row。缺省值为 0,取值范围 [0, 1]。其中 0 表示不启用、1 表示启用。(从 2.0.11.0 版本开始支持。从 2.1.1.0 版本开始,修改此参数后无需重启服务器即可生效。) **Tips**: 以上所有参数修改后都可以用show databases来确认是否修改成功。 From 1062357eeee0cb3eb04064c20c98a3f356f51ad0 Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 26 May 2021 17:36:51 +0800 Subject: [PATCH 27/30] [TD-4034]when cacheLast option changed,restore or free cache last data --- src/tsdb/inc/tsdbMeta.h | 6 +- src/tsdb/inc/tsdbint.h | 4 + src/tsdb/src/tsdbCommitQueue.c | 22 +++-- src/tsdb/src/tsdbMain.c | 172 ++++++++++++++++++++++++++++++++- src/tsdb/src/tsdbMeta.c | 23 ++--- 5 files changed, 203 insertions(+), 24 deletions(-) diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index 45868c002d..14d5a41768 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -38,8 +38,9 @@ typedef struct STable { SRWLatch latch; // TODO: implementa latch functions SDataCol *lastCols; - int16_t lastColNum; - int16_t maxColumnNum; + int16_t maxColNum; + int16_t restoreColumnNum; + bool hasRestoreLastColumn; int lastColSVersion; T_REF_DECLARE() } STable; @@ -90,6 +91,7 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema); int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId); int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema); STSchema* tsdbGetTableLatestSchema(STable *pTable); +void tsdbFreeLastColumns(STable* pTable); static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 4d62164df9..e74c3238e2 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -75,6 +75,9 @@ struct STsdbRepo { STsdbCfg save_config; // save apply config bool config_changed; // config changed flag pthread_mutex_t save_mutex; // protect save config + + uint8_t hasCachedLastRow; + uint8_t hasCachedLastColumn; STsdbAppH appH; STsdbStat stat; @@ -100,6 +103,7 @@ int tsdbUnlockRepo(STsdbRepo* pRepo); STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); int tsdbCheckCommit(STsdbRepo* pRepo); int tsdbRestoreInfo(STsdbRepo* pRepo); +int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg); void tsdbGetRootDir(int repoid, char dirName[]); void tsdbGetDataDir(int repoid, char dirName[]); diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index e753a3211e..abea79bc4f 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -113,11 +113,15 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) { } static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { + pthread_mutex_lock(&pRepo->save_mutex); + pRepo->config_changed = false; STsdbCfg * pSaveCfg = &pRepo->save_config; - + STsdbCfg oldCfg; int32_t oldTotalBlocks = pRepo->config.totalBlocks; + memcpy(&oldCfg, &(pRepo->config), sizeof(STsdbCfg)); + pRepo->config.compression = pRepo->save_config.compression; pRepo->config.keep = pRepo->save_config.keep; pRepo->config.keep1 = pRepo->save_config.keep1; @@ -125,10 +129,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow; pRepo->config.totalBlocks = pRepo->save_config.totalBlocks; - tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)", + pthread_mutex_unlock(&pRepo->save_mutex); + + tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d->%d),totalBlocks(%d->%d)", REPO_ID(pRepo), pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, - pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks); + pSaveCfg->totalBlocks, oldCfg.cacheLastRow, pSaveCfg->cacheLastRow, oldTotalBlocks, pSaveCfg->totalBlocks); int err = tsdbExpendPool(pRepo, oldTotalBlocks); if (!TAOS_SUCCEEDED(err)) { @@ -136,6 +142,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err)); } + if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) { + if (tsdbLockRepo(pRepo) < 0) return; + tsdbCacheLastData(pRepo, &oldCfg); + tsdbUnlockRepo(pRepo); + } + } static void *tsdbLoopCommit(void *arg) { @@ -165,10 +177,8 @@ static void *tsdbLoopCommit(void *arg) { pRepo = ((SCommitReq *)pNode->data)->pRepo; // check if need to apply new config - if (pRepo->config_changed) { - pthread_mutex_lock(&pRepo->save_mutex); + if (pRepo->config_changed) { tsdbApplyRepoConfig(pRepo); - pthread_mutex_unlock(&pRepo->save_mutex); } tsdbCommitData(pRepo); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 880400dd9f..a8bbd0d69e 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -548,6 +548,8 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { return NULL; } pRepo->config_changed = false; + atomic_store_8(&pRepo->hasCachedLastRow, 0); + atomic_store_8(&pRepo->hasCachedLastColumn, 0); code = tsem_init(&(pRepo->readyToCommit), 0, 1); if (code != 0) { @@ -636,7 +638,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea int err = 0; numColumns = schemaNCols(pSchema); - if (numColumns <= pTable->maxColumnNum) { + if (numColumns <= pTable->restoreColumnNum) { return 0; } if (pTable->lastColSVersion != schemaVersion(pSchema)) { @@ -675,7 +677,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea SBlockIdx *pIdx = pReadh->pBlkIdx; blockIdx = (int32_t)(pIdx->numOfBlocks - 1); - while (numColumns > pTable->maxColumnNum && blockIdx >= 0) { + while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) { bool loadStatisData = false; pBlock = pReadh->pBlkInfo->blocks + blockIdx; blockIdx -= 1; @@ -693,7 +695,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea loadStatisData = true; } - for (int16_t i = 0; i < numColumns && numColumns > pTable->maxColumnNum; ++i) { + for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) { STColumn *pCol = schemaColAt(pSchema, i); // ignore loaded columns if (pTable->lastCols[i].bytes != 0) { @@ -733,7 +735,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); pLastCol->ts = dataRowKey(row); - pTable->maxColumnNum += 1; + pTable->restoreColumnNum += 1; tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); break; @@ -766,7 +768,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { for (int i = 1; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; - pTable->maxColumnNum = 0; + pTable->restoreColumnNum = 0; } } @@ -841,5 +843,165 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { } tsdbDestroyReadH(&readh); + if (CACHE_LAST_ROW(pCfg)) { + atomic_store_8(&pRepo->hasCachedLastRow, 1); + } + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + atomic_store_8(&pRepo->hasCachedLastColumn, 1); + } + return 0; } + +int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { + bool cacheLastRow = false, cacheLastCol = false; + SFSIter fsiter; + SReadH readh; + SDFileSet *pSet; + STsdbMeta *pMeta = pRepo->tsdbMeta; + //STsdbCfg * pCfg = REPO_CFG(pRepo); + SBlock * pBlock; + int tableNum = 0; + int maxTableIdx = 0; + int cacheLastRowTableNum = 0; + int cacheLastColTableNum = 0; + + bool need_free_last_row = CACHE_LAST_ROW(oldCfg) && !CACHE_LAST_ROW(&(pRepo->config)); + bool need_free_last_col = CACHE_LAST_NULL_COLUMN(oldCfg) && !CACHE_LAST_NULL_COLUMN(&(pRepo->config)); + + if (CACHE_LAST_ROW(&(pRepo->config)) || CACHE_LAST_NULL_COLUMN(&(pRepo->config))) { + tsdbInfo("tsdbCacheLastData cache last data since cacheLast option changed"); + cacheLastRow = !CACHE_LAST_ROW(oldCfg) && CACHE_LAST_ROW(&(pRepo->config)); + cacheLastCol = !CACHE_LAST_NULL_COLUMN(oldCfg) && CACHE_LAST_NULL_COLUMN(&(pRepo->config)); + } + + // calc max table idx and table num + for (int i = 1; i < pMeta->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + tableNum += 1; + maxTableIdx = i; + if (cacheLastCol) { + pTable->restoreColumnNum = 0; + } + } + + // if close last option,need to free data + if (need_free_last_row || need_free_last_col) { + if (need_free_last_row) { + atomic_store_8(&pRepo->hasCachedLastRow, 0); + } + if (need_free_last_col) { + atomic_store_8(&pRepo->hasCachedLastColumn, 0); + } + tsdbInfo("free cache last data since cacheLast option changed"); + for (int i = 1; i < maxTableIdx; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + if (need_free_last_row) { + taosTZfree(pTable->lastRow); + pTable->lastRow = NULL; + pTable->lastKey = TSKEY_INITIAL_VAL; + } + if (need_free_last_col) { + tsdbFreeLastColumns(pTable); + } + } + } + + if (!cacheLastRow && !cacheLastCol) { + return 0; + } + + cacheLastRowTableNum = cacheLastRow ? tableNum : 0; + cacheLastColTableNum = cacheLastCol ? tableNum : 0; + + if (tsdbInitReadH(&readh, pRepo) < 0) { + return -1; + } + + tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD); + + while ((pSet = tsdbFSIterNext(&fsiter)) != NULL && (cacheLastRowTableNum > 0 || cacheLastColTableNum > 0)) { + if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + if (tsdbLoadBlockIdx(&readh) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + for (int i = 1; i <= maxTableIdx; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + + //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data); + + if (tsdbSetReadTable(&readh, pTable) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + SBlockIdx *pIdx = readh.pBlkIdx; + + if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) { + pTable->lastKey = pIdx->maxKey; + + if (tsdbLoadBlockInfo(&readh, NULL) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1; + + if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + // Get the data in row + ASSERT(pTable->lastRow == NULL); + STSchema *pSchema = tsdbGetTableSchema(pTable); + pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); + if (pTable->lastRow == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyReadH(&readh); + return -1; + } + + tdInitDataRow(pTable->lastRow, pSchema); + for (int icol = 0; icol < schemaNCols(pSchema); icol++) { + STColumn *pCol = schemaColAt(pSchema, icol); + SDataCol *pDataCol = readh.pDCols[0]->cols + icol; + tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, + pCol->offset); + } + cacheLastRowTableNum -= 1; + } + + // restore NULL columns + if (pIdx && cacheLastColTableNum > 0 && !pTable->hasRestoreLastColumn) { + if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) { + tsdbDestroyReadH(&readh); + return -1; + } + if (pTable->hasRestoreLastColumn) { + cacheLastColTableNum -= 1; + } + } + } + } + + tsdbDestroyReadH(&readh); + + if (cacheLastRow) { + atomic_store_8(&pRepo->hasCachedLastRow, 1); + } + if (cacheLastCol) { + atomic_store_8(&pRepo->hasCachedLastColumn, 1); + } + + return 0; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 5717d52eea..324a7c79c5 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -44,7 +44,6 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable); static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable); static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid); static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema); -static void tsdbFreeLastColumns(STable* pTable); // ------------------ OUTER FUNCTIONS ------------------ int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) { @@ -590,12 +589,12 @@ void tsdbUnRefTable(STable *pTable) { } } -static void tsdbFreeLastColumns(STable* pTable) { +void tsdbFreeLastColumns(STable* pTable) { if (pTable->lastCols == NULL) { return; } - for (int i = 0; i < pTable->lastColNum; ++i) { + for (int i = 0; i < pTable->maxColNum; ++i) { if (pTable->lastCols[i].bytes == 0) { continue; } @@ -605,14 +604,16 @@ static void tsdbFreeLastColumns(STable* pTable) { } tfree(pTable->lastCols); pTable->lastCols = NULL; - pTable->lastColNum = 0; + pTable->maxColNum = 0; + pTable->lastColSVersion = -1; + pTable->restoreColumnNum = 0; } int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) { if (pTable->lastCols == NULL) { return -1; } - for (int16_t i = 0; i < pTable->lastColNum; ++i) { + for (int16_t i = 0; i < pTable->maxColNum; ++i) { if (pTable->lastCols[i].colId == colId) { return i; } @@ -640,8 +641,8 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) { } pTable->lastColSVersion = schemaVersion(pSchema); - pTable->lastColNum = numOfColumn; - pTable->maxColumnNum = 0; + pTable->maxColNum = numOfColumn; + pTable->restoreColumnNum = 0; return 0; } @@ -682,11 +683,11 @@ int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) { } SDataCol *oldLastCols = pTable->lastCols; - int16_t oldLastColNum = pTable->lastColNum; + int16_t oldLastColNum = pTable->maxColNum; pTable->lastColSVersion = schemaVersion(pNewSchema); pTable->lastCols = lastCols; - pTable->lastColNum = numOfCols; + pTable->maxColNum = numOfCols; if (oldLastCols == NULL) { TSDB_WUNLOCK_TABLE(pTable); @@ -797,8 +798,8 @@ static STable *tsdbNewTable() { pTable->lastKey = TSKEY_INITIAL_VAL; pTable->lastCols = NULL; - pTable->maxColumnNum = 0; - pTable->lastColNum = 0; + pTable->restoreColumnNum = 0; + pTable->maxColNum = 0; pTable->lastColSVersion = -1; return pTable; } From da4b78f1bad71b606461c46066a759249ac50d28 Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 26 May 2021 18:27:04 +0800 Subject: [PATCH 28/30] [TD-4034]when config changed,apply a commit msg then config will be applied --- src/tsdb/src/tsdbCommit.c | 3 +++ src/tsdb/src/tsdbMain.c | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 24f1c11628..c43b35492d 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -90,6 +90,9 @@ static int tsdbApplyRtn(STsdbRepo *pRepo); static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); void *tsdbCommitData(STsdbRepo *pRepo) { + if (pRepo->imem == NULL) { + return NULL; + } tsdbStartCommit(pRepo); // Commit to update meta file diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index a8bbd0d69e..d4bc712275 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -268,6 +268,12 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { repo->config_changed = true; pthread_mutex_unlock(&repo->save_mutex); + + // schedule a commit msg then the new config will be applyed immediatly + if (tsdbLockRepo(repo) < 0) return -1; + tsdbScheduleCommit(repo); + if (tsdbUnlockRepo(repo) < 0) return -1; + return 0; #if 0 STsdbRepo *pRepo = (STsdbRepo *)repo; From 6213438f2e9056d6e234e711b02c7a724f9451dd Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 26 May 2021 18:27:22 +0800 Subject: [PATCH 29/30] [TD-4034]when config changed,apply a commit msg then config will be applied --- src/tsdb/src/tsdbMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index d4bc712275..cc2fca420c 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -269,7 +269,7 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { pthread_mutex_unlock(&repo->save_mutex); - // schedule a commit msg then the new config will be applyed immediatly + // schedule a commit msg then the new config will be applied immediatly if (tsdbLockRepo(repo) < 0) return -1; tsdbScheduleCommit(repo); if (tsdbUnlockRepo(repo) < 0) return -1; From d66b6e2d850431eb5b98a579952f964b0309e0fe Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 26 May 2021 19:35:47 +0800 Subject: [PATCH 30/30] [TD-4034]refactor code --- src/common/src/ttypes.c | 61 -------------------- src/inc/tsdb.h | 2 +- src/inc/ttype.h | 3 - src/tsdb/inc/tsdbMeta.h | 3 - src/tsdb/src/tsdbMain.c | 109 +++++++++++++++--------------------- src/tsdb/src/tsdbMemTable.c | 6 +- 6 files changed, 49 insertions(+), 135 deletions(-) diff --git a/src/common/src/ttypes.c b/src/common/src/ttypes.c index 6587a27760..6fa27a029b 100644 --- a/src/common/src/ttypes.c +++ b/src/common/src/ttypes.c @@ -417,18 +417,6 @@ void setVardataNull(char* val, int32_t type) { } } -bool isVardataNull(char* val, int32_t type) { - if (type == TSDB_DATA_TYPE_BINARY) { - return *(uint8_t*) varDataVal(val) == TSDB_DATA_BINARY_NULL; - } else if (type == TSDB_DATA_TYPE_NCHAR) { - return *(uint32_t*) varDataVal(val) == TSDB_DATA_NCHAR_NULL; - } else { - assert(0); - } - - return false; -} - void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); } void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) { @@ -504,55 +492,6 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) { } } -bool isNullN(char *val, int32_t type) { - switch (type) { - case TSDB_DATA_TYPE_BOOL: - return *(uint8_t *)(val) == TSDB_DATA_BOOL_NULL; - break; - case TSDB_DATA_TYPE_TINYINT: - return *(uint8_t *)(val) == TSDB_DATA_TINYINT_NULL; - break; - case TSDB_DATA_TYPE_SMALLINT: - return *(uint16_t *)(val) == TSDB_DATA_SMALLINT_NULL; - break; - case TSDB_DATA_TYPE_INT: - return *(uint32_t *)(val) == TSDB_DATA_INT_NULL; - break; - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_TIMESTAMP: - return *(uint64_t *)(val) == TSDB_DATA_BIGINT_NULL; - break; - case TSDB_DATA_TYPE_UTINYINT: - return *(uint8_t *)(val) == TSDB_DATA_UTINYINT_NULL; - break; - case TSDB_DATA_TYPE_USMALLINT: - return *(uint16_t *)(val) == TSDB_DATA_USMALLINT_NULL; - break; - case TSDB_DATA_TYPE_UINT: - return *(uint32_t *)(val) == TSDB_DATA_UINT_NULL; - break; - case TSDB_DATA_TYPE_UBIGINT: - return *(uint64_t *)(val) == TSDB_DATA_UBIGINT_NULL; - break; - case TSDB_DATA_TYPE_FLOAT: - return *(uint32_t *)(val) == TSDB_DATA_FLOAT_NULL; - break; - case TSDB_DATA_TYPE_DOUBLE: - return *(uint64_t *)(val) == TSDB_DATA_DOUBLE_NULL; - break; - case TSDB_DATA_TYPE_NCHAR: - case TSDB_DATA_TYPE_BINARY: - return isVardataNull(val, type); - break; - default: { - return *(uint32_t *)(val) == TSDB_DATA_INT_NULL; - break; - } - } - - return false; -} - static uint8_t nullBool = TSDB_DATA_BOOL_NULL; static uint8_t nullTinyInt = TSDB_DATA_TINYINT_NULL; static uint16_t nullSmallInt = TSDB_DATA_SMALLINT_NULL; diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 7c28d3e485..5f2d866fdb 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -69,7 +69,7 @@ typedef struct { int8_t precision; int8_t compression; int8_t update; - int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column + int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2 } STsdbCfg; #define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0) diff --git a/src/inc/ttype.h b/src/inc/ttype.h index 43dbeb7640..9949f31c59 100644 --- a/src/inc/ttype.h +++ b/src/inc/ttype.h @@ -178,9 +178,6 @@ void setNull(char *val, int32_t type, int32_t bytes); void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems); void *getNullValue(int32_t type); -bool isVardataNull(char* val, int32_t type); -bool isNullN(char *val, int32_t type); - void assignVal(char *val, const char *src, int32_t len, int32_t type); void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf); diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index 14d5a41768..45bbd5a7c6 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -45,9 +45,6 @@ typedef struct STable { T_REF_DECLARE() } STable; -#define TSDB_LATEST_COLUMN_ARRAY_SIZE 20 -#define TSDB_LATEST_COLUMN_ARRAY_ADD_SIZE 5 - typedef struct { pthread_rwlock_t rwLock; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index cc2fca420c..afbedd5b2f 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -27,6 +27,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo); static void tsdbStartStream(STsdbRepo *pRepo); static void tsdbStopStream(STsdbRepo *pRepo); static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh); +static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx); // Function declaration int32_t tsdbCreateRepo(int repoid) { @@ -270,9 +271,7 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { pthread_mutex_unlock(&repo->save_mutex); // schedule a commit msg then the new config will be applied immediatly - if (tsdbLockRepo(repo) < 0) return -1; - tsdbScheduleCommit(repo); - if (tsdbUnlockRepo(repo) < 0) return -1; + tsdbAsyncCommit(repo); return 0; #if 0 @@ -645,6 +644,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea numColumns = schemaNCols(pSchema); if (numColumns <= pTable->restoreColumnNum) { + pTable->hasRestoreLastColumn = true; return 0; } if (pTable->lastColSVersion != schemaVersion(pSchema)) { @@ -719,7 +719,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); //SDataCol *pDataCol = readh.pDCols[0]->cols + j; void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); - if (isNullN(value, pCol->type)) { + if (isNull(value, pCol->type)) { continue; } @@ -753,16 +753,51 @@ out: taosTZfree(row); tfree(pBlockStatis); + if (err == 0 && numColumns <= pTable->restoreColumnNum) { + pTable->hasRestoreLastColumn = true; + } + return err; } +static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) { + ASSERT(pTable->lastRow == NULL); + if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { + return -1; + } + + SBlock* pBlock = pReadh->pBlkInfo->blocks + pIdx->numOfBlocks - 1; + + if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) { + return -1; + } + + // Get the data in row + + STSchema *pSchema = tsdbGetTableSchema(pTable); + pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); + if (pTable->lastRow == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + tdInitDataRow(pTable->lastRow, pSchema); + for (int icol = 0; icol < schemaNCols(pSchema); icol++) { + STColumn *pCol = schemaColAt(pSchema, icol); + SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol; + tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, + pCol->offset); + } + + return 0; +} + int tsdbRestoreInfo(STsdbRepo *pRepo) { SFSIter fsiter; SReadH readh; SDFileSet *pSet; STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbCfg * pCfg = REPO_CFG(pRepo); - SBlock * pBlock; if (tsdbInitReadH(&readh, pRepo) < 0) { return -1; @@ -805,41 +840,14 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { if (pIdx && lastKey < pIdx->maxKey) { pTable->lastKey = pIdx->maxKey; - if (CACHE_LAST_ROW(pCfg)) { - if (tsdbLoadBlockInfo(&readh, NULL) < 0) { - tsdbDestroyReadH(&readh); - return -1; - } - - pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1; - - if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) { - tsdbDestroyReadH(&readh); - return -1; - } - - // Get the data in row - ASSERT(pTable->lastRow == NULL); - STSchema *pSchema = tsdbGetTableSchema(pTable); - pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); - if (pTable->lastRow == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyReadH(&readh); - return -1; - } - - tdInitDataRow(pTable->lastRow, pSchema); - for (int icol = 0; icol < schemaNCols(pSchema); icol++) { - STColumn *pCol = schemaColAt(pSchema, icol); - SDataCol *pDataCol = readh.pDCols[0]->cols + icol; - tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, - pCol->offset); - } + if (CACHE_LAST_ROW(pCfg) && tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) { + tsdbDestroyReadH(&readh); + return -1; } } // restore NULL columns - if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg)) { + if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg) && !pTable->hasRestoreLastColumn) { if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) { tsdbDestroyReadH(&readh); return -1; @@ -865,8 +873,6 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { SReadH readh; SDFileSet *pSet; STsdbMeta *pMeta = pRepo->tsdbMeta; - //STsdbCfg * pCfg = REPO_CFG(pRepo); - SBlock * pBlock; int tableNum = 0; int maxTableIdx = 0; int cacheLastRowTableNum = 0; @@ -955,35 +961,10 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) { pTable->lastKey = pIdx->maxKey; - if (tsdbLoadBlockInfo(&readh, NULL) < 0) { + if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) { tsdbDestroyReadH(&readh); return -1; } - - pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1; - - if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) { - tsdbDestroyReadH(&readh); - return -1; - } - - // Get the data in row - ASSERT(pTable->lastRow == NULL); - STSchema *pSchema = tsdbGetTableSchema(pTable); - pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); - if (pTable->lastRow == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyReadH(&readh); - return -1; - } - - tdInitDataRow(pTable->lastRow, pSchema); - for (int icol = 0; icol < schemaNCols(pSchema); icol++) { - STColumn *pCol = schemaColAt(pSchema, icol); - SDataCol *pDataCol = readh.pDCols[0]->cols + icol; - tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, - pCol->offset); - } cacheLastRowTableNum -= 1; } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index ff2a870f3f..79dbb8be5d 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -274,7 +274,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { int tsdbAsyncCommit(STsdbRepo *pRepo) { tsem_wait(&(pRepo->readyToCommit)); - ASSERT(pRepo->imem == NULL); + //ASSERT(pRepo->imem == NULL); if (pRepo->mem == NULL) { tsem_post(&(pRepo->readyToCommit)); return 0; @@ -965,7 +965,7 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { } static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) { - tsdbInfo("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row)); + tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row)); STSchema* pSchema = tsdbGetTableLatestSchema(pTable); if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) { @@ -988,7 +988,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r } void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); - if (isNullN(value, pTCol->type)) { + if (isNull(value, pTCol->type)) { continue; }