Merge branch 'develop' into xiaoping/add_test_case
This commit is contained in:
commit
3c7e9628c3
|
@ -0,0 +1,70 @@
|
||||||
|
pipeline {
|
||||||
|
agent any
|
||||||
|
stages {
|
||||||
|
stage('build TDengine') {
|
||||||
|
steps {
|
||||||
|
sh '''cd ${WORKSPACE}
|
||||||
|
export TZ=Asia/Harbin
|
||||||
|
date
|
||||||
|
rm -rf ${WORKSPACE}/debug
|
||||||
|
mkdir debug
|
||||||
|
cd debug
|
||||||
|
cmake .. > /dev/null
|
||||||
|
make > /dev/null
|
||||||
|
cd ${WORKSPACE}/debug'''
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stage('test_tsim') {
|
||||||
|
parallel {
|
||||||
|
stage('test') {
|
||||||
|
steps {
|
||||||
|
sh '''cd ${WORKSPACE}/tests
|
||||||
|
#./test-all.sh smoke
|
||||||
|
sudo ./test-all.sh full'''
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stage('test_crash_gen') {
|
||||||
|
steps {
|
||||||
|
sh '''cd ${WORKSPACE}/tests/pytest
|
||||||
|
sudo ./crash_gen.sh -a -p -t 4 -s 2000'''
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stage('test_valgrind') {
|
||||||
|
steps {
|
||||||
|
sh '''cd ${WORKSPACE}/tests/pytest
|
||||||
|
sudo ./valgrind-test.sh 2>&1 > mem-error-out.log
|
||||||
|
grep \'start to execute\\|ERROR SUMMARY\' mem-error-out.log|grep -v \'grep\'|uniq|tee uniq-mem-error-out.log
|
||||||
|
|
||||||
|
for memError in `grep \'ERROR SUMMARY\' uniq-mem-error-out.log | awk \'{print $4}\'`
|
||||||
|
do
|
||||||
|
if [ -n "$memError" ]; then
|
||||||
|
if [ "$memError" -gt 12 ]; then
|
||||||
|
echo -e "${RED} ## Memory errors number valgrind reports is $memError.\\
|
||||||
|
More than our threshold! ## ${NC}"
|
||||||
|
travis_terminate $memError
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
grep \'start to execute\\|definitely lost:\' mem-error-out.log|grep -v \'grep\'|uniq|tee uniq-definitely-lost-out.log
|
||||||
|
for defiMemError in `grep \'definitely lost:\' uniq-definitely-lost-out.log | awk \'{print $7}\'`
|
||||||
|
do
|
||||||
|
if [ -n "$defiMemError" ]; then
|
||||||
|
if [ "$defiMemError" -gt 13 ]; then
|
||||||
|
echo -e "${RED} ## Memory errors number valgrind reports \\
|
||||||
|
Definitely lost is $defiMemError. More than our threshold! ## ${NC}"
|
||||||
|
travis_terminate $defiMemError
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
done'''
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -83,12 +83,18 @@ sudo dnf install -y maven
|
||||||
|
|
||||||
## Get the source codes
|
## Get the source codes
|
||||||
|
|
||||||
- github:
|
First of all, you may clone the source codes from github:
|
||||||
```bash
|
```bash
|
||||||
git clone https://github.com/taosdata/TDengine.git
|
git clone https://github.com/taosdata/TDengine.git
|
||||||
cd TDengine
|
cd TDengine
|
||||||
```
|
```
|
||||||
|
|
||||||
|
The connectors for go & grafana have been moved to separated repositories,
|
||||||
|
so you should run this command in the TDengine directory to install them:
|
||||||
|
```bash
|
||||||
|
git submodule update --init --recursive
|
||||||
|
```
|
||||||
|
|
||||||
## Build TDengine
|
## Build TDengine
|
||||||
|
|
||||||
### On Linux platform
|
### On Linux platform
|
||||||
|
|
|
@ -250,10 +250,10 @@ ALTER USER <user_name> PASS <'password'>;
|
||||||
修改用户密码, 为避免被转换为小写,密码需要用单引号引用,单引号为英文半角
|
修改用户密码, 为避免被转换为小写,密码需要用单引号引用,单引号为英文半角
|
||||||
|
|
||||||
```
|
```
|
||||||
ALTER USER <user_name> PRIVILEDGE <'super'|'write'|'read'>;
|
ALTER USER <user_name> PRIVILEDGE <super|write|read>;
|
||||||
```
|
```
|
||||||
|
|
||||||
修改用户权限为:super/write/read。 为避免被转换为小写,密码需要用单引号引用,单引号为英文半角
|
修改用户权限为:super/write/read,不需要添加单引号
|
||||||
|
|
||||||
```
|
```
|
||||||
SHOW USERS;
|
SHOW USERS;
|
||||||
|
|
|
@ -229,7 +229,6 @@ typedef struct SQueryInfo {
|
||||||
// TODO refactor
|
// TODO refactor
|
||||||
STimeWindow window; // query time window
|
STimeWindow window; // query time window
|
||||||
SInterval interval;
|
SInterval interval;
|
||||||
int32_t tz; // query client timezone
|
|
||||||
|
|
||||||
SSqlGroupbyExpr groupbyExpr; // group by tags info
|
SSqlGroupbyExpr groupbyExpr; // group by tags info
|
||||||
SArray * colList; // SArray<SColumn*>
|
SArray * colList; // SArray<SColumn*>
|
||||||
|
|
|
@ -509,7 +509,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) {
|
if (pCmd->insertType == TSDB_QUERY_TYPE_STMT_INSERT) {
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
|
|
|
@ -430,7 +430,7 @@ static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) {
|
||||||
SSqlObj* pSql = builder->pInterSql;
|
SSqlObj* pSql = builder->pInterSql;
|
||||||
|
|
||||||
if (row == NULL) {
|
if (row == NULL) {
|
||||||
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t* lengths = taos_fetch_lengths(pSql);
|
int32_t* lengths = taos_fetch_lengths(pSql);
|
||||||
|
@ -458,7 +458,7 @@ static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0 == strlen(result)) {
|
if (0 == strlen(result)) {
|
||||||
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -554,7 +554,7 @@ int32_t tscRebuildCreateTableStatement(void *param,char *result) {
|
||||||
static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) {
|
static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) {
|
||||||
TAOS_ROW row = tscFetchRow(builder);
|
TAOS_ROW row = tscFetchRow(builder);
|
||||||
if (row == NULL) {
|
if (row == NULL) {
|
||||||
return TSDB_CODE_MND_DB_NOT_SELECTED;
|
return TSDB_CODE_TSC_DB_NOT_SELECTED;
|
||||||
}
|
}
|
||||||
const char *showColumns[] = {"REPLICA", "QUORUM", "DAYS", "KEEP", "BLOCKS", NULL};
|
const char *showColumns[] = {"REPLICA", "QUORUM", "DAYS", "KEEP", "BLOCKS", NULL};
|
||||||
|
|
||||||
|
@ -586,7 +586,7 @@ static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) {
|
||||||
} while (row != NULL);
|
} while (row != NULL);
|
||||||
|
|
||||||
if (0 == strlen(result)) {
|
if (0 == strlen(result)) {
|
||||||
return TSDB_CODE_MND_DB_NOT_SELECTED;
|
return TSDB_CODE_TSC_DB_NOT_SELECTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -362,8 +362,9 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// additional msg has been attached already
|
// additional msg has been attached already
|
||||||
if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
|
code = tscSetTableFullName(pTableMetaInfo, pToken, pSql);
|
||||||
return TSDB_CODE_TSC_INVALID_SQL;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
return tscGetTableMeta(pSql, pTableMetaInfo);
|
return tscGetTableMeta(pSql, pTableMetaInfo);
|
||||||
|
@ -381,14 +382,15 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
|
code = tscSetTableFullName(pTableMetaInfo, pToken, pSql);
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
return tscGetTableMeta(pSql, pTableMetaInfo);
|
return tscGetTableMeta(pSql, pTableMetaInfo);
|
||||||
}
|
}
|
||||||
case TSDB_SQL_SHOW_CREATE_DATABASE: {
|
case TSDB_SQL_SHOW_CREATE_DATABASE: {
|
||||||
const char* msg1 = "invalid database name";
|
const char* msg1 = "invalid database name";
|
||||||
const char* msg2 = "table name is too long";
|
|
||||||
SStrToken* pToken = &pInfo->pDCLInfo->a[0];
|
SStrToken* pToken = &pInfo->pDCLInfo->a[0];
|
||||||
|
|
||||||
if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) {
|
if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -397,10 +399,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
if (pToken->n > TSDB_DB_NAME_LEN) {
|
if (pToken->n > TSDB_DB_NAME_LEN) {
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||||
}
|
}
|
||||||
if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
|
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
return tscSetTableFullName(pTableMetaInfo, pToken, pSql);
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
case TSDB_SQL_CFG_DNODE: {
|
case TSDB_SQL_CFG_DNODE: {
|
||||||
const char* msg2 = "invalid configure options or values, such as resetlog / debugFlag 135 / balance 'vnode:2-dnode:2' / monitor 1 ";
|
const char* msg2 = "invalid configure options or values, such as resetlog / debugFlag 135 / balance 'vnode:2-dnode:2' / monitor 1 ";
|
||||||
|
@ -805,55 +805,44 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
|
||||||
|
|
||||||
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableName, SSqlObj* pSql) {
|
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableName, SSqlObj* pSql) {
|
||||||
const char* msg1 = "name too long";
|
const char* msg1 = "name too long";
|
||||||
const char* msg2 = "current database or database name invalid";
|
|
||||||
|
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
// backup the old name in pTableMetaInfo
|
// backup the old name in pTableMetaInfo
|
||||||
size_t size = strlen(pTableMetaInfo->name);
|
char oldName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
char* oldName = NULL;
|
tstrncpy(oldName, pTableMetaInfo->name, tListLen(oldName));
|
||||||
if (size > 0) {
|
|
||||||
oldName = strdup(pTableMetaInfo->name);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (hasSpecifyDB(pzTableName)) {
|
if (hasSpecifyDB(pzTableName)) { // db has been specified in sql string so we ignore current db path
|
||||||
// db has been specified in sql string so we ignore current db path
|
|
||||||
code = setObjFullName(pTableMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL);
|
code = setObjFullName(pTableMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != 0) {
|
||||||
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||||
}
|
}
|
||||||
} else { // get current DB name first, then set it into path
|
} else { // get current DB name first, and then set it into path
|
||||||
SStrToken t = {0};
|
SStrToken t = {0};
|
||||||
getCurrentDBName(pSql, &t);
|
getCurrentDBName(pSql, &t);
|
||||||
if (t.n == 0) {
|
if (t.n == 0) { // current database not available or not specified
|
||||||
code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
code = TSDB_CODE_TSC_DB_NOT_SELECTED;
|
||||||
} else {
|
} else {
|
||||||
code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL);
|
code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != 0) {
|
||||||
code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosTFree(oldName);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* the old name exists and is not equalled to the new name. Release the metermeta/metricmeta
|
* the old name exists and is not equalled to the new name. Release the table meta
|
||||||
* that are corresponding to the old name for the new table name.
|
* that are corresponding to the old name for the new table name.
|
||||||
*/
|
*/
|
||||||
if (size > 0) {
|
if (strlen(oldName) > 0 && strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) {
|
||||||
if (strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) {
|
tscClearTableMetaInfo(pTableMetaInfo, false);
|
||||||
tscClearTableMetaInfo(pTableMetaInfo, false);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
assert(pTableMetaInfo->pTableMeta == NULL);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTFree(oldName);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4566,6 +4555,8 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
const char* msg18 = "primary timestamp column cannot be dropped";
|
const char* msg18 = "primary timestamp column cannot be dropped";
|
||||||
const char* msg19 = "invalid new tag name";
|
const char* msg19 = "invalid new tag name";
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
SAlterTableSQL* pAlterSQL = pInfo->pAlterInfo;
|
SAlterTableSQL* pAlterSQL = pInfo->pAlterInfo;
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||||
|
@ -4576,13 +4567,14 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscSetTableFullName(pTableMetaInfo, &(pAlterSQL->name), pSql) != TSDB_CODE_SUCCESS) {
|
code = tscSetTableFullName(pTableMetaInfo, &(pAlterSQL->name), pSql);
|
||||||
return TSDB_CODE_TSC_INVALID_SQL;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ret = tscGetTableMeta(pSql, pTableMetaInfo);
|
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return ret;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||||
|
@ -5880,8 +5872,9 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscSetTableFullName(pTableMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) {
|
int32_t code = tscSetTableFullName(pTableMetaInfo, pzTableName, pSql);
|
||||||
return TSDB_CODE_TSC_INVALID_SQL;
|
if(code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!validateTableColumnInfo(pFieldList, pCmd) ||
|
if (!validateTableColumnInfo(pFieldList, pCmd) ||
|
||||||
|
@ -5935,15 +5928,16 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscSetTableFullName(pStableMeterMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
|
int32_t code = tscSetTableFullName(pStableMeterMetaInfo, pToken, pSql);
|
||||||
return TSDB_CODE_TSC_INVALID_SQL;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// get meter meta from mnode
|
// get meter meta from mnode
|
||||||
tstrncpy(pCreateTable->usingInfo.tagdata.name, pStableMeterMetaInfo->name, sizeof(pCreateTable->usingInfo.tagdata.name));
|
tstrncpy(pCreateTable->usingInfo.tagdata.name, pStableMeterMetaInfo->name, sizeof(pCreateTable->usingInfo.tagdata.name));
|
||||||
tVariantList* pList = pInfo->pCreateTableInfo->usingInfo.pTagVals;
|
tVariantList* pList = pInfo->pCreateTableInfo->usingInfo.pTagVals;
|
||||||
|
|
||||||
int32_t code = tscGetTableMeta(pSql, pStableMeterMetaInfo);
|
code = tscGetTableMeta(pSql, pStableMeterMetaInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -6020,7 +6014,6 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
|
||||||
|
|
||||||
int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
||||||
const char* msg1 = "invalid table name";
|
const char* msg1 = "invalid table name";
|
||||||
const char* msg2 = "table name too long";
|
|
||||||
const char* msg3 = "fill only available for interval query";
|
const char* msg3 = "fill only available for interval query";
|
||||||
const char* msg4 = "fill option not supported in stream computing";
|
const char* msg4 = "fill option not supported in stream computing";
|
||||||
const char* msg5 = "sql too long"; // todo ADD support
|
const char* msg5 = "sql too long"; // todo ADD support
|
||||||
|
@ -6052,11 +6045,12 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscSetTableFullName(pTableMetaInfo, &srcToken, pSql) != TSDB_CODE_SUCCESS) {
|
int32_t code = tscSetTableFullName(pTableMetaInfo, &srcToken, pSql);
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = tscGetTableMeta(pSql, pTableMetaInfo);
|
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -6083,8 +6077,9 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the created table[stream] name
|
// set the created table[stream] name
|
||||||
if (tscSetTableFullName(pTableMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) {
|
code = tscSetTableFullName(pTableMetaInfo, pzTableName, pSql);
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQuerySql->selectToken.n > TSDB_MAX_SAVED_SQL_LEN) {
|
if (pQuerySql->selectToken.n > TSDB_MAX_SAVED_SQL_LEN) {
|
||||||
|
@ -6128,7 +6123,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
|
||||||
assert(pQuerySql != NULL && (pQuerySql->from == NULL || pQuerySql->from->nExpr > 0));
|
assert(pQuerySql != NULL && (pQuerySql->from == NULL || pQuerySql->from->nExpr > 0));
|
||||||
|
|
||||||
const char* msg0 = "invalid table name";
|
const char* msg0 = "invalid table name";
|
||||||
//const char* msg1 = "table name too long";
|
|
||||||
const char* msg2 = "point interpolation query needs timestamp";
|
const char* msg2 = "point interpolation query needs timestamp";
|
||||||
const char* msg5 = "fill only available for interval query";
|
const char* msg5 = "fill only available for interval query";
|
||||||
const char* msg6 = "start(end) time of query range required or time range too large";
|
const char* msg6 = "start(end) time of query range required or time range too large";
|
||||||
|
@ -6200,11 +6194,16 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
|
||||||
STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo, i/2);
|
STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo, i/2);
|
||||||
|
|
||||||
SStrToken t = {.type = TSDB_DATA_TYPE_BINARY, .n = pTableItem->nLen, .z = pTableItem->pz};
|
SStrToken t = {.type = TSDB_DATA_TYPE_BINARY, .n = pTableItem->nLen, .z = pTableItem->pz};
|
||||||
if (tscSetTableFullName(pTableMetaInfo1, &t, pSql) != TSDB_CODE_SUCCESS) {
|
code = tscSetTableFullName(pTableMetaInfo1, &t, pSql);
|
||||||
return TSDB_CODE_TSC_INVALID_SQL;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
tVariant* pTableItem1 = &pQuerySql->from->a[i + 1].pVar;
|
tVariant* pTableItem1 = &pQuerySql->from->a[i + 1].pVar;
|
||||||
|
if (pTableItem1->nType != TSDB_DATA_TYPE_BINARY) {
|
||||||
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg11);
|
||||||
|
}
|
||||||
|
|
||||||
SStrToken aliasName = {.z = pTableItem1->pz, .n = pTableItem1->nLen, .type = TK_STRING};
|
SStrToken aliasName = {.z = pTableItem1->pz, .n = pTableItem1->nLen, .type = TK_STRING};
|
||||||
if (tscValidateName(&aliasName) != TSDB_CODE_SUCCESS) {
|
if (tscValidateName(&aliasName) != TSDB_CODE_SUCCESS) {
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg11);
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg11);
|
||||||
|
|
|
@ -53,7 +53,10 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
|
||||||
assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
|
assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
|
||||||
|
|
||||||
SRpcEpSet* pEpSet = &pSql->epSet;
|
SRpcEpSet* pEpSet = &pSql->epSet;
|
||||||
pEpSet->inUse = 0;
|
|
||||||
|
// Issue the query to one of the vnode among a vgroup randomly.
|
||||||
|
// change the inUse property would not affect the isUse attribute of STableMeta
|
||||||
|
pEpSet->inUse = rand() % pVgroupInfo->numOfEps;
|
||||||
|
|
||||||
// apply the FQDN string length check here
|
// apply the FQDN string length check here
|
||||||
bool hasFqdn = false;
|
bool hasFqdn = false;
|
||||||
|
@ -144,12 +147,13 @@ void tscPrintMgmtEp() {
|
||||||
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
||||||
STscObj *pObj = (STscObj *)param;
|
STscObj *pObj = (STscObj *)param;
|
||||||
if (pObj == NULL) return;
|
if (pObj == NULL) return;
|
||||||
|
|
||||||
if (pObj != pObj->signature) {
|
if (pObj != pObj->signature) {
|
||||||
tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
|
tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSqlObj *pSql = pObj->pHb;
|
SSqlObj *pSql = tres;
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
|
@ -170,10 +174,17 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
||||||
if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
|
if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tscDebug("heart beat failed, code:%s", tstrerror(code));
|
tscDebug("heartbeat failed, code:%s", tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
|
if (pObj->pHb != NULL) {
|
||||||
|
int32_t waitingDuring = tsShellActivityTimer * 500;
|
||||||
|
tscDebug("%p start heartbeat in %dms", pSql, waitingDuring);
|
||||||
|
|
||||||
|
taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer);
|
||||||
|
} else {
|
||||||
|
tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscProcessActivityTimer(void *handle, void *tmrId) {
|
void tscProcessActivityTimer(void *handle, void *tmrId) {
|
||||||
|
@ -249,6 +260,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
|
|
||||||
assert(*pSql->self == pSql);
|
assert(*pSql->self == pSql);
|
||||||
|
pSql->pRpcCtx = NULL;
|
||||||
|
|
||||||
if (pObj->signature != pObj) {
|
if (pObj->signature != pObj) {
|
||||||
tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
|
tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
|
||||||
|
@ -258,8 +270,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSql->pRpcCtx = NULL; // clear the rpcCtx
|
|
||||||
|
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||||
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||||
tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
|
tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
|
||||||
|
@ -474,6 +484,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
|
||||||
pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||||
if (pSub->pRpcCtx != NULL) {
|
if (pSub->pRpcCtx != NULL) {
|
||||||
rpcCancelRequest(pSub->pRpcCtx);
|
rpcCancelRequest(pSub->pRpcCtx);
|
||||||
|
pSub->pRpcCtx = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscQueueAsyncRes(pSub); // async res? not other functions?
|
tscQueueAsyncRes(pSub); // async res? not other functions?
|
||||||
|
|
|
@ -698,6 +698,7 @@ void taos_stop_query(TAOS_RES *res) {
|
||||||
tscKillSTableQuery(pSql);
|
tscKillSTableQuery(pSql);
|
||||||
} else {
|
} else {
|
||||||
if (pSql->cmd.command < TSDB_SQL_LOCAL) {
|
if (pSql->cmd.command < TSDB_SQL_LOCAL) {
|
||||||
|
assert(pSql->pRpcCtx != NULL);
|
||||||
rpcCancelRequest(pSql->pRpcCtx);
|
rpcCancelRequest(pSql->pRpcCtx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 06ec30a0f1762e8169bf6b9045c82bcaa52bcdf0
|
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
|
|
@ -464,7 +464,14 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
|
||||||
dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse);
|
dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse);
|
||||||
for (int i = 0; i < pEpSet->numOfEps; ++i) {
|
for (int i = 0; i < pEpSet->numOfEps; ++i) {
|
||||||
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
|
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
|
||||||
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i])
|
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
|
||||||
|
|
||||||
|
if (!mnodeIsRunning()) {
|
||||||
|
if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) {
|
||||||
|
dInfo("mnode index:%d %s:%u should work as master", i, pEpSet->fqdn[i], pEpSet->port[i]);
|
||||||
|
sdbUpdateSync();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tsDMnodeEpSet = *pEpSet;
|
tsDMnodeEpSet = *pEpSet;
|
||||||
|
|
|
@ -64,7 +64,7 @@ extern const int32_t TYPE_BYTES[11];
|
||||||
// TODO: replace and remove code below
|
// TODO: replace and remove code below
|
||||||
#define CHAR_BYTES sizeof(char)
|
#define CHAR_BYTES sizeof(char)
|
||||||
#define SHORT_BYTES sizeof(int16_t)
|
#define SHORT_BYTES sizeof(int16_t)
|
||||||
#define INT_BYTES sizeof(int)
|
#define INT_BYTES sizeof(int32_t)
|
||||||
#define LONG_BYTES sizeof(int64_t)
|
#define LONG_BYTES sizeof(int64_t)
|
||||||
#define FLOAT_BYTES sizeof(float)
|
#define FLOAT_BYTES sizeof(float)
|
||||||
#define DOUBLE_BYTES sizeof(double)
|
#define DOUBLE_BYTES sizeof(double)
|
||||||
|
@ -73,7 +73,7 @@ extern const int32_t TYPE_BYTES[11];
|
||||||
#define TSDB_DATA_BOOL_NULL 0x02
|
#define TSDB_DATA_BOOL_NULL 0x02
|
||||||
#define TSDB_DATA_TINYINT_NULL 0x80
|
#define TSDB_DATA_TINYINT_NULL 0x80
|
||||||
#define TSDB_DATA_SMALLINT_NULL 0x8000
|
#define TSDB_DATA_SMALLINT_NULL 0x8000
|
||||||
#define TSDB_DATA_INT_NULL 0x80000000
|
#define TSDB_DATA_INT_NULL 0x80000000L
|
||||||
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L
|
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L
|
||||||
|
|
||||||
#define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
|
#define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
|
||||||
|
@ -304,7 +304,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
|
||||||
#define TSDB_MIN_VNODES 64
|
#define TSDB_MIN_VNODES 64
|
||||||
#define TSDB_MAX_VNODES 2048
|
#define TSDB_MAX_VNODES 2048
|
||||||
#define TSDB_MIN_VNODES_PER_DB 2
|
#define TSDB_MIN_VNODES_PER_DB 2
|
||||||
#define TSDB_MAX_VNODES_PER_DB 16
|
#define TSDB_MAX_VNODES_PER_DB 64
|
||||||
|
|
||||||
#define TSDB_DNODE_ROLE_ANY 0
|
#define TSDB_DNODE_ROLE_ANY 0
|
||||||
#define TSDB_DNODE_ROLE_MGMT 1
|
#define TSDB_DNODE_ROLE_MGMT 1
|
||||||
|
|
|
@ -98,7 +98,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ACTION_IN_PROGRESS, 0, 0x0212, "Action in
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnected from service")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnected from service")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, 0, 0x0215, "Connection killed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, 0, 0x0215, "Connection killed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, 0, 0x0216, "Syntax errr in SQL")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, 0, 0x0216, "Syntax error in SQL")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, 0, 0x0217, "Database not specified or available")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, 0, 0x0218, "Table does not exist")
|
||||||
|
|
||||||
// mnode
|
// mnode
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed")
|
||||||
|
|
|
@ -63,9 +63,10 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
|
||||||
|
|
||||||
|
|
||||||
typedef struct SInterval {
|
typedef struct SInterval {
|
||||||
char intervalUnit;
|
int32_t tz; // query client timezone
|
||||||
char slidingUnit;
|
char intervalUnit;
|
||||||
char offsetUnit;
|
char slidingUnit;
|
||||||
|
char offsetUnit;
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
int64_t sliding;
|
int64_t sliding;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
|
|
|
@ -481,7 +481,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
|
||||||
start = (int64_t)(mktime(&tm) * TSDB_TICK_PER_SECOND(precision));
|
start = (int64_t)(mktime(&tm) * TSDB_TICK_PER_SECOND(precision));
|
||||||
} else {
|
} else {
|
||||||
int64_t delta = t - pInterval->interval;
|
int64_t delta = t - pInterval->interval;
|
||||||
int32_t factor = delta > 0 ? 1 : -1;
|
int32_t factor = (delta >= 0) ? 1 : -1;
|
||||||
|
|
||||||
start = (delta / pInterval->sliding + factor) * pInterval->sliding;
|
start = (delta / pInterval->sliding + factor) * pInterval->sliding;
|
||||||
|
|
||||||
|
|
|
@ -2225,10 +2225,11 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) {
|
int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
*status = 0;
|
*status = BLK_DATA_NO_NEEDED;
|
||||||
|
|
||||||
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf > 0) {
|
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf > 0) {
|
||||||
*status = BLK_DATA_ALL_NEEDED;
|
*status = BLK_DATA_ALL_NEEDED;
|
||||||
} else { // check if this data block is required to load
|
} else { // check if this data block is required to load
|
||||||
|
@ -2240,12 +2241,26 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*status) != BLK_DATA_ALL_NEEDED) {
|
if ((*status) != BLK_DATA_ALL_NEEDED) {
|
||||||
|
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
|
||||||
|
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
|
||||||
|
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||||
|
bool hasTimeWindow = false;
|
||||||
|
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
||||||
|
|
||||||
|
TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey;
|
||||||
|
|
||||||
|
STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery);
|
||||||
|
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo->tid, &win, masterScan, &hasTimeWindow) !=
|
||||||
|
TSDB_CODE_SUCCESS) {
|
||||||
|
// todo handle error in set result for timewindow
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base;
|
SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base;
|
||||||
|
|
||||||
int32_t functionId = pSqlFunc->functionId;
|
int32_t functionId = pSqlFunc->functionId;
|
||||||
int32_t colId = pSqlFunc->colInfo.colId;
|
int32_t colId = pSqlFunc->colInfo.colId;
|
||||||
|
|
||||||
(*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId);
|
(*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId);
|
||||||
if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
|
if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
|
||||||
break;
|
break;
|
||||||
|
@ -2476,7 +2491,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SArray * pDataBlock = NULL;
|
SArray * pDataBlock = NULL;
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
|
|
||||||
int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status);
|
int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -4667,18 +4682,17 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
||||||
setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo);
|
setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataStatis *pStatis = NULL;
|
|
||||||
SArray * pDataBlock = NULL;
|
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
|
SDataStatis *pStatis = NULL;
|
||||||
|
SArray *pDataBlock = NULL;
|
||||||
|
|
||||||
int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status);
|
int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pQuery->current->windowResInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status == BLK_DATA_DISCARD) {
|
if (status == BLK_DATA_DISCARD) {
|
||||||
pQuery->current->lastKey =
|
pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step : blockInfo.window.skey + step;
|
||||||
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
|
||||||
static void rpcSendReqHead(SRpcConn *pConn);
|
static void rpcSendReqHead(SRpcConn *pConn);
|
||||||
|
|
||||||
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
|
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
|
||||||
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead);
|
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext);
|
||||||
static void rpcProcessConnError(void *param, void *id);
|
static void rpcProcessConnError(void *param, void *id);
|
||||||
static void rpcProcessRetryTimer(void *, void *);
|
static void rpcProcessRetryTimer(void *, void *);
|
||||||
static void rpcProcessIdleTimer(void *param, void *tmrId);
|
static void rpcProcessIdleTimer(void *param, void *tmrId);
|
||||||
|
@ -881,17 +881,32 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
pConn->outType = 0;
|
pConn->outType = 0;
|
||||||
pConn->pReqMsg = NULL;
|
pConn->pReqMsg = NULL;
|
||||||
pConn->reqMsgLen = 0;
|
pConn->reqMsgLen = 0;
|
||||||
|
SRpcReqContext *pContext = pConn->pContext;
|
||||||
|
|
||||||
|
if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
|
||||||
|
if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SRpcEpSet)) {
|
||||||
|
// if EpSet is not included in the msg, treat it as NOT_READY
|
||||||
|
pHead->code = TSDB_CODE_RPC_NOT_READY;
|
||||||
|
} else {
|
||||||
|
pContext->redirect++;
|
||||||
|
if (pContext->redirect > TSDB_MAX_REPLICA) {
|
||||||
|
pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
|
tWarn("%s, too many redirects, quit", pConn->info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) {
|
||||||
int32_t sid;
|
int32_t sid;
|
||||||
SRpcConn *pConn = NULL;
|
SRpcConn *pConn = NULL;
|
||||||
|
|
||||||
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
|
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
|
||||||
|
|
||||||
sid = htonl(pHead->destId);
|
sid = htonl(pHead->destId);
|
||||||
|
*ppContext = NULL;
|
||||||
|
|
||||||
if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) {
|
if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) {
|
||||||
tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
|
tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
|
||||||
|
@ -945,6 +960,11 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl);
|
pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl);
|
||||||
} else {
|
} else {
|
||||||
terrno = rpcProcessRspHead(pConn, pHead);
|
terrno = rpcProcessRspHead(pConn, pHead);
|
||||||
|
if (terrno == 0) {
|
||||||
|
SRpcReqContext *pContext = pConn->pContext;
|
||||||
|
*ppContext = pContext;
|
||||||
|
pConn->pContext = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1009,7 +1029,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
}
|
}
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
pConn = rpcProcessMsgHead(pRpc, pRecv);
|
SRpcReqContext *pContext;
|
||||||
|
pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
|
||||||
|
|
||||||
if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) {
|
if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) {
|
||||||
tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
|
tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
|
||||||
|
@ -1029,7 +1050,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
|
tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
|
||||||
}
|
}
|
||||||
} else { // msg is passed to app only parsing is ok
|
} else { // msg is passed to app only parsing is ok
|
||||||
rpcProcessIncomingMsg(pConn, pHead);
|
rpcProcessIncomingMsg(pConn, pHead, pContext);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1060,7 +1081,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
|
||||||
rpcFreeCont(pContext->pCont);
|
rpcFreeCont(pContext->pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
|
||||||
|
|
||||||
SRpcInfo *pRpc = pConn->pRpc;
|
SRpcInfo *pRpc = pConn->pRpc;
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
@ -1070,9 +1091,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
rpcMsg.pCont = pHead->content;
|
rpcMsg.pCont = pHead->content;
|
||||||
rpcMsg.msgType = pHead->msgType;
|
rpcMsg.msgType = pHead->msgType;
|
||||||
rpcMsg.code = pHead->code;
|
rpcMsg.code = pHead->code;
|
||||||
rpcMsg.ahandle = pConn->ahandle;
|
|
||||||
|
|
||||||
if ( rpcIsReq(pHead->msgType) ) {
|
if ( rpcIsReq(pHead->msgType) ) {
|
||||||
|
rpcMsg.ahandle = pConn->ahandle;
|
||||||
if (rpcMsg.contLen > 0) {
|
if (rpcMsg.contLen > 0) {
|
||||||
rpcMsg.handle = pConn;
|
rpcMsg.handle = pConn;
|
||||||
rpcAddRef(pRpc); // add the refCount for requests
|
rpcAddRef(pRpc); // add the refCount for requests
|
||||||
|
@ -1089,10 +1110,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// it's a response
|
// it's a response
|
||||||
SRpcReqContext *pContext = pConn->pContext;
|
|
||||||
rpcMsg.handle = pContext;
|
rpcMsg.handle = pContext;
|
||||||
pConn->pContext = NULL;
|
rpcMsg.ahandle = pContext->ahandle;
|
||||||
pConn->pReqMsg = NULL;
|
|
||||||
|
|
||||||
// for UDP, port may be changed by server, the port in epSet shall be used for cache
|
// for UDP, port may be changed by server, the port in epSet shall be used for cache
|
||||||
if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
|
if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
|
||||||
|
@ -1101,19 +1120,6 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
rpcCloseConn(pConn);
|
rpcCloseConn(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
|
|
||||||
if (rpcMsg.contLen < sizeof(SRpcEpSet)) {
|
|
||||||
// if EpSet is not included in the msg, treat it as NOT_READY
|
|
||||||
pHead->code = TSDB_CODE_RPC_NOT_READY;
|
|
||||||
} else {
|
|
||||||
pContext->redirect++;
|
|
||||||
if (pContext->redirect > TSDB_MAX_REPLICA) {
|
|
||||||
pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
|
||||||
tWarn("%s, too many redirects, quit", pConn->info);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
|
if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
|
||||||
pContext->numOfTry = 0;
|
pContext->numOfTry = 0;
|
||||||
SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content;
|
SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content;
|
||||||
|
|
|
@ -62,7 +62,7 @@ typedef struct {
|
||||||
char label[TSDB_LABEL_LEN];
|
char label[TSDB_LABEL_LEN];
|
||||||
int numOfThreads;
|
int numOfThreads;
|
||||||
void * shandle;
|
void * shandle;
|
||||||
SThreadObj *pThreadObj;
|
SThreadObj **pThreadObj;
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
} SServerObj;
|
} SServerObj;
|
||||||
|
|
||||||
|
@ -90,7 +90,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
|
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
|
||||||
pServerObj->numOfThreads = numOfThreads;
|
pServerObj->numOfThreads = numOfThreads;
|
||||||
|
|
||||||
pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
|
pServerObj->pThreadObj = (SThreadObj **)calloc(sizeof(SThreadObj *), numOfThreads);
|
||||||
if (pServerObj->pThreadObj == NULL) {
|
if (pServerObj->pThreadObj == NULL) {
|
||||||
tError("TCP:%s no enough memory", label);
|
tError("TCP:%s no enough memory", label);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
@ -104,19 +104,28 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
// initialize parameters in case it may encounter error later
|
// initialize parameters in case it may encounter error later
|
||||||
pThreadObj = pServerObj->pThreadObj;
|
|
||||||
for (int i = 0; i < numOfThreads; ++i) {
|
for (int i = 0; i < numOfThreads; ++i) {
|
||||||
|
pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), 1);
|
||||||
|
if (pThreadObj == NULL) {
|
||||||
|
tError("TCP:%s no enough memory", label);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
for (int j=0; j<i; ++j) free(pServerObj->pThreadObj[j]);
|
||||||
|
free(pServerObj->pThreadObj);
|
||||||
|
free(pServerObj);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pServerObj->pThreadObj[i] = pThreadObj;
|
||||||
pThreadObj->pollFd = -1;
|
pThreadObj->pollFd = -1;
|
||||||
taosResetPthread(&pThreadObj->thread);
|
taosResetPthread(&pThreadObj->thread);
|
||||||
pThreadObj->processData = fp;
|
pThreadObj->processData = fp;
|
||||||
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
|
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
|
||||||
pThreadObj->shandle = shandle;
|
pThreadObj->shandle = shandle;
|
||||||
pThreadObj++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize mutex, thread, fd which may fail
|
// initialize mutex, thread, fd which may fail
|
||||||
pThreadObj = pServerObj->pThreadObj;
|
|
||||||
for (int i = 0; i < numOfThreads; ++i) {
|
for (int i = 0; i < numOfThreads; ++i) {
|
||||||
|
pThreadObj = pServerObj->pThreadObj[i];
|
||||||
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
||||||
|
@ -137,7 +146,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
}
|
}
|
||||||
|
|
||||||
pThreadObj->threadId = i;
|
pThreadObj->threadId = i;
|
||||||
pThreadObj++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
||||||
|
@ -166,6 +174,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
|
||||||
pThreadObj->stop = true;
|
pThreadObj->stop = true;
|
||||||
eventfd_t fd = -1;
|
eventfd_t fd = -1;
|
||||||
|
|
||||||
|
if (taosComparePthread(pThreadObj->thread, pthread_self())) {
|
||||||
|
pthread_detach(pthread_self());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
|
if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
|
||||||
// signal the thread to stop, try graceful method first,
|
// signal the thread to stop, try graceful method first,
|
||||||
// and use pthread_cancel when failed
|
// and use pthread_cancel when failed
|
||||||
|
@ -183,15 +196,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosCheckPthreadValid(pThreadObj->thread)) pthread_join(pThreadObj->thread, NULL);
|
if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
|
||||||
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd);
|
pthread_join(pThreadObj->thread, NULL);
|
||||||
if (fd != -1) taosCloseSocket(fd);
|
|
||||||
|
|
||||||
while (pThreadObj->pHead) {
|
|
||||||
SFdObj *pFdObj = pThreadObj->pHead;
|
|
||||||
pThreadObj->pHead = pFdObj->next;
|
|
||||||
taosFreeFdObj(pFdObj);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (fd != -1) taosCloseSocket(fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosStopTcpServer(void *handle) {
|
void taosStopTcpServer(void *handle) {
|
||||||
|
@ -199,7 +208,14 @@ void taosStopTcpServer(void *handle) {
|
||||||
|
|
||||||
if (pServerObj == NULL) return;
|
if (pServerObj == NULL) return;
|
||||||
if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD);
|
if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD);
|
||||||
if (taosCheckPthreadValid(pServerObj->thread)) pthread_join(pServerObj->thread, NULL);
|
|
||||||
|
if (taosCheckPthreadValid(pServerObj->thread)) {
|
||||||
|
if (taosComparePthread(pServerObj->thread, pthread_self())) {
|
||||||
|
pthread_detach(pthread_self());
|
||||||
|
} else {
|
||||||
|
pthread_join(pServerObj->thread, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tDebug("%s TCP server is stopped", pServerObj->label);
|
tDebug("%s TCP server is stopped", pServerObj->label);
|
||||||
}
|
}
|
||||||
|
@ -210,9 +226,8 @@ void taosCleanUpTcpServer(void *handle) {
|
||||||
if (pServerObj == NULL) return;
|
if (pServerObj == NULL) return;
|
||||||
|
|
||||||
for (int i = 0; i < pServerObj->numOfThreads; ++i) {
|
for (int i = 0; i < pServerObj->numOfThreads; ++i) {
|
||||||
pThreadObj = pServerObj->pThreadObj + i;
|
pThreadObj = pServerObj->pThreadObj[i];
|
||||||
taosStopTcpThread(pThreadObj);
|
taosStopTcpThread(pThreadObj);
|
||||||
pthread_mutex_destroy(&(pThreadObj->mutex));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tDebug("%s TCP server is cleaned up", pServerObj->label);
|
tDebug("%s TCP server is cleaned up", pServerObj->label);
|
||||||
|
@ -249,7 +264,7 @@ static void *taosAcceptTcpConnection(void *arg) {
|
||||||
taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
|
taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
|
||||||
|
|
||||||
// pick up the thread to handle this connection
|
// pick up the thread to handle this connection
|
||||||
pThreadObj = pServerObj->pThreadObj + threadId;
|
pThreadObj = pServerObj->pThreadObj[threadId];
|
||||||
|
|
||||||
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
|
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
|
||||||
if (pFdObj) {
|
if (pFdObj) {
|
||||||
|
@ -327,10 +342,8 @@ void taosCleanUpTcpClient(void *chandle) {
|
||||||
SThreadObj *pThreadObj = chandle;
|
SThreadObj *pThreadObj = chandle;
|
||||||
if (pThreadObj == NULL) return;
|
if (pThreadObj == NULL) return;
|
||||||
|
|
||||||
|
tDebug ("%s TCP client will be cleaned up", pThreadObj->label);
|
||||||
taosStopTcpThread(pThreadObj);
|
taosStopTcpThread(pThreadObj);
|
||||||
tDebug ("%s TCP client is cleaned up", pThreadObj->label);
|
|
||||||
|
|
||||||
taosTFree(pThreadObj);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
|
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
|
||||||
|
@ -365,7 +378,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
|
||||||
|
|
||||||
void taosCloseTcpConnection(void *chandle) {
|
void taosCloseTcpConnection(void *chandle) {
|
||||||
SFdObj *pFdObj = chandle;
|
SFdObj *pFdObj = chandle;
|
||||||
if (pFdObj == NULL) return;
|
if (pFdObj == NULL || pFdObj->signature != pFdObj) return;
|
||||||
|
|
||||||
SThreadObj *pThreadObj = pFdObj->pThreadObj;
|
SThreadObj *pThreadObj = pFdObj->pThreadObj;
|
||||||
tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj);
|
tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj);
|
||||||
|
@ -378,7 +391,7 @@ void taosCloseTcpConnection(void *chandle) {
|
||||||
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
|
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
|
||||||
SFdObj *pFdObj = chandle;
|
SFdObj *pFdObj = chandle;
|
||||||
|
|
||||||
if (chandle == NULL) return -1;
|
if (pFdObj == NULL || pFdObj->signature != pFdObj) return -1;
|
||||||
|
|
||||||
return taosWriteMsg(pFdObj->fd, data, len);
|
return taosWriteMsg(pFdObj->fd, data, len);
|
||||||
}
|
}
|
||||||
|
@ -503,8 +516,22 @@ static void *taosProcessTcpData(void *param) {
|
||||||
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
|
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
|
||||||
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
|
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pThreadObj->stop) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd);
|
||||||
|
|
||||||
|
while (pThreadObj->pHead) {
|
||||||
|
SFdObj *pFdObj = pThreadObj->pHead;
|
||||||
|
pThreadObj->pHead = pFdObj->next;
|
||||||
|
taosFreeFdObj(pFdObj);
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_destroy(&(pThreadObj->mutex));
|
||||||
|
tDebug("%s TCP thread exits ...", pThreadObj->label);
|
||||||
|
taosTFree(pThreadObj);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -219,7 +219,10 @@ static void *taosProcessTcpData(void *param) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pThread->stop) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("%p TCP epoll thread exits", pThread);
|
uDebug("%p TCP epoll thread exits", pThread);
|
||||||
|
@ -321,5 +324,5 @@ static void taosStopPoolThread(SThreadObj *pThread) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_join(thread, NULL);
|
pthread_join(thread, NULL);
|
||||||
taosClose(fd);
|
if (fd >= 0) taosClose(fd);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1266,6 +1266,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
cur->pos >= 0 && cur->pos < pBlock->numOfRows);
|
cur->pos >= 0 && cur->pos < pBlock->numOfRows);
|
||||||
|
|
||||||
TSKEY* tsArray = pCols->cols[0].pData;
|
TSKEY* tsArray = pCols->cols[0].pData;
|
||||||
|
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast);
|
||||||
|
|
||||||
// for search the endPos, so the order needs to reverse
|
// for search the endPos, so the order needs to reverse
|
||||||
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
|
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
|
||||||
|
|
|
@ -26,7 +26,7 @@ extern "C" {
|
||||||
#define COMP_OVERFLOW_BYTES 2
|
#define COMP_OVERFLOW_BYTES 2
|
||||||
#define BITS_PER_BYTE 8
|
#define BITS_PER_BYTE 8
|
||||||
// Masks
|
// Masks
|
||||||
#define INT64MASK(_x) ((1ul << _x) - 1)
|
#define INT64MASK(_x) ((((uint64_t)1) << _x) - 1)
|
||||||
#define INT32MASK(_x) (((uint32_t)1 << _x) - 1)
|
#define INT32MASK(_x) (((uint32_t)1 << _x) - 1)
|
||||||
#define INT8MASK(_x) (((uint8_t)1 << _x) - 1)
|
#define INT8MASK(_x) (((uint8_t)1 << _x) - 1)
|
||||||
// Compression algorithm
|
// Compression algorithm
|
||||||
|
|
|
@ -1516,6 +1516,8 @@ class Task():
|
||||||
if errno in [
|
if errno in [
|
||||||
0x05, # TSDB_CODE_RPC_NOT_READY
|
0x05, # TSDB_CODE_RPC_NOT_READY
|
||||||
# 0x200, # invalid SQL, TODO: re-examine with TD-934
|
# 0x200, # invalid SQL, TODO: re-examine with TD-934
|
||||||
|
0x217, # "db not selected", client side defined error code
|
||||||
|
0x218, # "Table does not exist" client side defined error code
|
||||||
0x360, 0x362,
|
0x360, 0x362,
|
||||||
0x369, # tag already exists
|
0x369, # tag already exists
|
||||||
0x36A, 0x36B, 0x36D,
|
0x36A, 0x36B, 0x36D,
|
||||||
|
|
|
@ -56,13 +56,13 @@ class TDTestCase:
|
||||||
tdDnodes.stop(1)
|
tdDnodes.stop(1)
|
||||||
tdDnodes.setTestCluster(False)
|
tdDnodes.setTestCluster(False)
|
||||||
tdDnodes.setValgrind(False)
|
tdDnodes.setValgrind(False)
|
||||||
tdDnodes.addSimExtraCfg("maxSQLLength", "1048576")
|
|
||||||
tdDnodes.deploy(1)
|
tdDnodes.deploy(1)
|
||||||
tdDnodes.cfg(1, "maxSQLLength", "1048576")
|
|
||||||
tdLog.sleep(20)
|
tdLog.sleep(20)
|
||||||
tdDnodes.start(1)
|
tdDnodes.start(1)
|
||||||
|
tdDnodes.addSimExtraCfg("maxSQLLength", "1048576")
|
||||||
|
|
||||||
|
|
||||||
|
tdSql.close()
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
tdSql.execute("create table tb(ts timestamp, name1 binary(1000), name2 binary(1000), name3 binary(1000))")
|
tdSql.execute("create table tb(ts timestamp, name1 binary(1000), name2 binary(1000), name3 binary(1000))")
|
||||||
|
|
||||||
|
@ -75,14 +75,14 @@ class TDTestCase:
|
||||||
tdSql.query("select * from tb")
|
tdSql.query("select * from tb")
|
||||||
tdSql.checkRows(43)
|
tdSql.checkRows(43)
|
||||||
|
|
||||||
# self.ts += 43
|
self.ts += 43
|
||||||
# for i in range(330):
|
for i in range(330):
|
||||||
# value = self.get_random_string(1000)
|
value = self.get_random_string(1000)
|
||||||
# sql += "(%d, '%s', '%s', '%s')" % (self.ts + i, value, value, value)
|
sql += "(%d, '%s', '%s', '%s')" % (self.ts + i, value, value, value)
|
||||||
# tdSql.execute(sql)
|
tdSql.execute(sql)
|
||||||
|
|
||||||
# tdSql.query("select * from tb")
|
tdSql.query("select * from tb")
|
||||||
# tdSql.checkRows(379)
|
tdSql.checkRows(379)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
|
@ -119,7 +119,7 @@ endi
|
||||||
|
|
||||||
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' used1' 127.0.0.1:7111/rest/sql
|
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' used1' 127.0.0.1:7111/rest/sql
|
||||||
print 17-> $system_content
|
print 17-> $system_content
|
||||||
if $system_content != @{"status":"error","code":534,"desc":"Syntax errr in SQL"}@ then
|
if $system_content != @{"status":"error","code":534,"desc":"Syntax error in SQL"}@ then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
|
@ -213,4 +213,53 @@ if $data01 != 5195.000000000 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print =======================>td-1596
|
||||||
|
sql create table t2(ts timestamp, k int)
|
||||||
|
sql insert into t2 values('2020-1-2 1:1:1', 1);
|
||||||
|
sql insert into t2 values('2020-2-2 1:1:1', 1);
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sql connect
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql use db
|
||||||
|
sql select count(*), first(ts), last(ts) from t2 interval(1d);
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != @20-01-02 00:00:00.000@ then
|
||||||
|
print expect 20-01-02 00:00:00.000, actual: $data00
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data10 != @20-02-02 00:00:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != @20-01-02 01:01:01.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != @20-02-02 01:01:01.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data03 != @20-01-02 01:01:01.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data13 != @20-02-02 01:01:01.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -238,6 +238,13 @@ if $data11 != @19-01-01 09:10:00.000@ then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql create table tb_where_NULL (ts timestamp, c1 float, c2 binary(10))
|
sql create table tb_where_NULL (ts timestamp, c1 float, c2 binary(10))
|
||||||
|
|
||||||
|
print ===================>td-1604
|
||||||
|
sql_error insert into tb_where_NULL values(?, ?, ?)
|
||||||
|
sql_error insert into tb_where_NULL values(now, 1, ?)
|
||||||
|
sql_error insert into tb_where_NULL values(?, 1, '')
|
||||||
|
sql_error insert into tb_where_NULL values(now, ?, '12')
|
||||||
|
|
||||||
sql insert into tb_where_NULL values ('2019-01-01 09:00:00.000', 1, 'val1')
|
sql insert into tb_where_NULL values ('2019-01-01 09:00:00.000', 1, 'val1')
|
||||||
sql insert into tb_where_NULL values ('2019-01-01 09:00:01.000', NULL, NULL)
|
sql insert into tb_where_NULL values ('2019-01-01 09:00:01.000', NULL, NULL)
|
||||||
sql insert into tb_where_NULL values ('2019-01-01 09:00:02.000', 2, 'val2')
|
sql insert into tb_where_NULL values ('2019-01-01 09:00:02.000', 2, 'val2')
|
||||||
|
@ -334,4 +341,5 @@ if $rows != 0 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -303,8 +303,8 @@ cd ../../../debug; make
|
||||||
./test.sh -f unique/mnode/mgmt22.sim
|
./test.sh -f unique/mnode/mgmt22.sim
|
||||||
./test.sh -f unique/mnode/mgmt23.sim
|
./test.sh -f unique/mnode/mgmt23.sim
|
||||||
./test.sh -f unique/mnode/mgmt24.sim
|
./test.sh -f unique/mnode/mgmt24.sim
|
||||||
#./test.sh -f unique/mnode/mgmt25.sim
|
./test.sh -f unique/mnode/mgmt25.sim
|
||||||
#./test.sh -f unique/mnode/mgmt26.sim
|
./test.sh -f unique/mnode/mgmt26.sim
|
||||||
./test.sh -f unique/mnode/mgmt33.sim
|
./test.sh -f unique/mnode/mgmt33.sim
|
||||||
./test.sh -f unique/mnode/mgmt34.sim
|
./test.sh -f unique/mnode/mgmt34.sim
|
||||||
./test.sh -f unique/mnode/mgmtr2.sim
|
./test.sh -f unique/mnode/mgmtr2.sim
|
||||||
|
|
|
@ -65,7 +65,7 @@ endi
|
||||||
|
|
||||||
print ============== step4
|
print ============== step4
|
||||||
sql drop dnode $hostname2
|
sql drop dnode $hostname2
|
||||||
sleep 8000
|
sleep 16000
|
||||||
|
|
||||||
sql show mnodes
|
sql show mnodes
|
||||||
$dnode1Role = $data2_1
|
$dnode1Role = $data2_1
|
||||||
|
|
Loading…
Reference in New Issue