Merge branch 'fix/mainNew' into fix/fixTransferCrash2

This commit is contained in:
yihaoDeng 2023-09-08 19:58:21 +08:00
commit af08289a06
84 changed files with 821 additions and 334 deletions

View File

@ -315,7 +315,9 @@ def pre_test_build_win() {
python.exe -m pip install --upgrade pip
python -m pip uninstall taospy -y
python -m pip install taospy==2.7.10
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
python -m pip uninstall taos-ws-py -y
python -m pip install taos-ws-py==0.2.8
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
'''
return 1
}

5
SECURITY.md Normal file
View File

@ -0,0 +1,5 @@
# Security Policy
## Reporting a Vulnerability
Please submit CVE to https://github.com/taosdata/TDengine/security/advisories.

View File

@ -1,7 +1,7 @@
# cos
ExternalProject_Add(mxml
GIT_REPOSITORY https://github.com/michaelrsweet/mxml.git
GIT_TAG release-2.12
GIT_TAG v2.12
SOURCE_DIR "${TD_CONTRIB_DIR}/mxml"
#BINARY_DIR ""
BUILD_IN_SOURCE TRUE

View File

@ -292,11 +292,11 @@ CONCAT_WS(separator_expr, expr1, expr2 [, expr] ...)
LENGTH(expr)
```
**Description**: The length in bytes of a string
**Description**: The length in bytes
**Return value type**: Bigint
**Applicable data types**: VARCHAR and NCHAR fields or columns
**Applicable data types**: VARCHAR and NCHAR and VARBINARY
**Nested query**: It can be used in both the outer query and inner query in a nested query.

View File

@ -10,7 +10,7 @@ TDengine 是一款开源、高性能、云原生的[时序数据库](https://tde
## 主要产品
TDengine 有三个主要产品TDengine Pro (即 TDengine 企业版TDengine Cloud和 TDengine OSS关于它们的具体定义请参考
TDengine 有三个主要产品TDengine Enterprise (即 TDengine 企业版TDengine Cloud和 TDengine OSS关于它们的具体定义请参考
- [TDengine 企业版](https://www.taosdata.com/tdengine-pro)
- [TDengine 云服务](https://cloud.taosdata.com/?utm_source=menu&utm_medium=webcn)
- [TDengine 开源版](https://www.taosdata.com/tdengine-oss)

View File

@ -1004,7 +1004,7 @@ TaosConsumer consumer = new TaosConsumer<>(config);
- httpConnectTimeout: 创建连接超时参数,单位 ms默认为 5000 ms。仅在 WebSocket 连接下有效。
- messageWaitTimeout: 数据传输超时参数,单位 ms默认为 10000 ms。仅在 WebSocket 连接下有效。
- httpPoolSize: 同一个连接下最大并行请求数。仅在 WebSocket 连接下有效。
其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group)
其他参数请参考:[Consumer 参数列表](../../develop/tmq#创建-consumer-以及consumer-group)
#### 订阅消费数据
@ -1082,7 +1082,7 @@ consumer.unsubscribe();
consumer.close()
```
详情请参考:[数据订阅](../../../develop/tmq)
详情请参考:[数据订阅](../../develop/tmq)
#### 完整示例
@ -1373,7 +1373,7 @@ public static void main(String[] args) throws Exception {
**解决方法** 更换 taos-jdbcdriver 3.0.2+ 版本。
其它问题请参考 [FAQ](../../../train-faq/faq)
其它问题请参考 [FAQ](../../train-faq/faq)
## API 参考

View File

@ -352,7 +352,7 @@ client.put(&sml_data)?
### 数据订阅
TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。
TDengine 通过消息队列 [TMQ](../../taos-sql/tmq/) 启动一个订阅。
#### 创建 Topic
@ -491,7 +491,7 @@ let taos = pool.get()?;
## 常见问题
请参考 [FAQ](../../../train-faq/faq)
请参考 [FAQ](../../train-faq/faq)
## API 参考

View File

@ -292,11 +292,11 @@ CONCAT_WS(separator_expr, expr1, expr2 [, expr] ...)
LENGTH(expr)
```
**功能说明**:以字节计数的字符串长度。
**功能说明**:以字节计数的长度。
**返回结果类型**BIGINT。
**适用数据类型**输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列
**适用数据类型**VARCHAR, NCHAR, VARBINARY
**嵌套子查询支持**:适用于内层查询和外层查询。

View File

@ -13,7 +13,7 @@ taosBenchmark (曾用名 taosdemo ) 是一个用于测试 TDengine 产品性能
taosBenchmark 有两种安装方式:
- 安装 TDengine 官方安装包的同时会自动安装 taosBenchmark, 详情请参考[ TDengine 安装](../../operation/pkg-install)。
- 安装 TDengine 官方安装包的同时会自动安装 taosBenchmark, 详情请参考 [TDengine 安装](../../get-started/)。
- 单独编译 taos-tools 并安装, 详情请参考 [taos-tools](https://github.com/taosdata/taos-tools) 仓库。

View File

@ -16,7 +16,7 @@ taosKeeper 是 TDengine 3.0 版本监控指标的导出工具,通过简单的
taosKeeper 有两种安装方式:
taosKeeper 安装方式:
- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper, 详情请参考[ TDengine 安装](../../operation/pkg-install)。
- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper, 详情请参考[ TDengine 安装](../../get-started/)。
- 单独编译 taosKeeper 并安装,详情请参考 [taosKeeper](https://github.com/taosdata/taoskeeper) 仓库。

View File

@ -23,7 +23,7 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送
1. Linux 操作系统
2. 已安装 Java 8 和 Maven
3. 已安装 Git、curl、vi
4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](../../operation/pkg-install)
4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](../../get-started/)
## 安装 Kafka

View File

@ -75,7 +75,7 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) {
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval);
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);
int32_t taosTimeCountIntervalForFill(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision, int32_t order);
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision);
int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision);

View File

@ -41,16 +41,16 @@ enum {
STREAM_STATUS__PAUSE,
};
enum {
typedef enum ETaskStatus {
TASK_STATUS__NORMAL = 0,
TASK_STATUS__DROPPING,
TASK_STATUS__FAIL,
TASK_STATUS__UNINIT, // not used, an placeholder
TASK_STATUS__STOP,
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
};
} ETaskStatus;
enum {
TASK_SCHED_STATUS__INACTIVE = 1,

View File

@ -33,17 +33,14 @@ adapterName="taosadapter"
benchmarkName="taosBenchmark"
dumpName="taosdump"
demoName="taosdemo"
xname="taosx"
clientName2="taos"
serverName2="${clientName2}d"
configFile2="${clientName2}.cfg"
productName2="TDengine"
emailName2="taosdata.com"
xname2="${clientName2}x"
adapterName2="${clientName2}adapter"
explorerName="${clientName2}-explorer"
benchmarkName2="${clientName2}Benchmark"
demoName2="${clientName2}demo"
dumpName2="${clientName2}dump"
@ -218,8 +215,6 @@ function install_bin() {
${csudo}rm -f ${bin_link_dir}/${demoName2} || :
${csudo}rm -f ${bin_link_dir}/${benchmarkName2} || :
${csudo}rm -f ${bin_link_dir}/${dumpName2} || :
${csudo}rm -f ${bin_link_dir}/${xname2} || :
${csudo}rm -f ${bin_link_dir}/${explorerName} || :
${csudo}rm -f ${bin_link_dir}/set_core || :
${csudo}rm -f ${bin_link_dir}/TDinsight.sh || :
@ -233,8 +228,6 @@ function install_bin() {
[ -x ${install_main_dir}/bin/${benchmarkName2} ] && ${csudo}ln -sf ${install_main_dir}/bin/${benchmarkName2} ${bin_link_dir}/${demoName2} || :
[ -x ${install_main_dir}/bin/${benchmarkName2} ] && ${csudo}ln -sf ${install_main_dir}/bin/${benchmarkName2} ${bin_link_dir}/${benchmarkName2} || :
[ -x ${install_main_dir}/bin/${dumpName2} ] && ${csudo}ln -sf ${install_main_dir}/bin/${dumpName2} ${bin_link_dir}/${dumpName2} || :
[ -x ${install_main_dir}/bin/${xname2} ] && ${csudo}ln -sf ${install_main_dir}/bin/${xname2} ${bin_link_dir}/${xname2} || :
[ -x ${install_main_dir}/bin/${explorerName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${explorerName} ${bin_link_dir}/${explorerName} || :
[ -x ${install_main_dir}/bin/TDinsight.sh ] && ${csudo}ln -sf ${install_main_dir}/bin/TDinsight.sh ${bin_link_dir}/TDinsight.sh || :
if [ "$clientName2" == "${clientName}" ]; then
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} || :
@ -703,26 +696,6 @@ function clean_service_on_systemd() {
# if [ "$verMode" == "cluster" ] && [ "$clientName" != "$clientName2" ]; then
# ${csudo}rm -f ${service_config_dir}/${serverName2}.service
# fi
x_service_config="${service_config_dir}/${xName2}.service"
if [ -e "$x_service_config" ]; then
if systemctl is-active --quiet ${xName2}; then
echo "${productName2} ${xName2} is running, stopping it..."
${csudo}systemctl stop ${xName2} &>/dev/null || echo &>/dev/null
fi
${csudo}systemctl disable ${xName2} &>/dev/null || echo &>/dev/null
${csudo}rm -f ${x_service_config}
fi
explorer_service_config="${service_config_dir}/${explorerName2}.service"
if [ -e "$explorer_service_config" ]; then
if systemctl is-active --quiet ${explorerName2}; then
echo "${productName2} ${explorerName2} is running, stopping it..."
${csudo}systemctl stop ${explorerName2} &>/dev/null || echo &>/dev/null
fi
${csudo}systemctl disable ${explorerName2} &>/dev/null || echo &>/dev/null
${csudo}rm -f ${explorer_service_config}
${csudo}rm -f /etc/${clientName2}/explorer.toml
fi
}
function install_service_on_systemd() {

View File

@ -338,7 +338,7 @@ if [ "$verMode" == "cluster" ]; then
tmp_pwd=`pwd`
cd ${install_dir}/connector
if [ ! -d taos-connector-jdbc ];then
git clone -b 3.2.1 --depth=1 https://github.com/taosdata/taos-connector-jdbc.git ||:
git clone -b main --depth=1 https://github.com/taosdata/taos-connector-jdbc.git ||:
fi
cd taos-connector-jdbc
mvn clean package -Dmaven.test.skip=true

View File

@ -380,8 +380,7 @@ void destroySubRequests(SRequestObj *pRequest) {
pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId);
} else {
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
pTmp->requestId);
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
}
}
@ -398,7 +397,7 @@ void destroySubRequests(SRequestObj *pRequest) {
removeRequest(pTmp->self);
releaseRequest(pTmp->self);
} else {
tscError("0x%" PRIx64 " is not there", tmpRefId);
tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
}
}
@ -492,8 +491,7 @@ void stopAllQueries(SRequestObj *pRequest) {
pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId);
} else {
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
pTmp->requestId);
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
}
}
@ -512,7 +510,7 @@ void stopAllQueries(SRequestObj *pRequest) {
taosStopQueryImpl(pTmp);
releaseRequest(pTmp->self);
} else {
tscError("0x%" PRIx64 " is not there", tmpRefId);
tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
}
}

View File

@ -874,8 +874,13 @@ void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResult
if (TSDB_CODE_SUCCESS == code) {
code = cloneCatalogReq(&pNewWrapper->pCatalogReq, pWrapper->pCatalogReq);
}
doAsyncQueryFromAnalyse(pResultMeta, pNewWrapper, code);
nodesDestroyNode(pRoot);
if (TSDB_CODE_SUCCESS == code) {
doAsyncQueryFromAnalyse(pResultMeta, pNewWrapper, code);
nodesDestroyNode(pRoot);
} else {
handleQueryAnslyseRes(pWrapper, pResultMeta, code);
return;
}
}
void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code) {
@ -1148,8 +1153,7 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId);
} else {
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
pTmp->requestId);
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
}
}
@ -1162,7 +1166,7 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
removeRequest(pTmp->self);
releaseRequest(pTmp->self);
} else {
tscError("0x%" PRIx64 " is not there", tmpRefId);
tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
}
}

View File

@ -692,34 +692,67 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision) + fraction);
}
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision) {
/**
* @brief calc how many windows after filling between skey and ekey
* @notes for asc order
* skey ---> ekey
* ^ ^
* _____!_____.........._____|_____..
* |__1__)
* |__2__)...-->|_ret+1_)
* skey + ret * interval <= ekey
* skey + ret * interval + interval > ekey
* ======> (ekey - skey - interval) / interval < ret <= (ekey - skey) / interval
* For keys from blocks which do not need filling, skey + ret * interval == ekey.
* For keys need filling, skey + ret * interval <= ekey.
* Total num of windows is ret + 1(the last window)
*
* for desc order
* skey <--- ekey
* ^ ^
* _____|____..........______!____...
* |_first_)
* |__1__)
* |_ret_)<--...|__2__)
* skey >= ekey - ret * interval
* skey < ekey - ret * interval + interval
*=======> (ekey - skey) / interval <= ret < (ekey - skey + interval) / interval
* For keys from blocks which do not need filling, skey == ekey - ret * interval.
* For keys need filling, skey >= ekey - ret * interval.
* Total num of windows is ret + 1(the first window)
*/
int32_t taosTimeCountIntervalForFill(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision,
int32_t order) {
if (ekey < skey) {
int64_t tmp = ekey;
ekey = skey;
skey = tmp;
}
int32_t ret;
if (unit != 'n' && unit != 'y') {
return (int32_t)((ekey - skey) / interval);
ret = (int32_t)((ekey - skey) / interval);
if (order == TSDB_ORDER_DESC && ret * interval < (ekey - skey)) ret += 1;
} else {
skey /= (int64_t)(TSDB_TICK_PER_SECOND(precision));
ekey /= (int64_t)(TSDB_TICK_PER_SECOND(precision));
struct tm tm;
time_t t = (time_t)skey;
taosLocalTime(&t, &tm, NULL);
int32_t smon = tm.tm_year * 12 + tm.tm_mon;
t = (time_t)ekey;
taosLocalTime(&t, &tm, NULL);
int32_t emon = tm.tm_year * 12 + tm.tm_mon;
if (unit == 'y') {
interval *= 12;
}
ret = (emon - smon) / (int32_t)interval;
if (order == TSDB_ORDER_DESC && ret * interval < (smon - emon)) ret += 1;
}
skey /= (int64_t)(TSDB_TICK_PER_SECOND(precision));
ekey /= (int64_t)(TSDB_TICK_PER_SECOND(precision));
struct tm tm;
time_t t = (time_t)skey;
taosLocalTime(&t, &tm, NULL);
int32_t smon = tm.tm_year * 12 + tm.tm_mon;
t = (time_t)ekey;
taosLocalTime(&t, &tm, NULL);
int32_t emon = tm.tm_year * 12 + tm.tm_mon;
if (unit == 'y') {
interval *= 12;
}
return (emon - smon) / (int32_t)interval;
return ret + 1;
}
int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) {

View File

@ -439,7 +439,7 @@ static int32_t mndProcessCreateIdxReq(SRpcMsg *pReq) {
pDb = mndAcquireDbByStb(pMnode, createReq.stbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_INVALID_DB;
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto _OVER;
}

View File

@ -259,7 +259,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
if (pDb == NULL) {
if (0 != strcmp(connReq.db, TSDB_INFORMATION_SCHEMA_DB) &&
(0 != strcmp(connReq.db, TSDB_PERFORMANCE_SCHEMA_DB))) {
terrno = TSDB_CODE_MND_INVALID_DB;
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
terrstr());
goto _OVER;
@ -313,7 +313,7 @@ _CONNECT:
sprintf(obj, "%s:%d", ip, pConn->port);
char detail[1000] = {0};
sprintf(detail, "connType:%d, db:%s, pid:%d, startTime:%" PRId64 ", sVer:%s, app:%s",
sprintf(detail, "connType:%d, db:%s, pid:%d, startTime:%" PRId64 ", sVer:%s, app:%s",
connReq.connType, connReq.db, connReq.pid, connReq.startTime, connReq.sVer, connReq.app);
auditRecord(pReq, pMnode->clusterId, "login", connReq.user, obj, detail);

View File

@ -2320,7 +2320,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
pDb = mndAcquireDbByStb(pMnode, alterReq.name);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_INVALID_DB;
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto _OVER;
}
@ -2342,9 +2342,9 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
alterReq.alterType, alterReq.numOfFields, alterReq.ttl);
SName name = {0};
tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB);
tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
auditRecord(pReq, pMnode->clusterId, "alterStb", name.dbname, alterReq.name, detail);
auditRecord(pReq, pMnode->clusterId, "alterStb", name.dbname, name.tname, detail);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -3616,7 +3616,7 @@ static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq) {
pDb = mndAcquireDbByStb(pMnode, tagIdxReq.dbFName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_INVALID_DB;
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto _OVER;
}

View File

@ -868,6 +868,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mndTransDrop(pTrans);
taosThreadMutexLock(&execNodeList.lock);
mDebug("register to stream task node list");
keepStreamTasksInBuf(&streamObj, &execNodeList);
taosThreadMutexUnlock(&execNodeList.lock);
@ -876,8 +877,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
char detail[2000] = {0};
sprintf(detail,
"checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64
", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64
", maxDelay:%" PRId64 ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", maxDelay:%" PRId64
", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark,
createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate,
createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,
@ -1574,8 +1575,8 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
} else if (taskStatus == TASK_STATUS__DROPPING) {
memcpy(varDataVal(status), "dropping", 8);
varDataSetLen(status, 8);
} else if (taskStatus == TASK_STATUS__FAIL) {
memcpy(varDataVal(status), "fail", 4);
} else if (taskStatus == TASK_STATUS__UNINIT) {
memcpy(varDataVal(status), "uninit", 6);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__STOP) {
memcpy(varDataVal(status), "stop", 4);
@ -2016,14 +2017,11 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
const SEp* p = GET_ACTIVE_EP(pCurrent);
for (int32_t i = 0; i < pCurrent->numOfEps; ++i) {
const SEp *p = &(pCurrent->eps[i]);
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
return false;
}
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
return false;
}
return true;
}
@ -2120,6 +2118,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid);
int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo);
if (code != TSDB_CODE_SUCCESS) {
sdbCancelFetch(pSdb, pIter);
return code;
}
}
@ -2223,18 +2222,22 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
code = mndProcessVgroupChange(pMnode, &changeInfo);
// keep the new vnode snapshot
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("create trans successfully, update cached node list");
taosArrayDestroy(execNodeList.pNodeEntryList);
execNodeList.pNodeEntryList = pNodeSnapshot;
execNodeList.ts = ts;
}
} else {
mDebug("no update found in nodeList");
taosArrayDestroy(pNodeSnapshot);
}
taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap);
// keep the new vnode snapshot
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
taosArrayDestroy(execNodeList.pNodeEntryList);
execNodeList.pNodeEntryList = pNodeSnapshot;
execNodeList.ts = ts;
}
mDebug("end to do stream task node change checking");
atomic_store_32(&mndNodeCheckSentinel, 0);
return 0;
@ -2284,7 +2287,6 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p
// todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
int32_t code = TSDB_CODE_SUCCESS;
@ -2309,8 +2311,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
int64_t k[2] = {p->streamId, p->taskId};
int64_t k[2] = {p->streamId, p->taskId};
int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
if (index == NULL) {
continue;

View File

@ -65,7 +65,7 @@ TEST_F(MndTestProfile, 01_ConnectMsg) {
connId = connectRsp.connId;
}
TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) {
TEST_F(MndTestProfile, 02_ConnectMsg_NotExistDB) {
char passwd[] = "taosdata";
char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t*)passwd, strlen(passwd), secretEncrypt);
@ -73,7 +73,7 @@ TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) {
SConnectReq connectReq = {0};
connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_profile");
strcpy(connectReq.db, "invalid_db");
strcpy(connectReq.db, "not_exist_db");
strcpy(connectReq.user, "root");
strcpy(connectReq.passwd, secretEncrypt);
strcpy(connectReq.sVer, version);
@ -84,7 +84,7 @@ TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_DB);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_DB_NOT_EXIST);
ASSERT_EQ(pRsp->contLen, 0);
}

View File

@ -448,7 +448,7 @@ TEST_F(MndTestStb, 02_Alter_Stb_AddTag) {
{
void* pReq = BuildAlterStbAddTagReq("1.d3.stb", "tag4", &contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_DB);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_DB_NOT_EXIST);
}
{
@ -665,7 +665,7 @@ TEST_F(MndTestStb, 06_Alter_Stb_AddColumn) {
{
void* pReq = BuildAlterStbAddColumnReq("1.d7.stb", "tag4", &contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_DB);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_DB_NOT_EXIST);
}
{

View File

@ -165,6 +165,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);
int32_t tqCheckAndRunStreamTask(STQ* pTq);
int32_t tqStartStreamTasks(STQ* pTq);
int32_t tqStopStreamTasks(STQ* pTq);
// tq util

View File

@ -52,7 +52,9 @@ int metaFinishCommit(SMeta *pMeta, TXN *txn) { return tdbPostCommit(pMeta->pEnv
int metaPrepareAsyncCommit(SMeta *pMeta) {
// return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn);
int code = 0;
metaWLock(pMeta);
code = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
metaULock(pMeta);
code = tdbCommit(pMeta->pEnv, pMeta->txn);
return code;

View File

@ -931,21 +931,16 @@ end:
}
int metaTtlFindExpired(SMeta *pMeta, int64_t timePointMs, SArray *tbUids, int32_t ttlDropMaxCount) {
metaWLock(pMeta);
int ret = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
if (ret != 0) {
metaError("ttl failed to flush, ret:%d", ret);
goto _err;
}
metaRLock(pMeta);
int ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids, ttlDropMaxCount);
metaULock(pMeta);
ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids, ttlDropMaxCount);
if (ret != 0) {
metaError("ttl failed to find expired table, ret:%d", ret);
goto _err;
}
_err:
metaULock(pMeta);
return ret;
}

View File

@ -299,7 +299,7 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
ret = 0;
_out:
metaDebug("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
metaTrace("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
return ret;
@ -323,7 +323,7 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
ret = 0;
_out:
metaDebug("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
metaTrace("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
return ret;
}
@ -363,17 +363,37 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
ret = 0;
_out:
metaDebug("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
metaTrace("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
pUpdCtimeCtx->changeTimeMs);
return ret;
}
int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) {
int ret = -1;
STtlIdxKeyV1 ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX};
STtlExpiredCtx expiredCtx = {
.ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids};
return tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry);
ret = tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry);
if (ret) {
goto _out;
}
size_t vIdx = 0;
for (size_t i = 0; i < pTbUids->size; i++) {
tb_uid_t *pUid = taosArrayGet(pTbUids, i);
if (taosHashGet(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t)) == NULL) {
// not in dirty && expired in tdb => must be expired
taosArraySet(pTbUids, vIdx, pUid);
vIdx++;
}
}
taosArrayPopTailBatch(pTbUids, pTbUids->size - vIdx);
_out:
return ret;
}
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) {

View File

@ -1416,7 +1416,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
}
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) {
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
// no lock needs to secure the access of the version
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
// discard all the data when the stream task is suspended.
@ -1700,20 +1700,47 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamSetStatusNormal(pTask);
SStreamTask** ppHTask = NULL;
if (pTask->historyTaskId.taskId != 0) {
keys[0] = pTask->historyTaskId.streamId;
keys[1] = pTask->historyTaskId.taskId;
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppHTask == NULL || *ppHTask == NULL) {
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
pMeta->vgId, req.taskId);
} else {
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
}
}
{
streamSetStatusNormal(pTask);
streamMetaSaveTask(pMeta, pTask);
if (ppHTask != NULL) {
streamMetaSaveTask(pMeta, *ppHTask);
}
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
}
streamTaskStop(pTask);
if (ppHTask != NULL) {
streamTaskStop(*ppHTask);
}
tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr);
pMeta->closedTask += 1;
if (ppHTask != NULL) {
pMeta->closedTask += 1;
}
// possibly only handle the stream task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
bool allStopped = (pMeta->closedTask == numOfTasks);
if (allStopped) {
@ -1752,6 +1779,7 @@ _end:
taosWUnLockLatch(&pMeta->lock);
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
vInfo("vgId:%d, restart all stream tasks", vgId);
tqStartStreamTasks(pTq);
tqCheckAndRunStreamTaskAsync(pTq);
}
}

View File

@ -224,6 +224,35 @@ int32_t tqStopStreamTasks(STQ* pTq) {
return 0;
}
int32_t tqStartStreamTasks(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
taosWLockLatch(&pMeta->lock);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key));
int8_t status = (*pTask)->status.taskStatus;
if (status == TASK_STATUS__STOP) {
streamSetStatusNormal(*pTask);
}
}
taosWUnLockLatch(&pMeta->lock);
return 0;
}
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
// seek the stored version and extract data from WAL
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);

View File

@ -248,10 +248,14 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
tDecoderClear(&decoder);
// tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn)
int64_t key[2] = {task.streamId, task.taskId};
taosWLockLatch(&pTq->pStreamMeta->lock);
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) {
taosWUnLockLatch(&pTq->pStreamMeta->lock);
return -1;
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {
// do nothing
}

View File

@ -980,9 +980,7 @@ static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const TS
writer->ctx->brinBlkArray = NULL;
writer->ctx->tbHasOldData = false;
goto _exit;
}
for (; writer->ctx->brinBlkArrayIdx < TARRAY2_SIZE(writer->ctx->brinBlkArray); writer->ctx->brinBlkArrayIdx++) {
} else {
const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx);
if (brinBlk->minTbid.uid != writer->ctx->tbid->uid) {
@ -995,7 +993,6 @@ static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const TS
writer->ctx->brinBlockIdx = 0;
writer->ctx->brinBlkArrayIdx++;
break;
}
}
@ -1110,9 +1107,7 @@ static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TA
if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) {
writer->ctx->brinBlkArray = NULL;
break;
}
for (; writer->ctx->brinBlkArrayIdx < TARRAY2_SIZE(writer->ctx->brinBlkArray); writer->ctx->brinBlkArrayIdx++) {
} else {
const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx);
code = tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock);
@ -1120,7 +1115,6 @@ static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TA
writer->ctx->brinBlockIdx = 0;
writer->ctx->brinBlkArrayIdx++;
break;
}
}
@ -1251,9 +1245,7 @@ static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STom
if (writer->ctx->tombBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->tombBlkArray)) {
writer->ctx->hasOldTomb = false;
break;
}
for (; writer->ctx->tombBlkArrayIdx < TARRAY2_SIZE(writer->ctx->tombBlkArray); ++writer->ctx->tombBlkArrayIdx) {
} else {
const STombBlk *tombBlk = TARRAY2_GET_PTR(writer->ctx->tombBlkArray, writer->ctx->tombBlkArrayIdx);
code = tsdbDataFileReadTombBlock(writer->ctx->reader, tombBlk, writer->ctx->tombBlock);
@ -1261,7 +1253,6 @@ static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STom
writer->ctx->tombBlockIdx = 0;
writer->ctx->tombBlkArrayIdx++;
break;
}
}

View File

@ -174,11 +174,17 @@ int32_t save_fs(const TFileSetArray *arr, const char *fname) {
// fset
cJSON *ajson = cJSON_AddArrayToObject(json, "fset");
if (!ajson) TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
if (!ajson) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
const STFileSet *fset;
TARRAY2_FOREACH(arr, fset) {
cJSON *item = cJSON_CreateObject();
if (!item) TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
if (!item) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
cJSON_AddItemToArray(ajson, item);
code = tsdbTFileSetToJson(fset, item);
@ -231,7 +237,8 @@ static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) {
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
code = TSDB_CODE_FILE_CORRUPTED;
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
@ -312,7 +319,8 @@ static int32_t commit_edit(STFileSystem *fs) {
int32_t code;
int32_t lino;
if ((code = taosRenameFile(current_t, current))) {
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit);
code = TAOS_SYSTEM_ERROR(code);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = apply_commit(fs);
@ -345,7 +353,8 @@ static int32_t abort_edit(STFileSystem *fs) {
int32_t code;
int32_t lino;
if ((code = taosRemoveFile(fname))) {
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit);
code = TAOS_SYSTEM_ERROR(code);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = apply_abort(fs);
@ -398,7 +407,7 @@ static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname)
STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry));
if (entry == NULL) return TSDB_CODE_OUT_OF_MEMORY;
strcpy(entry->fname, fname);
strncpy(entry->fname, fname, TSDB_FILENAME_LEN);
uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
@ -873,7 +882,7 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
STFileSet *fset1;
fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
if (fsetArr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
if (fsetArr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
TARRAY2_INIT(fsetArr[0]);

View File

@ -46,7 +46,8 @@ static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl
return code;
}
TARRAY2_APPEND(lvl[0]->fobjArr, fobj);
code = TARRAY2_APPEND(lvl[0]->fobjArr, fobj);
if (code) return code;
}
return 0;
}
@ -185,7 +186,8 @@ static int32_t tsdbJsonToSttLvl(STsdb *pTsdb, const cJSON *json, SSttLvl **lvl)
return code;
}
TARRAY2_APPEND(lvl[0]->fobjArr, fobj);
code = TARRAY2_APPEND(lvl[0]->fobjArr, fobj);
if (code) return code;
}
return 0;
}
@ -263,7 +265,8 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset) {
return code;
}
TARRAY2_APPEND((*fset)->lvlArr, lvl);
code = TARRAY2_APPEND((*fset)->lvlArr, lvl);
if (code) return code;
}
} else {
return TSDB_CODE_FILE_CORRUPTED;
@ -326,11 +329,12 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
STFileObj tfobj = {.f[0] = {.cid = op->of.cid}}, *tfobjp = &tfobj;
STFileObj **fobjPtr = TARRAY2_SEARCH(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ);
tfobjp = (fobjPtr ? *fobjPtr : NULL);
ASSERT(tfobjp);
tfobjp->f[0] = op->nf;
if (fobjPtr) {
tfobjp = *fobjPtr;
tfobjp->f[0] = op->nf;
} else {
tsdbError("file not found, cid:%" PRId64, op->of.cid);
}
} else {
fset->farr[op->nf.type]->f[0] = op->nf;
}

View File

@ -42,8 +42,12 @@ static const struct {
};
void remove_file(const char *fname) {
taosRemoveFile(fname);
tsdbInfo("file:%s is removed", fname);
int32_t code = taosRemoveFile(fname);
if (code) {
tsdbError("file:%s remove failed", fname);
} else {
tsdbInfo("file:%s is removed", fname);
}
}
static int32_t tfile_to_json(const STFile *file, cJSON *json) {

View File

@ -356,7 +356,8 @@ static int32_t tsdbSttIterOpen(STsdbIter *iter) {
}
iter->sttData->sttBlkArrayIdx = 0;
tBlockDataCreate(iter->sttData->blockData);
code = tBlockDataCreate(iter->sttData->blockData);
if (code) return code;
iter->sttData->blockDataIdx = 0;
return tsdbSttIterNext(iter, NULL);
@ -381,7 +382,8 @@ static int32_t tsdbDataIterOpen(STsdbIter *iter) {
iter->dataData->brinBlockIdx = 0;
// SBlockData
tBlockDataCreate(iter->dataData->blockData);
code = tBlockDataCreate(iter->dataData->blockData);
if (code) return code;
iter->dataData->blockDataIdx = 0;
return tsdbDataIterNext(iter, NULL);

View File

@ -857,6 +857,9 @@ static int32_t tsdbSnapWriteTombRecord(STsdbSnapWriter* writer, const STombRecor
} else {
break;
}
code = tsdbIterMergerNext(writer->ctx->tombIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (record->suid == INT64_MAX) {

View File

@ -160,7 +160,7 @@ static int32_t vnodePreProcessDropTtlMsg(SVnode *pVnode, SRpcMsg *pMsg) {
}
{ // find expired uids
tbUids = taosArrayInit(8, sizeof(int64_t));
tbUids = taosArrayInit(8, sizeof(tb_uid_t));
if (tbUids == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
@ -945,7 +945,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
int32_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
char detail[1000] = {0};
sprintf(detail, "btime:%" PRId64 ", flags:%d, ttl:%d, type:%d",
sprintf(detail, "btime:%" PRId64 ", flags:%d, ttl:%d, type:%d",
pCreateReq->btime, pCreateReq->flags, pCreateReq->ttl, pCreateReq->type);
SName name = {0};

View File

@ -560,6 +560,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId);
} else {
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
tqStartStreamTasks(pVnode->pTq);
tqCheckAndRunStreamTaskAsync(pVnode->pTq);
}
} else {

View File

@ -760,12 +760,14 @@ int32_t ctgGetCachedStbNameFromSuid(SCatalog* pCtg, char* dbFName, uint64_t suid
char *stb = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
if (NULL == stb) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
ctgReleaseDBCache(pCtg, dbCache);
return TSDB_CODE_SUCCESS;
}
*stbName = taosStrdup(stb);
taosHashRelease(dbCache->stbCache, stb);
ctgReleaseDBCache(pCtg, dbCache);
return TSDB_CODE_SUCCESS;
}

View File

@ -308,10 +308,11 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbName, ch
if (retentions) {
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions);
taosMemoryFree(retentions);
}
}
taosMemoryFree(retentions);
(varDataLen(buf2)) = len;
colDataSetVal(pCol2, 0, buf2, false);

View File

@ -446,7 +446,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
taosThreadMutexInit(&inserter->mutex, NULL);
if (NULL == inserter->pDataBlocks) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
inserter->fullOrderColList = pInserterNode->pCols->length == inserter->pSchema->numOfCols;

View File

@ -151,14 +151,21 @@ static void updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo* pStbJoin)
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
freeOperatorParam(pChild, OP_GET_PARAM);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pChild) {
(*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
if (NULL == *ppRes) {
if (NULL == (*ppRes)->pChildren) {
freeOperatorParam(pChild, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
freeOperatorParam(pChild, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
@ -167,6 +174,8 @@ static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t down
SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
if (NULL == pGc) {
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -193,6 +202,7 @@ static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_
SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
if (NULL == pGc) {
freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -248,6 +258,7 @@ static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t d
SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
if (NULL == pExc) {
taosMemoryFreeClear(*ppRes);
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -255,6 +266,7 @@ static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t d
pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pExc->pBatchs) {
taosMemoryFree(pExc);
taosMemoryFreeClear(*ppRes);
return TSDB_CODE_OUT_OF_MEMORY;
}
tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
@ -288,21 +300,36 @@ static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t d
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam* pChild0, SOperatorParam* pChild1) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
freeOperatorParam(pChild0, OP_GET_PARAM);
freeOperatorParam(pChild1, OP_GET_PARAM);
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
if (NULL == *ppRes) {
freeOperatorParam(pChild0, OP_GET_PARAM);
freeOperatorParam(pChild1, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
freeOperatorParam(pChild0, OP_GET_PARAM);
freeOperatorParam(pChild1, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
freeOperatorParam(pChild1, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
if (NULL == pJoin) {
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -318,16 +345,28 @@ static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initPara
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
if (NULL == *ppRes) {
taosMemoryFreeClear(*ppRes);
freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -420,13 +459,34 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
pSrcParam0 = NULL;
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
pSrcParam1 = NULL;
}
if (TSDB_CODE_SUCCESS == code) {
code = buildMergeJoinOperatorParam(ppParam, pSrcParam0 ? true : false, pGcParam0, pGcParam1);
}
if (TSDB_CODE_SUCCESS != code) {
if (pSrcParam0) {
freeOperatorParam(pSrcParam0, OP_GET_PARAM);
}
if (pSrcParam1) {
freeOperatorParam(pSrcParam1, OP_GET_PARAM);
}
if (pGcParam0) {
freeOperatorParam(pGcParam0, OP_GET_PARAM);
}
if (pGcParam1) {
freeOperatorParam(pGcParam1, OP_GET_PARAM);
}
if (*ppParam) {
freeOperatorParam(*ppParam, OP_GET_PARAM);
*ppParam = NULL;
}
}
return code;
}
@ -488,7 +548,7 @@ static void handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCt
if (pPost->leftNeedCache) {
uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
if (--(*num) <= 0) {
if (num && --(*num) <= 0) {
tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
notifySeqJoinTableCacheEnd(pOperator, pPost, true);
}

View File

@ -277,7 +277,7 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
SFilterColumnParam param2 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(pInfo->pEndCondInfo, &param2);
if (code != TSDB_CODE_SUCCESS) {
return code;
goto _return;
}
int32_t status2 = 0;
@ -331,10 +331,12 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
}
}
_return:
colDataDestroy(ps);
taosMemoryFree(ps);
colDataDestroy(pe);
taosMemoryFree(pe);
return TSDB_CODE_SUCCESS;
return code;
}

View File

@ -223,6 +223,9 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, S
SGroupCacheFileInfo newFile = {0};
taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile));
pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
if (NULL == pTmp) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
if (pTmp->deleted) {
@ -287,7 +290,7 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
if (deleted) {
qTrace("FileId:%d-%d-%d already be deleted, skip write",
pCtx->id, pGroup->vgId, pHead->basic.fileId);
pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId);
int64_t blkId = pHead->basic.blkId;
pHead = pHead->next;
@ -337,7 +340,9 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr
return TSDB_CODE_OUT_OF_MEMORY;
}
pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId));
if (NULL == pBufInfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
SGcBlkBufInfo* pWriteHead = NULL;
@ -378,6 +383,10 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr
static FORCE_INLINE void chkRemoveVgroupCurrFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId) {
SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pFileCtx->fileId, sizeof(pFileCtx->fileId));
if (NULL == pFileInfo) {
return;
}
if (0 == pFileInfo->groupNum) {
removeGroupCacheFile(pFileInfo);
@ -711,6 +720,9 @@ static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int
newFile.groupNum = 1;
taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile));
pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
if (NULL == pTmp) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
pTmp->groupNum++;
}
@ -786,6 +798,9 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
}
*ppGrp = taosHashGet(pGrpHash, &uid, sizeof(uid));
if (NULL == *ppGrp) {
return TSDB_CODE_OUT_OF_MEMORY;
}
initNewGroupData(pCtx, *ppGrp, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache);
qError("new group %" PRIu64 " initialized, downstreamIdx:%d, vgId:%d, needCache:%d", uid, pParam->downstreamIdx, vgId, pGcParam->needCache);

View File

@ -636,12 +636,14 @@ static int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, S
int32_t code = getValBufFromPages(pJoin->pRowBufs, getHJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow);
if (code) {
taosMemoryFree(pRow);
return code;
}
if (NULL == pGroup) {
pRow->next = NULL;
if (tSimpleHashPut(pJoin->pKeyHash, pTable->keyData, keyLen, &group, sizeof(group))) {
taosMemoryFree(pRow);
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {

View File

@ -711,6 +711,11 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
}
}
}
if (NULL == pJoinInfo->pLeft || NULL == pJoinInfo->pRight) {
setMergeJoinDone(pOperator);
return false;
}
// only the timestamp match support for ordinary table
SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);

View File

@ -2883,7 +2883,7 @@ static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) {
}
static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI, STagScanInfo* pInfo) {
static int32_t tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI, STagScanInfo* pInfo) {
int32_t code = 0;
int32_t numOfTables = taosArrayGetSize(aUidTags);
@ -2894,9 +2894,15 @@ static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aF
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
SScalarParam output = {0};
tagScanCreateResultData(&type, numOfTables, &output);
code = tagScanCreateResultData(&type, numOfTables, &output);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
scalarCalculate(pTagCond, pBlockList, &output);
code = scalarCalculate(pTagCond, pBlockList, &output);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
bool* result = (bool*)output.columnData->pData;
for (int32_t i = 0 ; i < numOfTables; ++i) {
@ -2911,7 +2917,7 @@ static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aF
blockDataDestroy(pResBlock);
taosArrayDestroy(pBlockList);
return TSDB_CODE_SUCCESS;
}
static void tagScanFillOneCellWithTag(SOperatorInfo* pOperator, const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) {
@ -3024,7 +3030,11 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
bool ignoreFilterIdx = true;
if (pInfo->pTagCond != NULL) {
ignoreFilterIdx = false;
tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI, pInfo);
int32_t code = tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI, pInfo);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
} else {
ignoreFilterIdx = true;
}

View File

@ -669,8 +669,13 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
p->info.id.groupId = tupleGroupId;
pInfo->groupId = tupleGroupId;
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
if (p->info.rows == 0) {
appendOneRowToDataBlock(p, pTupleHandle);
p->info.id.groupId = pInfo->groupId = tupleGroupId;
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
}
}
} else {
appendOneRowToDataBlock(p, pTupleHandle);
@ -715,14 +720,9 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
// if limit is reached within a group, do not clear limiInfo otherwise the next block
// will be processed.
if (newgroup && limitReached) {
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
if (p->info.rows > 0 || limitReached) {
if (p->info.rows > 0) {
break;
}
}

View File

@ -578,9 +578,8 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
int64_t* tsList = (int64_t*)pCol->pData;
TSKEY lastKey = tsList[pFillInfo->numOfRows - 1];
numOfRes = taosTimeCountInterval(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding,
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision);
numOfRes += 1;
numOfRes = taosTimeCountIntervalForFill(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding,
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision, pFillInfo->order);
ASSERT(numOfRes >= numOfRows);
} else { // reach the end of data
if ((ekey1 < pFillInfo->currentKey && FILL_IS_ASC_FILL(pFillInfo)) ||
@ -588,9 +587,8 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
return 0;
}
numOfRes = taosTimeCountInterval(ekey1, pFillInfo->currentKey, pFillInfo->interval.sliding,
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision);
numOfRes += 1;
numOfRes = taosTimeCountIntervalForFill(ekey1, pFillInfo->currentKey, pFillInfo->interval.sliding,
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision, pFillInfo->order);
}
return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;

View File

@ -904,6 +904,7 @@ static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdx
}
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) {
int32_t code = TSDB_CODE_SUCCESS;
int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz);
blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
@ -930,7 +931,13 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
SMultiwayMergeTreeInfo* pTree = NULL;
tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
return code;
}
int32_t nRows = 0;
int32_t nMergedRows = 0;
bool mergeLimitReached = false;
@ -1054,7 +1061,14 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
tSimpleHashClear(mUidBlk);
int64_t p = taosGetTimestampUs();
sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
if (code != TSDB_CODE_SUCCESS) {
tSimpleHashCleanup(mUidBlk);
taosArrayDestroy(aBlkSort);
taosArrayDestroy(aExtSrc);
return code;
}
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;

View File

@ -1786,7 +1786,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
}
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (!IS_SIGNED_NUMERIC_TYPE(colType) && !IS_FLOAT_TYPE(colType) && TSDB_DATA_TYPE_BOOL != colType &&
if (!IS_INTEGER_TYPE(colType) && !IS_FLOAT_TYPE(colType) && TSDB_DATA_TYPE_BOOL != colType &&
!IS_TIMESTAMP_TYPE(colType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
@ -1815,6 +1815,8 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
uint8_t resType;
if (IS_SIGNED_NUMERIC_TYPE(colType) || IS_TIMESTAMP_TYPE(colType) || TSDB_DATA_TYPE_BOOL == colType) {
resType = TSDB_DATA_TYPE_BIGINT;
} else if (IS_UNSIGNED_NUMERIC_TYPE(colType)) {
resType = TSDB_DATA_TYPE_UBIGINT;
} else {
resType = TSDB_DATA_TYPE_DOUBLE;
}
@ -1839,10 +1841,6 @@ static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (TSDB_DATA_TYPE_VARBINARY == ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS;
}

View File

@ -2714,16 +2714,20 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
case TSDB_DATA_TYPE_BOOL:
pDiffInfo->prev.i64 = *(bool*)pv ? 1 : 0;
break;
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT:
pDiffInfo->prev.i64 = *(int8_t*)pv;
break;
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT:
pDiffInfo->prev.i64 = *(int32_t*)pv;
break;
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT:
pDiffInfo->prev.i64 = *(int16_t*)pv;
break;
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT:
pDiffInfo->prev.i64 = *(int64_t*)pv;
break;
@ -2745,6 +2749,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
int64_t ts) {
pDiffInfo->prevTs = ts;
switch (type) {
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: {
int32_t v = *(int32_t*)pv;
int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null
@ -2758,6 +2763,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
break;
}
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT: {
int8_t v = *(int8_t*)pv;
int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null
@ -2769,6 +2775,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
pDiffInfo->prev.i64 = v;
break;
}
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT: {
int16_t v = *(int16_t*)pv;
int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null
@ -2781,6 +2788,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT: {
int64_t v = *(int64_t*)pv;
int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null

View File

@ -69,7 +69,7 @@ const char *udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const
void udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
strncpy(processFuncName, udfName, sizeof(processFuncName));
snprintf(processFuncName, sizeof(processFuncName), "%s", udfName);
uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc));
char startFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
@ -103,7 +103,7 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
if (udf->funcType == UDF_FUNC_TYPE_SCALAR) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
strncpy(processFuncName, udfName, sizeof(processFuncName));
snprintf(processFuncName, sizeof(processFuncName), "%s", udfName);
uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc));
} else if (udf->funcType == UDF_FUNC_TYPE_AGG) {
udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName);

View File

@ -2902,7 +2902,7 @@ static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) {
taosCreateMD5Hash(buf, len);
strncpy(pFunc->node.aliasName, buf, TSDB_COL_NAME_LEN - 1);
len = snprintf(buf, sizeof(buf) - 1, "%s(%s)", pSrcFunc->functionName, pCol->colName);
taosCreateMD5Hash(buf, len);
// note: userAlias could be truncated here
strncpy(pFunc->node.userAlias, buf, TSDB_COL_NAME_LEN - 1);
}
} else {
@ -2910,7 +2910,7 @@ static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) {
taosCreateMD5Hash(buf, len);
strncpy(pFunc->node.aliasName, buf, TSDB_COL_NAME_LEN - 1);
len = snprintf(buf, sizeof(buf) - 1, "%s(%s)", pSrcFunc->functionName, pExpr->userAlias);
taosCreateMD5Hash(buf, len);
// note: userAlias could be truncated here
strncpy(pFunc->node.userAlias, buf, TSDB_COL_NAME_LEN - 1);
}

View File

@ -46,8 +46,8 @@ static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol, bool isPartit
pCol->colType = COLUMN_TYPE_TBNAME;
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 0);
if (pVal) {
strcpy(pCol->tableName, pVal->literal);
strcpy(pCol->tableAlias, pVal->literal);
snprintf(pCol->tableName, sizeof(pCol->tableName), "%s", pVal->literal);
snprintf(pCol->tableAlias, sizeof(pCol->tableAlias), "%s", pVal->literal);
}
break;
case FUNCTION_TYPE_WSTART:
@ -531,6 +531,9 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(pJoin->node.pChildren, (SNode*)pLeft);
}
if (TSDB_CODE_SUCCESS != code) {
pLeft = NULL;
}
}
SLogicNode* pRight = NULL;
@ -584,7 +587,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
}
}
if (NULL == pJoin->node.pTargets) {
if (NULL == pJoin->node.pTargets && NULL != pLeft) {
pJoin->node.pTargets = nodesCloneList(pLeft->pTargets);
if (NULL == pJoin->node.pTargets) {
code = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -391,8 +391,8 @@ static void doRetryDispatchData(void* param, void* tmrId) {
SStreamTask* pTask = param;
if (streamTaskShouldStop(&pTask->status)) {
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
return;
}
@ -409,17 +409,22 @@ static void doRetryDispatchData(void* param, void* tmrId) {
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
}
} else {
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
}
} else {
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref);
}
}
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
qError("s-task:%s dispatch data in %" PRId64 "ms", pTask->id.idStr, waitDuration);
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
qWarn("s-task:%s dispatch data in %" PRId64 "ms, in timer", pTask->id.idStr, waitDuration);
if (pTask->launchTaskTimer != NULL) {
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
} else {
pTask->launchTaskTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer);
}
}
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
@ -540,8 +545,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
}
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms",
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS);
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
break;
}
@ -995,8 +1002,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data",
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data, ref:%d",
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, ref);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else { // pipeline send data in output queue
// this message has been sent successfully, let's try next one.

View File

@ -666,6 +666,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (p == NULL) {
// pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader
// In this case, we try not to start fill-history task anymore.
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) {
doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask);

View File

@ -235,7 +235,13 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
streamTaskLaunchScanHistory(pTask);
} else {
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
if (pTask->info.fillHistory == 1) {
qDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id);
pTask->status.taskStatus = TASK_STATUS__DROPPING;
ASSERT(pTask->historyTaskId.taskId == 0);
} else {
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
}
}
// when current stream task is ready, check the related fill history task.
@ -579,19 +585,17 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
// todo fix the bug: 2. race condition
// an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
int32_t tId = pTask->historyTaskId.taskId;
if (tId == 0) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->historyTaskId.taskId;
if (hTaskId == 0) {
return TSDB_CODE_SUCCESS;
}
ASSERT(pTask->status.downstreamReady == 1);
qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
pTask->historyTaskId.streamId, tId);
pTask->historyTaskId.streamId, hTaskId);
SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->historyTaskId.taskId;
int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId};
int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId};
// Set the execute conditions, including the query time window and the version range
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
@ -610,11 +614,12 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
// todo failed to create timer
taosMemoryFree(pInfo);
} else {
atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
ASSERT(ref == 1);
qDebug("s-task:%s set timer active flag", pTask->id.idStr);
}
} else { // timer exists
ASSERT(pTask->status.timerActive > 0);
ASSERT(pTask->status.timerActive == 1);
qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
}
@ -918,6 +923,13 @@ void streamTaskHalt(SStreamTask* pTask) {
return;
}
// wait for checkpoint completed
while(pTask->status.taskStatus == TASK_STATUS__CK) {
qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr,
streamGetTaskStatusStr(TASK_STATUS__CK));
taosMsleep(1000);
}
// upgrade to halt status
if (status == TASK_STATUS__PAUSE) {
qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),

View File

@ -2562,7 +2562,11 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
}
SSyncLogReplMgr oldLogReplMgrs[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
SSyncLogReplMgr* oldLogReplMgrs = NULL;
int64_t length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
oldLogReplMgrs = taosMemoryMalloc(length);
if (NULL == oldLogReplMgrs) return -1;
memset(oldLogReplMgrs, 0, length);
for(int i = 0; i < oldtotalReplicaNum; i++){
oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
@ -2643,6 +2647,8 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
}
}
taosMemoryFree(oldLogReplMgrs);
return 0;
}

View File

@ -2200,10 +2200,15 @@ int tdbBtcDelete(SBTC *pBtc) {
tdbOsFree(pCell);
if (pPage->nOverflow > 0) {
tdbDebug("tdb/btc-delete: btree balance after update cell, pPage/nOverflow: %p/%d.", pPage,
pPage->nOverflow);
tdbDebug("tdb/btc-delete: btree balance after update cell, pPage/nOverflow/pgno: %p/%d/%" PRIu32 ".", pPage,
pPage->nOverflow, TDB_PAGE_PGNO(pPage));
pBtc->iPage = iPage;
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn);
while (--pBtc->iPage != iPage) {
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pgStack[pBtc->iPage], pBtc->pTxn);
}
// pBtc->iPage = iPage;
pBtc->pPage = pPage;
ret = tdbBtreeBalance(pBtc);
if (ret < 0) {

View File

@ -295,7 +295,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
}
// 1. pPage == NULL
// 2. pPage && pPage->isLocal == 0 && !TDB_TXN_IS_WRITE(pTxn)
// 2. pPage && !pPage->isLocal == 0 && !TDB_TXN_IS_WRITE(pTxn)
pPageH = pPage;
pPage = NULL;

View File

@ -86,7 +86,7 @@ pip3 install kafka-python
python3 kafka_example_consumer.py
# 21
pip3 install taos-ws-py
pip3 install taos-ws-py==0.2.6
python3 conn_websocket_pandas.py
# 22

View File

@ -59,6 +59,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py

View File

@ -81,6 +81,11 @@ pip3 list|grep taospy
pip3 uninstall taospy -y
pip3 install --default-timeout=120 taospy==2.7.10
#define taos-ws-py 0.2.8
pip3 list|grep taos-ws-py
pip3 uninstall taos-ws-py -y
pip3 install --default-timeout=120 taos-ws-py==0.2.8
$TIMEOUT_CMD $cmd
RET=$?
echo "cmd exit code: $RET"

View File

@ -668,7 +668,7 @@ class TDDnodes:
self.testCluster = False
self.valgrind = 0
self.asan = False
self.killValgrind = 1
self.killValgrind = 0
def init(self, path, remoteIP = ""):
binPath = self.dnodes[0].getPath() + "/../../../"
@ -775,9 +775,41 @@ class TDDnodes:
tdLog.info("execute finished")
return
def killProcesser(self, processerName):
if platform.system().lower() == 'windows':
killCmd = ("wmic process where name=\"%s.exe\" call terminate > NUL 2>&1" % processerName)
psCmd = ("wmic process where name=\"%s.exe\" | findstr \"%s.exe\"" % (processerName, processerName))
else:
killCmd = (
"ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs kill -TERM > /dev/null 2>&1"
% processerName
)
psCmd = ("ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % processerName)
processID = ""
try:
processID = subprocess.check_output(psCmd, shell=True)
while processID:
os.system(killCmd)
time.sleep(1)
try:
processID = subprocess.check_output(psCmd, shell=True)
except Exception as err:
processID = ""
tdLog.debug('**** kill pid warn: {err}')
except Exception as err:
processID = ""
tdLog.debug(f'**** find pid warn: {err}')
def stopAll(self):
tdLog.info("stop all dnodes, asan:%d" % self.asan)
distro_id = distro.id()
if platform.system().lower() != 'windows':
distro_id = distro.id()
else:
distro_id = "not alpine"
if self.asan and distro_id != "alpine":
tdLog.info("execute script: %s" % self.stopDnodesPath)
os.system(self.stopDnodesPath)
@ -792,7 +824,6 @@ class TDDnodes:
if (distro_id == "alpine"):
print(distro_id)
psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
while(processID):
@ -803,36 +834,9 @@ class TDDnodes:
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8").strip()
elif platform.system().lower() == 'windows':
psCmd = "for /f %a in ('wmic process where \"name='taosd.exe'\" get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
while(processID):
print(f"pid of taosd.exe:{processID}")
killCmd = "kill -9 %s > nul 2>&1" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8").strip()
psCmd = "for /f %a in ('wmic process where \"name='tmq_sim.exe'\" get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
while(processID):
print(f"pid of tmq_sim.exe:{processID}")
killCmd = "kill -9 %s > nul 2>&1" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8").strip()
psCmd = "for /f %a in ('wmic process where \"name='taosBenchmark.exe'\" get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
while(processID):
print(f"pid of taosBenchmark.exe:{processID}")
killCmd = "kill -9 %s > nul 2>&1" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8").strip()
self.killProcesser("taosd")
self.killProcesser("tmq_sim")
self.killProcesser("taosBenchmark")
else:
psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
@ -849,7 +853,6 @@ class TDDnodes:
time.sleep(1)
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8").strip()
if self.killValgrind == 1:
psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}' | xargs"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()

View File

@ -0,0 +1,20 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database test
sql use test
sql CREATE TABLE `tb` (`ts` TIMESTAMP, `c0` INT, `c1` FLOAT, `c2` BINARY(10))
sql insert into tb values("2022-05-15 00:01:08.000", 1, 1.0, "abc")
sql insert into tb values("2022-05-16 00:01:08.000", 2, 2.0, "bcd")
sql insert into tb values("2022-05-17 00:01:08.000", 3, 3.0, "cde")
#sleep 10000000
system taos -P7100 -s 'source tsim/query/t/multires_func.sql' | grep -v 'Query OK' | grep -v 'Client Version' > /tmp/multires_func.result
system echo ----------------------diff start-----------------------
system git diff --exit-code --color tsim/query/r/multires_func.result /tmp/multires_func.result
system echo ----------------------diff succeed-----------------------

View File

@ -0,0 +1,31 @@
Copyright (c) 2022 by TDengine, all rights reserved.
taos> source tsim/query/t/multires_func.sql
taos> use test;
Database changed.
taos> select count(*) from tb\G;
*************************** 1.row ***************************
count(*): 3
taos> select last(*) from tb\G;
*************************** 1.row ***************************
ts: 2022-05-17 00:01:08.000
c0: 3
c1: 3.0000000
c2: cde
taos> select last_row(*) from tb\G;
*************************** 1.row ***************************
ts: 2022-05-17 00:01:08.000
c0: 3
c1: 3.0000000
c2: cde
taos> select first(*) from tb\G;
*************************** 1.row ***************************
ts: 2022-05-15 00:01:08.000
c0: 1
c1: 1.0000000
c2: abc

View File

@ -0,0 +1,5 @@
use test;
select count(*) from tb\G;
select last(*) from tb\G;
select last_row(*) from tb\G;
select first(*) from tb\G;

View File

@ -198,7 +198,7 @@ class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.clock_gettime(time.CLOCK_REALTIME)
seed = time.time() % 10000
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")

View File

@ -210,7 +210,7 @@ class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.clock_gettime(time.CLOCK_REALTIME)
seed = time.time() % 10000
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")

View File

@ -0,0 +1,37 @@
import time
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
class TDTestCase:
updatecfgDict = {'ttlUnit': 1, "ttlPushInterval": 1, "ttlChangeOnWrite": 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
self.ttl = 5
self.dbname = "test"
def check_ttl_result(self):
tdSql.execute(f'create database {self.dbname}')
tdSql.execute(f'create table {self.dbname}.t1(ts timestamp, c1 int)')
tdSql.execute(f'create table {self.dbname}.t2(ts timestamp, c1 int) ttl {self.ttl}')
tdSql.query(f'show {self.dbname}.tables')
tdSql.checkRows(2)
tdSql.execute(f'flush database {self.dbname}')
time.sleep(self.ttl + 2)
tdSql.query(f'show {self.dbname}.tables')
tdSql.checkRows(1)
def run(self):
self.check_ttl_result()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,63 @@
import time
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
class TDTestCase:
updatecfgDict = {'ttlUnit': 1, "ttlPushInterval": 3, "ttlChangeOnWrite": 1, "trimVDbIntervalSec": 360,
"ttlFlushThreshold": 100, "ttlBatchDropNum": 10}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
self.ttl = 5
self.tables = 100
self.dbname = "test"
def check_batch_drop_num(self):
tdSql.execute(f'create database {self.dbname} vgroups 1')
tdSql.execute(f'use {self.dbname}')
tdSql.execute(f'create table stb(ts timestamp, c1 int) tags(t1 int)')
for i in range(self.tables):
tdSql.execute(f'create table t{i} using stb tags({i}) ttl {self.ttl}')
tdSql.execute(f'flush database {self.dbname}')
time.sleep(self.ttl + self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query('show tables')
tdSql.checkRows(90)
def check_ttl_result(self):
tdSql.execute(f'drop database if exists {self.dbname}')
tdSql.execute(f'create database {self.dbname}')
tdSql.execute(f'create table {self.dbname}.t1(ts timestamp, c1 int)')
tdSql.execute(f'create table {self.dbname}.t2(ts timestamp, c1 int) ttl {self.ttl}')
tdSql.query(f'show {self.dbname}.tables')
tdSql.checkRows(2)
tdSql.execute(f'flush database {self.dbname}')
time.sleep(self.ttl - 1)
tdSql.execute(f'insert into {self.dbname}.t2 values(now, 1)');
tdSql.execute(f'flush database {self.dbname}')
time.sleep(self.ttl - 1)
tdSql.query(f'show {self.dbname}.tables')
tdSql.checkRows(2)
tdSql.execute(f'flush database {self.dbname}')
time.sleep(self.ttl * 2)
tdSql.query(f'show {self.dbname}.tables')
tdSql.checkRows(1)
def run(self):
self.check_batch_drop_num()
self.check_ttl_result()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -220,7 +220,7 @@ class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.clock_gettime(time.CLOCK_REALTIME)
seed = time.time() % 10000
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")

View File

@ -35,6 +35,7 @@ class TDTestCase:
tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.ttl_param}')
tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum)
tdSql.execute(f'flush database db')
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables')
tdSql.checkRows(0)
@ -42,6 +43,7 @@ class TDTestCase:
tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.default_ttl}')
for i in range(int(self.tbnum/2)):
tdSql.execute(f'alter table db.{self.ntbname}_{i} ttl {self.modify_ttl}')
tdSql.execute(f'flush database db')
sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum - int(self.tbnum/2))
@ -54,6 +56,7 @@ class TDTestCase:
tdSql.execute(f'create table db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param}')
tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum)
tdSql.execute(f'flush database db')
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables')
tdSql.checkRows(0)
@ -63,6 +66,7 @@ class TDTestCase:
tdSql.checkRows(self.tbnum)
for i in range(int(self.tbnum/2)):
tdSql.execute(f'alter table db.{self.stbname}_{i} ttl {self.modify_ttl}')
tdSql.execute(f'flush database db')
sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum - int(self.tbnum/2))
@ -75,6 +79,7 @@ class TDTestCase:
tdSql.execute(f'insert into db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param} values(now,1)')
tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum)
tdSql.execute(f'flush database db')
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables')
tdSql.checkRows(0)

View File

@ -16,6 +16,9 @@ class TDTestCase:
self.perfix = 'dev'
self.tables = 10
def check_result(self):
for i in range(self.rowNum):
tdSql.checkData(i, 0, 1);
def run(self):
tdSql.prepare()
@ -179,11 +182,6 @@ class TDTestCase:
tdSql.error(f"select diff(col8) from {dbname}.stb_1")
tdSql.error(f"select diff(col9) from {dbname}.stb")
tdSql.error(f"select diff(col9) from {dbname}.stb_1")
tdSql.error(f"select diff(col11) from {dbname}.stb_1")
tdSql.error(f"select diff(col12) from {dbname}.stb_1")
tdSql.error(f"select diff(col13) from {dbname}.stb_1")
tdSql.error(f"select diff(col14) from {dbname}.stb_1")
tdSql.error(f"select diff(col14) from {dbname}.stb_1")
tdSql.error(f"select diff(col1,col1,col1) from {dbname}.stb_1")
tdSql.error(f"select diff(col1,1,col1) from {dbname}.stb_1")
tdSql.error(f"select diff(col1,col1,col) from {dbname}.stb_1")
@ -217,6 +215,22 @@ class TDTestCase:
tdSql.query(f"select diff(col6) from {dbname}.stb_1")
tdSql.checkRows(10)
tdSql.query(f"select diff(col11) from {dbname}.stb_1")
tdSql.checkRows(10)
self.check_result()
tdSql.query(f"select diff(col12) from {dbname}.stb_1")
tdSql.checkRows(10)
self.check_result()
tdSql.query(f"select diff(col13) from {dbname}.stb_1")
tdSql.checkRows(10)
self.check_result()
tdSql.query(f"select diff(col14) from {dbname}.stb_1")
tdSql.checkRows(10)
self.check_result()
tdSql.execute(f'''create table {dbname}.stb1(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''')
tdSql.execute(f"create table {dbname}.stb1_1 using {dbname}.stb tags('shanghai')")

View File

@ -137,6 +137,12 @@ class TDTestCase:
sql = "select _wstart, _wend, count(ts), sum(c1) from meters where ts > '2018-11-25 00:00:00.000' and ts < '2018-11-26 00:00:00.00' interval(1d) fill(NULL) order by _wstart desc"
tdSql.query(sql)
tdSql.checkRows(1)
sql = "select _wstart, count(*) from meters where ts > '2018-08-20 00:00:00.000' and ts < '2018-09-30 00:00:00.000' interval(9d) fill(NULL) order by _wstart desc;"
tdSql.query(sql)
tdSql.checkRows(6)
sql = "select _wstart, count(*) from meters where ts > '2018-08-20 00:00:00.000' and ts < '2018-09-30 00:00:00.000' interval(9d) fill(NULL) order by _wstart;"
tdSql.query(sql)
tdSql.checkRows(6)
def run(self):
self.prepareTestEnv()

View File

@ -251,10 +251,19 @@ class TDTestCase:
tdSql.checkData(2, 4, 9)
tdSql.checkData(3, 4, 9)
def test_partition_by_limit_no_agg(self):
sql_template = 'select t1 from meters partition by t1 limit %d'
for i in range(1, 5000, 1000):
tdSql.query(sql_template % i)
tdSql.checkRows(5 * i)
def run(self):
self.prepareTestEnv()
self.test_interval_limit_offset()
self.test_interval_partition_by_slimit_limit()
self.test_partition_by_limit_no_agg()
def stop(self):
tdSql.close()

View File

@ -220,7 +220,7 @@ class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.clock_gettime(time.CLOCK_REALTIME)
seed = time.time() % 10000
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")

View File

@ -269,7 +269,7 @@ class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.clock_gettime(time.CLOCK_REALTIME)
seed = time.time() % 10000
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")

View File

@ -233,7 +233,7 @@ class TMQCom:
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(stbName,i)
sql += " %s.%s%d values "%(dbName, stbName, i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
rowsBatched += 1
@ -241,7 +241,7 @@ class TMQCom:
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(stbName,i)
sql = "insert into %s.%s%d values " %(dbName, stbName,i)
else:
sql = "insert into "
#end sql
@ -263,7 +263,7 @@ class TMQCom:
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(ctbPrefix,i)
sql += " %s.%s%d values "%(dbName, ctbPrefix,i)
for j in range(rowsPerTbl):
if (j % 2 == 0):
sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, j, j)
@ -274,7 +274,7 @@ class TMQCom:
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
sql = "insert into %s.%s%d values " %(dbName, ctbPrefix, i)
else:
sql = "insert into "
#end sql
@ -296,7 +296,7 @@ class TMQCom:
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(ctbPrefix,i+ctbStartIdx)
sql += " %s.%s%d values "%(dbName, ctbPrefix, i+ctbStartIdx)
for j in range(rowsPerTbl):
if (j % 2 == 0):
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j)
@ -307,7 +307,7 @@ class TMQCom:
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i+ctbStartIdx)
sql = "insert into %s.%s%d values " %(dbName, ctbPrefix, i+ctbStartIdx)
else:
sql = "insert into "
#end sql

View File

@ -35,6 +35,7 @@ from util.taosadapter import *
import taos
import taosrest
import taosws
def checkRunTimeError():
import win32gui
@ -105,12 +106,13 @@ if __name__ == "__main__":
queryPolicy = 1
createDnodeNums = 1
restful = False
websocket = False
replicaVar = 1
asan = False
independentMnode = True
previousCluster = False
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RD:n:i:aP', [
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums','queryPolicy','createDnodeNums','restful','adaptercfgupdate','replicaVar','independentMnode','previous'])
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWD:n:i:aP', [
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums','queryPolicy','createDnodeNums','restful','websocket','adaptercfgupdate','replicaVar','independentMnode','previous'])
for key, value in opts:
if key in ['-h', '--help']:
tdLog.printNoPrefix(
@ -131,6 +133,7 @@ if __name__ == "__main__":
tdLog.printNoPrefix('-Q set queryPolicy in one dnode')
tdLog.printNoPrefix('-C create Dnode Numbers in one cluster')
tdLog.printNoPrefix('-R restful realization form')
tdLog.printNoPrefix('-W websocket connection')
tdLog.printNoPrefix('-D taosadapter update cfg dict ')
tdLog.printNoPrefix('-n the number of replicas')
tdLog.printNoPrefix('-i independentMnode Mnode')
@ -177,7 +180,7 @@ if __name__ == "__main__":
sys.exit(0)
if key in ['-k', '--killValgrind']:
killValgrind = 0
killValgrind = 1
if key in ['-e', '--execCmd']:
try:
@ -203,6 +206,9 @@ if __name__ == "__main__":
if key in ['-R', '--restful']:
restful = True
if key in ['-W', '--websocket']:
websocket = True
if key in ['-a', '--asan']:
asan = True
@ -224,7 +230,7 @@ if __name__ == "__main__":
# do exeCmd command
#
if not execCmd == "":
if restful:
if restful or websocket:
tAdapter.init(deployPath)
else:
tdDnodes.init(deployPath)
@ -263,7 +269,7 @@ if __name__ == "__main__":
if valgrind:
time.sleep(2)
if restful:
if restful or websocket:
toBeKilled = "taosadapter"
# killCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs kill -TERM > /dev/null 2>&1" % toBeKilled
@ -358,7 +364,7 @@ if __name__ == "__main__":
tdDnodes.deploy(1,updateCfgDict)
tdDnodes.start(1)
tdCases.logSql(logSql)
if restful:
if restful or websocket:
tAdapter.deploy(adapter_cfg_dict)
tAdapter.start()
@ -366,6 +372,8 @@ if __name__ == "__main__":
queryPolicy=int(queryPolicy)
if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else:
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
@ -395,14 +403,16 @@ if __name__ == "__main__":
tdDnodes.starttaosd(dnode.index)
tdCases.logSql(logSql)
if restful:
if restful or websocket:
tAdapter.deploy(adapter_cfg_dict)
tAdapter.start()
if not restful:
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
else:
if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else:
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
# tdLog.info(tdDnodes.getSimCfgPath(),host)
if createDnodeNums == 1:
createDnodeNums=dnodeNums
@ -419,6 +429,8 @@ if __name__ == "__main__":
queryPolicy=int(queryPolicy)
if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else:
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
@ -438,10 +450,12 @@ if __name__ == "__main__":
if ucase is not None and hasattr(ucase, 'noConn') and ucase.noConn == True:
conn = None
else:
if not restful:
conn = taos.connect(host="%s"%(host), config=tdDnodes.sim.getCfgDir())
if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
if testCluster:
tdLog.info("Procedures for testing cluster")
@ -451,10 +465,12 @@ if __name__ == "__main__":
tdCases.runOneCluster(fileName)
else:
tdLog.info("Procedures for testing self-deployment")
if not restful:
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
if fileName == "all":
tdCases.runAllWindows(conn)
@ -470,10 +486,12 @@ if __name__ == "__main__":
tdDnodes.stopAll()
tdDnodes.start(1)
time.sleep(1)
if not restful:
conn = taos.connect( host, config=tdDnodes.getSimCfgPath())
else:
if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else:
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
tdLog.info("Procedures for tdengine deployed in %s" % (host))
tdLog.info("query test after taosd restart")
tdCases.runOneWindows(conn, sp[0] + "_" + "restart.py", replicaVar)
@ -505,7 +523,7 @@ if __name__ == "__main__":
except:
pass
if restful:
if restful or websocket:
tAdapter.init(deployPath, masterIp)
tAdapter.stop(force_kill=True)
@ -515,16 +533,18 @@ if __name__ == "__main__":
tdDnodes.start(1)
tdCases.logSql(logSql)
if restful:
if restful or websocket:
tAdapter.deploy(adapter_cfg_dict)
tAdapter.start()
if queryPolicy != 1:
queryPolicy=int(queryPolicy)
if not restful:
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
else:
if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else:
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
# tdSql.init(conn.cursor())
# tdSql.execute("create qnode on dnode 1")
# tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy)
@ -567,15 +587,17 @@ if __name__ == "__main__":
tdDnodes.starttaosd(dnode.index)
tdCases.logSql(logSql)
if restful:
if restful or websocket:
tAdapter.deploy(adapter_cfg_dict)
tAdapter.start()
# create taos connect
if not restful:
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
print(tdDnodes.getSimCfgPath(),host)
if createDnodeNums == 1:
createDnodeNums=dnodeNums
@ -595,8 +617,10 @@ if __name__ == "__main__":
queryPolicy=int(queryPolicy)
if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else:
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
cursor = conn.cursor()
cursor.execute("create qnode on dnode 1")
@ -621,10 +645,12 @@ if __name__ == "__main__":
tdCases.runOneCluster(fileName)
else:
tdLog.info("Procedures for testing self-deployment")
if not restful:
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
if fileName == "all":
tdCases.runAllLinux(conn)
@ -641,10 +667,12 @@ if __name__ == "__main__":
tdDnodes.stopAll()
tdDnodes.start(1)
time.sleep(1)
if not restful:
conn = taos.connect( host, config=tdDnodes.getSimCfgPath())
else:
if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else:
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
tdLog.info("Procedures for tdengine deployed in %s" % (host))
tdLog.info("query test after taosd restart")
tdCases.runOneLinux(conn, sp[0] + "_" + "restart.py", replicaVar)

View File

@ -28,12 +28,12 @@ static void shellRecordCommandToHistory(char *command);
static int32_t shellRunCommand(char *command, bool recordHistory);
static void shellRunSingleCommandImp(char *command);
static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision);
static int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres);
static int64_t shellDumpResultToFile(const char *fname, TAOS_RES *tres);
static void shellPrintNChar(const char *str, int32_t length, int32_t width);
static void shellPrintGeometry(const unsigned char *str, int32_t length, int32_t width);
static int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql);
static int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql);
static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql);
static int64_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql);
static int64_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql);
static int64_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql);
static void shellReadHistory();
static void shellWriteHistory();
static void shellPrintError(TAOS_RES *tres, int64_t st);
@ -238,14 +238,14 @@ void shellRunSingleCommandImp(char *command) {
if (pFields != NULL) { // select and show kinds of commands
int32_t error_no = 0;
int32_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode, command);
int64_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode, command);
if (numOfRows < 0) return;
et = taosGetTimestampUs();
if (error_no == 0) {
printf("Query OK, %d row(s) in set (%.6fs)\r\n", numOfRows, (et - st) / 1E6);
printf("Query OK, %"PRId64 " row(s) in set (%.6fs)\r\n", numOfRows, (et - st) / 1E6);
} else {
printf("Query interrupted (%s), %d row(s) in set (%.6fs)\r\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6);
printf("Query interrupted (%s), %"PRId64 " row(s) in set (%.6fs)\r\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6);
}
taos_free_result(pSql);
} else {
@ -430,7 +430,7 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
}
}
int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) {
int64_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) {
char fullname[PATH_MAX] = {0};
if (taosExpandDir(fname, fullname, PATH_MAX) != 0) {
tstrncpy(fullname, fname, PATH_MAX);
@ -459,7 +459,7 @@ int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) {
}
taosFprintfFile(pFile, "\r\n");
int32_t numOfRows = 0;
int64_t numOfRows = 0;
do {
int32_t *length = taos_fetch_lengths(tres);
for (int32_t i = 0; i < num_fields; i++) {
@ -702,7 +702,7 @@ bool shellIsShowQuery(const char *sql) {
return false;
}
int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) {
int64_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) {
TAOS_ROW row = taos_fetch_row(tres);
if (row == NULL) {
return 0;
@ -726,11 +726,11 @@ int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) {
resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM;
}
int32_t numOfRows = 0;
int64_t numOfRows = 0;
int32_t showMore = 1;
do {
if (numOfRows < resShowMaxNum) {
printf("*************************** %d.row ***************************\r\n", numOfRows + 1);
printf("*************************** %"PRId64".row ***************************\r\n", numOfRows + 1);
int32_t *length = taos_fetch_lengths(tres);
@ -856,7 +856,7 @@ void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields) {
putchar('\n');
}
int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) {
int64_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) {
TAOS_ROW row = taos_fetch_row(tres);
if (row == NULL) {
return 0;
@ -879,7 +879,7 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) {
resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM;
}
int32_t numOfRows = 0;
int64_t numOfRows = 0;
int32_t showMore = 1;
do {
@ -915,8 +915,8 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) {
return numOfRows;
}
int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql) {
int32_t numOfRows = 0;
int64_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql) {
int64_t numOfRows = 0;
if (fname != NULL) {
numOfRows = shellDumpResultToFile(fname, tres);
} else if (vertical) {

View File

@ -157,10 +157,6 @@ void varbinary_sql_test() {
taos_free_result(pRes);
// string function test, not support
pRes = taos_query(taos, "select length(c2) from stb");
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select ltrim(c2) from stb");
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
@ -190,7 +186,7 @@ void varbinary_sql_test() {
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
// support first/last/last_row/count/hyperloglog/sample/tail/mode
// support first/last/last_row/count/hyperloglog/sample/tail/mode/length
pRes = taos_query(taos, "select first(c2) from stb");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
@ -207,6 +203,10 @@ void varbinary_sql_test() {
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select length(c2) from stb where c2 = '\\x7F8290'");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select cast(t2 as varbinary(16)) from stb order by ts");
while ((row = taos_fetch_row(pRes)) != NULL) {
int32_t* length = taos_fetch_lengths(pRes);