Merge branch 'develop' into feature/m2d
This commit is contained in:
commit
d9cfdcd1c5
70
.drone.yml
70
.drone.yml
|
@ -7,41 +7,22 @@ platform:
|
|||
arch: amd64
|
||||
|
||||
steps:
|
||||
- name: smoke_test
|
||||
image: python:3.8
|
||||
- name: build
|
||||
image: gcc
|
||||
commands:
|
||||
- apt-get update
|
||||
- apt-get install -y cmake build-essential gcc
|
||||
- pip3 install psutil
|
||||
- pip3 install guppy3
|
||||
- pip3 install src/connector/python/
|
||||
- apt-get install -y cmake build-essential
|
||||
- mkdir debug
|
||||
- cd debug
|
||||
- cmake ..
|
||||
- make
|
||||
- cd ../tests
|
||||
- ./test-all.sh smoke
|
||||
trigger:
|
||||
event:
|
||||
- pull_request
|
||||
when:
|
||||
branch:
|
||||
- develop
|
||||
- master
|
||||
|
||||
|
||||
- name: crash_gen
|
||||
image: python:3.8
|
||||
commands:
|
||||
- pip3 install requests
|
||||
- pip3 install src/connector/python/
|
||||
- pip3 install psutil
|
||||
- pip3 install guppy3
|
||||
- cd tests/pytest
|
||||
- ./crash_gen.sh -a -p -t 4 -s 2000
|
||||
when:
|
||||
branch:
|
||||
- develop
|
||||
- master
|
||||
|
||||
|
||||
---
|
||||
kind: pipeline
|
||||
name: test_arm64
|
||||
|
@ -60,6 +41,9 @@ steps:
|
|||
- cd debug
|
||||
- cmake .. -DCPUTYPE=aarch64 > /dev/null
|
||||
- make
|
||||
trigger:
|
||||
event:
|
||||
- pull_request
|
||||
when:
|
||||
branch:
|
||||
- develop
|
||||
|
@ -82,6 +66,9 @@ steps:
|
|||
- cd debug
|
||||
- cmake .. -DCPUTYPE=aarch32 > /dev/null
|
||||
- make
|
||||
trigger:
|
||||
event:
|
||||
- pull_request
|
||||
when:
|
||||
branch:
|
||||
- develop
|
||||
|
@ -106,11 +93,13 @@ steps:
|
|||
- cd debug
|
||||
- cmake ..
|
||||
- make
|
||||
trigger:
|
||||
event:
|
||||
- pull_request
|
||||
when:
|
||||
branch:
|
||||
- develop
|
||||
- master
|
||||
|
||||
---
|
||||
kind: pipeline
|
||||
name: build_xenial
|
||||
|
@ -129,6 +118,9 @@ steps:
|
|||
- cd debug
|
||||
- cmake ..
|
||||
- make
|
||||
trigger:
|
||||
event:
|
||||
- pull_request
|
||||
when:
|
||||
branch:
|
||||
- develop
|
||||
|
@ -151,6 +143,32 @@ steps:
|
|||
- cd debug
|
||||
- cmake ..
|
||||
- make
|
||||
trigger:
|
||||
event:
|
||||
- pull_request
|
||||
when:
|
||||
branch:
|
||||
- develop
|
||||
- master
|
||||
---
|
||||
kind: pipeline
|
||||
name: build_centos7
|
||||
platform:
|
||||
os: linux
|
||||
arch: amd64
|
||||
|
||||
steps:
|
||||
- name: build
|
||||
image: ansible/centos7-ansible
|
||||
commands:
|
||||
- yum install -y gcc gcc-c++ make cmake
|
||||
- mkdir debug
|
||||
- cd debug
|
||||
- cmake ..
|
||||
- make
|
||||
trigger:
|
||||
event:
|
||||
- pull_request
|
||||
when:
|
||||
branch:
|
||||
- develop
|
||||
|
|
|
@ -16,7 +16,7 @@ TDengine的Grafana插件在安装包的/usr/local/taos/connector/grafanaplugin
|
|||
以CentOS 7.2操作系统为例,将grafanaplugin目录拷贝到/var/lib/grafana/plugins目录下,重新启动grafana即可。
|
||||
|
||||
```bash
|
||||
sudo cp -rf /usr/local/taos/connector/grafanaplugin /var/lib/grafana/tdengine
|
||||
sudo cp -rf /usr/local/taos/connector/grafanaplugin /var/lib/grafana/plugins/tdengine
|
||||
```
|
||||
|
||||
### 使用 Grafana
|
||||
|
|
|
@ -135,6 +135,14 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM
|
|||
SHOW DATABASES;
|
||||
```
|
||||
|
||||
- **显示一个数据库的创建语句**
|
||||
|
||||
```mysql
|
||||
SHOW CREATE DATABASE db_name;
|
||||
```
|
||||
常用于数据库迁移。对一个已经存在的数据库,返回其创建语句;在另一个集群中执行该语句,就能得到一个设置完全相同的 Database。
|
||||
|
||||
|
||||
## <a class="anchor" id="table"></a>表管理
|
||||
|
||||
- **创建数据表**
|
||||
|
@ -200,6 +208,13 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM
|
|||
|
||||
通配符匹配:1)’%’ (百分号)匹配0到任意个字符;2)’\_’下划线匹配一个字符。
|
||||
|
||||
- **显示一个数据表的创建语句**
|
||||
|
||||
```mysql
|
||||
SHOW CREATE TABLE tb_name;
|
||||
```
|
||||
常用于数据库迁移。对一个已经存在的数据表,返回其创建语句;在另一个集群中执行该语句,就能得到一个结构完全相同的数据表。
|
||||
|
||||
- **在线修改显示字符宽度**
|
||||
|
||||
```mysql
|
||||
|
@ -265,6 +280,13 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM
|
|||
```
|
||||
查看数据库内全部 STable,及其相关信息,包括 STable 的名称、创建时间、列数量、标签(TAG)数量、通过该 STable 建表的数量。
|
||||
|
||||
- **显示一个超级表的创建语句**
|
||||
|
||||
```mysql
|
||||
SHOW CREATE STABLE stb_name;
|
||||
```
|
||||
常用于数据库迁移。对一个已经存在的超级表,返回其创建语句;在另一个集群中执行该语句,就能得到一个结构完全相同的超级表。
|
||||
|
||||
- **获取超级表的结构信息**
|
||||
|
||||
```mysql
|
||||
|
|
|
@ -577,12 +577,13 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SSq
|
|||
|
||||
index = 0;
|
||||
sToken = tStrGetToken(*str, &index, false);
|
||||
*str += index;
|
||||
if (sToken.n == 0 || sToken.type != TK_RP) {
|
||||
tscSQLSyntaxErrMsg(pCmd->payload, ") expected", *str);
|
||||
code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
*str += index;
|
||||
|
||||
(*numOfRows)++;
|
||||
}
|
||||
|
@ -712,6 +713,9 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, STableDataBlock
|
|||
|
||||
int32_t numOfRows = 0;
|
||||
code = tsParseValues(str, dataBuf, maxNumOfRows, pCmd, &numOfRows, tmpTokenBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) {
|
||||
SParamInfo *param = dataBuf->params + i;
|
||||
|
|
|
@ -8,6 +8,8 @@ import org.junit.Test;
|
|||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.sql.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Random;
|
||||
|
||||
public class TSDBPreparedStatementTest {
|
||||
private static final String host = "127.0.0.1";
|
||||
|
@ -97,6 +99,118 @@ public class TSDBPreparedStatementTest {
|
|||
Assert.assertEquals(1, result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void executeTest() throws SQLException {
|
||||
Statement stmt = conn.createStatement();
|
||||
|
||||
int numOfRows = 1000;
|
||||
|
||||
for (int loop = 0; loop < 10; loop++){
|
||||
stmt.execute("drop table if exists weather_test");
|
||||
stmt.execute("create table weather_test(ts timestamp, f1 nchar(4), f2 float, f3 double, f4 timestamp, f5 int, f6 bool, f7 binary(10))");
|
||||
|
||||
TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? values(?, ?, ?, ?, ?, ?, ?, ?)");
|
||||
Random r = new Random();
|
||||
s.setTableName("weather_test");
|
||||
|
||||
ArrayList<Long> ts = new ArrayList<Long>();
|
||||
for(int i = 0; i < numOfRows; i++) {
|
||||
ts.add(System.currentTimeMillis() + i);
|
||||
}
|
||||
s.setTimestamp(0, ts);
|
||||
|
||||
int random = 10 + r.nextInt(5);
|
||||
ArrayList<String> s2 = new ArrayList<String>();
|
||||
for(int i = 0; i < numOfRows; i++) {
|
||||
if(i % random == 0) {
|
||||
s2.add(null);
|
||||
}else{
|
||||
s2.add("分支" + i % 4);
|
||||
}
|
||||
}
|
||||
s.setNString(1, s2, 4);
|
||||
|
||||
random = 10 + r.nextInt(5);
|
||||
ArrayList<Float> s3 = new ArrayList<Float>();
|
||||
for(int i = 0; i < numOfRows; i++) {
|
||||
if(i % random == 0) {
|
||||
s3.add(null);
|
||||
}else{
|
||||
s3.add(r.nextFloat());
|
||||
}
|
||||
}
|
||||
s.setFloat(2, s3);
|
||||
|
||||
random = 10 + r.nextInt(5);
|
||||
ArrayList<Double> s4 = new ArrayList<Double>();
|
||||
for(int i = 0; i < numOfRows; i++) {
|
||||
if(i % random == 0) {
|
||||
s4.add(null);
|
||||
}else{
|
||||
s4.add(r.nextDouble());
|
||||
}
|
||||
}
|
||||
s.setDouble(3, s4);
|
||||
|
||||
random = 10 + r.nextInt(5);
|
||||
ArrayList<Long> ts2 = new ArrayList<Long>();
|
||||
for(int i = 0; i < numOfRows; i++) {
|
||||
if(i % random == 0) {
|
||||
ts2.add(null);
|
||||
}else{
|
||||
ts2.add(System.currentTimeMillis() + i);
|
||||
}
|
||||
}
|
||||
s.setTimestamp(4, ts2);
|
||||
|
||||
random = 10 + r.nextInt(5);
|
||||
ArrayList<Integer> vals = new ArrayList<>();
|
||||
for(int i = 0; i < numOfRows; i++) {
|
||||
if(i % random == 0) {
|
||||
vals.add(null);
|
||||
}else{
|
||||
vals.add(r.nextInt());
|
||||
}
|
||||
}
|
||||
s.setInt(5, vals);
|
||||
|
||||
random = 10 + r.nextInt(5);
|
||||
ArrayList<Boolean> sb = new ArrayList<>();
|
||||
for(int i = 0; i < numOfRows; i++) {
|
||||
if(i % random == 0) {
|
||||
sb.add(null);
|
||||
}else{
|
||||
sb.add(i % 2 == 0 ? true : false);
|
||||
}
|
||||
}
|
||||
s.setBoolean(6, sb);
|
||||
|
||||
random = 10 + r.nextInt(5);
|
||||
ArrayList<String> s5 = new ArrayList<String>();
|
||||
for(int i = 0; i < numOfRows; i++) {
|
||||
if(i % random == 0) {
|
||||
s5.add(null);
|
||||
}else{
|
||||
s5.add("test" + i % 10);
|
||||
}
|
||||
}
|
||||
s.setString(7, s5, 10);
|
||||
|
||||
s.columnDataAddBatch();
|
||||
s.columnDataExecuteBatch();
|
||||
s.columnDataCloseBatch();
|
||||
|
||||
String sql = "select * from weather_test";
|
||||
PreparedStatement statement = conn.prepareStatement(sql);
|
||||
ResultSet rs = statement.executeQuery();
|
||||
int rows = 0;
|
||||
while(rs.next()) {
|
||||
rows++;
|
||||
}
|
||||
Assert.assertEquals(numOfRows, rows);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void setBoolean() throws SQLException {
|
||||
pstmt_insert.setTimestamp(1, new Timestamp(System.currentTimeMillis()));
|
||||
|
|
|
@ -28,8 +28,9 @@ typedef struct {
|
|||
int bufBlockSize;
|
||||
int tBufBlocks;
|
||||
int nBufBlocks;
|
||||
int nRecycleBlocks;
|
||||
int64_t index;
|
||||
SList* bufBlockList;
|
||||
SList* bufBlockList;
|
||||
} STsdbBufPool;
|
||||
|
||||
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
|
||||
|
@ -39,5 +40,7 @@ void tsdbFreeBufPool(STsdbBufPool* pBufPool);
|
|||
int tsdbOpenBufPool(STsdbRepo* pRepo);
|
||||
void tsdbCloseBufPool(STsdbRepo* pRepo);
|
||||
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
|
||||
int tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks);
|
||||
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode);
|
||||
|
||||
#endif /* _TD_TSDB_BUFFER_H_ */
|
|
@ -71,6 +71,11 @@ struct STsdbRepo {
|
|||
uint8_t state;
|
||||
|
||||
STsdbCfg config;
|
||||
|
||||
STsdbCfg save_config; // save apply config
|
||||
bool config_changed; // config changed flag
|
||||
pthread_mutex_t save_mutex; // protect save config
|
||||
|
||||
STsdbAppH appH;
|
||||
STsdbStat stat;
|
||||
STsdbMeta* tsdbMeta;
|
||||
|
|
|
@ -70,6 +70,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) {
|
|||
pPool->tBufBlocks = pCfg->totalBlocks;
|
||||
pPool->nBufBlocks = 0;
|
||||
pPool->index = 0;
|
||||
pPool->nRecycleBlocks = 0;
|
||||
|
||||
for (int i = 0; i < pCfg->totalBlocks; i++) {
|
||||
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
|
||||
|
@ -156,4 +157,46 @@ _err:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
|
||||
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
|
||||
|
||||
int tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) {
|
||||
if (oldTotalBlocks == pRepo->config.totalBlocks) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int err = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (tsdbLockRepo(pRepo) < 0) return terrno;
|
||||
STsdbBufPool* pPool = pRepo->pPool;
|
||||
|
||||
if (pRepo->config.totalBlocks > oldTotalBlocks) {
|
||||
for (int i = 0; i < pRepo->config.totalBlocks - oldTotalBlocks; i++) {
|
||||
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
|
||||
if (pBufBlock == NULL) goto err;
|
||||
|
||||
if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
|
||||
tsdbFreeBufBlock(pBufBlock);
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
err = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto err;
|
||||
}
|
||||
|
||||
pPool->nBufBlocks++;
|
||||
}
|
||||
pthread_cond_signal(&pPool->poolNotEmpty);
|
||||
} else {
|
||||
pPool->nRecycleBlocks = oldTotalBlocks - pRepo->config.totalBlocks;
|
||||
}
|
||||
|
||||
err:
|
||||
tsdbUnlockRepo(pRepo);
|
||||
return err;
|
||||
}
|
||||
|
||||
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) {
|
||||
STsdbBufBlock *pBufBlock = NULL;
|
||||
tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock));
|
||||
tsdbFreeBufBlock(pBufBlock);
|
||||
free(pNode);
|
||||
pPool->nBufBlocks--;
|
||||
}
|
|
@ -112,6 +112,32 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
|
||||
pRepo->config_changed = false;
|
||||
STsdbCfg * pSaveCfg = &pRepo->save_config;
|
||||
|
||||
int32_t oldTotalBlocks = pRepo->config.totalBlocks;
|
||||
|
||||
pRepo->config.compression = pRepo->save_config.compression;
|
||||
pRepo->config.keep = pRepo->save_config.keep;
|
||||
pRepo->config.keep1 = pRepo->save_config.keep1;
|
||||
pRepo->config.keep2 = pRepo->save_config.keep2;
|
||||
pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow;
|
||||
pRepo->config.totalBlocks = pRepo->save_config.totalBlocks;
|
||||
|
||||
tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)",
|
||||
REPO_ID(pRepo),
|
||||
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
|
||||
pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks);
|
||||
|
||||
int err = tsdbExpendPool(pRepo, oldTotalBlocks);
|
||||
if (!TAOS_SUCCEEDED(err)) {
|
||||
tsdbError("vgId:%d expand pool from %d to %d fail,reason:%s",
|
||||
REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static void *tsdbLoopCommit(void *arg) {
|
||||
SCommitQueue *pQueue = &tsCommitQueue;
|
||||
SListNode * pNode = NULL;
|
||||
|
@ -138,6 +164,13 @@ static void *tsdbLoopCommit(void *arg) {
|
|||
|
||||
pRepo = ((SCommitReq *)pNode->data)->pRepo;
|
||||
|
||||
// check if need to apply new config
|
||||
if (pRepo->config_changed) {
|
||||
pthread_mutex_lock(&pRepo->save_mutex);
|
||||
tsdbApplyRepoConfig(pRepo);
|
||||
pthread_mutex_unlock(&pRepo->save_mutex);
|
||||
}
|
||||
|
||||
tsdbCommitData(pRepo);
|
||||
listNodeFree(pNode);
|
||||
}
|
||||
|
|
|
@ -203,6 +203,70 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int
|
|||
|
||||
int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
|
||||
// TODO: think about multithread cases
|
||||
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1;
|
||||
|
||||
STsdbCfg * pRCfg = &repo->config;
|
||||
|
||||
ASSERT(pRCfg->tsdbId == pCfg->tsdbId);
|
||||
ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize);
|
||||
ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile);
|
||||
ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock);
|
||||
ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock);
|
||||
ASSERT(pRCfg->precision == pCfg->precision);
|
||||
|
||||
bool configChanged = false;
|
||||
if (pRCfg->compression != pCfg->compression) {
|
||||
configChanged = true;
|
||||
}
|
||||
if (pRCfg->keep != pCfg->keep) {
|
||||
configChanged = true;
|
||||
}
|
||||
if (pRCfg->keep1 != pCfg->keep1) {
|
||||
configChanged = true;
|
||||
}
|
||||
if (pRCfg->keep2 != pCfg->keep2) {
|
||||
configChanged = true;
|
||||
}
|
||||
if (pRCfg->cacheLastRow != pCfg->cacheLastRow) {
|
||||
configChanged = true;
|
||||
}
|
||||
if (pRCfg->totalBlocks != pCfg->totalBlocks) {
|
||||
configChanged = true;
|
||||
}
|
||||
|
||||
if (!configChanged) {
|
||||
tsdbError("vgId:%d no config changed", REPO_ID(repo));
|
||||
}
|
||||
|
||||
int code = pthread_mutex_lock(&repo->save_mutex);
|
||||
if (code != 0) {
|
||||
tsdbError("vgId:%d failed to lock tsdb save config mutex since %s", REPO_ID(repo), strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
return -1;
|
||||
}
|
||||
|
||||
STsdbCfg * pSaveCfg = &repo->save_config;
|
||||
*pSaveCfg = repo->config;
|
||||
|
||||
pSaveCfg->compression = pCfg->compression;
|
||||
pSaveCfg->keep = pCfg->keep;
|
||||
pSaveCfg->keep1 = pCfg->keep1;
|
||||
pSaveCfg->keep2 = pCfg->keep2;
|
||||
pSaveCfg->cacheLastRow = pCfg->cacheLastRow;
|
||||
pSaveCfg->totalBlocks = pCfg->totalBlocks;
|
||||
|
||||
tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)",
|
||||
REPO_ID(repo),
|
||||
pRCfg->compression, pRCfg->keep, pRCfg->keep1,pRCfg->keep2,
|
||||
pRCfg->cacheLastRow, pRCfg->totalBlocks);
|
||||
tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)",
|
||||
REPO_ID(repo),
|
||||
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
|
||||
pSaveCfg->cacheLastRow,pSaveCfg->totalBlocks);
|
||||
|
||||
repo->config_changed = true;
|
||||
|
||||
pthread_mutex_unlock(&repo->save_mutex);
|
||||
return 0;
|
||||
#if 0
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
|
@ -474,6 +538,14 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
code = pthread_mutex_init(&(pRepo->save_mutex), NULL);
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
tsdbFreeRepo(pRepo);
|
||||
return NULL;
|
||||
}
|
||||
pRepo->config_changed = false;
|
||||
|
||||
code = tsem_init(&(pRepo->readyToCommit), 0, 1);
|
||||
if (code != 0) {
|
||||
code = errno;
|
||||
|
|
|
@ -98,17 +98,26 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
|
|||
STsdbBufPool *pBufPool = pRepo->pPool;
|
||||
|
||||
SListNode *pNode = NULL;
|
||||
bool recycleBlocks = pBufPool->nRecycleBlocks > 0;
|
||||
if (tsdbLockRepo(pRepo) < 0) return -1;
|
||||
while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) {
|
||||
tdListAppendNode(pBufPool->bufBlockList, pNode);
|
||||
if (pBufPool->nRecycleBlocks > 0) {
|
||||
tsdbRecycleBufferBlock(pBufPool, pNode);
|
||||
pBufPool->nRecycleBlocks -= 1;
|
||||
} else {
|
||||
tdListAppendNode(pBufPool->bufBlockList, pNode);
|
||||
}
|
||||
}
|
||||
int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
|
||||
if (code != 0) {
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code));
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
return -1;
|
||||
if (!recycleBlocks) {
|
||||
int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
|
||||
if (code != 0) {
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code));
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
|
||||
for (int i = 0; i < pMemTable->maxTables; i++) {
|
||||
|
@ -958,6 +967,15 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
|
|||
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
|
||||
STsdbCfg *pCfg = &pRepo->config;
|
||||
|
||||
// if cacheLastRow config has been reset, free the lastRow
|
||||
if (!pCfg->cacheLastRow && pTable->lastRow != NULL) {
|
||||
taosTZfree(pTable->lastRow);
|
||||
TSDB_WLOCK_TABLE(pTable);
|
||||
pTable->lastRow = NULL;
|
||||
pTable->lastKey = TSKEY_INITIAL_VAL;
|
||||
TSDB_WUNLOCK_TABLE(pTable);
|
||||
}
|
||||
|
||||
if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) {
|
||||
if (pCfg->cacheLastRow || pTable->lastRow != NULL) {
|
||||
SDataRow nrow = pTable->lastRow;
|
||||
|
|
|
@ -170,29 +170,31 @@ static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
|
|||
|
||||
vDebug("vgId:%d, tsdbchanged:%d syncchanged:%d while alter vnode", pVnode->vgId, tsdbCfgChanged, syncCfgChanged);
|
||||
|
||||
if (/*tsdbCfgChanged || */syncCfgChanged) {
|
||||
if (tsdbCfgChanged || syncCfgChanged) {
|
||||
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
|
||||
// dbCfgVersion can be corrected by status msg
|
||||
if (!vnodeSetUpdatingStatus(pVnode)) {
|
||||
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
|
||||
pVnode->dbCfgVersion = dbCfgVersion;
|
||||
pVnode->vgCfgVersion = vgCfgVersion;
|
||||
pVnode->syncCfg = syncCfg;
|
||||
pVnode->tsdbCfg = tsdbCfg;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
if (syncCfgChanged) {
|
||||
if (!vnodeSetUpdatingStatus(pVnode)) {
|
||||
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
|
||||
pVnode->dbCfgVersion = dbCfgVersion;
|
||||
pVnode->vgCfgVersion = vgCfgVersion;
|
||||
pVnode->syncCfg = syncCfg;
|
||||
pVnode->tsdbCfg = tsdbCfg;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
code = syncReconfig(pVnode->sync, &pVnode->syncCfg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pVnode->dbCfgVersion = dbCfgVersion;
|
||||
pVnode->vgCfgVersion = vgCfgVersion;
|
||||
pVnode->syncCfg = syncCfg;
|
||||
pVnode->tsdbCfg = tsdbCfg;
|
||||
vnodeSetReadyStatus(pVnode);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
code = syncReconfig(pVnode->sync, &pVnode->syncCfg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pVnode->dbCfgVersion = dbCfgVersion;
|
||||
pVnode->vgCfgVersion = vgCfgVersion;
|
||||
pVnode->syncCfg = syncCfg;
|
||||
pVnode->tsdbCfg = tsdbCfg;
|
||||
vnodeSetReadyStatus(pVnode);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pVnode->tsdb) {
|
||||
if (tsdbCfgChanged && pVnode->tsdb) {
|
||||
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pVnode->dbCfgVersion = dbCfgVersion;
|
||||
|
|
|
@ -11,7 +11,7 @@ CFLAGS = -O0 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion \
|
|||
all: $(TARGET)
|
||||
|
||||
exe:
|
||||
gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS)
|
||||
gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS)
|
||||
gcc $(CFLAGS) ./stmtBatchTest.c -o $(ROOT)stmtBatchTest $(LFLAGS)
|
||||
|
||||
clean:
|
||||
|
|
|
@ -1409,20 +1409,14 @@ static void prepareV(TAOS *taos, int schemaCase, int tableNum, int lenOfBina
|
|||
|
||||
}
|
||||
|
||||
static void preparemV(TAOS *taos, int schemaCase, int idx) {
|
||||
static void prepareV_long(TAOS *taos, int schemaCase, int tableNum, int lenOfBinaryDef) {
|
||||
TAOS_RES *result;
|
||||
int code;
|
||||
char dbname[32],sql[255];
|
||||
|
||||
sprintf(dbname, "demo%d", idx);
|
||||
sprintf(sql, "drop database if exists %s", dbname);
|
||||
|
||||
|
||||
result = taos_query(taos, sql);
|
||||
result = taos_query(taos, "drop database if exists demol");
|
||||
taos_free_result(result);
|
||||
|
||||
sprintf(sql, "create database %s", dbname);
|
||||
result = taos_query(taos, sql);
|
||||
result = taos_query(taos, "create database demol");
|
||||
code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to create database, reason:%s\n", taos_errstr(result));
|
||||
|
@ -1431,18 +1425,18 @@ static void preparemV(TAOS *taos, int schemaCase, int idx) {
|
|||
}
|
||||
taos_free_result(result);
|
||||
|
||||
sprintf(sql, "use %s", dbname);
|
||||
result = taos_query(taos, sql);
|
||||
result = taos_query(taos, "use demol");
|
||||
taos_free_result(result);
|
||||
|
||||
// create table
|
||||
for (int i = 0 ; i < 300; i++) {
|
||||
for (int i = 0 ; i < tableNum; i++) {
|
||||
char buf[1024];
|
||||
if (schemaCase) {
|
||||
sprintf(buf, "create table m%d (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), bin2 binary(40), t2 timestamp)", i) ;
|
||||
sprintf(buf, "create table m%d (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, br binary(%d), nr nchar(%d), ts2 timestamp)", i, lenOfBinaryDef, lenOfBinaryDef) ;
|
||||
} else {
|
||||
sprintf(buf, "create table m%d (ts timestamp, b int)", i) ;
|
||||
}
|
||||
|
||||
result = taos_query(taos, buf);
|
||||
code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
|
@ -1985,12 +1979,283 @@ static void* runCase(void *para) {
|
|||
|
||||
}
|
||||
|
||||
|
||||
static int stmt_bind_case_001_long(TAOS_STMT *stmt, int tableNum, int rowsOfPerColum, int bingNum, int lenOfBinaryDef, int lenOfBinaryAct, int columnNum, int64_t* startTs) {
|
||||
sampleValue* v = (sampleValue *)calloc(1, sizeof(sampleValue));
|
||||
|
||||
int totalRowsPerTbl = rowsOfPerColum * bingNum;
|
||||
|
||||
v->ts = (int64_t *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * tableNum));
|
||||
v->br = (char *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * lenOfBinaryDef));
|
||||
v->nr = (char *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * lenOfBinaryDef));
|
||||
|
||||
int *lb = (int *)malloc(MAX_ROWS_OF_PER_COLUMN * sizeof(int));
|
||||
|
||||
TAOS_MULTI_BIND *params = calloc(1, sizeof(TAOS_MULTI_BIND) * (size_t)(bingNum * columnNum * (tableNum+1) * rowsOfPerColum));
|
||||
char* is_null = malloc(sizeof(char) * MAX_ROWS_OF_PER_COLUMN);
|
||||
char* no_null = malloc(sizeof(char) * MAX_ROWS_OF_PER_COLUMN);
|
||||
|
||||
int64_t tts = *startTs;
|
||||
|
||||
for (int i = 0; i < rowsOfPerColum; ++i) {
|
||||
lb[i] = lenOfBinaryAct;
|
||||
no_null[i] = 0;
|
||||
is_null[i] = (i % 10 == 2) ? 1 : 0;
|
||||
v->b[i] = (int8_t)(i % 2);
|
||||
v->v1[i] = (int8_t)((i+1) % 2);
|
||||
v->v2[i] = (int16_t)i;
|
||||
v->v4[i] = (int32_t)(i+1);
|
||||
v->v8[i] = (int64_t)(i+2);
|
||||
v->f4[i] = (float)(i+3);
|
||||
v->f8[i] = (double)(i+4);
|
||||
char tbuf[MAX_BINARY_DEF_LEN];
|
||||
memset(tbuf, 0, MAX_BINARY_DEF_LEN);
|
||||
sprintf(tbuf, "binary-%d", i%10);
|
||||
memcpy(v->br + i*lenOfBinaryDef, tbuf, (size_t)lenOfBinaryAct);
|
||||
memset(tbuf, 0, MAX_BINARY_DEF_LEN);
|
||||
sprintf(tbuf, "nchar-%d", i%10);
|
||||
memcpy(v->nr + i*lenOfBinaryDef, tbuf, (size_t)lenOfBinaryAct);
|
||||
v->ts2[i] = tts + i;
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
for (int j = 0; j < bingNum * tableNum; j++) {
|
||||
params[i+0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
params[i+0].buffer_length = sizeof(int64_t);
|
||||
params[i+0].buffer = &v->ts[j*rowsOfPerColum];
|
||||
params[i+0].length = NULL;
|
||||
params[i+0].is_null = no_null;
|
||||
params[i+0].num = rowsOfPerColum;
|
||||
|
||||
params[i+1].buffer_type = TSDB_DATA_TYPE_BOOL;
|
||||
params[i+1].buffer_length = sizeof(int8_t);
|
||||
params[i+1].buffer = v->b;
|
||||
params[i+1].length = NULL;
|
||||
params[i+1].is_null = is_null;
|
||||
params[i+1].num = rowsOfPerColum;
|
||||
|
||||
params[i+2].buffer_type = TSDB_DATA_TYPE_TINYINT;
|
||||
params[i+2].buffer_length = sizeof(int8_t);
|
||||
params[i+2].buffer = v->v1;
|
||||
params[i+2].length = NULL;
|
||||
params[i+2].is_null = is_null;
|
||||
params[i+2].num = rowsOfPerColum;
|
||||
|
||||
params[i+3].buffer_type = TSDB_DATA_TYPE_SMALLINT;
|
||||
params[i+3].buffer_length = sizeof(int16_t);
|
||||
params[i+3].buffer = v->v2;
|
||||
params[i+3].length = NULL;
|
||||
params[i+3].is_null = is_null;
|
||||
params[i+3].num = rowsOfPerColum;
|
||||
|
||||
params[i+4].buffer_type = TSDB_DATA_TYPE_INT;
|
||||
params[i+4].buffer_length = sizeof(int32_t);
|
||||
params[i+4].buffer = v->v4;
|
||||
params[i+4].length = NULL;
|
||||
params[i+4].is_null = is_null;
|
||||
params[i+4].num = rowsOfPerColum;
|
||||
|
||||
params[i+5].buffer_type = TSDB_DATA_TYPE_BIGINT;
|
||||
params[i+5].buffer_length = sizeof(int64_t);
|
||||
params[i+5].buffer = v->v8;
|
||||
params[i+5].length = NULL;
|
||||
params[i+5].is_null = is_null;
|
||||
params[i+5].num = rowsOfPerColum;
|
||||
|
||||
params[i+6].buffer_type = TSDB_DATA_TYPE_FLOAT;
|
||||
params[i+6].buffer_length = sizeof(float);
|
||||
params[i+6].buffer = v->f4;
|
||||
params[i+6].length = NULL;
|
||||
params[i+6].is_null = is_null;
|
||||
params[i+6].num = rowsOfPerColum;
|
||||
|
||||
params[i+7].buffer_type = TSDB_DATA_TYPE_DOUBLE;
|
||||
params[i+7].buffer_length = sizeof(double);
|
||||
params[i+7].buffer = v->f8;
|
||||
params[i+7].length = NULL;
|
||||
params[i+7].is_null = is_null;
|
||||
params[i+7].num = rowsOfPerColum;
|
||||
|
||||
params[i+8].buffer_type = TSDB_DATA_TYPE_BINARY;
|
||||
params[i+8].buffer_length = (uintptr_t)lenOfBinaryDef;
|
||||
params[i+8].buffer = v->br;
|
||||
params[i+8].length = lb;
|
||||
params[i+8].is_null = is_null;
|
||||
params[i+8].num = rowsOfPerColum;
|
||||
|
||||
params[i+9].buffer_type = TSDB_DATA_TYPE_NCHAR;
|
||||
params[i+9].buffer_length = (uintptr_t)lenOfBinaryDef;
|
||||
params[i+9].buffer = v->nr;
|
||||
params[i+9].length = lb;
|
||||
params[i+9].is_null = is_null;
|
||||
params[i+9].num = rowsOfPerColum;
|
||||
|
||||
params[i+10].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
params[i+10].buffer_length = sizeof(int64_t);
|
||||
params[i+10].buffer = v->ts2;
|
||||
params[i+10].length = NULL;
|
||||
params[i+10].is_null = is_null;
|
||||
params[i+10].num = rowsOfPerColum;
|
||||
|
||||
i+=columnNum;
|
||||
}
|
||||
|
||||
//int64_t tts = 1591060628000;
|
||||
for (int i = 0; i < totalRowsPerTbl * tableNum; ++i) {
|
||||
v->ts[i] = tts + i;
|
||||
}
|
||||
|
||||
*startTs = tts + totalRowsPerTbl * tableNum; // return to next
|
||||
|
||||
unsigned long long starttime = getCurrentTime();
|
||||
|
||||
char *sql = "insert into ? values(?,?,?,?,?,?,?,?,?,?,?)";
|
||||
int code = taos_stmt_prepare(stmt, sql, 0);
|
||||
if (code != 0){
|
||||
printf("failed to execute taos_stmt_prepare. code:0x%x[%s]\n", code, tstrerror(code));
|
||||
return -1;
|
||||
}
|
||||
|
||||
int id = 0;
|
||||
for (int l = 0; l < bingNum; l++) {
|
||||
for (int zz = 0; zz < tableNum; zz++) {
|
||||
char buf[32];
|
||||
sprintf(buf, "m%d", zz);
|
||||
code = taos_stmt_set_tbname(stmt, buf);
|
||||
if (code != 0){
|
||||
printf("failed to execute taos_stmt_set_tbname. code:0x%x[%s]\n", code, tstrerror(code));
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int col=0; col < columnNum; ++col) {
|
||||
code = taos_stmt_bind_single_param_batch(stmt, params + id, col);
|
||||
if (code != 0){
|
||||
printf("failed to execute taos_stmt_bind_single_param_batch. code:0x%x[%s]\n", code, tstrerror(code));
|
||||
return -1;
|
||||
}
|
||||
id++;
|
||||
}
|
||||
|
||||
code = taos_stmt_add_batch(stmt);
|
||||
if (code != 0) {
|
||||
printf("failed to execute taos_stmt_add_batch. code:0x%x[%s]\n", code, tstrerror(code));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
code = taos_stmt_execute(stmt);
|
||||
if (code != 0) {
|
||||
printf("failed to execute taos_stmt_execute. code:0x%x[%s]\n", code, tstrerror(code));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
unsigned long long endtime = getCurrentTime();
|
||||
unsigned long long totalRows = (uint32_t)(totalRowsPerTbl * tableNum);
|
||||
printf("insert total %d records, used %u seconds, avg:%u useconds per record\n", totalRows, (endtime-starttime)/1000000UL, (endtime-starttime)/totalRows);
|
||||
|
||||
free(v->ts);
|
||||
free(v->br);
|
||||
free(v->nr);
|
||||
free(v);
|
||||
free(lb);
|
||||
free(params);
|
||||
free(is_null);
|
||||
free(no_null);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void* runCase_long(void *para) {
|
||||
ThreadInfo* tInfo = (ThreadInfo *)para;
|
||||
TAOS *taos = tInfo->taos;
|
||||
int idx = tInfo->idx;
|
||||
|
||||
TAOS_STMT *stmt = NULL;
|
||||
|
||||
(void)idx;
|
||||
|
||||
int tableNum;
|
||||
int lenOfBinaryDef;
|
||||
int rowsOfPerColum;
|
||||
int bingNum;
|
||||
int lenOfBinaryAct;
|
||||
int columnNum;
|
||||
|
||||
int totalRowsPerTbl;
|
||||
|
||||
//=======================================================================//
|
||||
//========== long case 14: ======================//
|
||||
#if 0
|
||||
{
|
||||
stmt = taos_stmt_init(taos);
|
||||
|
||||
tableNum = 1000;
|
||||
rowsOfPerColum = 10;
|
||||
bingNum = 5000000;
|
||||
lenOfBinaryDef = 1000;
|
||||
lenOfBinaryAct = 33;
|
||||
columnNum = 11;
|
||||
|
||||
prepareV(taos, 1, tableNum, lenOfBinaryDef);
|
||||
stmt_bind_case_002(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum);
|
||||
|
||||
totalRowsPerTbl = rowsOfPerColum * bingNum;
|
||||
checkResult(taos, "m0", 0, totalRowsPerTbl);
|
||||
checkResult(taos, "m1", 0, totalRowsPerTbl);
|
||||
checkResult(taos, "m2", 0, totalRowsPerTbl);
|
||||
checkResult(taos, "m3", 0, totalRowsPerTbl);
|
||||
checkResult(taos, "m4", 0, totalRowsPerTbl);
|
||||
taos_stmt_close(stmt);
|
||||
printf("long case 14 check result end\n\n");
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
//========== case 15: ======================//
|
||||
#if 1
|
||||
{
|
||||
printf("====long case 15 test start\n\n");
|
||||
|
||||
tableNum = 200;
|
||||
rowsOfPerColum = 110;
|
||||
bingNum = 100;
|
||||
lenOfBinaryDef = 1000;
|
||||
lenOfBinaryAct = 8;
|
||||
columnNum = 11;
|
||||
|
||||
int64_t startTs = 1591060628000;
|
||||
prepareV_long(taos, 1, tableNum, lenOfBinaryDef);
|
||||
|
||||
totalRowsPerTbl = 0;
|
||||
for (int i = 0; i < 30000; i++) {
|
||||
stmt = taos_stmt_init(taos);
|
||||
stmt_bind_case_001_long(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum, &startTs);
|
||||
|
||||
totalRowsPerTbl += rowsOfPerColum * bingNum;
|
||||
checkResult(taos, "m0", 0, totalRowsPerTbl);
|
||||
checkResult(taos, "m11", 0, totalRowsPerTbl);
|
||||
checkResult(taos, "m22", 0, totalRowsPerTbl);
|
||||
checkResult(taos, "m133", 0, totalRowsPerTbl);
|
||||
checkResult(taos, "m199", 0, totalRowsPerTbl);
|
||||
taos_stmt_close(stmt);
|
||||
}
|
||||
|
||||
printf("====long case 15 check result end\n\n");
|
||||
}
|
||||
#endif
|
||||
|
||||
return NULL;
|
||||
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
TAOS *taos;
|
||||
char host[32] = "127.0.0.1";
|
||||
char* serverIp = NULL;
|
||||
int threadNum = 1;
|
||||
int threadNum = 2;
|
||||
|
||||
// connect to server
|
||||
if (argc == 1) {
|
||||
|
@ -2021,7 +2286,11 @@ int main(int argc, char *argv[])
|
|||
|
||||
tInfo->taos = taos;
|
||||
tInfo->idx = i;
|
||||
pthread_create(&(pThreadList[0]), NULL, runCase, (void *)tInfo);
|
||||
if (0 == i) {
|
||||
pthread_create(&(pThreadList[0]), NULL, runCase, (void *)tInfo);
|
||||
} else if (1 == i){
|
||||
pthread_create(&(pThreadList[0]), NULL, runCase_long, (void *)tInfo);
|
||||
}
|
||||
tInfo++;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue