[TD-1568]fix conflict from develop
This commit is contained in:
commit
66d5bff0eb
|
@ -24,7 +24,7 @@ TDengine的安装非常简单,从下载到安装成功仅仅只要几秒钟。
|
|||
|
||||
## <a class="anchor" id="start"></a>轻松启动
|
||||
|
||||
安装成功后,用户可使用`systemctl`命令来启动TDengine的服务进程。
|
||||
安装成功后,用户可使用 `systemctl` 命令来启动 TDengine 的服务进程。
|
||||
|
||||
```bash
|
||||
$ systemctl start taosd
|
||||
|
@ -35,21 +35,22 @@ $ systemctl start taosd
|
|||
$ systemctl status taosd
|
||||
```
|
||||
|
||||
如果TDengine服务正常工作,那么您可以通过TDengine的命令行程序`taos`来访问并体验TDengine。
|
||||
如果 TDengine 服务正常工作,那么您可以通过 TDengine 的命令行程序 `taos` 来访问并体验 TDengine。
|
||||
|
||||
**注意:**
|
||||
|
||||
- systemctl命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 sudo
|
||||
- 为更好的获得产品反馈,改善产品,TDengine会采集基本的使用信息,但您可以修改系统配置文件taos.cfg里的配置参数telemetryReporting, 将其设为0,就可将其关闭。
|
||||
- TDengine采用FQDN(一般就是hostname)作为节点的ID,为保证正常运行,需要给运行taosd的服务器配置好hostname,在客户端应用运行的机器配置好DNS服务或hosts文件,保证FQDN能够解析。
|
||||
- systemctl 命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 sudo 。
|
||||
- 为更好的获得产品反馈,改善产品,TDengine 会采集基本的使用信息,但您可以修改系统配置文件 taos.cfg 里的配置参数 telemetryReporting, 将其设为 0,就可将其关闭。
|
||||
- TDengine 采用 FQDN (一般就是 hostname )作为节点的 ID,为保证正常运行,需要给运行 taosd 的服务器配置好 hostname,在客户端应用运行的机器配置好 DNS 服务或 hosts 文件,保证 FQDN 能够解析。
|
||||
- `systemctl stop taosd` 指令在执行后并不会马上停止 TDengine 服务,而是会等待系统中必要的落盘工作正常完成。在数据量很大的情况下,这可能会消耗较长时间。
|
||||
|
||||
* TDengine 支持在使用[`systemd`](https://en.wikipedia.org/wiki/Systemd)做进程服务管理的linux系统上安装,用`which systemctl`命令来检测系统中是否存在`systemd`包:
|
||||
* TDengine 支持在使用 [`systemd`](https://en.wikipedia.org/wiki/Systemd) 做进程服务管理的 linux 系统上安装,用 `which systemctl` 命令来检测系统中是否存在 `systemd` 包:
|
||||
|
||||
```bash
|
||||
$ which systemctl
|
||||
```
|
||||
|
||||
如果系统中不支持systemd,也可以用手动运行 /usr/local/taos/bin/taosd 方式启动 TDengine 服务。
|
||||
如果系统中不支持 systemd,也可以用手动运行 /usr/local/taos/bin/taosd 方式启动 TDengine 服务。
|
||||
|
||||
|
||||
## <a class="anchor" id="console"></a>TDengine命令行程序
|
||||
|
|
|
@ -129,7 +129,7 @@ taosd -C
|
|||
- blocks:每个VNODE(TSDB)中有多少cache大小的内存块。因此一个VNODE的用的内存大小粗略为(cache * blocks)。单位为块,默认值:4。(可通过 alter database 修改)
|
||||
- replica:副本个数,取值范围:1-3。单位为个,默认值:1。(可通过 alter database 修改)
|
||||
- precision:时间戳精度标识,ms表示毫秒,us表示微秒。默认值:ms。
|
||||
- cacheLast:是否在内存中缓存子表 last_row,0:关闭;1:开启。默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数)
|
||||
- cacheLast:是否在内存中缓存子表的最近数据,0:关闭;1:缓存子表最近一行数据;2:缓存子表每一列的最近的非NULL值,3:同时打开缓存最近行和列功能,默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数)
|
||||
|
||||
对于一个应用场景,可能有多种数据特征的数据并存,最佳的设计是将具有相同数据特征的表放在一个库里,这样一个应用有多个库,而每个库可以配置不同的存储参数,从而保证系统有最优的性能。TDengine允许应用在创建库时指定上述存储参数,如果指定,该参数就将覆盖对应的系统配置参数。举例,有下述SQL:
|
||||
|
||||
|
|
|
@ -126,7 +126,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM
|
|||
```mysql
|
||||
ALTER DATABASE db_name CACHELAST 0;
|
||||
```
|
||||
CACHELAST 参数控制是否在内存中缓存数据子表的 last_row。缺省值为 0,取值范围 [0, 1]。其中 0 表示不启用、1 表示启用。(从 2.0.11 版本开始支持,修改后需要重启服务器生效。)
|
||||
CACHELAST 参数控制是否在内存中缓存数据子表的 last_row。缺省值为 0,取值范围 [0, 1]。其中 0 表示不启用、1 表示启用。(从 2.0.11.0 版本开始支持。从 2.1.1.0 版本开始,修改此参数后无需重启服务器即可生效。)
|
||||
|
||||
**Tips**: 以上所有参数修改后都可以用show databases来确认是否修改成功。
|
||||
|
||||
|
@ -399,7 +399,12 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM
|
|||
INSERT INTO tb1_name (tb1_field1_name, ...) [USING stb1_name TAGS (tag_value1, ...)] VALUES (field1_value1, ...) (field1_value2, ...) ...
|
||||
tb2_name (tb2_field1_name, ...) [USING stb2_name TAGS (tag_value2, ...)] VALUES (field1_value1, ...) (field1_value2, ...) ...;
|
||||
```
|
||||
以自动建表的方式,同时向表tb1_name和tb2_name中按列分别插入多条记录。
|
||||
以自动建表的方式,同时向表tb1_name和tb2_name中按列分别插入多条记录。
|
||||
从 2.0.20.5 版本开始,子表的列名可以不跟在子表名称后面,而是可以放在 TAGS 和 VALUES 之间,例如像下面这样写:
|
||||
```mysql
|
||||
INSERT INTO tb1_name [USING stb1_name TAGS (tag_value1, ...)] (tb1_field1_name, ...) VALUES (field1_value1, ...) (field1_value2, ...) ...;
|
||||
```
|
||||
注意:虽然两种写法都可以,但并不能在一条 SQL 语句中混用,否则会报语法错误。
|
||||
|
||||
**历史记录写入**:可使用IMPORT或者INSERT命令,IMPORT的语法,功能与INSERT完全一样。
|
||||
|
||||
|
|
|
@ -234,6 +234,7 @@ typedef struct SDataCol {
|
|||
int len; // column data length
|
||||
VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column
|
||||
void * pData; // Actual data pointer
|
||||
TSKEY ts; // only used in last NULL column
|
||||
} SDataCol;
|
||||
|
||||
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
|
||||
|
|
|
@ -298,7 +298,7 @@ do { \
|
|||
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0
|
||||
|
||||
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
|
||||
#define TSDB_MAX_DB_CACHE_LAST_ROW 1
|
||||
#define TSDB_MAX_DB_CACHE_LAST_ROW 3
|
||||
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
|
||||
|
||||
#define TSDB_MIN_FSYNC_PERIOD 0
|
||||
|
|
|
@ -69,9 +69,13 @@ typedef struct {
|
|||
int8_t precision;
|
||||
int8_t compression;
|
||||
int8_t update;
|
||||
int8_t cacheLastRow;
|
||||
int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2
|
||||
} STsdbCfg;
|
||||
|
||||
#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0)
|
||||
#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0)
|
||||
#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0)
|
||||
|
||||
// --------- TSDB REPOSITORY USAGE STATISTICS
|
||||
typedef struct {
|
||||
int64_t totalStorage; // total bytes occupie
|
||||
|
|
|
@ -36,6 +36,12 @@ typedef struct STable {
|
|||
char* sql;
|
||||
void* cqhandle;
|
||||
SRWLatch latch; // TODO: implementa latch functions
|
||||
|
||||
SDataCol *lastCols;
|
||||
int16_t maxColNum;
|
||||
int16_t restoreColumnNum;
|
||||
bool hasRestoreLastColumn;
|
||||
int lastColSVersion;
|
||||
T_REF_DECLARE()
|
||||
} STable;
|
||||
|
||||
|
@ -78,6 +84,11 @@ void tsdbUnRefTable(STable* pTable);
|
|||
void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct);
|
||||
int tsdbRestoreTable(STsdbRepo* pRepo, void* cont, int contLen);
|
||||
void tsdbOrgMeta(STsdbRepo* pRepo);
|
||||
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema);
|
||||
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId);
|
||||
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema);
|
||||
STSchema* tsdbGetTableLatestSchema(STable *pTable);
|
||||
void tsdbFreeLastColumns(STable* pTable);
|
||||
|
||||
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
||||
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
|
||||
|
|
|
@ -77,6 +77,9 @@ struct STsdbRepo {
|
|||
STsdbCfg save_config; // save apply config
|
||||
bool config_changed; // config changed flag
|
||||
pthread_mutex_t save_mutex; // protect save config
|
||||
|
||||
uint8_t hasCachedLastRow;
|
||||
uint8_t hasCachedLastColumn;
|
||||
|
||||
STsdbAppH appH;
|
||||
STsdbStat stat;
|
||||
|
@ -102,6 +105,7 @@ int tsdbUnlockRepo(STsdbRepo* pRepo);
|
|||
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
|
||||
int tsdbCheckCommit(STsdbRepo* pRepo);
|
||||
int tsdbRestoreInfo(STsdbRepo* pRepo);
|
||||
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg);
|
||||
void tsdbGetRootDir(int repoid, char dirName[]);
|
||||
void tsdbGetDataDir(int repoid, char dirName[]);
|
||||
|
||||
|
|
|
@ -88,6 +88,9 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
|
|||
static int tsdbApplyRtn(STsdbRepo *pRepo);
|
||||
|
||||
void *tsdbCommitData(STsdbRepo *pRepo) {
|
||||
if (pRepo->imem == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
tsdbStartCommit(pRepo);
|
||||
|
||||
// Commit to update meta file
|
||||
|
|
|
@ -115,11 +115,15 @@ int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) {
|
|||
}
|
||||
|
||||
static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
|
||||
pthread_mutex_lock(&pRepo->save_mutex);
|
||||
|
||||
pRepo->config_changed = false;
|
||||
STsdbCfg * pSaveCfg = &pRepo->save_config;
|
||||
|
||||
STsdbCfg oldCfg;
|
||||
int32_t oldTotalBlocks = pRepo->config.totalBlocks;
|
||||
|
||||
memcpy(&oldCfg, &(pRepo->config), sizeof(STsdbCfg));
|
||||
|
||||
pRepo->config.compression = pRepo->save_config.compression;
|
||||
pRepo->config.keep = pRepo->save_config.keep;
|
||||
pRepo->config.keep1 = pRepo->save_config.keep1;
|
||||
|
@ -127,10 +131,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
|
|||
pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow;
|
||||
pRepo->config.totalBlocks = pRepo->save_config.totalBlocks;
|
||||
|
||||
tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)",
|
||||
pthread_mutex_unlock(&pRepo->save_mutex);
|
||||
|
||||
tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d->%d),totalBlocks(%d->%d)",
|
||||
REPO_ID(pRepo),
|
||||
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
|
||||
pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks);
|
||||
pSaveCfg->totalBlocks, oldCfg.cacheLastRow, pSaveCfg->cacheLastRow, oldTotalBlocks, pSaveCfg->totalBlocks);
|
||||
|
||||
int err = tsdbExpendPool(pRepo, oldTotalBlocks);
|
||||
if (!TAOS_SUCCEEDED(err)) {
|
||||
|
@ -138,6 +144,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
|
|||
REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err));
|
||||
}
|
||||
|
||||
if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) {
|
||||
if (tsdbLockRepo(pRepo) < 0) return;
|
||||
tsdbCacheLastData(pRepo, &oldCfg);
|
||||
tsdbUnlockRepo(pRepo);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static void *tsdbLoopCommit(void *arg) {
|
||||
|
@ -168,10 +180,9 @@ static void *tsdbLoopCommit(void *arg) {
|
|||
req = ((SReq *)pNode->data)->req;
|
||||
pRepo = ((SReq *)pNode->data)->pRepo;
|
||||
|
||||
// check if need to apply new config
|
||||
if (pRepo->config_changed) {
|
||||
pthread_mutex_lock(&pRepo->save_mutex);
|
||||
tsdbApplyRepoConfig(pRepo);
|
||||
pthread_mutex_unlock(&pRepo->save_mutex);
|
||||
}
|
||||
|
||||
if (req == COMMIT_REQ) {
|
||||
|
|
|
@ -26,6 +26,8 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
|
|||
static void tsdbFreeRepo(STsdbRepo *pRepo);
|
||||
static void tsdbStartStream(STsdbRepo *pRepo);
|
||||
static void tsdbStopStream(STsdbRepo *pRepo);
|
||||
static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh);
|
||||
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx);
|
||||
|
||||
// Function declaration
|
||||
int32_t tsdbCreateRepo(int repoid) {
|
||||
|
@ -267,6 +269,10 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
|
|||
repo->config_changed = true;
|
||||
|
||||
pthread_mutex_unlock(&repo->save_mutex);
|
||||
|
||||
// schedule a commit msg then the new config will be applied immediatly
|
||||
tsdbAsyncCommit(repo);
|
||||
|
||||
return 0;
|
||||
#if 0
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
|
@ -511,8 +517,10 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
|
|||
if (pCfg->update != 0) pCfg->update = 1;
|
||||
|
||||
// update cacheLastRow
|
||||
if (pCfg->cacheLastRow != 0) pCfg->cacheLastRow = 1;
|
||||
|
||||
if (pCfg->cacheLastRow != 0) {
|
||||
if (pCfg->cacheLastRow > 3)
|
||||
pCfg->cacheLastRow = 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -545,6 +553,8 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
|
|||
return NULL;
|
||||
}
|
||||
pRepo->config_changed = false;
|
||||
atomic_store_8(&pRepo->hasCachedLastRow, 0);
|
||||
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
|
||||
|
||||
code = tsem_init(&(pRepo->readyToCommit), 0, 1);
|
||||
if (code != 0) {
|
||||
|
@ -614,13 +624,180 @@ static void tsdbStopStream(STsdbRepo *pRepo) {
|
|||
}
|
||||
}
|
||||
|
||||
static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) {
|
||||
//tsdbInfo("tsdbRestoreLastColumns of table %s", pTable->name->data);
|
||||
|
||||
STSchema *pSchema = tsdbGetTableLatestSchema(pTable);
|
||||
if (pSchema == NULL) {
|
||||
tsdbError("tsdbGetTableLatestSchema of table %s fail", pTable->name->data);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SBlock* pBlock;
|
||||
int numColumns;
|
||||
int32_t blockIdx;
|
||||
SDataStatis* pBlockStatis = NULL;
|
||||
SDataRow row = NULL;
|
||||
// restore last column data with last schema
|
||||
|
||||
int err = 0;
|
||||
|
||||
numColumns = schemaNCols(pSchema);
|
||||
if (numColumns <= pTable->restoreColumnNum) {
|
||||
pTable->hasRestoreLastColumn = true;
|
||||
return 0;
|
||||
}
|
||||
if (pTable->lastColSVersion != schemaVersion(pSchema)) {
|
||||
if (tsdbInitColIdCacheWithSchema(pTable, pSchema) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
|
||||
if (row == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
err = -1;
|
||||
goto out;
|
||||
}
|
||||
tdInitDataRow(row, pSchema);
|
||||
|
||||
// first load block index info
|
||||
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
|
||||
err = -1;
|
||||
goto out;
|
||||
}
|
||||
|
||||
pBlockStatis = calloc(numColumns, sizeof(SDataStatis));
|
||||
if (pBlockStatis == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
err = -1;
|
||||
goto out;
|
||||
}
|
||||
memset(pBlockStatis, 0, numColumns * sizeof(SDataStatis));
|
||||
for(int32_t i = 0; i < numColumns; ++i) {
|
||||
STColumn *pCol = schemaColAt(pSchema, i);
|
||||
pBlockStatis[i].colId = pCol->colId;
|
||||
}
|
||||
|
||||
// load block from backward
|
||||
SBlockIdx *pIdx = pReadh->pBlkIdx;
|
||||
blockIdx = (int32_t)(pIdx->numOfBlocks - 1);
|
||||
|
||||
while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) {
|
||||
bool loadStatisData = false;
|
||||
pBlock = pReadh->pBlkInfo->blocks + blockIdx;
|
||||
blockIdx -= 1;
|
||||
|
||||
// load block data
|
||||
if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) {
|
||||
err = -1;
|
||||
goto out;
|
||||
}
|
||||
|
||||
// file block with sub-blocks has no statistics data
|
||||
if (pBlock->numOfSubBlocks <= 1) {
|
||||
tsdbLoadBlockStatis(pReadh, pBlock);
|
||||
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns);
|
||||
loadStatisData = true;
|
||||
}
|
||||
|
||||
for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
|
||||
STColumn *pCol = schemaColAt(pSchema, i);
|
||||
// ignore loaded columns
|
||||
if (pTable->lastCols[i].bytes != 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// ignore block which has no not-null colId column
|
||||
if (loadStatisData && pBlockStatis[i].numOfNull == pBlock->numOfRows) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// OK,let's load row from backward to get not-null column
|
||||
for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) {
|
||||
SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
|
||||
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
|
||||
//SDataCol *pDataCol = readh.pDCols[0]->cols + j;
|
||||
void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset);
|
||||
if (isNull(value, pCol->type)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId);
|
||||
if (idx == -1) {
|
||||
tsdbError("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), pTable->name->data, pCol->colId);
|
||||
continue;
|
||||
}
|
||||
// save not-null column
|
||||
SDataCol *pLastCol = &(pTable->lastCols[idx]);
|
||||
pLastCol->pData = malloc(pCol->bytes);
|
||||
pLastCol->bytes = pCol->bytes;
|
||||
pLastCol->colId = pCol->colId;
|
||||
memcpy(pLastCol->pData, value, pCol->bytes);
|
||||
|
||||
// save row ts(in column 0)
|
||||
pDataCol = pReadh->pDCols[0]->cols + 0;
|
||||
pCol = schemaColAt(pSchema, 0);
|
||||
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
|
||||
pLastCol->ts = dataRowKey(row);
|
||||
|
||||
pTable->restoreColumnNum += 1;
|
||||
|
||||
tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
out:
|
||||
taosTZfree(row);
|
||||
tfree(pBlockStatis);
|
||||
|
||||
if (err == 0 && numColumns <= pTable->restoreColumnNum) {
|
||||
pTable->hasRestoreLastColumn = true;
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) {
|
||||
ASSERT(pTable->lastRow == NULL);
|
||||
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SBlock* pBlock = pReadh->pBlkInfo->blocks + pIdx->numOfBlocks - 1;
|
||||
|
||||
if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Get the data in row
|
||||
|
||||
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
||||
pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
|
||||
if (pTable->lastRow == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
tdInitDataRow(pTable->lastRow, pSchema);
|
||||
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
|
||||
STColumn *pCol = schemaColAt(pSchema, icol);
|
||||
SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol;
|
||||
tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
|
||||
pCol->offset);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||
SFSIter fsiter;
|
||||
SReadH readh;
|
||||
SDFileSet *pSet;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||
SBlock * pBlock;
|
||||
|
||||
if (tsdbInitReadH(&readh, pRepo) < 0) {
|
||||
return -1;
|
||||
|
@ -628,6 +805,14 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
|||
|
||||
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
|
||||
|
||||
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
|
||||
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable == NULL) continue;
|
||||
pTable->restoreColumnNum = 0;
|
||||
}
|
||||
}
|
||||
|
||||
while ((pSet = tsdbFSIterNext(&fsiter)) != NULL) {
|
||||
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
|
@ -643,6 +828,8 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
|||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable == NULL) continue;
|
||||
|
||||
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
|
||||
|
||||
if (tsdbSetReadTable(&readh, pTable) < 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
|
@ -653,42 +840,155 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
|||
if (pIdx && lastKey < pIdx->maxKey) {
|
||||
pTable->lastKey = pIdx->maxKey;
|
||||
|
||||
if (pCfg->cacheLastRow) {
|
||||
if (tsdbLoadBlockInfo(&readh, NULL) < 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1;
|
||||
|
||||
if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Get the data in row
|
||||
ASSERT(pTable->lastRow == NULL);
|
||||
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
||||
pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
|
||||
if (pTable->lastRow == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tdInitDataRow(pTable->lastRow, pSchema);
|
||||
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
|
||||
STColumn *pCol = schemaColAt(pSchema, icol);
|
||||
SDataCol *pDataCol = readh.pDCols[0]->cols + icol;
|
||||
tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
|
||||
pCol->offset);
|
||||
}
|
||||
if (CACHE_LAST_ROW(pCfg) && tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// restore NULL columns
|
||||
if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg) && !pTable->hasRestoreLastColumn) {
|
||||
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tsdbDestroyReadH(&readh);
|
||||
if (CACHE_LAST_ROW(pCfg)) {
|
||||
atomic_store_8(&pRepo->hasCachedLastRow, 1);
|
||||
}
|
||||
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
|
||||
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
|
||||
bool cacheLastRow = false, cacheLastCol = false;
|
||||
SFSIter fsiter;
|
||||
SReadH readh;
|
||||
SDFileSet *pSet;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
int tableNum = 0;
|
||||
int maxTableIdx = 0;
|
||||
int cacheLastRowTableNum = 0;
|
||||
int cacheLastColTableNum = 0;
|
||||
|
||||
bool need_free_last_row = CACHE_LAST_ROW(oldCfg) && !CACHE_LAST_ROW(&(pRepo->config));
|
||||
bool need_free_last_col = CACHE_LAST_NULL_COLUMN(oldCfg) && !CACHE_LAST_NULL_COLUMN(&(pRepo->config));
|
||||
|
||||
if (CACHE_LAST_ROW(&(pRepo->config)) || CACHE_LAST_NULL_COLUMN(&(pRepo->config))) {
|
||||
tsdbInfo("tsdbCacheLastData cache last data since cacheLast option changed");
|
||||
cacheLastRow = !CACHE_LAST_ROW(oldCfg) && CACHE_LAST_ROW(&(pRepo->config));
|
||||
cacheLastCol = !CACHE_LAST_NULL_COLUMN(oldCfg) && CACHE_LAST_NULL_COLUMN(&(pRepo->config));
|
||||
}
|
||||
|
||||
// calc max table idx and table num
|
||||
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable == NULL) continue;
|
||||
tableNum += 1;
|
||||
maxTableIdx = i;
|
||||
if (cacheLastCol) {
|
||||
pTable->restoreColumnNum = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// if close last option,need to free data
|
||||
if (need_free_last_row || need_free_last_col) {
|
||||
if (need_free_last_row) {
|
||||
atomic_store_8(&pRepo->hasCachedLastRow, 0);
|
||||
}
|
||||
if (need_free_last_col) {
|
||||
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
|
||||
}
|
||||
tsdbInfo("free cache last data since cacheLast option changed");
|
||||
for (int i = 1; i < maxTableIdx; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable == NULL) continue;
|
||||
if (need_free_last_row) {
|
||||
taosTZfree(pTable->lastRow);
|
||||
pTable->lastRow = NULL;
|
||||
pTable->lastKey = TSKEY_INITIAL_VAL;
|
||||
}
|
||||
if (need_free_last_col) {
|
||||
tsdbFreeLastColumns(pTable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!cacheLastRow && !cacheLastCol) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
cacheLastRowTableNum = cacheLastRow ? tableNum : 0;
|
||||
cacheLastColTableNum = cacheLastCol ? tableNum : 0;
|
||||
|
||||
if (tsdbInitReadH(&readh, pRepo) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
|
||||
|
||||
while ((pSet = tsdbFSIterNext(&fsiter)) != NULL && (cacheLastRowTableNum > 0 || cacheLastColTableNum > 0)) {
|
||||
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsdbLoadBlockIdx(&readh) < 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int i = 1; i <= maxTableIdx; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable == NULL) continue;
|
||||
|
||||
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
|
||||
|
||||
if (tsdbSetReadTable(&readh, pTable) < 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SBlockIdx *pIdx = readh.pBlkIdx;
|
||||
|
||||
if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) {
|
||||
pTable->lastKey = pIdx->maxKey;
|
||||
|
||||
if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
cacheLastRowTableNum -= 1;
|
||||
}
|
||||
|
||||
// restore NULL columns
|
||||
if (pIdx && cacheLastColTableNum > 0 && !pTable->hasRestoreLastColumn) {
|
||||
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
if (pTable->hasRestoreLastColumn) {
|
||||
cacheLastColTableNum -= 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tsdbDestroyReadH(&readh);
|
||||
|
||||
if (cacheLastRow) {
|
||||
atomic_store_8(&pRepo->hasCachedLastRow, 1);
|
||||
}
|
||||
if (cacheLastCol) {
|
||||
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -274,7 +274,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
|||
int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
||||
tsem_wait(&(pRepo->readyToCommit));
|
||||
|
||||
ASSERT(pRepo->imem == NULL);
|
||||
//ASSERT(pRepo->imem == NULL);
|
||||
if (pRepo->mem == NULL) {
|
||||
tsem_post(&(pRepo->readyToCommit));
|
||||
return 0;
|
||||
|
@ -964,6 +964,49 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
|
|||
}
|
||||
}
|
||||
|
||||
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
|
||||
tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row));
|
||||
|
||||
STSchema* pSchema = tsdbGetTableLatestSchema(pTable);
|
||||
if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
|
||||
if (pSchema == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
SDataCol *pLatestCols = pTable->lastCols;
|
||||
|
||||
for (int16_t j = 0; j < schemaNCols(pSchema); j++) {
|
||||
STColumn *pTCol = schemaColAt(pSchema, j);
|
||||
// ignore not exist colId
|
||||
int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pTCol->colId);
|
||||
if (idx == -1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
|
||||
if (isNull(value, pTCol->type)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SDataCol *pDataCol = &(pLatestCols[idx]);
|
||||
if (pDataCol->pData == NULL) {
|
||||
pDataCol->pData = malloc(pSchema->columns[j].bytes);
|
||||
pDataCol->bytes = pSchema->columns[j].bytes;
|
||||
} else if (pDataCol->bytes < pSchema->columns[j].bytes) {
|
||||
pDataCol->pData = realloc(pDataCol->pData, pSchema->columns[j].bytes);
|
||||
pDataCol->bytes = pSchema->columns[j].bytes;
|
||||
}
|
||||
|
||||
memcpy(pDataCol->pData, value, pDataCol->bytes);
|
||||
//tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData);
|
||||
pDataCol->ts = dataRowKey(row);
|
||||
}
|
||||
}
|
||||
|
||||
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
|
||||
STsdbCfg *pCfg = &pRepo->config;
|
||||
|
||||
|
@ -977,7 +1020,7 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow
|
|||
}
|
||||
|
||||
if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) {
|
||||
if (pCfg->cacheLastRow || pTable->lastRow != NULL) {
|
||||
if (CACHE_LAST_ROW(pCfg) || pTable->lastRow != NULL) {
|
||||
SDataRow nrow = pTable->lastRow;
|
||||
if (taosTSizeof(nrow) < dataRowLen(row)) {
|
||||
SDataRow orow = nrow;
|
||||
|
@ -1002,7 +1045,10 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow
|
|||
} else {
|
||||
pTable->lastKey = dataRowKey(row);
|
||||
}
|
||||
}
|
||||
|
||||
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
|
||||
updateTableLatestColumn(pRepo, pTable, row);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -589,6 +589,131 @@ void tsdbUnRefTable(STable *pTable) {
|
|||
}
|
||||
}
|
||||
|
||||
void tsdbFreeLastColumns(STable* pTable) {
|
||||
if (pTable->lastCols == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int i = 0; i < pTable->maxColNum; ++i) {
|
||||
if (pTable->lastCols[i].bytes == 0) {
|
||||
continue;
|
||||
}
|
||||
tfree(pTable->lastCols[i].pData);
|
||||
pTable->lastCols[i].bytes = 0;
|
||||
pTable->lastCols[i].pData = NULL;
|
||||
}
|
||||
tfree(pTable->lastCols);
|
||||
pTable->lastCols = NULL;
|
||||
pTable->maxColNum = 0;
|
||||
pTable->lastColSVersion = -1;
|
||||
pTable->restoreColumnNum = 0;
|
||||
}
|
||||
|
||||
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) {
|
||||
if (pTable->lastCols == NULL) {
|
||||
return -1;
|
||||
}
|
||||
for (int16_t i = 0; i < pTable->maxColNum; ++i) {
|
||||
if (pTable->lastCols[i].colId == colId) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) {
|
||||
ASSERT(pTable->lastCols == NULL);
|
||||
|
||||
int16_t numOfColumn = pSchema->numOfCols;
|
||||
|
||||
pTable->lastCols = (SDataCol*)malloc(numOfColumn * sizeof(SDataCol));
|
||||
if (pTable->lastCols == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int16_t i = 0; i < numOfColumn; ++i) {
|
||||
STColumn *pCol = schemaColAt(pSchema, i);
|
||||
SDataCol* pDataCol = &(pTable->lastCols[i]);
|
||||
pDataCol->bytes = 0;
|
||||
pDataCol->pData = NULL;
|
||||
pDataCol->colId = pCol->colId;
|
||||
}
|
||||
|
||||
pTable->lastColSVersion = schemaVersion(pSchema);
|
||||
pTable->maxColNum = numOfColumn;
|
||||
pTable->restoreColumnNum = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
STSchema* tsdbGetTableLatestSchema(STable *pTable) {
|
||||
return tsdbGetTableSchemaByVersion(pTable, -1);
|
||||
}
|
||||
|
||||
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) {
|
||||
if (pTable->lastColSVersion == schemaVersion(pNewSchema)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
tsdbInfo("tsdbUpdateLastColSchema:%s,%d->%d", pTable->name->data, pTable->lastColSVersion, schemaVersion(pNewSchema));
|
||||
|
||||
int16_t numOfCols = pNewSchema->numOfCols;
|
||||
SDataCol *lastCols = (SDataCol*)malloc(numOfCols * sizeof(SDataCol));
|
||||
if (lastCols == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
TSDB_WLOCK_TABLE(pTable);
|
||||
|
||||
for (int16_t i = 0; i < numOfCols; ++i) {
|
||||
STColumn *pCol = schemaColAt(pNewSchema, i);
|
||||
int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId);
|
||||
|
||||
SDataCol* pDataCol = &(lastCols[i]);
|
||||
if (idx != -1) {
|
||||
// move col data to new last column array
|
||||
SDataCol* pOldDataCol = &(pTable->lastCols[idx]);
|
||||
memcpy(pDataCol, pOldDataCol, sizeof(SDataCol));
|
||||
} else {
|
||||
// init new colid data
|
||||
pDataCol->colId = pCol->colId;
|
||||
pDataCol->bytes = 0;
|
||||
pDataCol->pData = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
SDataCol *oldLastCols = pTable->lastCols;
|
||||
int16_t oldLastColNum = pTable->maxColNum;
|
||||
|
||||
pTable->lastColSVersion = schemaVersion(pNewSchema);
|
||||
pTable->lastCols = lastCols;
|
||||
pTable->maxColNum = numOfCols;
|
||||
|
||||
if (oldLastCols == NULL) {
|
||||
TSDB_WUNLOCK_TABLE(pTable);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// free old schema last column datas
|
||||
for (int16_t i = 0; i < oldLastColNum; ++i) {
|
||||
SDataCol* pDataCol = &(oldLastCols[i]);
|
||||
if (pDataCol->bytes == 0) {
|
||||
continue;
|
||||
}
|
||||
int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pDataCol->colId);
|
||||
if (idx != -1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// free not exist column data
|
||||
tfree(pDataCol->pData);
|
||||
}
|
||||
TSDB_WUNLOCK_TABLE(pTable);
|
||||
tfree(oldLastCols);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema, bool insertAct) {
|
||||
ASSERT(TABLE_TYPE(pTable) != TSDB_STREAM_TABLE && TABLE_TYPE(pTable) != TSDB_SUPER_TABLE);
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
@ -672,6 +797,10 @@ static STable *tsdbNewTable() {
|
|||
|
||||
pTable->lastKey = TSKEY_INITIAL_VAL;
|
||||
|
||||
pTable->lastCols = NULL;
|
||||
pTable->restoreColumnNum = 0;
|
||||
pTable->maxColNum = 0;
|
||||
pTable->lastColSVersion = -1;
|
||||
return pTable;
|
||||
}
|
||||
|
||||
|
@ -785,8 +914,10 @@ static void tsdbFreeTable(STable *pTable) {
|
|||
kvRowFree(pTable->tagVal);
|
||||
|
||||
tSkipListDestroy(pTable->pIndex);
|
||||
taosTZfree(pTable->lastRow);
|
||||
taosTZfree(pTable->lastRow);
|
||||
tfree(pTable->sql);
|
||||
|
||||
tsdbFreeLastColumns(pTable);
|
||||
free(pTable);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue