fix:conflicts from main

This commit is contained in:
wangmm0220 2023-07-19 09:05:05 +08:00
commit 5c344b0159
55 changed files with 3003 additions and 217 deletions

View File

@ -352,4 +352,4 @@ TDengine 提供了丰富的应用程序开发接口,其中包括 C/C++、Java
# 加入技术交流群
TDengine 官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine1",加小 T 为好友,即可入群。
TDengine 官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine",加小 T 为好友,即可入群。

View File

@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "3.0.7.1.alpha")
SET(TD_VER_NUMBER "3.0.7.2.alpha")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)

View File

@ -59,9 +59,9 @@ The different database framework specifications for various programming language
| -------------------------------------- | ------------- | --------------- | ------------- | ------------- | ------------- | ------------- |
| **Connection Management** | Support | Support | Support | Support | Support | Support |
| **Regular Query** | Support | Support | Support | Support | Support | Support |
| **Parameter Binding** | Supported | Not Supported | Support | Support | Not Supported | Support |
| **Parameter Binding** | Supported | Supported | Support | Support | Not Supported | Support |
| **Subscription (TMQ) ** | Supported | Support | Support | Not Supported | Not Supported | Support |
| **Schemaless** | Supported | Not Supported | Supported | Not Supported | Not Supported | Not Supported |
| **Schemaless** | Supported | Supported | Supported | Not Supported | Not Supported | Not Supported |
| **Bulk Pulling (based on WebSocket) ** | Support | Support | Support | Support | Support | Support |
:::warning

View File

@ -722,6 +722,15 @@ The charset that takes effect is UTF-8.
| Value Range | 0: not change; 1: change by modification |
| Default Value | 0 |
### tmqMaxTopicNum
| Attribute | Description |
| -------- | ------------------ |
| Applicable | Server Only |
| Meaning | The max num of topics |
| Value Range | 1-10000|
| Default Value | 20 |
## 3.0 Parameters
| # | **Parameter** | **Applicable to 2.x ** | **Applicable to 3.0 ** | Current behavior in 3.0 |

View File

@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://w
import Release from "/components/ReleaseV3";
## 3.0.7.1
<Release type="tdengine" version="3.0.7.1" />
## 3.0.7.0
<Release type="tdengine" version="3.0.7.0" />

View File

@ -58,9 +58,9 @@ TDengine 版本更新往往会增加新的功能特性,列表中的连接器
| ------------------------------ | -------- | ---------- | -------- | -------- | ----------- | -------- |
| **连接管理** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **普通查询** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **参数绑定** | 支持 | 暂不支持 | 支持 | 支持 | 暂不支持 | 支持 |
| **参数绑定** | 支持 | 支持 | 支持 | 支持 | 暂不支持 | 支持 |
| **数据订阅TMQ** | 支持 | 支持 | 支持 | 暂不支持 | 暂不支持 | 支持 |
| **Schemaless** | 支持 | 暂不支持 | 支持 | 暂不支持 | 暂不支持 | 支持 |
| **Schemaless** | 支持 | 支持 | 支持 | 暂不支持 | 暂不支持 | 支持 |
| **批量拉取(基于 WebSocket** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
:::warning

View File

@ -726,6 +726,15 @@ charset 的有效值是 UTF-8。
| 取值范围 | 0: 不改变1改变 |
| 缺省值 | 0 |
### tmqMaxTopicNum
| 属性 | 说明 |
| -------- | ------------------ |
| 适用范围 | 仅服务端适用 |
| 含义 | 订阅最多可建立的 topic 数量 |
| 取值范围 | 1-10000|
| 缺省值 | 20 |
## 压缩参数
### compressMsgSize

View File

@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3";
## 3.0.7.1
<Release type="tdengine" version="3.0.7.1" />
## 3.0.7.0
<Release type="tdengine" version="3.0.7.0" />

View File

@ -358,7 +358,7 @@ typedef struct SRestoreComponentNodeStmt {
typedef struct SCreateTopicStmt {
ENodeType type;
char topicName[TSDB_TABLE_NAME_LEN];
char topicName[TSDB_TOPIC_NAME_LEN];
char subDbName[TSDB_DB_NAME_LEN];
char subSTbName[TSDB_TABLE_NAME_LEN];
bool ignoreExists;
@ -369,13 +369,13 @@ typedef struct SCreateTopicStmt {
typedef struct SDropTopicStmt {
ENodeType type;
char topicName[TSDB_TABLE_NAME_LEN];
char topicName[TSDB_TOPIC_NAME_LEN];
bool ignoreNotExists;
} SDropTopicStmt;
typedef struct SDropCGroupStmt {
ENodeType type;
char topicName[TSDB_TABLE_NAME_LEN];
char topicName[TSDB_TOPIC_NAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
bool ignoreNotExists;
} SDropCGroupStmt;

View File

@ -247,6 +247,7 @@ typedef struct SSortLogicNode {
SNodeList* pSortKeys;
bool groupSort;
int64_t maxRows;
bool skipPKSortOpt;
} SSortLogicNode;
typedef struct SPartitionLogicNode {

View File

@ -267,7 +267,9 @@ function install_log() {
}
function install_connector() {
${csudo}cp -rf ${script_dir}/connector/ ${install_main_dir}/
if [ -d ${script_dir}/connector ]; then
${csudo}cp -rf ${script_dir}/connector/ ${install_main_dir}/
fi
}
function install_examples() {

View File

@ -56,8 +56,8 @@ copy %binary_dir%\\build\\bin\\taos.exe %target_dir% > nul
if exist %binary_dir%\\build\\bin\\taosBenchmark.exe (
copy %binary_dir%\\build\\bin\\taosBenchmark.exe %target_dir% > nul
)
if exist %binary_dir%\\build\\lib\\taosws.dll.lib (
copy %binary_dir%\\build\\lib\\taosws.dll.lib %target_dir%\\driver > nul
if exist %binary_dir%\\build\\lib\\taosws.lib (
copy %binary_dir%\\build\\lib\\taosws.lib %target_dir%\\driver > nul
)
if exist %binary_dir%\\build\\lib\\taosws.dll (
copy %binary_dir%\\build\\lib\\taosws.dll %target_dir%\\driver > nul

View File

@ -3005,4 +3005,4 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
}
return code;
}
}

View File

@ -1143,7 +1143,8 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad vload = {0};
int64_t reserved = 0;
int64_t reserved64 = 0;
int32_t reserved32 = 0;
if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1;
if (tDecodeI8(&decoder, &vload.syncState) < 0) return -1;
if (tDecodeI8(&decoder, &vload.syncRestore) < 0) return -1;
@ -1155,9 +1156,9 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1;
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1;
if (tDecodeI32(&decoder, (int32_t *)&reserved) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (tDecodeI32(&decoder, (int32_t *)&reserved32) < 0) return -1;
if (tDecodeI64(&decoder, &reserved64) < 0) return -1;
if (tDecodeI64(&decoder, &reserved64) < 0) return -1;
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
@ -1545,6 +1546,7 @@ int32_t tSerializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pR
}
int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRsp) {
char *key = NULL, *value = NULL;
pRsp->createdDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pRsp->readDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pRsp->writeDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
@ -1553,40 +1555,40 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
pRsp->useDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pRsp->createdDbs == NULL || pRsp->readDbs == NULL || pRsp->writeDbs == NULL || pRsp->readTbs == NULL ||
pRsp->writeTbs == NULL || pRsp->useDbs == NULL) {
return -1;
goto _err;
}
if (tDecodeCStrTo(pDecoder, pRsp->user) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->superAuth) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->sysInfo) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->enable) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->reserve) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->version) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pRsp->user) < 0) goto _err;
if (tDecodeI8(pDecoder, &pRsp->superAuth) < 0) goto _err;
if (tDecodeI8(pDecoder, &pRsp->sysInfo) < 0) goto _err;
if (tDecodeI8(pDecoder, &pRsp->enable) < 0) goto _err;
if (tDecodeI8(pDecoder, &pRsp->reserve) < 0) goto _err;
if (tDecodeI32(pDecoder, &pRsp->version) < 0) goto _err;
int32_t numOfCreatedDbs = 0;
int32_t numOfReadDbs = 0;
int32_t numOfWriteDbs = 0;
if (tDecodeI32(pDecoder, &numOfCreatedDbs) < 0) return -1;
if (tDecodeI32(pDecoder, &numOfReadDbs) < 0) return -1;
if (tDecodeI32(pDecoder, &numOfWriteDbs) < 0) return -1;
if (tDecodeI32(pDecoder, &numOfCreatedDbs) < 0) goto _err;
if (tDecodeI32(pDecoder, &numOfReadDbs) < 0) goto _err;
if (tDecodeI32(pDecoder, &numOfWriteDbs) < 0) goto _err;
for (int32_t i = 0; i < numOfCreatedDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0};
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
if (tDecodeCStrTo(pDecoder, db) < 0) goto _err;
int32_t len = strlen(db);
taosHashPut(pRsp->createdDbs, db, len, db, len + 1);
}
for (int32_t i = 0; i < numOfReadDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0};
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
if (tDecodeCStrTo(pDecoder, db) < 0) goto _err;
int32_t len = strlen(db);
taosHashPut(pRsp->readDbs, db, len, db, len + 1);
}
for (int32_t i = 0; i < numOfWriteDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0};
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
if (tDecodeCStrTo(pDecoder, db) < 0) goto _err;
int32_t len = strlen(db);
taosHashPut(pRsp->writeDbs, db, len, db, len + 1);
}
@ -1595,67 +1597,80 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
int32_t numOfReadTbs = 0;
int32_t numOfWriteTbs = 0;
int32_t numOfUseDbs = 0;
if (tDecodeI32(pDecoder, &numOfReadTbs) < 0) return -1;
if (tDecodeI32(pDecoder, &numOfWriteTbs) < 0) return -1;
if (tDecodeI32(pDecoder, &numOfUseDbs) < 0) return -1;
if (tDecodeI32(pDecoder, &numOfReadTbs) < 0) goto _err;
if (tDecodeI32(pDecoder, &numOfWriteTbs) < 0) goto _err;
if (tDecodeI32(pDecoder, &numOfUseDbs) < 0) goto _err;
for (int32_t i = 0; i < numOfReadTbs; ++i) {
int32_t keyLen = 0;
if (tDecodeI32(pDecoder, &keyLen) < 0) return -1;
if (tDecodeI32(pDecoder, &keyLen) < 0) goto _err;
char *key = taosMemoryCalloc(keyLen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, key) < 0) return -1;
key = taosMemoryCalloc(keyLen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, key) < 0) goto _err;
int32_t valuelen = 0;
if (tDecodeI32(pDecoder, &valuelen) < 0) return -1;
char *value = taosMemoryCalloc(valuelen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, value) < 0) return -1;
if (tDecodeI32(pDecoder, &valuelen) < 0) goto _err;
value = taosMemoryCalloc(valuelen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, value) < 0) goto _err;
taosHashPut(pRsp->readTbs, key, strlen(key), value, valuelen + 1);
taosMemoryFree(key);
taosMemoryFree(value);
taosMemoryFreeClear(key);
taosMemoryFreeClear(value);
}
for (int32_t i = 0; i < numOfWriteTbs; ++i) {
int32_t keyLen = 0;
if (tDecodeI32(pDecoder, &keyLen) < 0) return -1;
if (tDecodeI32(pDecoder, &keyLen) < 0) goto _err;
char *key = taosMemoryCalloc(keyLen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, key) < 0) return -1;
key = taosMemoryCalloc(keyLen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, key) < 0) goto _err;
int32_t valuelen = 0;
if (tDecodeI32(pDecoder, &valuelen) < 0) return -1;
char *value = taosMemoryCalloc(valuelen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, value) < 0) return -1;
if (tDecodeI32(pDecoder, &valuelen) < 0) goto _err;
value = taosMemoryCalloc(valuelen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, value) < 0) goto _err;
taosHashPut(pRsp->writeTbs, key, strlen(key), value, valuelen + 1);
taosMemoryFree(key);
taosMemoryFree(value);
taosMemoryFreeClear(key);
taosMemoryFreeClear(value);
}
for (int32_t i = 0; i < numOfUseDbs; ++i) {
int32_t keyLen = 0;
if (tDecodeI32(pDecoder, &keyLen) < 0) return -1;
if (tDecodeI32(pDecoder, &keyLen) < 0) goto _err;
char *key = taosMemoryCalloc(keyLen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, key) < 0) return -1;
key = taosMemoryCalloc(keyLen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, key) < 0) goto _err;
int32_t ref = 0;
if (tDecodeI32(pDecoder, &ref) < 0) return -1;
if (tDecodeI32(pDecoder, &ref) < 0) goto _err;
taosHashPut(pRsp->useDbs, key, strlen(key), &ref, sizeof(ref));
taosMemoryFree(key);
taosMemoryFreeClear(key);
}
// since 3.0.7.0
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI32(pDecoder, &pRsp->passVer) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->passVer) < 0) goto _err;
} else {
pRsp->passVer = 0;
}
}
return 0;
_err:
taosHashCleanup(pRsp->createdDbs);
taosHashCleanup(pRsp->readDbs);
taosHashCleanup(pRsp->writeDbs);
taosHashCleanup(pRsp->writeTbs);
taosHashCleanup(pRsp->readTbs);
taosHashCleanup(pRsp->useDbs);
taosMemoryFreeClear(key);
taosMemoryFreeClear(value);
return -1;
}
int32_t tDeserializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pRsp) {
@ -2844,7 +2859,6 @@ int32_t tSerializeSDbHbRspImp(SEncoder *pEncoder, const SDbHbRsp *pRsp) {
return 0;
}
int32_t tSerializeSDbHbBatchRsp(void *buf, int32_t bufLen, SDbHbBatchRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -2908,7 +2922,7 @@ int32_t tDeserializeSUseDbRsp(void *buf, int32_t bufLen, SUseDbRsp *pRsp) {
return 0;
}
int32_t tDeserializeSDbHbRspImp(SDecoder* decoder, SDbHbRsp* pRsp) {
int32_t tDeserializeSDbHbRspImp(SDecoder *decoder, SDbHbRsp *pRsp) {
int8_t flag = 0;
if (tDecodeI8(decoder, &flag) < 0) return -1;
if (flag) {
@ -3196,7 +3210,7 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
return tlen;
}
int32_t tDeserializeSDbCfgRspImpl(SDecoder* decoder, SDbCfgRsp *pRsp) {
int32_t tDeserializeSDbCfgRspImpl(SDecoder *decoder, SDbCfgRsp *pRsp) {
if (tDecodeCStrTo(decoder, pRsp->db) < 0) return -1;
if (tDecodeI64(decoder, &pRsp->dbId) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->cfgVersion) < 0) return -1;
@ -5306,10 +5320,10 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
return 0;
}
int32_t tDeatroySMqHbReq(SMqHbReq* pReq){
for(int i = 0; i < taosArrayGetSize(pReq->topics); i++){
TopicOffsetRows* vgs = taosArrayGet(pReq->topics, i);
if(vgs) taosArrayDestroy(vgs->offsetRows);
int32_t tDeatroySMqHbReq(SMqHbReq *pReq) {
for (int i = 0; i < taosArrayGetSize(pReq->topics); i++) {
TopicOffsetRows *vgs = taosArrayGet(pReq->topics, i);
if (vgs) taosArrayDestroy(vgs->offsetRows);
}
taosArrayDestroy(pReq->topics);
return 0;
@ -5326,7 +5340,7 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
int32_t sz = taosArrayGetSize(pReq->topics);
if (tEncodeI32(&encoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; ++i) {
TopicOffsetRows* vgs = (TopicOffsetRows*)taosArrayGet(pReq->topics, i);
TopicOffsetRows *vgs = (TopicOffsetRows *)taosArrayGet(pReq->topics, i);
if (tEncodeCStr(&encoder, vgs->topicName) < 0) return -1;
int32_t szVgs = taosArrayGetSize(vgs->offsetRows);
if (tEncodeI32(&encoder, szVgs) < 0) return -1;
@ -5356,19 +5370,19 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
int32_t sz = 0;
if (tDecodeI32(&decoder, &sz) < 0) return -1;
if(sz > 0){
if (sz > 0) {
pReq->topics = taosArrayInit(sz, sizeof(TopicOffsetRows));
if (NULL == pReq->topics) return -1;
for (int32_t i = 0; i < sz; ++i) {
TopicOffsetRows* data = taosArrayReserve(pReq->topics, 1);
TopicOffsetRows *data = taosArrayReserve(pReq->topics, 1);
tDecodeCStrTo(&decoder, data->topicName);
int32_t szVgs = 0;
if (tDecodeI32(&decoder, &szVgs) < 0) return -1;
if(szVgs > 0){
if (szVgs > 0) {
data->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
if (NULL == data->offsetRows) return -1;
for (int32_t j= 0; j < szVgs; ++j) {
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayReserve(data->offsetRows, 1);
if (tDecodeI32(&decoder, &offRows->vgId) < 0) return -1;
if (tDecodeI64(&decoder, &offRows->rows) < 0) return -1;
if (tDecodeSTqOffsetVal(&decoder, &offRows->offset) < 0) return -1;
@ -5382,7 +5396,6 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
return 0;
}
int32_t tSerializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
@ -5610,9 +5623,9 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
// SMsgHead *pHead = buf;
// pHead->vgId = pReq->head.vgId;
// pHead->contLen = pReq->head.contLen;
// SMsgHead *pHead = buf;
// pHead->vgId = pReq->head.vgId;
// pHead->contLen = pReq->head.contLen;
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
@ -6983,7 +6996,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
return 0;
}
int32_t tDecodeSVAlterTbReqSetCtime(SDecoder* pDecoder, SVAlterTbReq* pReq, int64_t ctimeMs) {
int32_t tDecodeSVAlterTbReqSetCtime(SDecoder *pDecoder, SVAlterTbReq *pReq, int64_t ctimeMs) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeSVAlterTbReqCommon(pDecoder, pReq) < 0) return -1;
@ -7216,13 +7229,13 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
return 0;
}
int32_t tEncodeMqVgOffset(SEncoder* pEncoder, const SMqVgOffset* pOffset) {
int32_t tEncodeMqVgOffset(SEncoder *pEncoder, const SMqVgOffset *pOffset) {
if (tEncodeSTqOffset(pEncoder, &pOffset->offset) < 0) return -1;
if (tEncodeI64(pEncoder, pOffset->consumerId) < 0) return -1;
return 0;
}
int32_t tDecodeMqVgOffset(SDecoder* pDecoder, SMqVgOffset* pOffset) {
int32_t tDecodeMqVgOffset(SDecoder *pDecoder, SMqVgOffset *pOffset) {
if (tDecodeSTqOffset(pDecoder, &pOffset->offset) < 0) return -1;
if (tDecodeI64(pDecoder, &pOffset->consumerId) < 0) return -1;
return 0;
@ -7407,7 +7420,7 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
}
int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
if (tDecodeMqDataRsp(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1;
if (tDecodeMqDataRsp(pDecoder, (SMqDataRsp *)pRsp) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1;
if (pRsp->createTableNum) {

View File

@ -77,7 +77,6 @@ static SClusterObj *mndAcquireCluster(SMnode *pMnode, void **ppIter) {
if (pIter == NULL) break;
*ppIter = pIter;
return pCluster;
}

View File

@ -1303,11 +1303,10 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
sdbRelease(pSdb, pVgroup);
if (pDb && (vindex >= pDb->cfg.numOfVgroups)) {
sdbCancelFetch(pSdb, pIter);
break;
}
}
sdbCancelFetch(pSdb, pIter);
}
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq) {

View File

@ -706,6 +706,7 @@ _OVER:
} else {
mndReleaseDnode(pMnode, pDnode);
}
sdbCancelFetch(pSdb, pIter);
mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
return terrno;

View File

@ -831,6 +831,7 @@ int32_t mndGetIdxsByTagName(SMnode *pMnode, SStbObj *pStb, char *tagName, SIdxOb
if (pIdx->stbUid == pStb->uid && strcasecmp(pIdx->colName, tagName) == 0) {
memcpy((char *)idx, (char *)pIdx, sizeof(SIdxObj));
sdbRelease(pSdb, pIdx);
sdbCancelFetch(pSdb, pIter);
return 0;
}
@ -851,7 +852,7 @@ int32_t mndDropIdxsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
if (pIdx->dbUid == pDb->uid) {
if (mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx) != 0) {
sdbRelease(pSdb, pIdx);
sdbCancelFetch(pSdb, pIdx);
sdbCancelFetch(pSdb, pIter);
return -1;
}
}

View File

@ -454,6 +454,7 @@ int32_t mndCreateQnodeList(SMnode *pMnode, SArray **pList, int32_t limit) {
sdbRelease(pSdb, pObj);
if (limit > 0 && numOfRows >= limit) {
sdbCancelFetch(pSdb, pIter);
break;
}
}

View File

@ -168,6 +168,7 @@ SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
void* pIter = NULL;
// TODO random fetch
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
sdbCancelFetch(pMnode->pSdb, pIter);
return pObj;
}
@ -197,6 +198,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
sdbRelease(pMnode->pSdb, pVgroup);
continue;
}
sdbCancelFetch(pMnode->pSdb, pIter);
return pVgroup;
}
return pVgroup;
@ -435,6 +437,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
sdbCancelFetch(pSdb, pIter);
return -1;
}
@ -444,6 +447,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
sdbCancelFetch(pSdb, pIter);
return -1;
}
@ -453,6 +457,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
qDestroyQueryPlan(pPlan);
sdbCancelFetch(pSdb, pIter);
return -1;
}
}
@ -492,6 +497,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
if (code != TSDB_CODE_SUCCESS) {
qDestroyQueryPlan(pPlan);
sdbCancelFetch(pSdb, pIter);
return -1;
}
}

View File

@ -900,7 +900,6 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
SMsgHead *pHead = rpcMallocCont(contLen);
if (pHead == NULL) {
sdbCancelFetch(pSdb, pVgroup);
sdbRelease(pSdb, pVgroup);
continue;
}
@ -1240,6 +1239,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
mError("topic:%s, create ast error", pTopic->name);
sdbRelease(pSdb, pTopic);
sdbCancelFetch(pSdb, pIter);
return -1;
}
@ -1260,6 +1260,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pTopic);
return -1;
}
@ -1287,6 +1288,7 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
mError("stream:%s, create ast error", pStream->name);
sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
return -1;
}
@ -1306,6 +1308,7 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);
sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
return -1;
}
mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
@ -1335,6 +1338,7 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName,
terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err",
pSma->name, stbFullName, suid, colId);
sdbCancelFetch(pSdb, pIter);
return -1;
}
@ -1355,6 +1359,7 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName,
nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);
sdbRelease(pSdb, pSma);
sdbCancelFetch(pSdb, pIter);
return -1;
}
mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
@ -2268,6 +2273,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
if (pTopic->stbUid == suid) {
sdbRelease(pSdb, pTopic);
sdbCancelFetch(pSdb, pIter);
return -1;
}
}
@ -2282,6 +2288,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
mError("topic:%s, create ast error", pTopic->name);
sdbRelease(pSdb, pTopic);
sdbCancelFetch(pSdb, pIter);
return -1;
}
@ -2295,6 +2302,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);
sdbCancelFetch(pSdb, pIter);
return -1;
} else {
goto NEXT;
@ -2322,6 +2330,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
}
if (pStream->targetStbUid == suid) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pStream);
return -1;
}
@ -2330,6 +2339,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
if (nodesStringToNode(pStream->ast, &pAst) != 0) {
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
mError("stream:%s, create ast error", pStream->name);
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pStream);
return -1;
}
@ -2341,6 +2351,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
SColumnNode *pCol = (SColumnNode *)pNode;
if (pCol->tableId == suid) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pStream);
nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);

View File

@ -705,12 +705,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
sdbCancelFetch(pMnode->pSdb, pIter);
goto _OVER;
}
if (pStream->targetStbUid == streamObj.targetStbUid) {
mError("Cannot write the same stable as other stream:%s", pStream->name);
terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE;
sdbCancelFetch(pMnode->pSdb, pIter);
goto _OVER;
}
}

View File

@ -1104,6 +1104,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
if (taosHashGetSize(pSub->consumerHash) != 0) {
sdbRelease(pSdb, pSub);
terrno = TSDB_CODE_MND_IN_REBALANCE;
sdbCancelFetch(pSdb, pIter);
return -1;
}
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
@ -1122,12 +1123,14 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
sdbRelease(pSdb, pSub);
sdbCancelFetch(pSdb, pIter);
return -1;
}
}
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
sdbRelease(pSdb, pSub);
sdbCancelFetch(pSdb, pIter);
goto END;
}

View File

@ -513,6 +513,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
if (code < 0) {
sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
goto _OUT;
}
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
@ -522,6 +523,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
goto _OUT;
}
tEncoderClear(&encoder);
@ -535,6 +537,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
goto _OUT;
}
buf = NULL;
@ -647,7 +650,6 @@ static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTo
code = 0;
_OVER:
mndTransDrop(pTrans);
return code;
}
@ -698,6 +700,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic);
sdbCancelFetch(pSdb, pIter);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
@ -711,6 +714,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic);
sdbCancelFetch(pSdb, pIter);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
@ -724,6 +728,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic);
sdbCancelFetch(pSdb, pIter);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
@ -735,6 +740,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
mndReleaseTopic(pMnode, pTopic);
return -1;
}
@ -788,6 +794,8 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
mndReleaseTopic(pMnode, pTopic);
sdbCancelFetch(pSdb, pIter);
mndTransDrop(pTrans);
return -1;
}
@ -796,6 +804,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
mndReleaseTopic(pMnode, pTopic);
mndTransDrop(pTrans);
if (code != 0) {
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
@ -999,6 +1008,7 @@ bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb) {
if (pTopic->dbUid == pDb->uid) {
sdbRelease(pSdb, pTopic);
sdbCancelFetch(pSdb, pIter);
return true;
}

View File

@ -922,19 +922,19 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
}
}
if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_TABLE) {
if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_TABLE || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_TABLE) {
if (mndTablePriviledge(pMnode, newUser.readTbs, newUser.useDbs, &alterReq, pSdb) != 0) goto _OVER;
}
if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_TABLE) {
if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_TABLE || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_TABLE) {
if (mndTablePriviledge(pMnode, newUser.writeTbs, newUser.useDbs, &alterReq, pSdb) != 0) goto _OVER;
}
if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_TABLE) {
if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_TABLE || alterReq.alterType == TSDB_ALTER_USER_REMOVE_ALL_TABLE) {
if (mndRemoveTablePriviledge(pMnode, newUser.readTbs, newUser.useDbs, &alterReq, pSdb) != 0) goto _OVER;
}
if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_TABLE) {
if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_TABLE || alterReq.alterType == TSDB_ALTER_USER_REMOVE_ALL_TABLE) {
if (mndRemoveTablePriviledge(pMnode, newUser.writeTbs, newUser.useDbs, &alterReq, pSdb) != 0) goto _OVER;
}
@ -1444,7 +1444,9 @@ int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) {
if (pIter == NULL) break;
code = -1;
if (mndUserDupObj(pUser, &newUser) != 0) break;
if (mndUserDupObj(pUser, &newUser) != 0) {
break;
}
bool inRead = (taosHashGet(newUser.readDbs, db, len) != NULL);
bool inWrite = (taosHashGet(newUser.writeDbs, db, len) != NULL);
@ -1453,7 +1455,9 @@ int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) {
(void)taosHashRemove(newUser.writeDbs, db, len);
SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) break;
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
break;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
}
@ -1491,7 +1495,9 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) {
if (inTopic) {
(void)taosHashRemove(newUser.topics, topic, len);
SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) break;
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
break;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
}

View File

@ -2591,6 +2591,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break;
if (!mndIsDnodeOnline(pDnode, curMs)) {
sdbCancelFetch(pMnode->pSdb, pIter);
terrno = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
sdbRelease(pMnode->pSdb, pDnode);

View File

@ -38,6 +38,8 @@ typedef struct STtlManger {
SHashObj* pTtlCache; // key: tuid, value: {ttl, ctime}
SHashObj* pDirtyUids; // dirty tuid
TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl>
char* logPrefix;
} STtlManger;
typedef struct {
@ -77,9 +79,10 @@ typedef struct {
typedef struct {
tb_uid_t uid;
TXN* pTxn;
int64_t ttlDays;
} STtlDelTtlCtx;
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback);
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback, const char* logPrefix);
void ttlMgrClose(STtlManger* pTtlMgr);
int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta);

View File

@ -130,7 +130,9 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
}
// open pTtlMgr ("ttlv1.idx")
ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0);
char logPrefix[128] = {0};
sprintf(logPrefix, "vgId:%d", TD_VID(pVnode));
ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0, logPrefix);
if (ret < 0) {
metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;

View File

@ -455,7 +455,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
}
}
if (diffIdx == -1 && diffIdx == 0) {
if (diffIdx == -1 || diffIdx == 0) {
goto _err;
}
@ -974,7 +974,15 @@ static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
}
static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) {
if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0;
STtlDelTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn};
if (pME->type == TSDB_CHILD_TABLE) {
ctx.ttlDays = pME->ctbEntry.ttlDays;
} else {
ctx.ttlDays = pME->ntbEntry.ttlDays;
}
return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx);
}
@ -1654,10 +1662,11 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
if (ret < 0) {
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
return -1;
} else {
uid = *(tb_uid_t *)pVal;
tdbFree(pVal);
pVal = NULL;
}
uid = *(tb_uid_t *)pVal;
tdbFree(pVal);
pVal = NULL;
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(tb_uid_t), &pVal, &nVal) == -1) {
ret = -1;
@ -1736,12 +1745,16 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
nTagData = tDataTypes[pCol->type].bytes;
}
if (metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, uid, &pTagIdxKey, &nTagIdxKey) < 0) {
tdbFree(pKey);
tdbFree(pVal);
metaDestroyTagIdxKey(pTagIdxKey);
tdbTbcClose(pCtbIdxc);
goto _err;
}
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
metaDestroyTagIdxKey(pTagIdxKey);
}
tdbTbcClose(pCtbIdxc);
return 0;
_err:
@ -1968,7 +1981,6 @@ static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0;
STtlUpdTtlCtx ctx = {.uid = pME->uid};
if (pME->type == TSDB_CHILD_TABLE) {
ctx.ttlDays = pME->ctbEntry.ttlDays;
ctx.changeTimeMs = pME->ctbEntry.btime;

View File

@ -39,8 +39,8 @@ static int32_t ttlMgrULock(STtlManger *pTtlMgr);
const char *ttlTbname = "ttl.idx";
const char *ttlV1Tbname = "ttlv1.idx";
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
int ret = TSDB_CODE_SUCCESS;
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix) {
int ret = TSDB_CODE_SUCCESS;
int64_t startNs = taosGetTimestampNs();
*ppTtlMgr = NULL;
@ -48,9 +48,17 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
char *logBuffer = (char *)tdbOsCalloc(1, strlen(logPrefix) + 1);
if (logBuffer == NULL) {
tdbOsFree(pTtlMgr);
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(logBuffer, logPrefix);
pTtlMgr->logPrefix = logBuffer;
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
if (ret < 0) {
metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno));
metaError("%s, failed to open %s since %s", pTtlMgr->logPrefix, ttlV1Tbname, tstrerror(terrno));
tdbOsFree(pTtlMgr);
return ret;
}
@ -62,14 +70,14 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
ret = ttlMgrFillCache(pTtlMgr);
if (ret < 0) {
metaError("failed to fill hash since %s", tstrerror(terrno));
metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
ttlMgrCleanup(pTtlMgr);
return ret;
}
int64_t endNs = taosGetTimestampNs();
metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
endNs - startNs);
metaInfo("%s, ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
*ppTtlMgr = pTtlMgr;
return TSDB_CODE_SUCCESS;
@ -91,37 +99,37 @@ int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS;
metaInfo("ttl mgr start upgrade");
metaInfo("%s, ttl mgr start upgrade", pTtlMgr->logPrefix);
int64_t startNs = taosGetTimestampNs();
ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
if (ret < 0) {
metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno));
metaError("%s, failed to open %s index since %s", pTtlMgr->logPrefix, ttlTbname, tstrerror(terrno));
goto _out;
}
ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta);
if (ret < 0) {
metaError("failed to convert ttl index since %s", tstrerror(terrno));
metaError("%s, failed to convert ttl index since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn);
if (ret < 0) {
metaError("failed to drop old ttl index since %s", tstrerror(terrno));
metaError("%s, failed to drop old ttl index since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
ret = ttlMgrFillCache(pTtlMgr);
if (ret < 0) {
metaError("failed to fill hash since %s", tstrerror(terrno));
metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
int64_t endNs = taosGetTimestampNs();
metaInfo("ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
endNs - startNs);
metaInfo("%s, ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
_out:
tdbTbClose(pTtlMgr->pOldTtlIdx);
pTtlMgr->pOldTtlIdx = NULL;
@ -130,11 +138,12 @@ _out:
}
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
taosMemoryFree(pTtlMgr->logPrefix);
taosHashCleanup(pTtlMgr->pTtlCache);
taosHashCleanup(pTtlMgr->pDirtyUids);
tdbTbClose(pTtlMgr->pTtlIdx);
taosThreadRwlockDestroy(&pTtlMgr->lock);
tdbOsFree(pTtlMgr);
taosMemoryFree(pTtlMgr);
}
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
@ -250,13 +259,13 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
if (ret < 0) {
metaError("ttlMgr insert failed to update ttl cache since %s", tstrerror(terrno));
metaError("%s, ttlMgr insert failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
ret = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
if (ret < 0) {
metaError("ttlMgr insert failed to update ttl dirty uids since %s", tstrerror(terrno));
metaError("%s, ttlMgr insert failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
@ -264,20 +273,21 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
_out:
ttlMgrULock(pTtlMgr);
metaDebug("ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, updCtx->uid,
updCtx->changeTimeMs, updCtx->ttlDays);
metaDebug("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
return ret;
}
int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
if (delCtx->ttlDays == 0) return 0;
ttlMgrWLock(pTtlMgr);
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL};
int ret = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
if (ret < 0) {
metaError("ttlMgr del failed to update ttl dirty uids since %s", tstrerror(terrno));
metaError("%s, ttlMgr del failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
@ -285,7 +295,7 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
_out:
ttlMgrULock(pTtlMgr);
metaDebug("ttl mgr delete ttl, uid: %" PRId64, delCtx->uid);
metaDebug("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
return ret;
}
@ -293,6 +303,8 @@ _out:
int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) {
ttlMgrWLock(pTtlMgr);
int ret = 0;
STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid));
if (oldData == NULL) {
goto _out;
@ -301,17 +313,17 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays, .changeTimeMs = pUpdCtimeCtx->changeTimeMs};
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
int ret =
taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
ret = taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
if (ret < 0) {
metaError("ttlMgr update ctime failed to update ttl cache since %s", tstrerror(terrno));
metaError("%s, ttlMgr update ctime failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
ret = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
sizeof(dirtryEntry));
if (ret < 0) {
metaError("ttlMgr update ctime failed to update ttl dirty uids since %s", tstrerror(terrno));
metaError("%s, ttlMgr update ctime failed to update ttl dirty uids since %s", pTtlMgr->logPrefix,
tstrerror(terrno));
goto _out;
}
@ -319,7 +331,8 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
_out:
ttlMgrULock(pTtlMgr);
metaDebug("ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pUpdCtimeCtx->uid, pUpdCtimeCtx->changeTimeMs);
metaDebug("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
pUpdCtimeCtx->changeTimeMs);
return ret;
}
@ -366,7 +379,7 @@ _out:
int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
ttlMgrWLock(pTtlMgr);
metaInfo("ttl mgr flush start.");
metaInfo("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
int ret = -1;
@ -377,9 +390,9 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
if (cacheEntry == NULL) {
metaError("ttlMgr flush failed to get ttl cache since %s, uid: %" PRId64 ", type: %d", tstrerror(terrno), *pUid,
pEntry->type);
goto _out;
metaError("%s, ttlMgr flush failed to get ttl cache since %s, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix,
tstrerror(terrno), *pUid, pEntry->type);
continue;
}
STtlIdxKeyV1 ttlKey;
@ -389,27 +402,29 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), &cacheEntry->ttlDays, sizeof(cacheEntry->ttlDays),
pTxn);
if (ret < 0) {
metaError("ttlMgr flush failed to flush ttl cache upsert since %s", tstrerror(terrno));
metaError("%s, ttlMgr flush failed to flush ttl cache upsert since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
} else if (pEntry->type == ENTRY_TYPE_DEL) {
ret = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
if (ret < 0) {
metaError("ttlMgr flush failed to flush ttl cache del since %s", tstrerror(terrno));
metaError("%s, ttlMgr flush failed to flush ttl cache del since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
ret = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
if (ret < 0) {
metaError("ttlMgr flush failed to delete ttl cache since %s", tstrerror(terrno));
metaError("%s, ttlMgr flush failed to delete ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
} else {
metaError("ttlMgr flush failed to flush ttl cache, unknown type: %d", pEntry->type);
metaError("%s, ttlMgr flush failed to flush ttl cache, unknown type: %d", pTtlMgr->logPrefix, pEntry->type);
goto _out;
}
pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIter);
void *pIterTmp = pIter;
pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIterTmp);
taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t));
}
taosHashClear(pTtlMgr->pDirtyUids);
@ -418,7 +433,7 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
_out:
ttlMgrULock(pTtlMgr);
metaInfo("ttl mgr flush end.");
metaInfo("%s, ttl mgr flush end.", pTtlMgr->logPrefix);
return ret;
}
@ -426,7 +441,7 @@ _out:
static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("ttlMgr rlock %p", &pTtlMgr->lock);
metaTrace("%s, ttlMgr rlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
ret = taosThreadRwlockRdlock(&pTtlMgr->lock);
@ -436,7 +451,7 @@ static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("ttlMgr wlock %p", &pTtlMgr->lock);
metaTrace("%s, ttlMgr wlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
ret = taosThreadRwlockWrlock(&pTtlMgr->lock);
@ -446,7 +461,7 @@ static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
static int32_t ttlMgrULock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("ttlMgr ulock %p", &pTtlMgr->lock);
metaTrace("%s, ttlMgr ulock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
ret = taosThreadRwlockUnlock(&pTtlMgr->lock);

View File

@ -4350,6 +4350,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
cleanupAfterGroupResultGen(pMiaInfo, pRes);
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
}
setOperatorCompleted(pOperator);
@ -4370,6 +4371,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
pMiaInfo->prefetchedBlock = pBlock;
cleanupAfterGroupResultGen(pMiaInfo, pRes);
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
break;
} else {
// continue

View File

@ -3409,7 +3409,8 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
if (IS_CALENDAR_TIME_DURATION(pSliding->unit)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SLIDING_UNIT);
}
if ((pSliding->datum.i < convertTimePrecision(tsMinSlidingTime, TSDB_TIME_PRECISION_MILLI, precision)) ||
if ((pSliding->datum.i <
convertTimeFromPrecisionToUnit(tsMinSlidingTime, TSDB_TIME_PRECISION_MILLI, pSliding->unit)) ||
(pInter->datum.i / pSliding->datum.i > INTERVAL_SLIDING_FACTOR)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL);
}
@ -6134,9 +6135,7 @@ static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* p
static int32_t translateDropTopic(STranslateContext* pCxt, SDropTopicStmt* pStmt) {
SMDropTopicReq dropReq = {0};
SName name;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
tNameGetFullDbName(&name, dropReq.name);
snprintf(dropReq.name, sizeof(dropReq.name), "%d.%s", pCxt->pParseCxt->acctId, pStmt->topicName);
dropReq.igNotExists = pStmt->ignoreNotExists;
return buildCmdMsg(pCxt, TDMT_MND_TMQ_DROP_TOPIC, (FSerializeFunc)tSerializeSMDropTopicReq, &dropReq);

View File

@ -1034,7 +1034,6 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pSort->node.resultDataOrder = isPrimaryKeySort(pSelect->pOrderByList)
? (pSort->groupSort ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_GLOBAL)
: DATA_ORDER_LEVEL_NONE;
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_ORDER_BY, NULL, COLLECT_COL_TYPE_ALL, &pSort->node.pTargets);
if (TSDB_CODE_SUCCESS == code && NULL == pSort->node.pTargets) {
code = nodesListMakeStrictAppend(&pSort->node.pTargets,
@ -1048,6 +1047,7 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
}
SNode* pNode = NULL;
SOrderByExprNode* firstSortKey = (SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0);
if (isPrimaryKeySort(pSelect->pOrderByList)) pSort->node.outputTsOrder = firstSortKey->order;
if (firstSortKey->pExpr->type == QUERY_NODE_COLUMN) {
SColumnNode* pCol = (SColumnNode*)firstSortKey->pExpr;
int16_t projIdx = 1;

View File

@ -1168,7 +1168,8 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) {
return false;
}
SSortLogicNode* pSort = (SSortLogicNode*)pNode;
if (!sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys) || 1 != LIST_LENGTH(pSort->node.pChildren)) {
if (pSort->skipPKSortOpt || !sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys) ||
1 != LIST_LENGTH(pSort->node.pChildren)) {
return false;
}
SNode* pChild;
@ -1181,8 +1182,8 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) {
return true;
}
static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool groupSort, bool* pNotOptimize,
SNodeList** pSequencingNodes) {
static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool groupSort, EOrder sortOrder,
bool* pNotOptimize, SNodeList** pSequencingNodes) {
if (NULL != pNode->pLimit || NULL != pNode->pSlimit) {
*pNotOptimize = false;
return TSDB_CODE_SUCCESS;
@ -1199,15 +1200,21 @@ static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool group
}
case QUERY_NODE_LOGIC_PLAN_JOIN: {
int32_t code = sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), groupSort,
pNotOptimize, pSequencingNodes);
sortOrder, pNotOptimize, pSequencingNodes);
if (TSDB_CODE_SUCCESS == code) {
code = sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), groupSort,
pNotOptimize, pSequencingNodes);
sortOrder, pNotOptimize, pSequencingNodes);
}
return code;
}
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return nodesListMakeAppend(pSequencingNodes, (SNode*)pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
SWindowLogicNode* pWindowLogicNode = (SWindowLogicNode*)pNode;
// For interval window, we always apply sortPriKey optimization.
// For session/event/state window, the output ts order will always be ASC.
// If sort order is also asc, we apply optimization, otherwise we keep sort node to get correct output order.
if (pWindowLogicNode->winType == WINDOW_TYPE_INTERVAL || sortOrder == ORDER_ASC)
return nodesListMakeAppend(pSequencingNodes, (SNode*)pNode);
}
case QUERY_NODE_LOGIC_PLAN_AGG:
case QUERY_NODE_LOGIC_PLAN_PARTITION:
*pNotOptimize = true;
@ -1221,23 +1228,25 @@ static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool group
return TSDB_CODE_SUCCESS;
}
return sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), groupSort,
return sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), groupSort, sortOrder,
pNotOptimize, pSequencingNodes);
}
static int32_t sortPriKeyOptGetSequencingNodes(SLogicNode* pNode, bool groupSort, SNodeList** pSequencingNodes) {
bool notOptimize = false;
int32_t code = sortPriKeyOptGetSequencingNodesImpl(pNode, groupSort, &notOptimize, pSequencingNodes);
if (TSDB_CODE_SUCCESS != code || notOptimize) {
NODES_CLEAR_LIST(*pSequencingNodes);
}
return code;
}
static EOrder sortPriKeyOptGetPriKeyOrder(SSortLogicNode* pSort) {
return ((SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0))->order;
}
static int32_t sortPriKeyOptGetSequencingNodes(SSortLogicNode* pSort, bool groupSort, SNodeList** pSequencingNodes) {
bool notOptimize = false;
int32_t code =
sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0), groupSort,
sortPriKeyOptGetPriKeyOrder(pSort), &notOptimize, pSequencingNodes);
if (TSDB_CODE_SUCCESS != code || notOptimize) {
NODES_CLEAR_LIST(*pSequencingNodes);
}
return code;
}
static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort,
SNodeList* pSequencingNodes) {
EOrder order = sortPriKeyOptGetPriKeyOrder(pSort);
@ -1276,10 +1285,17 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
static int32_t sortPrimaryKeyOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort) {
SNodeList* pSequencingNodes = NULL;
int32_t code = sortPriKeyOptGetSequencingNodes((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0),
pSort->groupSort, &pSequencingNodes);
if (TSDB_CODE_SUCCESS == code && NULL != pSequencingNodes) {
code = sortPriKeyOptApply(pCxt, pLogicSubplan, pSort, pSequencingNodes);
int32_t code = sortPriKeyOptGetSequencingNodes(pSort, pSort->groupSort, &pSequencingNodes);
if (TSDB_CODE_SUCCESS == code) {
if (pSequencingNodes != NULL) {
code = sortPriKeyOptApply(pCxt, pLogicSubplan, pSort, pSequencingNodes);
} else {
// if we decided not to push down sort info to children, we should propagate output ts order to parents of pSort
optSetParentOrder(pSort->node.pParent, sortPriKeyOptGetPriKeyOrder(pSort), 0);
// we need to prevent this pSort from being chosen to do optimization again
pSort->skipPKSortOpt = true;
pCxt->optimized = true;
}
}
nodesClearList(pSequencingNodes);
return code;

View File

@ -190,8 +190,8 @@ void streamBackendCleanup(void* arg) {
taosThreadMutexDestroy(&pHandle->cfMutex);
taosMemoryFree(pHandle);
qDebug("destroy stream backend backend:%p", pHandle);
taosMemoryFree(pHandle);
return;
}
SListNode* streamBackendAddCompare(void* backend, void* arg) {
@ -704,8 +704,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
char suffix[64] = {0};
rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*));
rocksdb_comparator_t** pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t**));
RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
rocksdb_comparator_t** pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
for (int i = 0; i < nCf; i++) {
@ -861,7 +861,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
param[i].tableOpt = tableOpt;
};
rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**));
rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
for (int i = 0; i < cfLen; i++) {
SCfInit* cf = &ginitDict[i];
@ -1012,16 +1012,15 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len
}
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot,
rocksdb_readoptions_t** readOpt) {
int idx = streamStateGetCfIdx(pState, cfName);
if (snapshot != NULL) {
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
}
int idx = streamStateGetCfIdx(pState, cfName);
rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create();
*readOpt = rOpt;
rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
rocksdb_readoptions_set_fill_cache(rOpt, 0);
if (snapshot != NULL) {
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
rocksdb_readoptions_set_fill_cache(rOpt, 0);
}
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt,
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[idx]);
@ -1049,8 +1048,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
if (err != NULL) { \
taosMemoryFree(err); \
qError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
taosMemoryFree(err); \
code = -1; \
} else { \
qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \
@ -1389,8 +1388,6 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k
int code = 0;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen);
if (code == -1) {
}
return code;
}
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
@ -1408,8 +1405,10 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo
code = -1;
} else {
*key = resKey;
*pVal = taosMemoryCalloc(1, *pVLen);
memcpy(*pVal, tmp, *pVLen);
if (pVal != NULL && pVLen != NULL) {
*pVal = taosMemoryCalloc(1, *pVLen);
memcpy(*pVal, tmp, *pVLen);
}
}
}
taosMemoryFree(tmp);

View File

@ -391,7 +391,13 @@ static void httpHandleReq(SHttpMsg* msg) {
// set up timeout to avoid stuck;
int32_t fd = taosCreateSocketWithTimeout(5);
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
if (fd < 0) {
tError("http-report failed to open socket, dst:%s:%d", cli->addr, cli->port);
taosReleaseRef(httpRefMgt, httpRef);
destroyHttpClient(cli);
return;
}
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
if (ret != 0) {
tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
taosReleaseRef(httpRefMgt, httpRef);

View File

@ -38,6 +38,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqOffset.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmqDropConsumer.py
@ -160,6 +161,7 @@
,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py
,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py
,,n,system-test,python3 ./test.py -f 0-others/splitVGroup.py -N 5
,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_replica.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
@ -341,6 +343,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/smaBasic.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/smaTest.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/smaTest.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/sma_index.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/spread.py
@ -446,7 +449,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3
,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3
,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5

View File

@ -545,6 +545,8 @@ class TDDnode:
def stoptaosd(self):
tdLog.debug("start to stop taosd on dnode: %d "% (self.index))
# print(self.asan,self.running,self.remoteIP,self.valgrind)
if self.asan:
stopCmd = "%s -s stop -n dnode%d" % (self.execPath, self.index)
tdLog.info("execute script: " + stopCmd)

View File

@ -440,8 +440,10 @@ class TDSql:
time.sleep(1)
continue
def execute(self, sql,queryTimes=30):
def execute(self, sql, queryTimes=30, show=False):
self.sql = sql
if show:
tdLog.info(sql)
i=1
while i <= queryTimes:
try:

File diff suppressed because it is too large Load Diff

View File

@ -98,3 +98,65 @@ select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 9,1;
select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 10;
select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 2,8;
select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 9,1;
explain verbose true select _wstart, _wend, count(*) from meters event_window start with c2 > 0 end with c2 < 100\G;
explain verbose true select _wstart, _wend, count(*) from meters event_window start with c2 > 0 end with c2 < 100 order by _wstart desc\G;
explain verbose true select _wstart, _wend, count(*) from meters event_window start with c2 > 0 end with c2 < 100 order by _wstart asc\G;
explain verbose true select _wstart, _wend, count(*) from meters event_window start with c2 > 0 end with c2 < 100 order by _wend desc\G;
explain verbose true select _wstart, _wend, count(*) from meters event_window start with c2 > 0 end with c2 < 100 order by _wend asc\G;
select _wstart, _wend, count(*) from meters event_window start with c2 > 0 end with c2 < 100;
select _wstart, _wend, count(*) from meters event_window start with c2 > 0 end with c2 < 100 order by _wstart desc;
select _wstart, _wend, count(*) from meters event_window start with c2 > 0 end with c2 < 100 order by _wstart asc;
select _wstart, _wend, count(*) from meters event_window start with c2 > 0 end with c2 < 100 order by _wend desc;
select _wstart, _wend, count(*) from meters event_window start with c2 > 0 end with c2 < 100 order by _wend asc;
explain verbose true select _wstart, _wend, count(*) from meters session(ts, 1h)\G;
explain verbose true select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wstart desc\G;
explain verbose true select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wstart asc\G;
explain verbose true select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wend desc\G;
explain verbose true select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wend asc\G;
select _wstart, _wend, count(*) from meters session(ts, 1h);
select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wstart desc;
select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wstart asc;
select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wend desc;
select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wend asc;
explain verbose true select _wstart, _wend, count(*) from meters session(ts, 1h)\G;
explain verbose true select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wstart desc\G;
explain verbose true select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wstart asc\G;
explain verbose true select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wend desc\G;
explain verbose true select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wend asc\G;
select _wstart, _wend, count(*) from meters session(ts, 1h);
select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wstart desc;
select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wstart asc;
select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wend desc;
select _wstart, _wend, count(*) from meters session(ts, 1h) order by _wend asc;
explain verbose true select _wstart, _wend, count(*), last(ts) from meters state_window(c2)\G;
explain verbose true select _wstart, _wend, count(*), last(ts) from meters state_window(c2) order by _wstart desc\G;
explain verbose true select _wstart, _wend, count(*), last(ts) from meters state_window(c2) order by _wstart asc\G;
explain verbose true select _wstart, _wend, count(*), last(ts) from meters state_window(c2) order by _wend desc\G;
explain verbose true select _wstart, _wend, count(*), last(ts) from meters state_window(c2) order by _wend asc\G;
select _wstart, _wend, count(*), last(ts) from meters state_window(c2);
select _wstart, _wend, count(*), last(ts) from meters state_window(c2) order by _wstart desc;
select _wstart, _wend, count(*), last(ts) from meters state_window(c2) order by _wstart asc;
select _wstart, _wend, count(*), last(ts) from meters state_window(c2) order by _wend desc;
select _wstart, _wend, count(*), last(ts) from meters state_window(c2) order by _wend asc;
explain verbose true select _wstart, _wend, count(*), last(ts) from meters state_window(c2) order by _wend asc, count(*) desc\G;
explain verbose true select _wstart, _wend, last(ts) from (select _wstart as ts, _wend, count(*), last(ts) from meters state_window(c2) order by _wend desc) interval(1h) order by _wstart desc\G;
explain verbose true select _wstart, _wend, last(ts) from (select _wstart as ts, _wend, count(*), last(ts) from meters state_window(c2) order by _wend asc) interval(1h) order by _wstart desc\G;

View File

@ -204,7 +204,6 @@ class TDTestCase:
tdsql.execute("insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);")
tdsql.query("select * from db.ct4")
tdsql.checkData(0,1,14)
print(1)
tdsql=tdCom.newTdSql()
tdsql.query("describe information_schema.ins_databases;")
qRows=tdsql.queryRows

View File

@ -0,0 +1,48 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
class TDTestCase:
hostname = socket.gethostname()
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
#tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), logSql) # output sql.txt file
def create_databases(self):
tdSql.execute("create database db_ms precision 'ms'")
tdSql.execute("create database db_us precision 'us'")
tdSql.execute("create database db_ns precision 'ns'")
def create_stables(self):
tdSql.execute("CREATE STABLE db_ms.`meters` (`ts` TIMESTAMP, `c0` INT, `c1` TINYINT, `c2` DOUBLE, `c3` VARCHAR(64), `c4` NCHAR(64)) TAGS (`cc` VARCHAR(16))")
tdSql.execute("CREATE STABLE db_us.`meters` (`ts` TIMESTAMP, `c0` INT, `c1` TINYINT, `c2` DOUBLE, `c3` VARCHAR(64), `c4` NCHAR(64)) TAGS (`cc` VARCHAR(16))")
tdSql.execute("CREATE STABLE db_ns.`meters` (`ts` TIMESTAMP, `c0` INT, `c1` TINYINT, `c2` DOUBLE, `c3` VARCHAR(64), `c4` NCHAR(64)) TAGS (`cc` VARCHAR(16))")
def create_sma_index(self):
tdSql.execute("create sma index sma_index_ms on db_ms.meters function(max(c1), max(c2), min(c1)) interval(6m, 10s) sliding(6m)" )
tdSql.execute("create sma index sma_index_us on db_us.meters function(max(c1), max(c2), min(c1)) interval(6m, 10s) sliding(6m)" )
tdSql.execute("create sma index sma_index_ns on db_ns.meters function(max(c1), max(c2), min(c1)) interval(6m, 10s) sliding(6m)" )
def run(self):
tdSql.prepare()
self.create_databases()
self.create_stables()
self.create_sma_index()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,309 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import random
import time
import copy
import string
import taos
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
# random string
def random_string(self, count):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(count))
# get col value and total max min ...
def getColsValue(self, i, j):
# c1 value
if random.randint(1, 10) == 5:
c1 = None
else:
c1 = 1
# c2 value
if j % 3200 == 0:
c2 = 8764231
elif random.randint(1, 10) == 5:
c2 = None
else:
c2 = random.randint(-87654297, 98765321)
value = f"({self.ts}, "
# c1
if c1 is None:
value += "null,"
else:
self.c1Cnt += 1
value += f"{c1},"
# c2
if c2 is None:
value += "null,"
else:
value += f"{c2},"
# total count
self.c2Cnt += 1
# max
if self.c2Max is None:
self.c2Max = c2
else:
if c2 > self.c2Max:
self.c2Max = c2
# min
if self.c2Min is None:
self.c2Min = c2
else:
if c2 < self.c2Min:
self.c2Min = c2
# sum
if self.c2Sum is None:
self.c2Sum = c2
else:
self.c2Sum += c2
# c3 same with ts
value += f"{self.ts})"
# move next 1s interval
self.ts += 1
return value
# insert data
def insertData(self):
tdLog.info("insert data ....")
sqls = ""
for i in range(self.childCnt):
# insert child table
values = ""
pre_insert = f"insert into @db_name.t{i} values "
for j in range(self.childRow):
if values == "":
values = self.getColsValue(i, j)
else:
values += "," + self.getColsValue(i, j)
# batch insert
if j % self.batchSize == 0 and values != "":
sql = pre_insert + values
self.exeDouble(sql)
values = ""
# append last
if values != "":
sql = pre_insert + values
self.exeDouble(sql)
values = ""
# insert finished
tdLog.info(f"insert data successfully.\n"
f" inserted child table = {self.childCnt}\n"
f" inserted child rows = {self.childRow}\n"
f" total inserted rows = {self.childCnt*self.childRow}\n")
return
def exeDouble(self, sql):
# dbname replace
sql1 = sql.replace("@db_name", self.db1)
if len(sql1) > 100:
tdLog.info(sql1[:100])
else:
tdLog.info(sql1)
tdSql.execute(sql1)
sql2 = sql.replace("@db_name", self.db2)
if len(sql2) > 100:
tdLog.info(sql2[:100])
else:
tdLog.info(sql2)
tdSql.execute(sql2)
# prepareEnv
def prepareEnv(self):
# init
self.ts = 1680000000000
self.childCnt = 2
self.childRow = 100000
self.batchSize = 5000
self.vgroups1 = 4
self.vgroups2 = 4
self.db1 = "db1" # no sma
self.db2 = "db2" # have sma
self.smaClause = "interval(10s)"
# total
self.c1Cnt = 0
self.c2Cnt = 0
self.c2Max = None
self.c2Min = None
self.c2Sum = None
# alter local optimization to treu
sql = "alter local 'querysmaoptimize 1'"
tdSql.execute(sql, 5, True)
# check forbid mulit-replic on create sma index
sql = f"create database db vgroups {self.vgroups1} replica 3"
tdSql.execute(sql, 5, True)
sql = f"create table db.st(ts timestamp, c1 int, c2 bigint, ts1 timestamp) tags(area int)"
tdSql.execute(sql, 5, True)
sql = f"create sma index sma_test on db.st function(max(c1),max(c2),min(c1),min(c2)) {self.smaClause};"
tdLog.info(sql)
tdSql.error(sql)
# create database db
sql = f"create database @db_name vgroups {self.vgroups1} replica 1"
self.exeDouble(sql)
# create super talbe st
sql = f"create table @db_name.st(ts timestamp, c1 int, c2 bigint, ts1 timestamp) tags(area int)"
self.exeDouble(sql)
# create child table
for i in range(self.childCnt):
sql = f"create table @db_name.t{i} using @db_name.st tags({i}) "
self.exeDouble(sql)
# create sma index on db2
sql = f"use {self.db2}"
tdSql.execute(sql)
sql = f"create sma index sma_index_maxmin on {self.db2}.st function(max(c1),max(c2),min(c1),min(c2)) {self.smaClause};"
tdLog.info(sql)
tdSql.execute(sql)
# insert data
self.insertData()
# check data correct
def checkExpect(self, sql, expectVal):
tdSql.query(sql)
rowCnt = tdSql.getRows()
for i in range(rowCnt):
val = tdSql.getData(i,0)
if val != expectVal:
tdLog.exit(f"Not expect . query={val} expect={expectVal} i={i} sql={sql}")
return False
tdLog.info(f"check expect ok. sql={sql} expect ={expectVal} rowCnt={rowCnt}")
return True
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.clock_gettime(time.CLOCK_REALTIME)
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
# check query result same
def queryDoubleImpl(self, sql):
# sql
sql1 = sql.replace('@db_name', self.db1)
tdLog.info(sql1)
start1 = time.time()
rows1 = tdSql.query(sql1)
spend1 = time.time() - start1
res1 = copy.copy(tdSql.queryResult)
sql2 = sql.replace('@db_name', self.db2)
tdLog.info(sql2)
start2 = time.time()
tdSql.query(sql2)
spend2 = time.time() - start2
res2 = tdSql.queryResult
rowlen1 = len(res1)
rowlen2 = len(res2)
if rowlen1 != rowlen2:
tdLog.info(f"check error. rowlen1={rowlen1} rowlen2={rowlen2} both not equal.")
return False
for i in range(rowlen1):
row1 = res1[i]
row2 = res2[i]
collen1 = len(row1)
collen2 = len(row2)
if collen1 != collen2:
tdLog.info(f"checkerror. collen1={collen1} collen2={collen2} both not equal.")
return False
for j in range(collen1):
if row1[j] != row2[j]:
tdLog.exit(f"col={j} col1={row1[j]} col2={row2[j]} both col not equal.")
return False
# warning performance
multiple = spend1/spend2
tdLog.info("spend1=%.6fs spend2=%.6fs multiple=%.1f"%(spend1, spend2, multiple))
if spend2 > spend1 and multiple < 4:
tdLog.info(f"performace not reached: multiple(spend1/spend)={multiple} require is >=4 ")
return False
return True
# check query result same
def queryDouble(self, sql, tryCount=60, gap=1):
for i in range(tryCount):
if self.queryDoubleImpl(sql):
return True
# error
tdLog.info(f"queryDouble return false, try loop={i}")
time.sleep(gap)
tdLog.exit(f"queryDouble try {tryCount} times, but all failed.")
return False
# check result
def checkResult(self):
# max
sql = f"select max(c1) from @db_name.st {self.smaClause}"
self.queryDouble(sql)
# min
sql = f"select max(c2) from @db_name.st {self.smaClause}"
self.queryDouble(sql)
# mix
sql = f"select max(c1),max(c2),min(c1),min(c2) from @db_name.st {self.smaClause}"
self.queryDouble(sql)
# run
def run(self):
# prepare env
self.prepareEnv()
# check two db query result same
tdLog.info(f"check have sma(db1) and no sma(db2) performace...")
self.checkResult()
# stop
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -129,6 +129,12 @@ class TDTestCase:
tdSql.query(f'select * from information_schema.ins_topics where topic_name = "{topic_name}"')
tdSql.checkEqual(tdSql.queryResult[0][3],f'create topic {topic_name} as select c0 from {self.dbname}.{stbname}')
tdSql.execute(f'drop topic {topic_name}')
#TD-25222
long_topic_name="hhhhjjhhhhqwertyuiasdfghjklzxcvbnmhhhhjjhhhhqwertyuiasdfghjklzxcvbnmhhhhjjhhhhqwertyuiasdfghjklzxcvbnm"
tdSql.execute(f'create topic {long_topic_name} as select * from {self.dbname}.{stbname}')
tdSql.execute(f'drop topic {long_topic_name}')
tdSql.execute(f'drop database {self.dbname}')
def drop_stream_check(self):

View File

@ -0,0 +1,347 @@
from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
from numpy import row_stack
import taos
import sys
import time
import os
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import TDDnodes
from util.dnodes import TDDnode
from util.cluster import *
from util.common import *
sys.path.append("./6-cluster")
from clusterCommonCreate import *
from clusterCommonCheck import clusterComCheck
from pathlib import Path
from taos.tmq import Consumer
import time
import socket
import subprocess
from multiprocessing import Process
import threading
import time
import json
BASEVERSION = "3.0.7.0"
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
tdLog.debug(f"start to excute {__file__}")
self.TDDnodes = None
tdSql.init(conn.cursor())
self.host = socket.gethostname()
self.replicaVar = int(replicaVar)
def checkProcessPid(self,processName):
i=0
while i<60:
print(f"wait stop {processName}")
processPid = subprocess.getstatusoutput(f'ps aux|grep {processName} |grep -v "grep"|awk \'{{print $2}}\'')[1]
print(f"times:{i},{processName}-pid:{processPid}")
if(processPid == ""):
break
i += 1
sleep(1)
else:
print(f'this processName is not stoped in 60s')
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def getCfgPath(self):
buildPath = self.getBuildPath()
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
cfgPath = buildPath + "/../sim/dnode1/cfg/"
else:
cfgPath = buildPath + "/../sim/dnode1/cfg/"
return cfgPath
def installTaosd(self,bPath,cPath):
# os.system(f"rmtaos && mkdir -p {self.getBuildPath()}/build/lib/temp && mv {self.getBuildPath()}/build/lib/libtaos.so* {self.getBuildPath()}/build/lib/temp/ ")
# os.system(f" mv {bPath}/build {bPath}/build_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so {self.getBuildPath()}/build/lib/libtaos.so_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so.1 {self.getBuildPath()}/build/lib/libtaos.so.1_bak ")
packagePath = "/usr/local/src/"
dataPath = cPath + "/../data/"
packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz"
packageTPath = packageName.split("-Linux-")[0]
my_file = Path(f"{packagePath}/{packageName}")
if not my_file.exists():
print(f"{packageName} is not exists")
tdLog.info(f"cd {packagePath} && wget https://www.tdengine.com/assets-download/3.0/{packageName}")
os.system(f"cd {packagePath} && wget https://www.tdengine.com/assets-download/3.0/{packageName}")
else:
print(f"{packageName} has been exists")
os.system(f" cd {packagePath} && tar xvf {packageName} && cd {packageTPath} && ./install.sh -e no " )
# tdDnodes.stop(1)
# print(f"start taosd: rm -rf {dataPath}/* && nohup taosd -c {cPath} & ")
# os.system(f"rm -rf {dataPath}/* && nohup taosd -c {cPath} & " )
# sleep(5)
def buildTaosd(self,bPath):
# os.system(f"mv {bPath}/build_bak {bPath}/build ")
os.system(f" cd {bPath}/ && make install ")
def is_list_same_as_ordered_list(self,unordered_list, ordered_list):
sorted_list = sorted(unordered_list)
return sorted_list == ordered_list
def insertAllData(self,cPath,dbname,tableNumbers,recordNumbers):
tdLog.info(f"insertAllData")
# tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -d dbtest -t {tableNumbers} -c {cPath} -n {recordNumbers} -v 2 -a 3 -y -k 10 -z 5 ")
# os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -d dbtest -t {tableNumbers} -c {cPath} -n {recordNumbers} -v 2 -a 3 -y -k 10 -z 5 ")
print(f"sed -i 's/\"cfgdir\".*/\"cfgdir\": \"{cPath}\",/' 6-cluster/rollup.json && sed -i '0,/\"name\":.*/s/\"name\":.*/\"name\": \"{dbname}\",/' 6-cluster/rollup.json && sed -i 's/\"childtable_count\":.*/\"childtable_count\": {tableNumbers},/' 6-cluster/rollup.json && sed -i 's/\"insert_rows\":.*/\"insert_rows\": {recordNumbers},/' 6-cluster/rollup.json" )
os.system(f"sed -i 's/\"cfgdir\".*/\"cfgdir\": \"{cPath}\",/' 6-cluster/rollup.json && sed -i '0,/\"name\":.*/s/\"name\":.*/\"name\": \"{dbname}\",/' 6-cluster/rollup.json && sed -i 's/\"childtable_count\":.*/\"childtable_count\": {tableNumbers},/' 6-cluster/rollup.json && sed -i 's/\"insert_rows\":.*/\"insert_rows\": {recordNumbers},/' 6-cluster/rollup.json")
print("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 6-cluster/rollup.json -y -k 10 -z 5")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 6-cluster/rollup.json -y -k 10 -z 5 ")
def insertData(self,countstart,countstop):
# fisrt add data : db\stable\childtable\general table
for couti in range(countstart,countstop):
tdLog.debug("drop database if exists db%d" %couti)
tdSql.execute("drop database if exists db%d" %couti)
print("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("use db%d" %couti)
tdSql.execute(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db0_0',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'stbNumbers': 2,
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 200,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
"rowsPerTbl": 1000,
"batchNum": 5000
}
hostname = socket.gethostname()
dnodeNumbers=int(dnodeNumbers)
tdLog.info("first check dnode and mnode")
tdSql=tdCom.newTdSql()
tdSql.query("select * from information_schema.ins_dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdLog.printNoPrefix(f"==========step1:prepare cluster of {dnodeNumbers} dnodes whith old version-{BASEVERSION} ")
scriptsPath = os.path.dirname(os.path.realpath(__file__))
distro_id = distro.id()
if distro_id == "alpine":
tdLog.info(f"alpine skip Roll test")
return True
if platform.system().lower() == 'windows':
tdLog.info(f"Windows skip Roll test")
return True
tdLog.info("====step1.1:stop all taosd and clear data dir,then start all old taosd ====")
bPath = self.getBuildPath()
cPath = self.getCfgPath()
tdDnodes=cluster.dnodes
for i in range(dnodeNumbers):
tdDnodes[i].stoptaosd()
self.installTaosd(bPath,cPath)
for i in range(dnodeNumbers):
dnode_cfgPath = tdDnodes[i].cfgDir
dnode_dataPath = tdDnodes[i].dataDir
os.system(f"rm -rf {dnode_dataPath}/* && nohup taosd -c {dnode_cfgPath} & ")
tdLog.info("====step1.2: create dnode on cluster ====")
for i in range(1,dnodeNumbers):
dnode_id = tdDnodes[i].cfgDict["fqdn"] + ":" + tdDnodes[i].cfgDict["serverPort"]
os.system(f" LD_LIBRARY_PATH=/usr/lib taos -s 'create dnode \"{dnode_id}\" ' ")
sleep(5)
os.system(" LD_LIBRARY_PATH=/usr/lib taos -s 'show dnodes' ")
for i in range(2,dnodeNumbers+1):
os.system(f" LD_LIBRARY_PATH=/usr/lib taos -s 'create mnode on dnode {i} ' ")
sleep(10)
os.system(" LD_LIBRARY_PATH=/usr/lib taos -s 'show mnodes' ")
tdLog.info("====step1.3: insert data, includes time data, tmq and stream ====")
tableNumbers1=100
recordNumbers1=100000
recordNumbers2=1000
dbname = "dbtest"
stb = f"{dbname}.meters"
cPath_temp=cPath.replace("/","\/")
# os.system("echo 'debugFlag 143' > /etc/taos/taos.cfg ")
# create database and tables
print(f"sed -i 's/\"cfgdir\".*/\"cfgdir\": \"{cPath_temp}\",/' 6-cluster/rollup_db.json && sed -i '0,/\"name\":.*/s/\"name\":.*/\"name\": \"{dbname}\",/' 6-cluster/rollup_db.json ")
os.system(f"sed -i 's/\"cfgdir\".*/\"cfgdir\": \"{cPath_temp}\",/' 6-cluster/rollup_db.json && sed -i '0,/\"name\":.*/s/\"name\":.*/\"name\": \"{dbname}\",/' 6-cluster/rollup_db.json")
print("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 6-cluster/rollup_db.json -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 6-cluster/rollup_db.json -y")
# insert data
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -d test -t {tableNumbers1} -c {cPath} -n {recordNumbers2} -v 2 -a 3 -y -k 10 -z 5 ")
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -d test -t {tableNumbers1} -c {cPath} -n {recordNumbers2} -v 2 -a 3 -y -k 10 -z 5 ")
# os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ")
# os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ')
# os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ')
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "alter database test WAL_RETENTION_PERIOD 1000" ')
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists tmq_test_topic as select current,voltage,phase from test.meters where voltage <= 106 and current <= 5;" ')
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "use test;show topics;" ')
print(f"sed -i 's/\"cfgdir\".*/\"cfgdir\": \"{cPath_temp}\",/' 0-others/compa4096.json ")
os.system(f"sed -i 's/\"cfgdir\".*/\"cfgdir\": \"{cPath_temp}\",/'0-others/compa4096.json ")
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y -k 10 -z 5 ")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y -k 10 -z 5 ")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql")
# self.buildTaosd(bPath)
threads=[]
threads.append(threading.Thread(target=self.insertAllData, args=(cPath_temp,dbname,tableNumbers1,recordNumbers1)))
for tr in threads:
tr.start()
# when inserting data porcess has been started up ,we can upgrade taosd
sleep(5)
tdLog.printNoPrefix("==========step2:start to rolling upgdade ")
for i in range(dnodeNumbers):
tdDnodes[i].running = 1
tdDnodes[i].stoptaosd()
sleep(2)
tdDnodes[i].starttaosd()
for tr in threads:
tr.join()
tdLog.printNoPrefix(f"==========step3:check dnode status ")
# wait 10s for taosd cluster ready
sleep(10)
tdsql=tdCom.newTdSql()
tdsql.query("select * from information_schema.ins_dnodes;")
tdLog.info(tdsql.queryResult)
tdsql.checkData(2,1,'%s:6230'%self.host)
clusterComCheck.checkDnodes(dnodeNumbers)
tdsql1=tdCom.newTdSql()
tdsql1.query(f"SELECT SERVER_VERSION();")
nowServerVersion=tdsql1.queryResult[0][0]
tdLog.printNoPrefix(f"==========step4:prepare and check data in new version-{nowServerVersion}")
tdLog.info(f"New server version is {nowServerVersion}")
tdsql1.query(f"SELECT CLIENT_VERSION();")
nowClientVersion=tdsql1.queryResult[0][0]
tdLog.info(f"New client version is {nowClientVersion}")
tdsql1.query(f"select count(*) from {stb}")
tdsql1.checkData(0,0,tableNumbers1*recordNumbers1)
tdsql1.query(f"select count(*) from db4096.stb0")
tdsql1.checkData(0,0,50000)
# tdsql1.query("show streams;")
# tdsql1.checkRows(2)
tdsql1.query("select *,tbname from d0.almlog where mcid='m0103';")
tdsql1.checkRows(6)
expectList = [0,3003,20031,20032,20033,30031]
resultList = []
for i in range(6):
resultList.append(tdsql1.queryResult[i][3])
print(resultList)
if self.is_list_same_as_ordered_list(resultList,expectList):
print("The unordered list is the same as the ordered list.")
else:
tdlog.error("The unordered list is not the same as the ordered list.")
tdsql1.execute(f"insert into test.d80 values (now+1s, 11, 103, 0.21);")
tdsql1.execute(f"insert into test.d9 values (now+5s, 4.3, 104, 0.4);")
conn = taos.connect()
consumer = Consumer(
{
"group.id": "tg75",
"client.id": "124",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
"experimental.snapshot.enable": "true",
}
)
consumer.subscribe(["tmq_test_topic"])
while True:
res = consumer.poll(10)
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
tdsql1.query("show topics;")
tdsql1.checkRows(1)
# #check mnode status
# tdLog.info("check mnode status")
# clusterComCheck.checkMnodeStatus(mnodeNums)
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(dnodeNumbers=3,mnodeNums=3,restartNumbers=2,stopRole='dnode')
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -256,12 +256,12 @@ class ClusterComCheck:
if vgroup_status_first.count('leader') == 1 and vgroup_status_first.count('follower') == 2:
if vgroup_status_last.count('leader') == 1 and vgroup_status_last.count('follower') == 2:
ready_time= (count + 1)
tdLog.success(f"elections of {db_name} all vgroups are ready in {ready_time} s")
tdLog.success(f"elections of {db_name}.vgroups are ready in {ready_time} s")
return True
count+=1
else:
tdLog.debug(tdSql.queryResult)
tdLog.notice(f"elections of {db_name} all vgroups are failed in{count}s ")
tdLog.notice(f"elections of {db_name} all vgroups are failed in{count} s ")
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno)
tdLog.exit("%s(%d) failed " % args)

View File

@ -182,7 +182,7 @@ class TDTestCase:
tdLog.info(f"show transactions;alter database db0_0 replica {replica3};")
TdSqlEx.execute(f'show transactions;')
TdSqlEx.execute(f'alter database db0_0 replica {replica3};')
clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica3,db_name=paraDict["dbName"],count_number=120)
clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica3,db_name=paraDict["dbName"],count_number=180)
def run(self):
# print(self.master_dnode.cfgDict)

View File

@ -0,0 +1,77 @@
{
"filetype": "insert",
"cfgdir": "/home/chr/TDengine/debug/../sim/dnode1/cfg/",
"host": "localhost",
"port": 6030,
"rest_port": 6041,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"create_table_thread_count": 4,
"result_file": "taosBenchmark_result.log",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 1000,
"max_sql_len": 1024000,
"databases": [
{
"dbinfo": {
"name": "dbtest",
"drop": "no",
"replica": 1,
"duration": 10,
"precision": "ms",
"keep": 3650,
"comp": 2,
"vgroups": 2,
"buffer": 1000
},
"super_tables": [
{
"name": "meters",
"child_table_exists": "yes",
"childtable_count": 100,
"childtable_prefix": "ctb",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 500,
"data_source": "rand",
"insert_mode": "taosc",
"continue_if_fail": "yes",
"keep_trying": 500,
"trying_interval": 100,
"interlace_rows": 0,
"line_protocol": null,
"tcp_transfer": "no",
"insert_rows": 100000,
"childtable_limit": 0,
"childtable_offset": 0,
"rows_per_tbl": 0,
"max_sql_len": 1048576,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1000,
"start_timestamp": "2022-10-22 17:20:36",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"partial_col_num": 999,
"columns": [{"type": "TIMESTAMP","max": 10, "min": 0},{"type": "INT","max": 10, "min": 0}, {"type": "BIGINT","max": 10, "min": 0}, {"type": "FLOAT","max": 10, "min": 0}, {"type": "DOUBLE","max": 10, "min": 0}, {"type": "SMALLINT","max": 10, "min": 0}, {"type": "TINYINT","max": 10, "min": 0}, {"type": "BOOL","max": 10, "min": 0}, {"type": "NCHAR","len": 29, "count":1,
"values": ["d1", "d2"]
}, {"type": "UINT","max": 10, "min": 0}, {"type": "UBIGINT","max": 10, "min": 0}, {"type": "UTINYINT","max": 10, "min": 0}, {"type": "USMALLINT","max": 10, "min": 0}, {"type": "BINARY", "len": 23, "count":1,
"values": ["b1","b2"]
}],
"tags": [{"type": "TIMESTAMP","max": 10, "min": 0},{"type": "INT","max": 10, "min": 0}, {"type": "BIGINT","max": 10, "min": 0}, {"type": "FLOAT","max": 10, "min": 0}, {"type": "DOUBLE","max": 10, "min": 0}, {"type": "SMALLINT","max": 10, "min": 0}, {"type": "TINYINT","max": 10, "min": 0}, {"type": "BOOL","max": 10, "min": 0}, {"type": "NCHAR","len": 17, "count":1,
"values": ["d1", "d2"]
}, {"type": "UINT","max": 10, "min": 0}, {"type": "UBIGINT","max": 10, "min": 0}, {"type": "UTINYINT","max": 10, "min": 0}, {"type": "USMALLINT","max": 10, "min": 0}, {"type": "BINARY", "len": 19, "count":1,
"values": ["b1","b2"]
}]
}
]
}
],
"prepare_rand": 10000,
"chinese": "no",
"streams": false,
"test_log": "/root/testlog/"
}

View File

@ -0,0 +1,77 @@
{
"filetype": "insert",
"cfgdir": "/home/chr/TDengine/debug/../sim/dnode1/cfg/",
"host": "localhost",
"port": 6030,
"rest_port": 6041,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"create_table_thread_count": 4,
"result_file": "taosBenchmark_result.log",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 1000,
"max_sql_len": 1024000,
"databases": [
{
"dbinfo": {
"name": "dbtest",
"drop": "yes",
"replica": 1,
"duration": 10,
"precision": "ms",
"keep": 3650,
"comp": 2,
"vgroups": 2,
"buffer": 1000
},
"super_tables": [
{
"name": "meters",
"child_table_exists": "no",
"childtable_count": 100,
"childtable_prefix": "ctb",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 500,
"data_source": "rand",
"insert_mode": "taosc",
"continue_if_fail": "yes",
"keep_trying": 500,
"trying_interval": 100,
"interlace_rows": 0,
"line_protocol": null,
"tcp_transfer": "no",
"insert_rows": 0,
"childtable_limit": 0,
"childtable_offset": 0,
"rows_per_tbl": 0,
"max_sql_len": 1048576,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1000,
"start_timestamp": "2022-10-22 17:20:36",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"partial_col_num": 999,
"columns": [{"type": "TIMESTAMP","max": 10, "min": 0},{"type": "INT","max": 10, "min": 0}, {"type": "BIGINT","max": 10, "min": 0}, {"type": "FLOAT","max": 10, "min": 0}, {"type": "DOUBLE","max": 10, "min": 0}, {"type": "SMALLINT","max": 10, "min": 0}, {"type": "TINYINT","max": 10, "min": 0}, {"type": "BOOL","max": 10, "min": 0}, {"type": "NCHAR","len": 29, "count":1,
"values": ["d1", "d2"]
}, {"type": "UINT","max": 10, "min": 0}, {"type": "UBIGINT","max": 10, "min": 0}, {"type": "UTINYINT","max": 10, "min": 0}, {"type": "USMALLINT","max": 10, "min": 0}, {"type": "BINARY", "len": 23, "count":1,
"values": ["b1","b2"]
}],
"tags": [{"type": "TIMESTAMP","max": 10, "min": 0},{"type": "INT","max": 10, "min": 0}, {"type": "BIGINT","max": 10, "min": 0}, {"type": "FLOAT","max": 10, "min": 0}, {"type": "DOUBLE","max": 10, "min": 0}, {"type": "SMALLINT","max": 10, "min": 0}, {"type": "TINYINT","max": 10, "min": 0}, {"type": "BOOL","max": 10, "min": 0}, {"type": "NCHAR","len": 17, "count":1,
"values": ["d1", "d2"]
}, {"type": "UINT","max": 10, "min": 0}, {"type": "UBIGINT","max": 10, "min": 0}, {"type": "UTINYINT","max": 10, "min": 0}, {"type": "USMALLINT","max": 10, "min": 0}, {"type": "BINARY", "len": 19, "count":1,
"values": ["b1","b2"]
}]
}
]
}
],
"prepare_rand": 10000,
"chinese": "no",
"streams": false,
"test_log": "/root/testlog/"
}

View File

@ -216,41 +216,142 @@ class TDTestCase:
tdLog.info("create topic sql: %s"%sqlString)
tdSql.error(sqlString)
# pThreadList = []
# for i in range(self.tmqMaxTopicNum):
# topic_name = f"%s%d" %(topicNamePrefix, i)
# print("======%s"%(topic_name))
# group_id_prefix = f"grp_%d"%(i)
# inputDict = {'group_id_prefix': group_id_prefix,
# 'topic_name': topic_name,
# 'pollDelay': 1
# }
# pThread = self.asyncSubscribe(inputDict)
# pThreadList.append(pThread)
# for j in range(self.tmqMaxGroups):
# pThreadList[j].join()
# time.sleep(5)
# tdSql.query('show subscriptions;')
# subscribeNum = tdSql.queryRows
# expectNum = self.tmqMaxGroups * self.tmqMaxTopicNum
# tdLog.info("loop index: %d, ======subscriptions %d and expect num: %d"%(i, subscribeNum, expectNum))
# if subscribeNum != expectNum:
# tdLog.exit("subscriptions %d not equal expect num: %d"%(subscribeNum, expectNum))
# # drop all topics
# for i in range(self.tmqMaxTopicNum):
# sqlString = "drop topic %s%d" %(topicNamePrefix, i)
# tdLog.info("drop topic sql: %s"%sqlString)
# tdSql.execute(sqlString)
tdLog.info("drop database when there are topic")
sqlString = "drop database %s" %(paraDict['dbName'])
tdLog.info("drop database sql: %s"%sqlString)
tdSql.error(sqlString)
tdLog.info("drop all topic for re-create")
tdSql.query('show topics;')
topicNum = tdSql.queryRows
tdLog.info(" topic count: %d"%(topicNum))
for i in range(topicNum):
sqlString = "drop topic %s" %(tdSql.getData(i, 0))
tdLog.info("drop topic sql: %s"%sqlString)
tdSql.execute(sqlString)
time.sleep(1)
tdLog.info("re-create topics")
topicNamePrefix = 'newTopic_'
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
for i in range(topicNum):
sqlString = "create topic %s%d as %s" %(topicNamePrefix, i, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
#=================================================#
tdLog.info("drop all topic for testcase2")
tdSql.query('show topics;')
topicNum = tdSql.queryRows
tdLog.info(" topic count: %d"%(topicNum))
for i in range(topicNum):
sqlString = "drop topic %s" %(tdSql.getData(i, 0))
tdLog.info("drop topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdLog.printNoPrefix("======== test case 1 end ...... ")
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: test topic name len")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
totalTopicNum = 0
topicName = 'a'
sqlString = "create topic %s as %s" %(topicName, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.query(sqlString)
totalTopicNum += 1
topicName = '3'
sqlString = "create topic %s as %s" %(topicName, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.error(sqlString)
totalTopicNum += 0
topicName = '_1'
sqlString = "create topic %s as %s" %(topicName, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.query(sqlString)
totalTopicNum += 1
topicName = 'a\\'
sqlString = "create topic %s as %s" %(topicName, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.error(sqlString)
totalTopicNum += 0
topicName = 'a\*\&\^'
sqlString = "create topic %s as %s" %(topicName, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.error(sqlString)
totalTopicNum += 0
str191char = 'a'
for i in range(190):
str191char = ('%s%d'%(str191char, 1))
topicName = str191char + 'a'
if (192 != len(topicName)):
tdLog.exit("topicName len error")
sqlString = "create topic %s as %s" %(topicName, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.query(sqlString)
totalTopicNum += 1
topicName = str191char + '12'
sqlString = "create topic %s as %s" %(topicName, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.error(sqlString)
totalTopicNum += 0
# topicName = str192char + '12'
# sqlString = "create topic %s as %s" %(topicName, queryString)
# tdLog.info("create topic sql: %s"%sqlString)
# tdSql.error(sqlString)
# totalTopicNum += 0
# check topic count
tdSql.query('show topics;')
topicNum = tdSql.queryRows
tdLog.info(" topic count: %d"%(topicNum))
if topicNum != totalTopicNum:
tdLog.exit("show topics %d not equal expect num: %d"%(topicNum, totalTopicNum))
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()
def stop(self):
tdSql.close()

View File

@ -0,0 +1,399 @@
import sys
import re
import time
import threading
from taos.tmq import Consumer
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135}
def __init__(self):
self.vgroups = 2
self.ctbNum = 1
self.rowsPerTbl = 10000
self.tmqMaxTopicNum = 10
self.tmqMaxGroups = 10
self.TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE = '0x4007'
self.TSDB_CODE_TMQ_INVALID_VGID = '0x4008'
self.TSDB_CODE_TMQ_INVALID_TOPIC = '0x4009'
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def getPath(self, tool="taosBenchmark"):
if (platform.system().lower() == 'windows'):
tool = tool + ".exe"
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
paths = []
for root, dirs, files in os.walk(projPath):
if ((tool) in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
paths.append(os.path.join(root, tool))
break
if (len(paths) == 0):
tdLog.exit("taosBenchmark not found!")
return
else:
tdLog.info("taosBenchmark found in %s" % paths[0])
return paths[0]
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
# 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
# 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'colSchema': [{'type': 'INT', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 10,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
# tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_period=36000)
# tdSql.execute("alter database %s wal_retention_period 360000" % (paraDict['dbName']))
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql.query("flush database %s"%(paraDict['dbName']))
return
def tmqPollAllRows(self, consumer):
totalRows = 0
res = consumer.poll(10)
while (res):
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()
# print(len(val))
for block in val:
# print(block.fetchall())
# print(len(block.fetchall()))
totalRows += len(block.fetchall())
res = consumer.poll(10)
tdLog.info("poll total rows: %d"%(totalRows))
return totalRows
def tmqPollRowsByOne(self, consumer):
rows = 0
res = consumer.poll(3)
if not res:
return rows
err = res.error()
if err is not None:
raise err
val = res.value()
# print(len(val))
for block in val:
# print(block.fetchall())
# print(len(block.fetchall()))
rows += len(block.fetchall())
return rows
def tmqOffsetTest(self, consumer):
# get topic assignment
tdLog.info("before poll get offset status:")
assignments = consumer.assignment()
for assignment in assignments:
print(assignment)
# poll
# consumer.poll(5)
rows = self.tmqPollRowsByOne(consumer)
tdLog.info("poll rows: %d"%(rows))
# get topic assignment
tdLog.info("after first poll get offset status:")
assignments = consumer.assignment()
for assignment in assignments:
print(assignment)
rows = self.tmqPollRowsByOne(consumer)
tdLog.info("poll rows: %d"%(rows))
# get topic assignment
tdLog.info("after second poll get offset status:")
assignments = consumer.assignment()
for assignment in assignments:
print(assignment)
return
def tmqSubscribe(self, inputDict):
consumer_dict = {
"group.id": inputDict['group_id'],
"client.id": "client",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.commit.interval.ms": "1000",
"enable.auto.commit": inputDict['auto_commit'],
"auto.offset.reset": inputDict['offset_reset'],
"experimental.snapshot.enable": "false",
"msg.with.table.name": "false"
}
consumer = Consumer(consumer_dict)
try:
consumer.subscribe([inputDict['topic_name']])
except Exception as e:
tdLog.info("consumer.subscribe() fail ")
tdLog.info("%s"%(e))
# rows = self.tmqPollAllRows(consumer)
tdLog.info("create consumer success!")
return consumer
def tmqConsumer(self, **inputDict):
consumer = self.tmqSubscribe(inputDict)
self.tmqPollAllRows(consumer)
# consumer.unsubscribe()
# consumer.close()
return
def asyncSubscribe(self, inputDict):
pThread = threading.Thread(target=self.tmqConsumer, kwargs=inputDict)
pThread.start()
return pThread
def seekErrorVgid(self, consumer, assignment):
####################### test1: error vgid
assignmentNew = assignment
# assignment.topic
assignmentNew.partition = assignment.partition + self.vgroups + self.vgroups
# assignment.offset
# consumer.seek(assignment)
errCodeStr = ''
try:
print("seek parameters:", assignmentNew)
consumer.seek(assignmentNew)
except Exception as e:
tdLog.info("error: %s"%(e))
rspString = str(e)
start = "["
end = "]"
start_index = rspString.index(start) + len(start)
end_index = rspString.index(end)
errCodeStr = rspString[start_index:end_index]
# print(errCodeStr)
tdLog.info("error code: %s"%(errCodeStr))
if (self.TSDB_CODE_TMQ_INVALID_VGID != errCodeStr):
tdLog.exit("tmq seek should return error code: %s"%(self.TSDB_CODE_TMQ_INVALID_VGID))
def seekErrorTopic(self, consumer, assignment):
assignmentNew = assignment
assignmentNew.topic = 'errorToipcName'
# assignment.partition
# assignment.offset
# consumer.seek(assignment)
errCodeStr = ''
try:
print("seek parameters:", assignmentNew)
consumer.seek(assignmentNew)
except Exception as e:
tdLog.info("error: %s"%(e))
rspString = str(e)
start = "["
end = "]"
start_index = rspString.index(start) + len(start)
end_index = rspString.index(end)
errCodeStr = rspString[start_index:end_index]
# print(errCodeStr)
tdLog.info("error code: %s"%(errCodeStr))
if (self.TSDB_CODE_TMQ_INVALID_TOPIC != errCodeStr):
tdLog.exit("tmq seek should return error code: %s"%(self.TSDB_CODE_TMQ_INVALID_TOPIC))
def seekErrorVersion(self, consumer, assignment):
assignmentNew = assignment
# print(assignment.topic, assignment.partition, assignment.offset)
# assignment.topic
# assignment.partition
assignmentNew.offset = assignment.offset + self.rowsPerTbl * 100000
# consumer.seek(assignment)
errCodeStr = ''
try:
# print(assignmentNew.topic, assignmentNew.partition, assignmentNew.offset)
print("seek parameters:", assignmentNew)
consumer.seek(assignmentNew)
except Exception as e:
tdLog.info("error: %s"%(e))
rspString = str(e)
start = "["
end = "]"
start_index = rspString.index(start) + len(start)
end_index = rspString.index(end)
errCodeStr = rspString[start_index:end_index]
# print(errCodeStr)
tdLog.info("error code: %s"%(errCodeStr))
if (self.TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE != errCodeStr):
tdLog.exit("tmq seek should return error code: %s"%(self.TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE))
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 100000000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
# ntbName = 'ntb'
# sqlString = "create table %s.%s (ts timestamp, c int)"%(paraDict['dbName'], ntbName)
# tdLog.info("create ntb sql: %s"%sqlString)
# tdSql.execute(sqlString)
topicName = 'offset_tp'
# queryString = "select * from %s.%s"%(paraDict['dbName'], ntbName)
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicName, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
inputDict = {
"topic_name": topicName,
"group_id": "offsetGrp",
"auto_commit": "true",
"offset_reset": "earliest"
}
pThread = self.asyncSubscribe(inputDict)
# pThread.join()
consumer = self.tmqSubscribe(inputDict)
# get topic assignment
assignments = consumer.assignment()
# print(type(assignments))
for assignment in assignments:
print(assignment)
assignment = assignments[0]
topic = assignment.topic
partition = assignment.partition
offset = assignment.offset
tdLog.info("======== test error vgid =======")
print("current assignment: ", assignment)
self.seekErrorVgid(consumer, assignment)
tdLog.info("======== test error topic =======")
assignment.topic = topic
assignment.partition = partition
assignment.offset = offset
print("current assignment: ", assignment)
self.seekErrorTopic(consumer, assignment)
tdLog.info("======== test error version =======")
assignment.topic = topic
assignment.partition = partition
assignment.offset = offset
print("current assignment: ", assignment)
self.seekErrorVersion(consumer, assignment)
pThread.join()
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
self.prepareTestEnv()
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,40 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import taos
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
# run
def run(self):
# check two db query result same
tdLog.info(f"hello world.")
# stop
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -3,8 +3,6 @@ IF (TD_WEBSOCKET)
SET(websocket_lib_file "libtaosws.so")
ELSEIF (TD_DARWIN)
SET(websocket_lib_file "libtaosws.dylib")
ELSEIF (TD_WINDOWS)
SET(websocket_lib_file "{taosws.dll,taosws.dll.lib}")
ENDIF ()
MESSAGE("${Green} use libtaos-ws${ColourReset}")
IF (TD_ALPINE)
@ -26,6 +24,26 @@ IF (TD_WEBSOCKET)
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include
COMMAND cmake -E copy target/release/taosws.h ${CMAKE_BINARY_DIR}/build/include
)
ELSEIF (TD_WINDOWS)
include(ExternalProject)
ExternalProject_Add(taosws-rs
PREFIX "taosws-rs"
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/taosws-rs
BUILD_ALWAYS off
DEPENDS taos
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config"
PATCH_COMMAND
COMMAND git clean -f -d
BUILD_COMMAND
COMMAND cargo update
COMMAND cargo build --release -p taos-ws-sys --features native-tls-vendored
INSTALL_COMMAND
COMMAND cp target/release/taosws.dll ${CMAKE_BINARY_DIR}/build/lib
COMMAND cp target/release/taosws.dll.lib ${CMAKE_BINARY_DIR}/build/lib/taosws.lib
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include
COMMAND cmake -E copy target/release/taosws.h ${CMAKE_BINARY_DIR}/build/include
)
ELSE()
include(ExternalProject)
ExternalProject_Add(taosws-rs

View File

@ -20,7 +20,7 @@ ELSEIF (TD_DARWIN AND TD_WEBSOCKET)
ADD_DEPENDENCIES(shell taosws-rs)
ELSEIF (TD_WINDOWS AND TD_WEBSOCKET)
ADD_DEFINITIONS(-DWEBSOCKET -I${CMAKE_BINARY_DIR}/build/include)
SET(LINK_WEBSOCKET "${CMAKE_BINARY_DIR}/build/lib/taosws.dll.lib")
SET(LINK_WEBSOCKET "${CMAKE_BINARY_DIR}/build/lib/taosws.lib")
ADD_DEPENDENCIES(shell taosws-rs)
ELSE ()
SET(LINK_WEBSOCKET "")