Merge remote-tracking branch 'origin/develop' into feature/linux

This commit is contained in:
Shengliang Guan 2021-03-19 02:00:06 +00:00
commit 8b4bb3fcc4
22 changed files with 495 additions and 383 deletions

View File

@ -160,24 +160,13 @@ mkdir debug && cd debug
cmake .. && cmake --build . cmake .. && cmake --build .
``` ```
# Quick Run
# Quick Run
To quickly start a TDengine server after building, run the command below in terminal:
```bash
./build/bin/taosd -c test/cfg
```
In another terminal, use the TDengine shell to connect the server:
```bash
./build/bin/taos -c test/cfg
```
option "-c test/cfg" specifies the system configuration file directory.
# Installing # Installing
After building successfully, TDengine can be installed by: After building successfully, TDengine can be installed by:
```bash ```bash
sudo make install sudo make install
``` ```
Users can find more information about directories installed on the system in the [directory and files](https://www.taosdata.com/en/documentation/administrator/#Directory-and-Files) section. Since version 2.0, installing from source code will also configure service management for TDengine. Users can find more information about directories installed on the system in the [directory and files](https://www.taosdata.com/en/documentation/administrator/#Directory-and-Files) section. Since version 2.0, installing from source code will also configure service management for TDengine.
Users can also choose to [install from packages](https://www.taosdata.com/en/getting-started/#Install-from-Package) for it. Users can also choose to [install from packages](https://www.taosdata.com/en/getting-started/#Install-from-Package) for it.
@ -193,6 +182,20 @@ taos
If TDengine shell connects the server successfully, welcome messages and version info are printed. Otherwise, an error message is shown. If TDengine shell connects the server successfully, welcome messages and version info are printed. Otherwise, an error message is shown.
## Quick Run
If you don't want to run TDengine as a service, you can run it in current shell. For example, to quickly start a TDengine server after building, run the command below in terminal:
```bash
./build/bin/taosd -c test/cfg
```
In another terminal, use the TDengine shell to connect the server:
```bash
./build/bin/taos -c test/cfg
```
option "-c test/cfg" specifies the system configuration file directory.
# Try TDengine # Try TDengine
It is easy to run SQL commands from TDengine shell which is the same as other SQL databases. It is easy to run SQL commands from TDengine shell which is the same as other SQL databases.
```sql ```sql

View File

@ -145,7 +145,7 @@ TDengine 建议用数据采集点的名字(如上表中的D1001)来做表名。
在TDengine的设计里**表用来代表一个具体的数据采集点,超级表用来代表一组相同类型的数据采集点集合**。当为某个具体数据采集点创建表时,用户使用超级表的定义做模板,同时指定该具体采集点(表)的标签值。与传统的关系型数据库相比,表(一个数据采集点)是带有静态标签的,而且这些标签可以事后增加、删除、修改。**一张超级表包含有多张表这些表具有相同的时序数据schema但带有不同的标签值**。 在TDengine的设计里**表用来代表一个具体的数据采集点,超级表用来代表一组相同类型的数据采集点集合**。当为某个具体数据采集点创建表时,用户使用超级表的定义做模板,同时指定该具体采集点(表)的标签值。与传统的关系型数据库相比,表(一个数据采集点)是带有静态标签的,而且这些标签可以事后增加、删除、修改。**一张超级表包含有多张表这些表具有相同的时序数据schema但带有不同的标签值**。
当对多个具有相同数据类型的数据采集点进行聚合操作时TDengine将先把满足标签过滤条件的表从超级表的中查找出来,然后再扫描这些表的时序数据,进行聚合操作,这样能将需要扫描的数据集大幅减少,从而大幅提高聚合计算的性能。 当对多个具有相同数据类型的数据采集点进行聚合操作时TDengine会先把满足标签过滤条件的表从超级表中找出来,然后再扫描这些表的时序数据,进行聚合操作,这样需要扫描的数据集会大幅减少,从而显著提高聚合计算的性能。
## <a class="anchor" id="cluster"></a>集群与基本逻辑单元 ## <a class="anchor" id="cluster"></a>集群与基本逻辑单元

View File

@ -125,7 +125,7 @@ TDengine缺省的时间戳是毫秒精度但通过修改配置参数enableMic
```mysql ```mysql
ALTER DATABASE db_name CACHELAST 0; ALTER DATABASE db_name CACHELAST 0;
``` ```
CACHELAST 参数控制是否在内存中缓存数据子表的 last_row。缺省值为 0取值范围 [0, 1]。其中 0 表示不启用、1 表示启用。(从 2.0.11 版本开始支持) CACHELAST 参数控制是否在内存中缓存数据子表的 last_row。缺省值为 0取值范围 [0, 1]。其中 0 表示不启用、1 表示启用。(从 2.0.11 版本开始支持,修改后需要重启服务器生效。
**Tips**: 以上所有参数修改后都可以用show databases来确认是否修改成功。 **Tips**: 以上所有参数修改后都可以用show databases来确认是否修改成功。

View File

@ -16,13 +16,13 @@
## 1. TDengine2.0之前的版本升级到2.0及以上的版本应该注意什么?☆☆☆ ## 1. TDengine2.0之前的版本升级到2.0及以上的版本应该注意什么?☆☆☆
2.0版在之前版本的基础上,进行了完全的重构,配置文件和数据文件是不兼容的。在升级之前务必进行如下操作: 2.0版在之前版本的基础上,进行了完全的重构,配置文件和数据文件是不兼容的。在升级之前务必进行如下操作:
1. 删除配置文件,执行 <code> sudo rm -rf /etc/taos/taos.cfg </code> 1. 删除配置文件,执行 `sudo rm -rf /etc/taos/taos.cfg`
2. 删除日志文件,执行 <code> sudo rm -rf /var/log/taos/ </code> 2. 删除日志文件,执行 `sudo rm -rf /var/log/taos/`
3. 确保数据已经不再需要的前提下,删除数据文件,执行 <code> sudo rm -rf /var/lib/taos/ </code> 3. 确保数据已经不再需要的前提下,删除数据文件,执行 `sudo rm -rf /var/lib/taos/`
4. 安装最新稳定版本的TDengine 4. 安装最新稳定版本的 TDengine
5. 如果数据需要迁移数据或者数据文件损坏,请联系涛思数据官方技术支持团队,进行协助解决 5. 如果需要迁移数据或者数据文件损坏,请联系涛思数据官方技术支持团队,进行协助解决
## 2. Windows平台下JDBCDriver找不到动态链接库怎么办 ## 2. Windows平台下JDBCDriver找不到动态链接库怎么办

View File

@ -213,10 +213,10 @@ fi
if echo $osinfo | grep -qwi "ubuntu" ; then if echo $osinfo | grep -qwi "ubuntu" ; then
# echo "this is ubuntu system" # echo "this is ubuntu system"
${csudo} rm -f /var/lib/dpkg/info/tdengine* || : ${csudo} dpkg --force-all -P tdengine || :
elif echo $osinfo | grep -qwi "debian" ; then elif echo $osinfo | grep -qwi "debian" ; then
# echo "this is debian system" # echo "this is debian system"
${csudo} rm -f /var/lib/dpkg/info/tdengine* || : ${csudo} dpkg --force-all -P tdengine || :
elif echo $osinfo | grep -qwi "centos" ; then elif echo $osinfo | grep -qwi "centos" ; then
# echo "this is centos system" # echo "this is centos system"
${csudo} rpm -e --noscripts tdengine || : ${csudo} rpm -e --noscripts tdengine || :

View File

@ -281,7 +281,7 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
} }
static void tscAsyncResultCallback(SSchedMsg *pMsg) { static void tscAsyncResultCallback(SSchedMsg *pMsg) {
SSqlObj* pSql = pMsg->ahandle; SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pMsg->ahandle);
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
tscDebug("%p SqlObj is freed, not add into queue async res", pSql); tscDebug("%p SqlObj is freed, not add into queue async res", pSql);
return; return;
@ -292,23 +292,26 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
if (pSql->fp == NULL || pSql->fetchFp == NULL){ if (pSql->fp == NULL || pSql->fetchFp == NULL){
taosReleaseRef(tscObjRef, pSql->self);
return; return;
} }
pSql->fp = pSql->fetchFp; pSql->fp = pSql->fetchFp;
(*pSql->fp)(pSql->param, pSql, pRes->code); (*pSql->fp)(pSql->param, pSql, pRes->code);
taosReleaseRef(tscObjRef, pSql->self);
} }
void tscAsyncResultOnError(SSqlObj* pSql) { void tscAsyncResultOnError(SSqlObj* pSql) {
SSchedMsg schedMsg = {0}; SSchedMsg schedMsg = {0};
schedMsg.fp = tscAsyncResultCallback; schedMsg.fp = tscAsyncResultCallback;
schedMsg.ahandle = pSql; schedMsg.ahandle = (void *)pSql->self;
schedMsg.thandle = (void *)1; schedMsg.thandle = (void *)1;
schedMsg.msg = 0; schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg); taosScheduleTask(tscQhandle, &schedMsg);
} }
int tscSendMsgToServer(SSqlObj *pSql); int tscSendMsgToServer(SSqlObj *pSql);
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {

View File

@ -4138,13 +4138,21 @@ static int32_t validateTagCondExpr(SSqlCmd* pCmd, tExprNode *p) {
} }
int32_t retVal = TSDB_CODE_SUCCESS; int32_t retVal = TSDB_CODE_SUCCESS;
int32_t bufLen = 0;
if (IS_NUMERIC_TYPE(vVariant->nType)) {
bufLen = 60; // The maximum length of string that a number is converted to.
} else {
bufLen = vVariant->nLen + 1;
}
if (schemaType == TSDB_DATA_TYPE_BINARY) { if (schemaType == TSDB_DATA_TYPE_BINARY) {
char *tmp = calloc(1, vVariant->nLen + TSDB_NCHAR_SIZE); char *tmp = calloc(1, bufLen * TSDB_NCHAR_SIZE);
retVal = tVariantDump(vVariant, tmp, schemaType, false); retVal = tVariantDump(vVariant, tmp, schemaType, false);
free(tmp); free(tmp);
} else if (schemaType == TSDB_DATA_TYPE_NCHAR) { } else if (schemaType == TSDB_DATA_TYPE_NCHAR) {
// pRight->value.nLen + 1 is larger than the actual nchar string length // pRight->value.nLen + 1 is larger than the actual nchar string length
char *tmp = calloc(1, (vVariant->nLen + 1) * TSDB_NCHAR_SIZE); char *tmp = calloc(1, bufLen * TSDB_NCHAR_SIZE);
retVal = tVariantDump(vVariant, tmp, schemaType, false); retVal = tVariantDump(vVariant, tmp, schemaType, false);
free(tmp); free(tmp);
} else { } else {
@ -4155,7 +4163,7 @@ static int32_t validateTagCondExpr(SSqlCmd* pCmd, tExprNode *p) {
if (retVal != TSDB_CODE_SUCCESS) { if (retVal != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
}while (0); } while (0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -25,6 +25,8 @@
#include "tdataformat.h" #include "tdataformat.h"
#include "tname.h" #include "tname.h"
#include "hash.h" #include "hash.h"
#include "tlockfree.h"
#include "tlist.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -172,10 +174,32 @@ typedef struct STsdbQueryCond {
int32_t type; // data block load type: int32_t type; // data block load type:
} STsdbQueryCond; } STsdbQueryCond;
typedef struct STableData STableData;
typedef struct {
T_REF_DECLARE()
SRWLatch latch;
TSKEY keyFirst;
TSKEY keyLast;
int64_t numOfRows;
int32_t maxTables;
STableData **tData;
SList * actList;
SList * extraBuffList;
SList * bufBlockList;
int64_t pointsAdd; // TODO
int64_t storageAdd; // TODO
} SMemTable;
typedef struct {
SMemTable* mem;
SMemTable* imem;
SMemTable mtable;
SMemTable* omem;
} SMemSnapshot;
typedef struct SMemRef { typedef struct SMemRef {
int32_t ref; int32_t ref;
void * mem; SMemSnapshot snapshot;
void * imem;
} SMemRef; } SMemRef;
typedef struct SDataBlockInfo { typedef struct SDataBlockInfo {

View File

@ -101,8 +101,8 @@ typedef enum CREATE_SUB_TALBE_MOD_EN {
} CREATE_SUB_TALBE_MOD_EN; } CREATE_SUB_TALBE_MOD_EN;
typedef enum TALBE_EXISTS_EN { typedef enum TALBE_EXISTS_EN {
TBL_ALREADY_EXISTS,
TBL_NO_EXISTS, TBL_NO_EXISTS,
TBL_ALREADY_EXISTS,
TBL_EXISTS_BUTT TBL_EXISTS_BUTT
} TALBE_EXISTS_EN; } TALBE_EXISTS_EN;
@ -567,12 +567,19 @@ static FILE * g_fpOfInsertResult = NULL;
#define debugPrint(fmt, ...) \ #define debugPrint(fmt, ...) \
do { if (g_args.debug_print || g_args.verbose_print) \ do { if (g_args.debug_print || g_args.verbose_print) \
fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0) fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0)
#define verbosePrint(fmt, ...) \ #define verbosePrint(fmt, ...) \
do { if (g_args.verbose_print) \ do { if (g_args.verbose_print) \
fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0) fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0)
#define errorPrint(fmt, ...) \
do { fprintf(stderr, "ERROR: "fmt, __VA_ARGS__); } while(0)
/////////////////////////////////////////////////// ///////////////////////////////////////////////////
static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); }
void printHelp() { void printHelp() {
char indent[10] = " "; char indent[10] = " ";
printf("%s%s%s%s\n", indent, "-f", indent, printf("%s%s%s%s\n", indent, "-f", indent,
@ -645,7 +652,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
} else if (strcmp(argv[i], "-c") == 0) { } else if (strcmp(argv[i], "-c") == 0) {
char *configPath = argv[++i]; char *configPath = argv[++i];
if (wordexp(configPath, &full_path, 0) != 0) { if (wordexp(configPath, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", configPath); errorPrint( "Invalid path %s\n", configPath);
return; return;
} }
taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]); taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]);
@ -694,8 +701,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
&& strcasecmp(argv[i], "DOUBLE") && strcasecmp(argv[i], "DOUBLE")
&& strcasecmp(argv[i], "BINARY") && strcasecmp(argv[i], "BINARY")
&& strcasecmp(argv[i], "NCHAR")) { && strcasecmp(argv[i], "NCHAR")) {
fprintf(stderr, "Invalid data_type!\n");
printHelp(); printHelp();
ERROR_EXIT( "Invalid data_type!\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
sptr[0] = argv[i]; sptr[0] = argv[i];
@ -715,8 +722,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
&& strcasecmp(token, "DOUBLE") && strcasecmp(token, "DOUBLE")
&& strcasecmp(token, "BINARY") && strcasecmp(token, "BINARY")
&& strcasecmp(token, "NCHAR")) { && strcasecmp(token, "NCHAR")) {
fprintf(stderr, "Invalid data_type!\n");
printHelp(); printHelp();
ERROR_EXIT("Invalid data_type!\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
sptr[index++] = token; sptr[index++] = token;
@ -771,8 +778,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
printHelp(); printHelp();
exit(0); exit(0);
} else { } else {
fprintf(stderr, "wrong options\n");
printHelp(); printHelp();
ERROR_EXIT("ERROR: wrong options\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
@ -858,7 +865,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) {
if (code != 0) { if (code != 0) {
debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command); debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res)); errorPrint( "Failed to run %s, reason: %s\n", command, taos_errstr(res));
taos_free_result(res); taos_free_result(res);
//taos_close(taos); //taos_close(taos);
return -1; return -1;
@ -884,13 +891,13 @@ static void getResult(TAOS_RES *res, char* resultFileName) {
if (resultFileName[0] != 0) { if (resultFileName[0] != 0) {
fp = fopen(resultFileName, "at"); fp = fopen(resultFileName, "at");
if (fp == NULL) { if (fp == NULL) {
fprintf(stderr, "failed to open result file: %s, result will not save to file\n", resultFileName); errorPrint("%s() LN%d, failed to open result file: %s, result will not save to file\n", __func__, __LINE__, resultFileName);
} }
} }
char* databuf = (char*) calloc(1, 100*1024*1024); char* databuf = (char*) calloc(1, 100*1024*1024);
if (databuf == NULL) { if (databuf == NULL) {
fprintf(stderr, "failed to malloc, warning: save result to file slowly!\n"); errorPrint("%s() LN%d, failed to malloc, warning: save result to file slowly!\n", __func__, __LINE__);
if (fp) if (fp)
fclose(fp); fclose(fp);
return ; return ;
@ -1484,7 +1491,7 @@ static int xDumpResultToFile(const char* fname, TAOS_RES* tres) {
FILE* fp = fopen(fname, "at"); FILE* fp = fopen(fname, "at");
if (fp == NULL) { if (fp == NULL) {
fprintf(stderr, "ERROR: failed to open file: %s\n", fname); errorPrint("%s() LN%d, failed to open file: %s\n", __func__, __LINE__, fname);
return -1; return -1;
} }
@ -1529,7 +1536,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
int32_t code = taos_errno(res); int32_t code = taos_errno(res);
if (code != 0) { if (code != 0) {
fprintf(stderr, "failed to run <show databases>, reason: %s\n", taos_errstr(res)); errorPrint( "failed to run <show databases>, reason: %s\n", taos_errstr(res));
return -1; return -1;
} }
@ -1541,7 +1548,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
dbInfos[count] = (SDbInfo *)calloc(1, sizeof(SDbInfo)); dbInfos[count] = (SDbInfo *)calloc(1, sizeof(SDbInfo));
if (dbInfos[count] == NULL) { if (dbInfos[count] == NULL) {
fprintf(stderr, "failed to allocate memory for some dbInfo[%d]\n", count); errorPrint( "failed to allocate memory for some dbInfo[%d]\n", count);
return -1; return -1;
} }
@ -1576,7 +1583,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
count++; count++;
if (count > MAX_DATABASE_COUNT) { if (count > MAX_DATABASE_COUNT) {
fprintf(stderr, "The database count overflow than %d\n", MAX_DATABASE_COUNT); errorPrint( "The database count overflow than %d\n", MAX_DATABASE_COUNT);
break; break;
} }
} }
@ -1590,7 +1597,7 @@ static void printfDbInfoForQueryToFile(char* filename, SDbInfo* dbInfos, int ind
FILE *fp = fopen(filename, "at"); FILE *fp = fopen(filename, "at");
if (fp == NULL) { if (fp == NULL) {
fprintf(stderr, "failed to open file: %s\n", filename); errorPrint( "failed to open file: %s\n", filename);
return; return;
} }
@ -1646,7 +1653,7 @@ static void printfQuerySystemInfo(TAOS * taos) {
res = taos_query(taos, "show databases;"); res = taos_query(taos, "show databases;");
SDbInfo** dbInfos = (SDbInfo **)calloc(MAX_DATABASE_COUNT, sizeof(SDbInfo *)); SDbInfo** dbInfos = (SDbInfo **)calloc(MAX_DATABASE_COUNT, sizeof(SDbInfo *));
if (dbInfos == NULL) { if (dbInfos == NULL) {
fprintf(stderr, "failed to allocate memory\n"); errorPrint("%s() LN%d, failed to allocate memory\n", __func__, __LINE__);
return; return;
} }
int dbCount = getDbFromServer(taos, dbInfos); int dbCount = getDbFromServer(taos, dbInfos);
@ -1676,8 +1683,6 @@ static void printfQuerySystemInfo(TAOS * taos) {
} }
static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); }
static int postProceSql(char* host, uint16_t port, char* sqlstr) static int postProceSql(char* host, uint16_t port, char* sqlstr)
{ {
char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s"; char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s";
@ -1725,9 +1730,9 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
sockfd = socket(AF_INET, SOCK_STREAM, 0); sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) { if (sockfd < 0) {
#ifdef WINDOWS #ifdef WINDOWS
fprintf(stderr, "Could not create socket : %d" , WSAGetLastError()); errorPrint( "Could not create socket : %d" , WSAGetLastError());
#endif #endif
debugPrint("%s() LN%d sockfd=%d\n", __func__, __LINE__, sockfd); debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
free(request_buf); free(request_buf);
ERROR_EXIT("ERROR opening socket"); ERROR_EXIT("ERROR opening socket");
} }
@ -1847,7 +1852,7 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) { static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) {
char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1); char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1);
if (NULL == dataBuf) { if (NULL == dataBuf) {
printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1); errorPrint("%s() LN%d, calloc failed! size:%d\n", __func__, __LINE__, TSDB_MAX_SQL_LEN+1);
return NULL; return NULL;
} }
@ -2039,7 +2044,8 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
//printf("==== sub table name: %s\n", pTblName); //printf("==== sub table name: %s\n", pTblName);
count++; count++;
if (count >= childTblCount - 1) { if (count >= childTblCount - 1) {
char *tmp = realloc(childTblName, (size_t)childTblCount*1.5*TSDB_TABLE_NAME_LEN+1); char *tmp = realloc(childTblName,
(size_t)childTblCount*1.5*TSDB_TABLE_NAME_LEN+1);
if (tmp != NULL) { if (tmp != NULL) {
childTblName = tmp; childTblName = tmp;
childTblCount = (int)(childTblCount*1.5); childTblCount = (int)(childTblCount*1.5);
@ -2047,7 +2053,8 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
(size_t)((childTblCount-count)*TSDB_TABLE_NAME_LEN)); (size_t)((childTblCount-count)*TSDB_TABLE_NAME_LEN));
} else { } else {
// exit, if allocate more memory failed // exit, if allocate more memory failed
printf("realloc fail for save child table name of %s.%s\n", dbName, sTblName); errorPrint("%s() LN%d, realloc fail for save child table name of %s.%s\n",
__func__, __LINE__, dbName, sTblName);
tmfree(childTblName); tmfree(childTblName);
taos_free_result(res); taos_free_result(res);
taos_close(taos); taos_close(taos);
@ -2135,12 +2142,13 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
calcRowLen(superTbls); calcRowLen(superTbls);
/*
if (TBL_ALREADY_EXISTS == superTbls->childTblExists) { if (TBL_ALREADY_EXISTS == superTbls->childTblExists) {
//get all child table name use cmd: select tbname from superTblName; //get all child table name use cmd: select tbname from superTblName;
int childTblCount = 10000; int childTblCount = 10000;
superTbls->childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN); superTbls->childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN);
if (superTbls->childTblName == NULL) { if (superTbls->childTblName == NULL) {
fprintf(stderr, "alloc memory failed!"); errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
return -1; return -1;
} }
getAllChildNameOfSuperTable(taos, dbName, getAllChildNameOfSuperTable(taos, dbName,
@ -2148,6 +2156,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
&superTbls->childTblName, &superTbls->childTblName,
&superTbls->childTblCount); &superTbls->childTblCount);
} }
*/
return 0; return 0;
} }
@ -2279,7 +2288,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command); verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
fprintf(stderr, "create supertable %s failed!\n\n", errorPrint( "create supertable %s failed!\n\n",
superTbls->sTblName); superTbls->sTblName);
return -1; return -1;
} }
@ -2293,7 +2302,7 @@ static int createDatabases() {
int ret = 0; int ret = 0;
taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, NULL, g_Dbs.port); taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, NULL, g_Dbs.port);
if (taos == NULL) { if (taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
return -1; return -1;
} }
char command[BUFFER_SIZE] = "\0"; char command[BUFFER_SIZE] = "\0";
@ -2378,7 +2387,7 @@ static int createDatabases() {
debugPrint("%s() %d command: %s\n", __func__, __LINE__, command); debugPrint("%s() %d command: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
taos_close(taos); taos_close(taos);
fprintf(stderr, "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName); errorPrint( "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
return -1; return -1;
} }
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName); printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName);
@ -2396,9 +2405,12 @@ static int createDatabases() {
&g_Dbs.db[i].superTbls[j], g_Dbs.use_metric); &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric);
} else { } else {
g_Dbs.db[i].superTbls[j].superTblExists = TBL_ALREADY_EXISTS; g_Dbs.db[i].superTbls[j].superTblExists = TBL_ALREADY_EXISTS;
if (g_Dbs.db[i].superTbls[j].childTblExists != TBL_ALREADY_EXISTS) {
ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName, ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName,
&g_Dbs.db[i].superTbls[j]); &g_Dbs.db[i].superTbls[j]);
} }
}
if (0 != ret) { if (0 != ret) {
printf("\ncreate super table %d failed!\n\n", j); printf("\ncreate super table %d failed!\n\n", j);
@ -2427,7 +2439,7 @@ static void* createTable(void *sarg)
char *buffer = calloc(buff_len, 1); char *buffer = calloc(buff_len, 1);
if (buffer == NULL) { if (buffer == NULL) {
fprintf(stderr, "Memory allocated failed!"); errorPrint("%s() LN%d, Memory allocated failed!\n", __func__, __LINE__);
exit(-1); exit(-1);
} }
@ -2485,7 +2497,7 @@ static void* createTable(void *sarg)
len = 0; len = 0;
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
fprintf(stderr, "queryDbExec() failed. buffer:\n%s\n", buffer); errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer);
free(buffer); free(buffer);
return NULL; return NULL;
} }
@ -2501,7 +2513,7 @@ static void* createTable(void *sarg)
if (0 != len) { if (0 != len) {
verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer);
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)) { if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)) {
fprintf(stderr, "queryDbExec() failed. buffer:\n%s\n", buffer); errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer);
} }
} }
@ -2546,7 +2558,7 @@ static int startMultiThreadCreateChildTable(
db_name, db_name,
g_Dbs.port); g_Dbs.port);
if (t_info->taos == NULL) { if (t_info->taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
free(pids); free(pids);
free(infos); free(infos);
return -1; return -1;
@ -2724,7 +2736,7 @@ static int readSampleFromCsvFileToMem(
FILE* fp = fopen(superTblInfo->sampleFile, "r"); FILE* fp = fopen(superTblInfo->sampleFile, "r");
if (fp == NULL) { if (fp == NULL) {
fprintf(stderr, "Failed to open sample file: %s, reason:%s\n", errorPrint( "Failed to open sample file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno)); superTblInfo->sampleFile, strerror(errno));
return -1; return -1;
} }
@ -2736,7 +2748,7 @@ static int readSampleFromCsvFileToMem(
readLen = tgetline(&line, &n, fp); readLen = tgetline(&line, &n, fp);
if (-1 == readLen) { if (-1 == readLen) {
if(0 != fseek(fp, 0, SEEK_SET)) { if(0 != fseek(fp, 0, SEEK_SET)) {
fprintf(stderr, "Failed to fseek file: %s, reason:%s\n", errorPrint( "Failed to fseek file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno)); superTblInfo->sampleFile, strerror(errno));
fclose(fp); fclose(fp);
return -1; return -1;
@ -2789,7 +2801,8 @@ void readSampleFromFileToMem(SSuperTable * supterTblInfo) {
} }
} }
*/ */
static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* superTbls) { static bool getColumnAndTagTypeFromInsertJsonFile(
cJSON* stbInfo, SSuperTable* superTbls) {
bool ret = false; bool ret = false;
// columns // columns
@ -2805,8 +2818,8 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
int columnSize = cJSON_GetArraySize(columns); int columnSize = cJSON_GetArraySize(columns);
if (columnSize > MAX_COLUMN_COUNT) { if (columnSize > MAX_COLUMN_COUNT) {
printf("ERROR: failed to read json, column size overflow, max column size is %d\n", errorPrint("%s() LN%d, failed to read json, column size overflow, max column size is %d\n",
MAX_COLUMN_COUNT); __func__, __LINE__, MAX_COLUMN_COUNT);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -2824,7 +2837,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
if (countObj && countObj->type == cJSON_Number) { if (countObj && countObj->type == cJSON_Number) {
count = countObj->valueint; count = countObj->valueint;
} else if (countObj && countObj->type != cJSON_Number) { } else if (countObj && countObj->type != cJSON_Number) {
printf("ERROR: failed to read json, column count not found\n"); errorPrint("%s() LN%d, failed to read json, column count not found\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} else { } else {
count = 1; count = 1;
@ -2834,7 +2847,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
memset(&columnCase, 0, sizeof(StrColumn)); memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(column, "type"); cJSON *dataType = cJSON_GetObjectItem(column, "type");
if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) { if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) {
printf("ERROR: failed to read json, column type not found\n"); errorPrint("%s() LN%d: failed to read json, column type not found\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
//tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE); //tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
@ -2844,14 +2857,15 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
if (dataLen && dataLen->type == cJSON_Number) { if (dataLen && dataLen->type == cJSON_Number) {
columnCase.dataLen = dataLen->valueint; columnCase.dataLen = dataLen->valueint;
} else if (dataLen && dataLen->type != cJSON_Number) { } else if (dataLen && dataLen->type != cJSON_Number) {
printf("ERROR: failed to read json, column len not found\n"); debugPrint("%s() LN%d: failed to read json, column len not found\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} else { } else {
columnCase.dataLen = 8; columnCase.dataLen = 8;
} }
for (int n = 0; n < count; ++n) { for (int n = 0; n < count; ++n) {
tstrncpy(superTbls->columns[index].dataType, columnCase.dataType, MAX_TB_NAME_SIZE); tstrncpy(superTbls->columns[index].dataType,
columnCase.dataType, MAX_TB_NAME_SIZE);
superTbls->columns[index].dataLen = columnCase.dataLen; superTbls->columns[index].dataLen = columnCase.dataLen;
index++; index++;
} }
@ -2863,13 +2877,13 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
// tags // tags
cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags"); cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags");
if (!tags || tags->type != cJSON_Array) { if (!tags || tags->type != cJSON_Array) {
printf("ERROR: failed to read json, tags not found\n"); debugPrint("%s() LN%d, failed to read json, tags not found\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
int tagSize = cJSON_GetArraySize(tags); int tagSize = cJSON_GetArraySize(tags);
if (tagSize > MAX_TAG_COUNT) { if (tagSize > MAX_TAG_COUNT) {
printf("ERROR: failed to read json, tags size overflow, max tag size is %d\n", MAX_TAG_COUNT); debugPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n", __func__, __LINE__, MAX_TAG_COUNT);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -2997,7 +3011,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!gInsertInterval) { } else if (!gInsertInterval) {
g_args.insert_interval = 0; g_args.insert_interval = 0;
} else { } else {
fprintf(stderr, "ERROR: failed to read json, insert_interval input mistake\n"); errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3007,7 +3021,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!rowsPerTbl) { } else if (!rowsPerTbl) {
g_args.rows_per_tbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req g_args.rows_per_tbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else { } else {
fprintf(stderr, "ERROR: failed to read json, rows_per_tbl input mistake\n"); errorPrint("%s() LN%d, failed to read json, rows_per_tbl input mistake\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3017,7 +3031,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!maxSqlLen) { } else if (!maxSqlLen) {
g_args.max_sql_len = TSDB_PAYLOAD_SIZE; g_args.max_sql_len = TSDB_PAYLOAD_SIZE;
} else { } else {
fprintf(stderr, "ERROR: failed to read json, max_sql_len input mistake\n"); errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3028,7 +3042,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!numRecPerReq) { } else if (!numRecPerReq) {
g_args.num_of_RPR = 100; g_args.num_of_RPR = 100;
} else { } else {
printf("ERROR: failed to read json, num_of_records_per_req not found\n"); errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3058,7 +3072,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
int dbSize = cJSON_GetArraySize(dbs); int dbSize = cJSON_GetArraySize(dbs);
if (dbSize > MAX_DB_COUNT) { if (dbSize > MAX_DB_COUNT) {
fprintf(stderr, errorPrint(
"ERROR: failed to read json, databases size overflow, max database is %d\n", "ERROR: failed to read json, databases size overflow, max database is %d\n",
MAX_DB_COUNT); MAX_DB_COUNT);
goto PARSE_OVER; goto PARSE_OVER;
@ -3098,8 +3112,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} }
cJSON *precision = cJSON_GetObjectItem(dbinfo, "precision"); cJSON *precision = cJSON_GetObjectItem(dbinfo, "precision");
if (precision && precision->type == cJSON_String && precision->valuestring != NULL) { if (precision && precision->type == cJSON_String
tstrncpy(g_Dbs.db[i].dbCfg.precision, precision->valuestring, MAX_DB_NAME_SIZE); && precision->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].dbCfg.precision, precision->valuestring,
MAX_DB_NAME_SIZE);
} else if (!precision) { } else if (!precision) {
//tstrncpy(g_Dbs.db[i].dbCfg.precision, "ms", MAX_DB_NAME_SIZE); //tstrncpy(g_Dbs.db[i].dbCfg.precision, "ms", MAX_DB_NAME_SIZE);
memset(g_Dbs.db[i].dbCfg.precision, 0, MAX_DB_NAME_SIZE); memset(g_Dbs.db[i].dbCfg.precision, 0, MAX_DB_NAME_SIZE);
@ -3257,7 +3273,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
int stbSize = cJSON_GetArraySize(stables); int stbSize = cJSON_GetArraySize(stables);
if (stbSize > MAX_SUPER_TABLE_COUNT) { if (stbSize > MAX_SUPER_TABLE_COUNT) {
fprintf(stderr, errorPrint(
"ERROR: failed to read json, databases size overflow, max database is %d\n", "ERROR: failed to read json, databases size overflow, max database is %d\n",
MAX_SUPER_TABLE_COUNT); MAX_SUPER_TABLE_COUNT);
goto PARSE_OVER; goto PARSE_OVER;
@ -3325,13 +3341,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!childTblExists) { } else if (!childTblExists) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS; g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
} else { } else {
printf("ERROR: failed to read json, child_table_exists not found\n"); errorPrint("%s() LN%d, failed to read json, child_table_exists not found\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* count = cJSON_GetObjectItem(stbInfo, "childtable_count"); cJSON* count = cJSON_GetObjectItem(stbInfo, "childtable_count");
if (!count || count->type != cJSON_Number || 0 >= count->valueint) { if (!count || count->type != cJSON_Number || 0 >= count->valueint) {
printf("ERROR: failed to read json, childtable_count not found\n"); errorPrint("%s() LN%d, failed to read json, childtable_count not found\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
g_Dbs.db[i].superTbls[j].childTblCount = count->valueint; g_Dbs.db[i].superTbls[j].childTblCount = count->valueint;
@ -3344,7 +3360,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!dataSource) { } else if (!dataSource) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", MAX_DB_NAME_SIZE); tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", MAX_DB_NAME_SIZE);
} else { } else {
printf("ERROR: failed to read json, data_source not found\n"); errorPrint("%s() LN%d, failed to read json, data_source not found\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3384,9 +3400,11 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
cJSON *ts = cJSON_GetObjectItem(stbInfo, "start_timestamp"); cJSON *ts = cJSON_GetObjectItem(stbInfo, "start_timestamp");
if (ts && ts->type == cJSON_String && ts->valuestring != NULL) { if (ts && ts->type == cJSON_String && ts->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp, ts->valuestring, MAX_DB_NAME_SIZE); tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp,
ts->valuestring, MAX_DB_NAME_SIZE);
} else if (!ts) { } else if (!ts) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp, "now", MAX_DB_NAME_SIZE); tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp,
"now", MAX_DB_NAME_SIZE);
} else { } else {
printf("ERROR: failed to read json, start_timestamp not found\n"); printf("ERROR: failed to read json, start_timestamp not found\n");
goto PARSE_OVER; goto PARSE_OVER;
@ -3493,7 +3511,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!rowsPerTbl) { } else if (!rowsPerTbl) {
g_Dbs.db[i].superTbls[j].rowsPerTbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req g_Dbs.db[i].superTbls[j].rowsPerTbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else { } else {
fprintf(stderr, "ERROR: failed to read json, rowsPerTbl input mistake\n"); errorPrint("%s() LN%d, failed to read json, rowsPerTbl input mistake\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3523,7 +3541,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!insertRows) { } else if (!insertRows) {
g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
} else { } else {
fprintf(stderr, "failed to read json, insert_rows input mistake"); errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3535,16 +3553,18 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
__func__, __LINE__, g_args.insert_interval); __func__, __LINE__, g_args.insert_interval);
g_Dbs.db[i].superTbls[j].insertInterval = g_args.insert_interval; g_Dbs.db[i].superTbls[j].insertInterval = g_args.insert_interval;
} else { } else {
fprintf(stderr, "failed to read json, insert_interval input mistake"); errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable /* CBD if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
continue; continue;
} }
*/
int retVal = getColumnAndTagTypeFromInsertJsonFile(stbInfo, &g_Dbs.db[i].superTbls[j]); int retVal = getColumnAndTagTypeFromInsertJsonFile(
stbInfo, &g_Dbs.db[i].superTbls[j]);
if (false == retVal) { if (false == retVal) {
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3942,7 +3962,7 @@ static bool getInfoFromJsonFile(char* file) {
} else if (SUBSCRIBE_TEST == g_args.test_mode) { } else if (SUBSCRIBE_TEST == g_args.test_mode) {
ret = getMetaFromQueryJsonFile(root); ret = getMetaFromQueryJsonFile(root);
} else { } else {
printf("ERROR: input json file type error! please input correct file type: insert or query or subscribe\n"); errorPrint("%s() LN%d, input json file type error! please input correct file type: insert or query or subscribe\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -4024,14 +4044,14 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper
if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6)) if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6))
|| (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) { || (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) {
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) { if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
printf("binary or nchar length overflow, max size:%u\n", errorPrint( "binary or nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN); (uint32_t)TSDB_MAX_BINARY_LEN);
return (-1); return (-1);
} }
char* buf = (char*)calloc(stbInfo->columns[i].dataLen+1, 1); char* buf = (char*)calloc(stbInfo->columns[i].dataLen+1, 1);
if (NULL == buf) { if (NULL == buf) {
printf("calloc failed! size:%d\n", stbInfo->columns[i].dataLen); errorPrint( "calloc failed! size:%d\n", stbInfo->columns[i].dataLen);
return (-1); return (-1);
} }
rand_string(buf, stbInfo->columns[i].dataLen); rand_string(buf, stbInfo->columns[i].dataLen);
@ -4063,7 +4083,7 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "timestamp", 9)) { } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "timestamp", 9)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%"PRId64", ", rand_bigint()); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%"PRId64", ", rand_bigint());
} else { } else {
printf("No support data type: %s\n", stbInfo->columns[i].dataType); errorPrint( "No support data type: %s\n", stbInfo->columns[i].dataType);
return (-1); return (-1);
} }
} }
@ -4138,7 +4158,8 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
sampleDataBuf = calloc( sampleDataBuf = calloc(
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) { if (sampleDataBuf == NULL) {
fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", errorPrint("%s() LN%d, Failed to calloc %d Bytes, reason:%s\n",
__func__, __LINE__,
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno)); strerror(errno));
return -1; return -1;
@ -4148,7 +4169,7 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
int ret = readSampleFromCsvFileToMem(superTblInfo); int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) { if (0 != ret) {
fprintf(stderr, "read sample from csv file failed.\n"); errorPrint("%s() LN%d, read sample from csv file failed.\n", __func__, __LINE__);
tmfree(sampleDataBuf); tmfree(sampleDataBuf);
superTblInfo->sampleDataBuf = NULL; superTblInfo->sampleDataBuf = NULL;
return -1; return -1;
@ -4157,29 +4178,26 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
return 0; return 0;
} }
static int execInsert(threadInfo *winfo, char *buffer, int k) static int execInsert(threadInfo *pThreadInfo, char *buffer, int k)
{ {
int affectedRows; int affectedRows;
SSuperTable* superTblInfo = winfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, buffer);
if (superTblInfo) { if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE);
affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
} else { } else {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); if (0 != postProceSql(g_Dbs.host, g_Dbs.port, buffer)) {
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
if (0 != retCode) {
affectedRows = -1; affectedRows = -1;
printf("========restful return fail, threadID[%d]\n", winfo->threadID); printf("========restful return fail, threadID[%d]\n", pThreadInfo->threadID);
} else { } else {
affectedRows = k; affectedRows = k;
} }
} }
} else { } else {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); affectedRows = queryDbExec(pThreadInfo->taos, buffer, 1);
affectedRows = queryDbExec(winfo->taos, buffer, 1);
} }
return affectedRows; return affectedRows;
@ -4195,8 +4213,9 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq)
superTblInfo->childTblName + (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); superTblInfo->childTblName + (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN);
} else { } else {
verbosePrint("%s() LN%d: from=%d count=%d seq=%d\n", verbosePrint("[%d] %s() LN%d: from=%d count=%d seq=%d\n",
__func__, __LINE__, pThreadInfo->start_table_from, pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from,
pThreadInfo->ntables, tableSeq); pThreadInfo->ntables, tableSeq);
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
@ -4323,7 +4342,7 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, threadInfo* pThrea
tableSeq % superTblInfo->tagSampleCount); tableSeq % superTblInfo->tagSampleCount);
} }
if (NULL == tagsValBuf) { if (NULL == tagsValBuf) {
fprintf(stderr, "tag buf failed to allocate memory\n"); errorPrint("%s() LN%d, tag buf failed to allocate memory\n", __func__, __LINE__);
return -1; return -1;
} }
@ -4341,7 +4360,7 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, threadInfo* pThrea
superTblInfo->maxSqlLen, superTblInfo->maxSqlLen,
"insert into %s.%s values", "insert into %s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); tableName);
} else { } else {
len = snprintf(buffer, len = snprintf(buffer,
(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len),
@ -4396,13 +4415,14 @@ static int generateDataBuffer(char *pTblName,
} }
static void* syncWriteInterlace(threadInfo *pThreadInfo) { static void* syncWriteInterlace(threadInfo *pThreadInfo) {
printf("### CBD: interlace write\n"); debugPrint("[%d] %s() LN%d: ### interlace write\n",
pThreadInfo->threadID, __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1);
if (NULL == buffer) { if (NULL == buffer) {
fprintf(stderr, "Failed to alloc %d Bytes, reason:%s\n", errorPrint( "Failed to alloc %d Bytes, reason:%s\n",
superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len,
strerror(errno)); strerror(errno));
return NULL; return NULL;
@ -4437,8 +4457,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int tableSeq = pThreadInfo->start_table_from; int tableSeq = pThreadInfo->start_table_from;
debugPrint("%s() LN%d: start_table_from=%d ntables=%d insertRows=%"PRId64"\n", debugPrint("[%d] %s() LN%d: start_table_from=%d ntables=%d insertRows=%"PRId64"\n",
__func__, __LINE__, pThreadInfo->start_table_from, pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->start_table_from,
pThreadInfo->ntables, insertRows); pThreadInfo->ntables, insertRows);
int64_t startTime = pThreadInfo->start_time; int64_t startTime = pThreadInfo->start_time;
@ -4446,31 +4466,40 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int batchPerTblTimes; int batchPerTblTimes;
int batchPerTbl; int batchPerTbl;
if ((rowsPerTbl > 0) && (pThreadInfo->ntables > 1)) { assert(pThreadInfo->ntables > 0);
batchPerTblTimes = g_args.num_of_RPR / rowsPerTbl;
if (rowsPerTbl > g_args.num_of_RPR)
rowsPerTbl = g_args.num_of_RPR;
batchPerTbl = rowsPerTbl; batchPerTbl = rowsPerTbl;
if ((rowsPerTbl > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes =
(g_args.num_of_RPR / (rowsPerTbl * pThreadInfo->ntables)) + 1;
} else { } else {
batchPerTblTimes = 1; batchPerTblTimes = 1;
batchPerTbl = g_args.num_of_RPR;
} }
int generatedRecPerTbl = 0; int generatedRecPerTbl = 0;
bool flagSleep = true;
int sleepTimeTotal = 0;
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if (insert_interval) { if ((flagSleep) && (insert_interval)) {
st = taosGetTimestampUs(); st = taosGetTimestampUs();
flagSleep = false;
} }
// generate data // generate data
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len);
char *pstr = buffer; char *pstr = buffer;
int recGenerated = 0; int recOfBatch = 0;
for (int i = 0; i < batchPerTblTimes; i ++) { for (int i = 0; i < batchPerTblTimes; i ++) {
getTableName(tableName, pThreadInfo, tableSeq); getTableName(tableName, pThreadInfo, tableSeq);
int headLen; int headLen;
if (i == 0) { if (i == 0) {
headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo, pstr); headLen = generateSQLHead(tableName, tableSeq, pThreadInfo,
superTblInfo, pstr);
} else { } else {
headLen = snprintf(pstr, TSDB_TABLE_NAME_LEN, "%s.%s values", headLen = snprintf(pstr, TSDB_TABLE_NAME_LEN, "%s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
@ -4478,48 +4507,70 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
} }
// generate data buffer // generate data buffer
verbosePrint("%s() LN%d i=%d buffer:\n%s\n", verbosePrint("[%d] %s() LN%d i=%d buffer:\n%s\n",
__func__, __LINE__, i, buffer); pThreadInfo->threadID, __func__, __LINE__, i, buffer);
pstr += headLen; pstr += headLen;
int dataLen = 0; int dataLen = 0;
printf("%s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n", verbosePrint("[%d] %s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n",
__func__, __LINE__, i, batchPerTblTimes, batchPerTbl); pThreadInfo->threadID, __func__, __LINE__,
int numOfRecGenerated = generateDataTail( i, batchPerTblTimes, batchPerTbl);
generateDataTail(
tableName, tableSeq, pThreadInfo, superTblInfo, tableName, tableSeq, pThreadInfo, superTblInfo,
batchPerTbl, pstr, insertRows, 0, batchPerTbl, pstr, insertRows, 0,
startTime + pThreadInfo->totalInsertRows * superTblInfo->timeStampStep, startTime + sleepTimeTotal +
pThreadInfo->totalInsertRows * superTblInfo->timeStampStep,
&(pThreadInfo->samplePos), &dataLen); &(pThreadInfo->samplePos), &dataLen);
verbosePrint("%s() LN%d numOfRecGenerated= %d\n",
__func__, __LINE__, numOfRecGenerated);
pstr += dataLen; pstr += dataLen;
recGenerated += numOfRecGenerated; recOfBatch += batchPerTbl;
pThreadInfo->totalInsertRows += batchPerTbl;
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl, recOfBatch);
tableSeq ++; tableSeq ++;
if (insertMode == INTERLACE_INSERT_MODE) { if (insertMode == INTERLACE_INSERT_MODE) {
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table // turn to first table
tableSeq = pThreadInfo->start_table_from; tableSeq = pThreadInfo->start_table_from;
generatedRecPerTbl += numOfRecGenerated; generatedRecPerTbl += batchPerTbl;
flagSleep = true;
if (generatedRecPerTbl >= insertRows)
break;
if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR)
break;
} }
} }
int remainRows = insertRows - generatedRecPerTbl; int remainRows = insertRows - generatedRecPerTbl;
if (batchPerTbl > remainRows) if ((remainRows > 0) && (batchPerTbl > remainRows))
batchPerTbl = remainRows; batchPerTbl = remainRows;
if ((g_args.num_of_RPR - recGenerated) < batchPerTbl) verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%d insertRows=%"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__,
generatedRecPerTbl, insertRows);
if ((g_args.num_of_RPR - recOfBatch) < batchPerTbl)
break; break;
} }
pThreadInfo->totalInsertRows += recGenerated;
printf("%s() LN%d recGenerated=%d totalInsertRows=%"PRId64" buffer:\n%s\n",
__func__, __LINE__, recGenerated,
pThreadInfo->totalInsertRows, buffer);
int affectedRows = execInsert(pThreadInfo, buffer, recGenerated); verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRId64"\n",
if (affectedRows < 0) pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
pThreadInfo->totalInsertRows);
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
pThreadInfo->threadID, __func__, __LINE__, buffer);
int affectedRows = execInsert(pThreadInfo, buffer, recOfBatch);
verbosePrint("[%d] %s() LN%d affectedRows=%d\n", pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if ((affectedRows < 0) || (recOfBatch != affectedRows)) {
errorPrint("[%d] %s() LN%d execInsert insert %d, affected rows: %d\n%s\n",
pThreadInfo->threadID, __func__, __LINE__,
recOfBatch, affectedRows, buffer);
goto free_and_statistics_interlace; goto free_and_statistics_interlace;
}
pThreadInfo->totalAffectedRows += affectedRows; pThreadInfo->totalAffectedRows += affectedRows;
@ -4539,14 +4590,15 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
if (insert_interval) { if ((insert_interval) && flagSleep) {
et = taosGetTimestampUs(); et = taosGetTimestampUs();
if (insert_interval > ((et - st)/1000) ) { if (insert_interval > ((et - st)/1000) ) {
int sleep_time = insert_interval - (et -st)/1000; int sleepTime = insert_interval - (et -st)/1000;
verbosePrint("%s() LN%d sleep: %d ms for insert interval\n", // verbosePrint("%s() LN%d sleep: %d ms for insert interval\n",
__func__, __LINE__, sleep_time); // __func__, __LINE__, sleepTime);
taosMsleep(sleep_time); // ms taosMsleep(sleepTime); // ms
sleepTimeTotal += insert_interval;
} }
} }
} }
@ -4570,12 +4622,13 @@ free_and_statistics_interlace:
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
*/ */
static void* syncWriteProgressive(threadInfo *pThreadInfo) { static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1);
if (NULL == buffer) { if (NULL == buffer) {
fprintf(stderr, "Failed to alloc %d Bytes, reason:%s\n", errorPrint( "Failed to alloc %d Bytes, reason:%s\n",
superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len,
strerror(errno)); strerror(errno));
return NULL; return NULL;
@ -4821,7 +4874,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
} else if (0 == strncasecmp(precision, "us", 2)) { } else if (0 == strncasecmp(precision, "us", 2)) {
timePrec = TSDB_TIME_PRECISION_MICRO; timePrec = TSDB_TIME_PRECISION_MICRO;
} else { } else {
fprintf(stderr, "No support precision: %s\n", precision); errorPrint( "No support precision: %s\n", precision);
exit(-1); exit(-1);
} }
} }
@ -4836,8 +4889,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
&start_time, &start_time,
strlen(superTblInfo->startTimestamp), strlen(superTblInfo->startTimestamp),
timePrec, 0)) { timePrec, 0)) {
fprintf(stderr, "ERROR to parse time!\n"); ERROR_EXIT("failed to parse time!\n");
exit(-1);
} }
} }
} else { } else {
@ -4857,57 +4909,26 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample")))) { "sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) { if (0 != prepareSampleDataForSTable(superTblInfo)) {
fprintf(stderr, "prepare sample data for stable failed!\n"); errorPrint("%s() LN%d, prepare sample data for stable failed!\n", __func__, __LINE__);
exit(-1); exit(-1);
} }
} }
if (superTblInfo && (superTblInfo->childTblOffset >= 0)
&& (superTblInfo->childTblLimit > 0)) {
TAOS* taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos) {
fprintf(stderr, "connect to server fail , reason: %s\n",
taos_errstr(NULL));
exit(-1);
}
superTblInfo->childTblName = (char*)calloc(1,
superTblInfo->childTblLimit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) {
fprintf(stderr, "alloc memory failed!");
taos_close(taos);
exit(-1);
}
int childTblCount;
getChildNameOfSuperTableWithLimitAndOffset(
taos,
db_name, superTblInfo->sTblName,
&superTblInfo->childTblName, &childTblCount,
superTblInfo->childTblLimit,
superTblInfo->childTblOffset);
taos_close(taos);
}
// read sample data from file first // read sample data from file first
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample")))) { "sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) { if (0 != prepareSampleDataForSTable(superTblInfo)) {
fprintf(stderr, "prepare sample data for stable failed!\n"); errorPrint("%s() LN%d, prepare sample data for stable failed!\n", __func__, __LINE__);
exit(-1); exit(-1);
} }
} }
TAOS* taos = taos_connect( TAOS* taos = taos_connect(
g_Dbs.host, g_Dbs.user, g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port); g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos) { if (NULL == taos) {
fprintf(stderr, "connect to server fail , reason: %s\n", errorPrint("%s() LN%d, connect to server fail , reason: %s\n",
taos_errstr(NULL)); __func__, __LINE__, taos_errstr(NULL));
exit(-1); exit(-1);
} }
@ -4926,7 +4947,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
superTblInfo->childTblName = (char*)calloc(1, superTblInfo->childTblName = (char*)calloc(1,
limit * TSDB_TABLE_NAME_LEN); limit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) { if (superTblInfo->childTblName == NULL) {
fprintf(stderr, "alloc memory failed!"); errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
taos_close(taos); taos_close(taos);
exit(-1); exit(-1);
} }
@ -4957,7 +4978,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
g_Dbs.host, g_Dbs.user, g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port); g_Dbs.password, db_name, g_Dbs.port);
if (NULL == t_info->taos) { if (NULL == t_info->taos) {
fprintf(stderr, "connect to server fail from insert sub thread, reason: %s\n", errorPrint( "connect to server fail from insert sub thread, reason: %s\n",
taos_errstr(NULL)); taos_errstr(NULL));
exit(-1); exit(-1);
} }
@ -5001,6 +5022,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tsem_destroy(&(t_info->lock_sem)); tsem_destroy(&(t_info->lock_sem));
taos_close(t_info->taos); taos_close(t_info->taos);
debugPrint("%s() LN%d, [%d] totalInsert=%"PRId64" totalAffected=%"PRId64"\n",
__func__, __LINE__,
t_info->threadID, t_info->totalInsertRows,
t_info->totalAffectedRows);
if (superTblInfo) { if (superTblInfo) {
superTblInfo->totalAffectedRows += t_info->totalAffectedRows; superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
superTblInfo->totalInsertRows += t_info->totalInsertRows; superTblInfo->totalInsertRows += t_info->totalInsertRows;
@ -5068,7 +5093,7 @@ void *readTable(void *sarg) {
char *tb_prefix = rinfo->tb_prefix; char *tb_prefix = rinfo->tb_prefix;
FILE *fp = fopen(rinfo->fp, "a"); FILE *fp = fopen(rinfo->fp, "a");
if (NULL == fp) { if (NULL == fp) {
fprintf(stderr, "fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno)); errorPrint( "fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno));
return NULL; return NULL;
} }
@ -5102,7 +5127,7 @@ void *readTable(void *sarg) {
int32_t code = taos_errno(pSql); int32_t code = taos_errno(pSql);
if (code != 0) { if (code != 0) {
fprintf(stderr, "Failed to query:%s\n", taos_errstr(pSql)); errorPrint( "Failed to query:%s\n", taos_errstr(pSql));
taos_free_result(pSql); taos_free_result(pSql);
taos_close(taos); taos_close(taos);
fclose(fp); fclose(fp);
@ -5178,7 +5203,7 @@ void *readMetric(void *sarg) {
int32_t code = taos_errno(pSql); int32_t code = taos_errno(pSql);
if (code != 0) { if (code != 0) {
fprintf(stderr, "Failed to query:%s\n", taos_errstr(pSql)); errorPrint( "Failed to query:%s\n", taos_errstr(pSql));
taos_free_result(pSql); taos_free_result(pSql);
taos_close(taos); taos_close(taos);
fclose(fp); fclose(fp);
@ -5216,7 +5241,7 @@ static int insertTestProcess() {
debugPrint("%d result file: %s\n", __LINE__, g_Dbs.resultFile); debugPrint("%d result file: %s\n", __LINE__, g_Dbs.resultFile);
g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a"); g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a");
if (NULL == g_fpOfInsertResult) { if (NULL == g_fpOfInsertResult) {
fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile); errorPrint( "Failed to open %s for save result\n", g_Dbs.resultFile);
return -1; return -1;
} }
@ -5407,7 +5432,7 @@ static int queryTestProcess() {
NULL, NULL,
g_queryInfo.port); g_queryInfo.port);
if (taos == NULL) { if (taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
exit(-1); exit(-1);
} }
@ -5707,7 +5732,7 @@ static int subscribeTestProcess() {
g_queryInfo.dbName, g_queryInfo.dbName,
g_queryInfo.port); g_queryInfo.port);
if (taos == NULL) { if (taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
exit(-1); exit(-1);
} }
@ -6065,7 +6090,7 @@ static void queryResult() {
g_Dbs.db[0].dbName, g_Dbs.db[0].dbName,
g_Dbs.port); g_Dbs.port);
if (rInfo->taos == NULL) { if (rInfo->taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
free(rInfo); free(rInfo);
exit(-1); exit(-1);
} }

View File

@ -1840,7 +1840,7 @@ static void doFreeQueryHandle(SQueryRuntimeEnv* pRuntimeEnv) {
pRuntimeEnv->pQueryHandle = NULL; pRuntimeEnv->pQueryHandle = NULL;
SMemRef* pMemRef = &pQuery->memRef; SMemRef* pMemRef = &pQuery->memRef;
assert(pMemRef->ref == 0 && pMemRef->imem == NULL && pMemRef->mem == NULL); assert(pMemRef->ref == 0 && pMemRef->snapshot.imem == NULL && pMemRef->snapshot.mem == NULL);
} }
static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {

View File

@ -31,29 +31,14 @@ typedef struct {
SSkipListIterator *pIter; SSkipListIterator *pIter;
} SCommitIter; } SCommitIter;
typedef struct { struct STableData {
uint64_t uid; uint64_t uid;
TSKEY keyFirst; TSKEY keyFirst;
TSKEY keyLast; TSKEY keyLast;
int64_t numOfRows; int64_t numOfRows;
SSkipList* pData; SSkipList* pData;
T_REF_DECLARE() T_REF_DECLARE()
} STableData; };
typedef struct {
T_REF_DECLARE()
SRWLatch latch;
TSKEY keyFirst;
TSKEY keyLast;
int64_t numOfRows;
int32_t maxTables;
STableData** tData;
SList* actList;
SList* extraBuffList;
SList* bufBlockList;
int64_t pointsAdd; // TODO
int64_t storageAdd; // TODO
} SMemTable;
enum { TSDB_UPDATE_META, TSDB_DROP_META }; enum { TSDB_UPDATE_META, TSDB_DROP_META };
@ -77,8 +62,8 @@ typedef struct {
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem, SArray* pATable); int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pATable);
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem); void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot);
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbAsyncCommit(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,

View File

@ -124,88 +124,80 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
return 0; return 0;
} }
int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem, SArray *pATable) { int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemSnapshot *pSnapshot, SArray *pATable) {
SMemTable *tmem; memset(pSnapshot, 0, sizeof(*pSnapshot));
// Get snap object
if (tsdbLockRepo(pRepo) < 0) return -1; if (tsdbLockRepo(pRepo) < 0) return -1;
tmem = pRepo->mem; pSnapshot->omem = pRepo->mem;
*pIMem = pRepo->imem; pSnapshot->imem = pRepo->imem;
tsdbRefMemTable(pRepo, tmem); tsdbRefMemTable(pRepo, pRepo->mem);
tsdbRefMemTable(pRepo, *pIMem); tsdbRefMemTable(pRepo, pRepo->imem);
if (tsdbUnlockRepo(pRepo) < 0) return -1; if (tsdbUnlockRepo(pRepo) < 0) return -1;
// Copy mem objects and ref needed STableData if (pSnapshot->omem) {
if (tmem) { taosRLockLatch(&(pSnapshot->omem->latch));
taosRLockLatch(&(tmem->latch));
*pMem = (SMemTable *)calloc(1, sizeof(**pMem)); pSnapshot->mem = &(pSnapshot->mtable);
if (*pMem == NULL) {
pSnapshot->mem->tData = (STableData **)calloc(pSnapshot->omem->maxTables, sizeof(STableData *));
if (pSnapshot->mem->tData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
taosRUnLockLatch(&(tmem->latch)); taosRUnLockLatch(&(pSnapshot->omem->latch));
tsdbUnRefMemTable(pRepo, tmem); tsdbUnRefMemTable(pRepo, pSnapshot->omem);
tsdbUnRefMemTable(pRepo, *pIMem); tsdbUnRefMemTable(pRepo, pSnapshot->imem);
*pMem = NULL; pSnapshot->mem = NULL;
*pIMem = NULL; pSnapshot->imem = NULL;
pSnapshot->omem = NULL;
return -1; return -1;
} }
(*pMem)->tData = (STableData **)calloc(tmem->maxTables, sizeof(STableData *)); pSnapshot->mem->keyFirst = pSnapshot->omem->keyFirst;
if ((*pMem)->tData == NULL) { pSnapshot->mem->keyLast = pSnapshot->omem->keyLast;
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; pSnapshot->mem->numOfRows = pSnapshot->omem->numOfRows;
taosRUnLockLatch(&(tmem->latch)); pSnapshot->mem->maxTables = pSnapshot->omem->maxTables;
free(*pMem);
tsdbUnRefMemTable(pRepo, tmem);
tsdbUnRefMemTable(pRepo, *pIMem);
*pMem = NULL;
*pIMem = NULL;
return -1;
}
(*pMem)->keyFirst = tmem->keyFirst;
(*pMem)->keyLast = tmem->keyLast;
(*pMem)->numOfRows = tmem->numOfRows;
(*pMem)->maxTables = tmem->maxTables;
for (size_t i = 0; i < taosArrayGetSize(pATable); i++) { for (size_t i = 0; i < taosArrayGetSize(pATable); i++) {
STable * pTable = *(STable **)taosArrayGet(pATable, i); STable * pTable = *(STable **)taosArrayGet(pATable, i);
int32_t tid = TABLE_TID(pTable); int32_t tid = TABLE_TID(pTable);
STableData *pTableData = (tid < tmem->maxTables) ? tmem->tData[tid] : NULL; STableData *pTableData = (tid < pSnapshot->omem->maxTables) ? pSnapshot->omem->tData[tid] : NULL;
if ((pTableData == NULL) || (TABLE_UID(pTable) != pTableData->uid)) continue; if ((pTableData == NULL) || (TABLE_UID(pTable) != pTableData->uid)) continue;
(*pMem)->tData[tid] = tmem->tData[tid]; pSnapshot->mem->tData[tid] = pTableData;
T_REF_INC(tmem->tData[tid]); T_REF_INC(pTableData);
} }
taosRUnLockLatch(&(tmem->latch)); taosRUnLockLatch(&(pSnapshot->omem->latch));
} }
tsdbUnRefMemTable(pRepo, tmem); tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pSnapshot->omem, pSnapshot->imem);
tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem);
return 0; return 0;
} }
void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) { void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemSnapshot *pSnapshot) {
tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem); tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pSnapshot->omem, pSnapshot->imem);
if (pMem != NULL) { if (pSnapshot->mem) {
for (size_t i = 0; i < pMem->maxTables; i++) { ASSERT(pSnapshot->omem != NULL);
STableData *pTableData = pMem->tData[i];
for (size_t i = 0; i < pSnapshot->mem->maxTables; i++) {
STableData *pTableData = pSnapshot->mem->tData[i];
if (pTableData) { if (pTableData) {
tsdbFreeTableData(pTableData); tsdbFreeTableData(pTableData);
} }
} }
free(pMem->tData); tfree(pSnapshot->mem->tData);
free(pMem);
tsdbUnRefMemTable(pRepo, pSnapshot->omem);
} }
if (pIMem != NULL) { tsdbUnRefMemTable(pRepo, pSnapshot->imem);
tsdbUnRefMemTable(pRepo, pIMem);
} pSnapshot->mem = NULL;
pSnapshot->imem = NULL;
pSnapshot->omem = NULL;
} }
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {

View File

@ -194,7 +194,7 @@ static void tsdbMayTakeMemSnapshot(STsdbQueryHandle* pQueryHandle, SArray* psTab
SMemRef* pMemRef = pQueryHandle->pMemRef; SMemRef* pMemRef = pQueryHandle->pMemRef;
if (pQueryHandle->pMemRef->ref++ == 0) { if (pQueryHandle->pMemRef->ref++ == 0) {
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, (SMemTable**)&(pMemRef->mem), (SMemTable**)&(pMemRef->imem), psTable); tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &(pMemRef->snapshot), psTable);
} }
taosArrayDestroy(psTable); taosArrayDestroy(psTable);
@ -208,9 +208,7 @@ static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) {
} }
if (--pMemRef->ref == 0) { if (--pMemRef->ref == 0) {
tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pMemRef->mem, pMemRef->imem); tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, &(pMemRef->snapshot));
pMemRef->mem = NULL;
pMemRef->imem = NULL;
} }
pQueryHandle->pMemRef = NULL; pQueryHandle->pMemRef = NULL;
@ -231,8 +229,8 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle) {
STableData* pMem = NULL; STableData* pMem = NULL;
STableData* pIMem = NULL; STableData* pIMem = NULL;
SMemTable *pMemT = (SMemTable *)(pMemRef->mem); SMemTable* pMemT = pMemRef->snapshot.mem;
SMemTable *pIMemT = (SMemTable *)(pMemRef->imem); SMemTable* pIMemT = pMemRef->snapshot.imem;
if (pMemT && pCheckInfo->tableId.tid < pMemT->maxTables) { if (pMemT && pCheckInfo->tableId.tid < pMemT->maxTables) {
pMem = pMemT->tData[pCheckInfo->tableId.tid]; pMem = pMemT->tData[pCheckInfo->tableId.tid];
@ -605,7 +603,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
int32_t order = pHandle->order; int32_t order = pHandle->order;
// no data in buffer, abort // no data in buffer, abort
if (pHandle->pMemRef->mem == NULL && pHandle->pMemRef->imem == NULL) { if (pHandle->pMemRef->snapshot.mem == NULL && pHandle->pMemRef->snapshot.imem == NULL) {
return false; return false;
} }
@ -614,8 +612,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
STableData* pMem = NULL; STableData* pMem = NULL;
STableData* pIMem = NULL; STableData* pIMem = NULL;
SMemTable* pMemT = pHandle->pMemRef->mem; SMemTable* pMemT = pHandle->pMemRef->snapshot.mem;
SMemTable* pIMemT = pHandle->pMemRef->imem; SMemTable* pIMemT = pHandle->pMemRef->snapshot.imem;
if (pMemT && pCheckInfo->tableId.tid < pMemT->maxTables) { if (pMemT && pCheckInfo->tableId.tid < pMemT->maxTables) {
pMem = pMemT->tData[pCheckInfo->tableId.tid]; pMem = pMemT->tData[pCheckInfo->tableId.tid];

View File

@ -677,13 +677,13 @@ static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch) {
static int tsdbReload(STsdbRepo *pRepo, bool isMfChanged) { static int tsdbReload(STsdbRepo *pRepo, bool isMfChanged) {
// TODO: may need to stop and restart stream // TODO: may need to stop and restart stream
if (isMfChanged) { // if (isMfChanged) {
tsdbCloseMeta(pRepo); tsdbCloseMeta(pRepo);
tsdbFreeMeta(pRepo->tsdbMeta); tsdbFreeMeta(pRepo->tsdbMeta);
pRepo->tsdbMeta = tsdbNewMeta(REPO_CFG(pRepo)); pRepo->tsdbMeta = tsdbNewMeta(REPO_CFG(pRepo));
tsdbOpenMeta(pRepo); tsdbOpenMeta(pRepo);
tsdbLoadMetaCache(pRepo, true); tsdbLoadMetaCache(pRepo, true);
} // }
tsdbUnRefMemTable(pRepo, pRepo->mem); tsdbUnRefMemTable(pRepo, pRepo->mem);
tsdbUnRefMemTable(pRepo, pRepo->imem); tsdbUnRefMemTable(pRepo, pRepo->imem);

View File

@ -147,7 +147,7 @@ static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
vDebug("vgId:%d, tsdbchanged:%d syncchanged:%d while alter vnode", pVnode->vgId, tsdbCfgChanged, syncCfgChanged); vDebug("vgId:%d, tsdbchanged:%d syncchanged:%d while alter vnode", pVnode->vgId, tsdbCfgChanged, syncCfgChanged);
if (tsdbCfgChanged || syncCfgChanged) { if (/*tsdbCfgChanged || */syncCfgChanged) {
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// dbCfgVersion can be corrected by status msg // dbCfgVersion can be corrected by status msg
if (!vnodeSetUpdatingStatus(pVnode)) { if (!vnodeSetUpdatingStatus(pVnode)) {

View File

@ -18,13 +18,15 @@ import argparse
class BuildDockerCluser: class BuildDockerCluser:
def __init__(self, hostName, user, password, configDir, numOfNodes, clusterVersion): def __init__(self, hostName, user, password, configDir, numOfNodes, clusterVersion, dockerDir, removeFlag):
self.hostName = hostName self.hostName = hostName
self.user = user self.user = user
self.password = password self.password = password
self.configDir = configDir self.configDir = configDir
self.numOfNodes = numOfNodes self.numOfNodes = numOfNodes
self.clusterVersion = clusterVersion self.clusterVersion = clusterVersion
self.dockerDir = dockerDir
self.removeFlag = removeFlag
def getConnection(self): def getConnection(self):
self.conn = taos.connect( self.conn = taos.connect(
@ -46,7 +48,10 @@ class BuildDockerCluser:
if self.numOfNodes < 2 or self.numOfNodes > 5: if self.numOfNodes < 2 or self.numOfNodes > 5:
print("the number of nodes must be between 2 and 5") print("the number of nodes must be between 2 and 5")
exit(0) exit(0)
os.system("./buildClusterEnv.sh -n %d -v %s" % (self.numOfNodes, self.clusterVersion)) print("remove Flag value %s" % self.removeFlag)
if self.removeFlag == False:
os.system("./cleanClusterEnv.sh -d %s" % self.dockerDir)
os.system("./buildClusterEnv.sh -n %d -v %s -d %s" % (self.numOfNodes, self.clusterVersion, self.dockerDir))
self.getConnection() self.getConnection()
self.createDondes() self.createDondes()
self.startArbitrator() self.startArbitrator()
@ -91,10 +96,24 @@ parser.add_argument(
'-v', '-v',
'--version', '--version',
action='store', action='store',
default='2.0.14.1', default='2.0.17.1',
type=str, type=str,
help='the version of the cluster to be build, Default is 2.0.14.1') help='the version of the cluster to be build, Default is 2.0.17.1')
parser.add_argument(
'-d',
'--docker-dir',
action='store',
default='/data',
type=str,
help='the data dir for docker, default is /data')
parser.add_argument(
'--flag',
action='store_true',
help='remove docker containers flag, default: True')
args = parser.parse_args() args = parser.parse_args()
cluster = BuildDockerCluser(args.host, args.user, args.password, args.config_dir, args.num_of_nodes, args.version) cluster = BuildDockerCluser(args.host, args.user, args.password, args.config_dir, args.num_of_nodes, args.version, args.docker_dir, args.flag)
cluster.run() cluster.run()
# usage 1: python3 basic.py -n 2 --flag (flag is True)
# usage 2: python3 basic.py -n 2 (flag should be False when it is not specified)

View File

@ -1,18 +1,19 @@
#!/bin/bash #!/bin/bash
echo "Executing buildClusterEnv.sh" echo "Executing buildClusterEnv.sh"
DOCKER_DIR=/data
CURR_DIR=`pwd` CURR_DIR=`pwd`
if [ $# != 4 ]; then if [ $# != 6 ]; then
echo "argument list need input : " echo "argument list need input : "
echo " -n numOfNodes" echo " -n numOfNodes"
echo " -v version" echo " -v version"
echo " -d docker dir"
exit 1 exit 1
fi fi
NUM_OF_NODES= NUM_OF_NODES=
VERSION= VERSION=
while getopts "n:v:" arg DOCKER_DIR=
while getopts "n:v:d:" arg
do do
case $arg in case $arg in
n) n)
@ -21,6 +22,9 @@ do
v) v)
VERSION=$OPTARG VERSION=$OPTARG
;; ;;
d)
DOCKER_DIR=$OPTARG
;;
?) ?)
echo "unkonwn argument" echo "unkonwn argument"
;; ;;
@ -31,29 +35,28 @@ function addTaoscfg {
for i in {1..5} for i in {1..5}
do do
touch /data/node$i/cfg/taos.cfg touch /data/node$i/cfg/taos.cfg
echo 'firstEp tdnode1:6030' > /data/node$i/cfg/taos.cfg echo 'firstEp tdnode1:6030' > $DOCKER_DIR/node$i/cfg/taos.cfg
echo 'fqdn tdnode'$i >> /data/node$i/cfg/taos.cfg echo 'fqdn tdnode'$i >> $DOCKER_DIR/node$i/cfg/taos.cfg
echo 'arbitrator tdnode1:6042' >> /data/node$i/cfg/taos.cfg echo 'arbitrator tdnode1:6042' >> $DOCKER_DIR/node$i/cfg/taos.cfg
done done
} }
function createDIR { function createDIR {
for i in {1..5} for i in {1..5}
do do
mkdir -p /data/node$i/data mkdir -p $DOCKER_DIR/node$i/data
mkdir -p /data/node$i/log mkdir -p $DOCKER_DIR/node$i/log
mkdir -p /data/node$i/cfg mkdir -p $DOCKER_DIR/node$i/cfg
mkdir -p /data/node$i/core mkdir -p $DOCKER_DIR/node$i/core
done done
} }
function cleanEnv { function cleanEnv {
echo "Clean up docker environment"
for i in {1..5} for i in {1..5}
do do
echo /data/node$i/data/* rm -rf $DOCKER_DIR/node$i/data/*
rm -rf /data/node$i/data/* rm -rf $DOCKER_DIR/node$i/log/*
echo /data/node$i/log/*
rm -rf /data/node$i/log/*
done done
} }
@ -98,19 +101,19 @@ function clusterUp {
if [ $NUM_OF_NODES -eq 2 ]; then if [ $NUM_OF_NODES -eq 2 ]; then
echo "create 2 dnodes" echo "create 2 dnodes"
PACKAGE=TDengine-server-$VERSION-Linux-x64.tar.gz DIR=TDengine-server-$VERSION DIR2=TDengine-arbitrator-$VERSION VERSION=$VERSION docker-compose up -d PACKAGE=TDengine-server-$VERSION-Linux-x64.tar.gz TARBITRATORPKG=TDengine-arbitrator-$VERSION-Linux-x64.tar.gz DIR=TDengine-server-$VERSION DIR2=TDengine-arbitrator-$VERSION VERSION=$VERSION docker-compose up -d
fi fi
if [ $NUM_OF_NODES -eq 3 ]; then if [ $NUM_OF_NODES -eq 3 ]; then
PACKAGE=TDengine-server-$VERSION-Linux-x64.tar.gz DIR=TDengine-server-$VERSION DIR2=TDengine-arbitrator-$VERSION VERSION=$VERSION docker-compose -f docker-compose.yml -f node3.yml up -d PACKAGE=TDengine-server-$VERSION-Linux-x64.tar.gz TARBITRATORPKG=TDengine-arbitrator-$VERSION-Linux-x64.tar.gz DIR=TDengine-server-$VERSION DIR2=TDengine-arbitrator-$VERSION VERSION=$VERSION docker-compose -f docker-compose.yml -f node3.yml up -d
fi fi
if [ $NUM_OF_NODES -eq 4 ]; then if [ $NUM_OF_NODES -eq 4 ]; then
PACKAGE=TDengine-server-$VERSION-Linux-x64.tar.gz DIR=TDengine-server-$VERSION DIR2=TDengine-arbitrator-$VERSION VERSION=$VERSION docker-compose -f docker-compose.yml -f node3.yml -f node4.yml up -d PACKAGE=TDengine-server-$VERSION-Linux-x64.tar.gz TARBITRATORPKG=TDengine-arbitrator-$VERSION-Linux-x64.tar.gz DIR=TDengine-server-$VERSION DIR2=TDengine-arbitrator-$VERSION VERSION=$VERSION docker-compose -f docker-compose.yml -f node3.yml -f node4.yml up -d
fi fi
if [ $NUM_OF_NODES -eq 5 ]; then if [ $NUM_OF_NODES -eq 5 ]; then
PACKAGE=TDengine-server-$VERSION-Linux-x64.tar.gz DIR=TDengine-server-$VERSION DIR2=TDengine-arbitrator-$VERSION VERSION=$VERSION docker-compose -f docker-compose.yml -f node3.yml -f node4.yml -f node5.yml up -d PACKAGE=TDengine-server-$VERSION-Linux-x64.tar.gz TARBITRATORPKG=TDengine-arbitrator-$VERSION-Linux-x64.tar.gz DIR=TDengine-server-$VERSION DIR2=TDengine-arbitrator-$VERSION VERSION=$VERSION docker-compose -f docker-compose.yml -f node3.yml -f node4.yml -f node5.yml up -d
fi fi
echo "docker compose finish" echo "docker compose finish"

View File

@ -0,0 +1,39 @@
#!/bin/bash
echo "Executing cleanClusterEnv.sh"
CURR_DIR=`pwd`
if [ $# != 2 ]; then
echo "argument list need input : "
echo " -d docker dir"
exit 1
fi
DOCKER_DIR=
while getopts "d:" arg
do
case $arg in
d)
DOCKER_DIR=$OPTARG
;;
?)
echo "unkonwn argument"
;;
esac
done
function removeDockerContainers {
cd $DOCKER_DIR
docker-compose down --remove-orphans
}
function cleanEnv {
echo "Clean up docker environment"
for i in {1..5}
do
rm -rf $DOCKER_DIR/node$i/data/*
rm -rf $DOCKER_DIR/node$i/log/*
done
}
removeDockerContainers
cleanEnv

View File

@ -275,7 +275,7 @@ python3 ./test.py -f functions/function_twa.py -r 1
python3 ./test.py -f functions/function_twa_test2.py python3 ./test.py -f functions/function_twa_test2.py
python3 ./test.py -f functions/function_stddev_td2555.py python3 ./test.py -f functions/function_stddev_td2555.py
python3 ./test.py -f insert/metadataUpdate.py python3 ./test.py -f insert/metadataUpdate.py
python3 ./test.py -f query/last_cache.py #python3 ./test.py -f query/last_cache.py
python3 ./test.py -f query/last_row_cache.py python3 ./test.py -f query/last_row_cache.py
python3 ./test.py -f account/account_create.py python3 ./test.py -f account/account_create.py
python3 ./test.py -f alter/alter_table.py python3 ./test.py -f alter/alter_table.py

View File

@ -118,7 +118,11 @@ class TDTestCase:
if i == 1 or i == 5 or i == 6 or i == 7 or i == 9 or i == 8 :continue if i == 1 or i == 5 or i == 6 or i == 7 or i == 9 or i == 8 :continue
tdSql.query('select stddev(c%d),stddev(c%d) from s group by c%d' %( i+1 , i+1 , i+1 ) ) tdSql.query('select stddev(c%d),stddev(c%d) from s group by c%d' %( i+1 , i+1 , i+1 ) )
#add for TD-3318
tdSql.execute('create table t1(ts timestamp, k int, b binary(12));')
tdSql.execute("insert into t1 values(now, 1, 'abc');")
tdLog.info("select stddev(k) from t1 where b <> 'abc' interval(1s);")
tdSql.query("select stddev(k) from t1 where b <> 'abc' interval(1s);")
def stop(self): def stop(self):

View File

@ -316,4 +316,13 @@ if $data13 != @20-02-02 01:01:01.000@ then
return -1 return -1
endi endi
print ===============================>td-3361
sql create table ttm1(ts timestamp, k int) tags(a nchar(12));
sql create table ttm1_t1 using ttm1 tags('abcdef')
sql insert into ttm1_t1 values(now, 1)
sql select * from ttm1 where a=123456789012
if $row != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -29,7 +29,7 @@ function dohavecore(){
proc=`echo $corefile|cut -d "_" -f3` proc=`echo $corefile|cut -d "_" -f3`
if [ -n "$corefile" ];then if [ -n "$corefile" ];then
echo 'taosd or taos has generated core' echo 'taosd or taos has generated core'
tar -zcvPf $corepath'taos_'`date "+%Y_%m_%d_%H_%M_%S"`.tar.gz /usr/local/taos/ tar -zcPf $corepath'taos_'`date "+%Y_%m_%d_%H_%M_%S"`.tar.gz /usr/local/taos/
if [[ $1 == 1 ]];then if [[ $1 == 1 ]];then
echo '\n'|gdb /usr/local/taos/bin/$proc $core_file -ex "bt 10" -ex quit echo '\n'|gdb /usr/local/taos/bin/$proc $core_file -ex "bt 10" -ex quit
exit 8 exit 8