doc: add mlt doc and merge 3.0
This commit is contained in:
commit
3c4701b0fb
|
@ -33,7 +33,9 @@ on:
|
|||
type: string
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.event.pull_request.number }}-${{ github.event.inputs.specified_target_branch }}-${{ github.event.inputs.specified_pr_number }}-TDengine
|
||||
group: ${{ github.workflow }}-${{ github.event_name }}-
|
||||
${{ github.event_name == 'pull_request' && github.event.pull_request.base.ref || inputs.specified_target_branch }}-
|
||||
${{ github.event_name == 'pull_request' && github.event.pull_request.number || inputs.specified_pr_number }}-TDengine
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
|
@ -42,27 +44,24 @@ env:
|
|||
jobs:
|
||||
run-tests-on-linux:
|
||||
uses: taosdata/.github/.github/workflows/run-tests-on-linux.yml@main
|
||||
if: ${{ github.event_name == 'pull_request' || github.event_name == 'workflow_dispatch'}}
|
||||
with:
|
||||
tdinternal: false
|
||||
specified_source_branch: ${{ github.event_name == 'pull_request' && 'unavailable' || github.event.inputs.specified_source_branch }}
|
||||
specified_target_branch: ${{ github.event_name == 'pull_request' && 'unavailable' || github.event.inputs.specified_target_branch }}
|
||||
specified_pr_number: ${{ github.event_name == 'pull_request' && 'unavailable' || github.event.inputs.specified_pr_number }}
|
||||
specified_source_branch: ${{ github.event_name == 'pull_request' && 'unavailable' || inputs.specified_source_branch }}
|
||||
specified_target_branch: ${{ github.event_name == 'pull_request' && 'unavailable' || inputs.specified_target_branch }}
|
||||
specified_pr_number: ${{ github.event_name == 'pull_request' && 'unavailable' || inputs.specified_pr_number }}
|
||||
|
||||
run-tests-on-mac:
|
||||
uses: taosdata/.github/.github/workflows/run-tests-on-macos.yml@main
|
||||
if: ${{ github.event_name == 'pull_request' }}
|
||||
with:
|
||||
tdinternal: false
|
||||
specified_source_branch: ${{ github.event_name == 'pull_request' && 'unavailable' || github.event.inputs.specified_source_branch }}
|
||||
specified_target_branch: ${{ github.event_name == 'pull_request' && 'unavailable' || github.event.inputs.specified_target_branch }}
|
||||
specified_pr_number: ${{ github.event_name == 'pull_request' && 'unavailable' || github.event.inputs.specified_pr_number }}
|
||||
specified_source_branch: ${{ github.event_name == 'pull_request' && 'unavailable' || inputs.specified_source_branch }}
|
||||
specified_target_branch: ${{ github.event_name == 'pull_request' && 'unavailable' || inputs.specified_target_branch }}
|
||||
specified_pr_number: ${{ github.event_name == 'pull_request' && 'unavailable' || inputs.specified_pr_number }}
|
||||
|
||||
run-tests-on-windows:
|
||||
uses: taosdata/.github/.github/workflows/run-tests-on-windows.yml@main
|
||||
if: ${{ github.event_name == 'pull_request' }}
|
||||
with:
|
||||
tdinternal: false
|
||||
specified_source_branch: ${{ github.event_name == 'pull_request' && 'unavailable' || github.event.inputs.specified_source_branch }}
|
||||
specified_target_branch: ${{ github.event_name == 'pull_request' && 'unavailable' || github.event.inputs.specified_target_branch }}
|
||||
specified_pr_number: ${{ github.event_name == 'pull_request' && 'unavailable' || github.event.inputs.specified_pr_number }}
|
||||
specified_source_branch: ${{ github.event_name == 'pull_request' && 'unavailable' || inputs.specified_source_branch }}
|
||||
specified_target_branch: ${{ github.event_name == 'pull_request' && 'unavailable' || inputs.specified_target_branch }}
|
||||
specified_pr_number: ${{ github.event_name == 'pull_request' && 'unavailable' || inputs.specified_pr_number }}
|
||||
|
|
|
@ -69,10 +69,10 @@ This statement creates a subscription that includes all table data in the databa
|
|||
|
||||
## Delete Topic
|
||||
|
||||
If you no longer need to subscribe to the data, you can delete the topic. Note that only topics that are not currently subscribed can be deleted.
|
||||
If you no longer need to subscribe to the data, you can delete the topic. If the current topic is subscribed to by a consumer, it can be forcibly deleted using the FORCE syntax. After the forced deletion, the subscribed consumer will consume data with errors (FORCE syntax supported from version 3.3.6.0).
|
||||
|
||||
```sql
|
||||
DROP TOPIC [IF EXISTS] topic_name;
|
||||
DROP TOPIC [IF EXISTS] [FORCE] topic_name;
|
||||
```
|
||||
|
||||
## View Topics
|
||||
|
@ -99,10 +99,10 @@ Displays information about all consumers in the current database, including the
|
|||
|
||||
### Delete Consumer Group
|
||||
|
||||
When creating a consumer, a consumer group is assigned to the consumer. Consumers cannot be explicitly deleted, but the consumer group can be deleted with the following statement when there are no consumers in the group:
|
||||
When creating a consumer, a consumer group is assigned to the consumer. Consumers cannot be explicitly deleted, but the consumer group can be deleted. If there are consumers in the current consumer group who are consuming, the FORCE syntax can be used to force deletion. After forced deletion, subscribed consumers will consume data with errors (FORCE syntax supported from version 3.3.6.0).
|
||||
|
||||
```sql
|
||||
DROP CONSUMER GROUP [IF EXISTS] cgroup_name ON topic_name;
|
||||
DROP CONSUMER GROUP [IF EXISTS] [FORCE] cgroup_name ON topic_name;
|
||||
```
|
||||
|
||||
## Data Subscription
|
||||
|
@ -137,6 +137,7 @@ If the following 3 data entries were written, then during replay, the first entr
|
|||
|
||||
When using the data subscription's replay feature, note the following:
|
||||
|
||||
- Enable replay function by configuring the consumption parameter enable.replay to true
|
||||
- The replay function of data subscription only supports data playback for query subscriptions; supertable and database subscriptions do not support playback.
|
||||
- Replay does not support progress saving.
|
||||
- Because data playback itself requires processing time, there is a precision error of several tens of milliseconds in playback.
|
||||
|
|
|
@ -58,11 +58,11 @@ Note: Subscriptions to supertables and databases are advanced subscription modes
|
|||
|
||||
## Delete topic
|
||||
|
||||
If you no longer need to subscribe to data, you can delete the topic, but note: only TOPICS that are not currently being subscribed to can be deleted.
|
||||
If you no longer need to subscribe to the data, you can delete the topic. If the current topic is subscribed to by a consumer, it can be forcibly deleted using the FORCE syntax. After the forced deletion, the subscribed consumer will consume data with errors (FORCE syntax supported from version 3.3.6.0)
|
||||
|
||||
```sql
|
||||
/* Delete topic */
|
||||
DROP TOPIC [IF EXISTS] topic_name;
|
||||
DROP TOPIC [IF EXISTS] [FORCE] topic_name;
|
||||
```
|
||||
|
||||
At this point, if there are consumers on this subscription topic, they will receive an error.
|
||||
|
@ -81,8 +81,10 @@ Consumer groups can only be created through the TDengine client driver or APIs p
|
|||
|
||||
## Delete consumer group
|
||||
|
||||
When creating a consumer, a consumer group is assigned to the consumer. Consumers cannot be explicitly deleted, but the consumer group can be deleted. If there are consumers in the current consumer group who are consuming, the FORCE syntax can be used to force deletion. After forced deletion, subscribed consumers will consume data with errors (FORCE syntax supported from version 3.3.6.0).
|
||||
|
||||
```sql
|
||||
DROP CONSUMER GROUP [IF EXISTS] cgroup_name ON topic_name;
|
||||
DROP CONSUMER GROUP [IF EXISTS] [FORCE] cgroup_name ON topic_name;
|
||||
```
|
||||
|
||||
Deletes the consumer group `cgroup_name` on the topic `topic_name`.
|
||||
|
|
|
@ -35,6 +35,7 @@ The list of keywords is as follows:
|
|||
| AS | |
|
||||
| ASC | |
|
||||
| ASOF | |
|
||||
| ASYNC | 3.3.6.0+ |
|
||||
| AT_ONCE | |
|
||||
| ATTACH | |
|
||||
| AUTO | 3.3.5.0+ |
|
||||
|
|
|
@ -64,10 +64,10 @@ CREATE TOPIC [IF NOT EXISTS] topic_name [with meta] AS DATABASE db_name;
|
|||
|
||||
## 删除主题
|
||||
|
||||
如果不再需要订阅数据,可以删除 topic,需要注意只有当前未在订阅中的 topic 才能被删除。
|
||||
如果不再需要订阅数据,可以删除 topic,如果当前 topic 被消费者订阅,通过 FORCE 语法可强制删除,强制删除后订阅的消费者会消费数据会出错(FORCE 语法3.3.6.0版本开始支持)。
|
||||
|
||||
```sql
|
||||
DROP TOPIC [IF EXISTS] topic_name;
|
||||
DROP TOPIC [IF EXISTS] [FORCE] topic_name;
|
||||
```
|
||||
|
||||
## 查看主题
|
||||
|
@ -94,9 +94,9 @@ SHOW CONSUMERS;
|
|||
|
||||
### 删除消费组
|
||||
|
||||
消费者创建的时候,会给消费者指定一个消费者组,消费者不能显式的删除,但是消费者组在组内没有消费者时可以通过下面语句删除:
|
||||
消费者创建的时候,会给消费者指定一个消费者组,消费者不能显式的删除,但是可以删除消费者组。如果当前消费者组里有消费者在消费,通过 FORCE 语法可强制删除,强制删除后订阅的消费者会消费数据会出错(FORCE 语法3.3.6.0版本开始支持)。
|
||||
```sql
|
||||
DROP CONSUMER GROUP [IF EXISTS] cgroup_name ON topic_name;
|
||||
DROP CONSUMER GROUP [IF EXISTS] [FORCE] cgroup_name ON topic_name;
|
||||
```
|
||||
|
||||
## 数据订阅
|
||||
|
@ -129,6 +129,7 @@ TDengine 的数据订阅功能支持回放(replay)功能,允许用户按
|
|||
```
|
||||
|
||||
使用数据订阅的回放功能时需要注意如下几项:
|
||||
- 通过配置消费参数 enable.replay 为 true 开启回放功能。
|
||||
- 数据订阅的回放功能仅查询订阅支持数据回放,超级表和库订阅不支持回放。
|
||||
- 回放不支持进度保存。
|
||||
- 因为数据回放本身需要处理时间,所以回放的精度存在几十毫秒的误差。
|
||||
|
|
|
@ -102,7 +102,7 @@ PARTITION 子句中,为 tbname 定义了一个别名 tname, 在 PARTITION
|
|||
通过启用 fill_history 选项,创建的流计算任务将具备处理创建前、创建过程中以及创建后写入的数据的能力。这意味着,无论数据是在流创建之前还是之后写入的,都将纳入流计算的范围,从而确保数据的完整性和一致性。这一设置为用户提供了更大的灵活性,使其能够根据实际需求灵活处理历史数据和新数据。
|
||||
|
||||
注意:
|
||||
- 开启 fill_history 时,创建流需要找到历史数据的分界点,如果历史数据很多,可能会导致创建流任务耗时较长,此时可以通过 fill_history 1 async(3.3.6.0版本开始支持) 语法将创建流的任务放在后台处理,创建流的语句可立即返回,不阻塞后面的操作。async 只对 fill_history 1 起效,fill_history 0 时建流很快,不需要异步处理。
|
||||
- 开启 fill_history 时,创建流需要找到历史数据的分界点,如果历史数据很多,可能会导致创建流任务耗时较长,此时可以通过 fill_history 1 async(v3.3.6.0 开始支持) 语法将创建流的任务放在后台处理,创建流的语句可立即返回,不阻塞后面的操作。async 只对 fill_history 1 起效,fill_history 0 时建流很快,不需要异步处理。
|
||||
|
||||
- 通过 show streams 可查看后台建流的进度(ready 状态表示成功,init 状态表示正在建流,failed 状态表示建流失败,失败时 message 列可以查看原因。对于建流失败的情况可以删除流重新建立)。
|
||||
|
||||
|
|
|
@ -14,6 +14,15 @@ LSTM模型即长短期记忆网络(Long Short Term Memory),是一种特殊的
|
|||
完整的调用SQL语句如下:
|
||||
```SQL
|
||||
SELECT _frowts, FORECAST(i32, "algo=lstm") from foo
|
||||
=======
|
||||
LSTM 模型即长短期记忆网络(Long Short Term Memory),是一种特殊的循环神经网络,适用于处理时间序列数据、自然语言处理等任务,通过其独特的门控机制,能够有效捕捉长期依赖关系,
|
||||
解决传统 RNN 的梯度消失问题,从而对序列数据进行准确预测,不过它不直接提供计算的置信区间范围结果。
|
||||
|
||||
|
||||
完整的调用 SQL 语句如下:
|
||||
```SQL
|
||||
SELECT _frowts, FORECAST(i32, "algo=lstm,alpha=95,period=10,start_p=1,max_p=5,start_q=1,max_q=5") from foo
|
||||
>>>>>>> 3.0
|
||||
```
|
||||
|
||||
```json5
|
||||
|
@ -21,7 +30,7 @@ SELECT _frowts, FORECAST(i32, "algo=lstm") from foo
|
|||
"rows": fc_rows, // 返回结果的行数
|
||||
"period": period, // 返回结果的周期性,同输入
|
||||
"alpha": alpha, // 返回结果的置信区间,同输入
|
||||
"algo": "lstm", // 返回结果使用的算法
|
||||
"algo": "lstm", // 返回结果使用的算法
|
||||
"mse": mse, // 拟合输入时间序列时候生成模型的最小均方误差(MSE)
|
||||
"res": res // 列模式的结果
|
||||
}
|
||||
|
|
|
@ -59,11 +59,11 @@ CREATE TOPIC [IF NOT EXISTS] topic_name [with meta] AS DATABASE db_name;
|
|||
|
||||
## 删除 topic
|
||||
|
||||
如果不再需要订阅数据,可以删除 topic,需要注意:只有当前未在订阅中的 TOPIC 才能被删除。
|
||||
如果不再需要订阅数据,可以删除 topic,如果当前 topic 被消费者订阅,通过 FORCE 语法可强制删除,强制删除后订阅的消费者会消费数据会出错(FORCE 语法3.3.6.0版本开始支持)。
|
||||
|
||||
```sql
|
||||
/* 删除 topic */
|
||||
DROP TOPIC [IF EXISTS] topic_name;
|
||||
DROP TOPIC [IF EXISTS] [FORCE] topic_name;
|
||||
```
|
||||
|
||||
此时如果该订阅主题上存在 consumer,则此 consumer 会收到一个错误。
|
||||
|
@ -82,8 +82,10 @@ SHOW TOPICS;
|
|||
|
||||
## 删除消费组
|
||||
|
||||
消费者创建的时候,会给消费者指定一个消费者组,消费者不能显式的删除,但是可以删除消费者组。如果当前消费者组里有消费者在消费,通过 FORCE 语法可强制删除,强制删除后订阅的消费者会消费数据会出错(FORCE 语法3.3.6.0版本开始支持)。
|
||||
|
||||
```sql
|
||||
DROP CONSUMER GROUP [IF EXISTS] cgroup_name ON topic_name;
|
||||
DROP CONSUMER GROUP [IF EXISTS] [FORCE] cgroup_name ON topic_name;
|
||||
```
|
||||
|
||||
删除主题 topic_name 上的消费组 cgroup_name。
|
||||
|
|
|
@ -128,7 +128,7 @@ create stream if not exists s1 fill_history 1 into st1 as select count(*) from
|
|||
如果该流任务已经彻底过期,并且您不再想让它检测或处理数据,您可以手动删除它,被计算出的数据仍会被保留。
|
||||
|
||||
注意:
|
||||
- 开启 fill_history 时,创建流需要找到历史数据的分界点,如果历史数据很多,可能会导致创建流任务耗时较长,此时可以通过 fill_history 1 async(3.3.6.0版本开始支持) 语法将创建流的任务放在后台处理,创建流的语句可立即返回,不阻塞后面的操作。async 只对 fill_history 1 起效,fill_history 0 时建流很快,不需要异步处理。
|
||||
- 开启 fill_history 时,创建流需要找到历史数据的分界点,如果历史数据很多,可能会导致创建流任务耗时较长,此时可以通过 fill_history 1 async(v3.3.6.0 开始支持) 语法将创建流的任务放在后台处理,创建流的语句可立即返回,不阻塞后面的操作。async 只对 fill_history 1 起效,fill_history 0 时建流很快,不需要异步处理。
|
||||
|
||||
- 通过 show streams 可查看后台建流的进度(ready 状态表示成功,init 状态表示正在建流,failed 状态表示建流失败,失败时 message 列可以查看原因。对于建流失败的情况可以删除流重新建立)。
|
||||
|
||||
|
|
|
@ -3358,6 +3358,7 @@ typedef struct {
|
|||
int8_t igNotExists;
|
||||
int32_t sqlLen;
|
||||
char* sql;
|
||||
int8_t force;
|
||||
} SMDropTopicReq;
|
||||
|
||||
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
||||
|
@ -3368,6 +3369,7 @@ typedef struct {
|
|||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
int8_t igNotExists;
|
||||
int8_t force;
|
||||
} SMDropCgroupReq;
|
||||
|
||||
int32_t tSerializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq);
|
||||
|
|
|
@ -553,6 +553,7 @@ typedef struct SDropTopicStmt {
|
|||
ENodeType type;
|
||||
char topicName[TSDB_TOPIC_NAME_LEN];
|
||||
bool ignoreNotExists;
|
||||
bool force;
|
||||
} SDropTopicStmt;
|
||||
|
||||
typedef struct SDropCGroupStmt {
|
||||
|
@ -560,6 +561,7 @@ typedef struct SDropCGroupStmt {
|
|||
char topicName[TSDB_TOPIC_NAME_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
bool ignoreNotExists;
|
||||
bool force;
|
||||
} SDropCGroupStmt;
|
||||
|
||||
typedef struct SAlterClusterStmt {
|
||||
|
|
|
@ -6676,6 +6676,8 @@ int32_t tSerializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pReq
|
|||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->name));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
|
||||
ENCODESQL();
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->force));
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
_exit:
|
||||
|
@ -6699,6 +6701,9 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR
|
|||
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->name));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
|
||||
DECODESQL();
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->force));
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
||||
_exit:
|
||||
|
@ -6719,6 +6724,7 @@ int32_t tSerializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pR
|
|||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->topic));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->cgroup));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->force));
|
||||
tEndEncode(&encoder);
|
||||
|
||||
_exit:
|
||||
|
@ -6741,6 +6747,9 @@ int32_t tDeserializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *
|
|||
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->topic));
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->cgroup));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->force));
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
||||
_exit:
|
||||
|
|
|
@ -29,7 +29,7 @@ int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName);
|
|||
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj** pSub);
|
||||
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
||||
|
||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
|
||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic, bool force);
|
||||
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub);
|
||||
|
||||
bool mndRebTryStart();
|
||||
|
|
|
@ -1112,12 +1112,13 @@ END:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) {
|
||||
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
|
||||
if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
void *pIter = NULL;
|
||||
SMqConsumerObj *pConsumer = NULL;
|
||||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
int code = 0;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||
|
@ -1130,18 +1131,27 @@ static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgr
|
|||
continue;
|
||||
}
|
||||
|
||||
bool found = checkTopic(pConsumer->assignedTopics, topic);
|
||||
if (found){
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||
topic, pConsumer->consumerId, pConsumer->cgroup);
|
||||
code = TSDB_CODE_MND_CGROUP_USED;
|
||||
goto END;
|
||||
if (deleteConsumer) {
|
||||
MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
|
||||
MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
pConsumerNew = NULL;
|
||||
} else {
|
||||
bool found = checkTopic(pConsumer->assignedTopics, topic);
|
||||
if (found){
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||
topic, pConsumer->consumerId, pConsumer->cgroup);
|
||||
code = TSDB_CODE_MND_CGROUP_USED;
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
sdbRelease(pMnode->pSdb, pConsumer);
|
||||
}
|
||||
|
||||
END:
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
sdbRelease(pMnode->pSdb, pConsumer);
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
return code;
|
||||
|
@ -1173,7 +1183,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
taosWLockLatch(&pSub->lock);
|
||||
if (taosHashGetSize(pSub->consumerHash) != 0) {
|
||||
if (!dropReq.force && taosHashGetSize(pSub->consumerHash) != 0) {
|
||||
code = TSDB_CODE_MND_CGROUP_USED;
|
||||
mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code));
|
||||
goto END;
|
||||
|
@ -1185,7 +1195,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
|||
mndTransSetDbName(pTrans, pSub->dbName, NULL);
|
||||
MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
|
||||
MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
|
||||
MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic));
|
||||
MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic, dropReq.force));
|
||||
MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
|
||||
MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
|
||||
|
||||
|
@ -1409,7 +1419,7 @@ END:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
|
||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
|
||||
if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t code = 0;
|
||||
|
@ -1428,7 +1438,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
|
|||
}
|
||||
|
||||
// iter all vnode to delete handle
|
||||
if (taosHashGetSize(pSub->consumerHash) != 0) {
|
||||
if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
|
||||
code = TSDB_CODE_MND_IN_REBALANCE;
|
||||
goto END;
|
||||
}
|
||||
|
|
|
@ -650,7 +650,7 @@ bool checkTopic(SArray *topics, char *topicName){
|
|||
return false;
|
||||
}
|
||||
|
||||
static int32_t mndCheckConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){
|
||||
static int32_t mndCheckConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName, bool deleteConsumer){
|
||||
if (pMnode == NULL || pTrans == NULL || topicName == NULL) {
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
@ -658,24 +658,34 @@ static int32_t mndCheckConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *top
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SMqConsumerObj *pConsumer = NULL;
|
||||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
bool found = checkTopic(pConsumer->assignedTopics, topicName);
|
||||
if (found){
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||
topicName, pConsumer->consumerId, pConsumer->cgroup);
|
||||
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
goto END;
|
||||
if (deleteConsumer) {
|
||||
MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
|
||||
MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
pConsumerNew = NULL;
|
||||
} else {
|
||||
bool found = checkTopic(pConsumer->assignedTopics, topicName);
|
||||
if (found){
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||
topicName, pConsumer->consumerId, pConsumer->cgroup);
|
||||
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
}
|
||||
|
||||
END:
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return code;
|
||||
|
@ -760,8 +770,8 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
|
||||
MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic));
|
||||
MND_TMQ_RETURN_CHECK(mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db));
|
||||
MND_TMQ_RETURN_CHECK(mndCheckConsumerByTopic(pMnode, pTrans, dropReq.name));
|
||||
MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name));
|
||||
MND_TMQ_RETURN_CHECK(mndCheckConsumerByTopic(pMnode, pTrans, dropReq.name, dropReq.force));
|
||||
MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name, dropReq.force));
|
||||
|
||||
if (pTopic->ntbUid != 0) {
|
||||
MND_TMQ_RETURN_CHECK(mndDropCheckInfoByTopic(pMnode, pTrans, pTopic));
|
||||
|
|
|
@ -1030,7 +1030,7 @@ int32_t ctgdShowStatInfo(void);
|
|||
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq);
|
||||
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
|
||||
int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx,
|
||||
int32_t* fetchIdx, int32_t baseResIdx, SArray* pList);
|
||||
int32_t* fetchIdx, int32_t baseResIdx, SArray* pList, bool autoCreate);
|
||||
int32_t ctgGetTbNamesFromCache(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbNamesCtx* ctx, int32_t dbIdx,
|
||||
int32_t* fetchIdx, int32_t baseResIdx, SArray* pList);
|
||||
int32_t ctgCloneDbCfgInfo(void* pSrc, SDbCfgInfo** ppDst);
|
||||
|
|
|
@ -3263,7 +3263,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) {
|
|||
autoCreate = pReq->autoCreate;
|
||||
|
||||
ctgDebug("start to check tb metas in db:%s, tbNum:%d", pReq->dbFName, (int32_t)taosArrayGetSize(pReq->pTables));
|
||||
CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables));
|
||||
CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables, autoCreate));
|
||||
baseResIdx += taosArrayGetSize(pReq->pTables);
|
||||
}
|
||||
|
||||
|
@ -3301,7 +3301,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) {
|
|||
}
|
||||
|
||||
SCtgTaskReq tReq;
|
||||
tReq.autoCreateCtb = (autoCreate && i == pCtx->fetchNum - 1) ? 1 : 0;
|
||||
tReq.autoCreateCtb = (pFetch->flag & CTG_FLAG_NOT_STB) ? 1 : 0;
|
||||
tReq.pTask = pTask;
|
||||
tReq.msgIdx = pFetch->fetchIdx;
|
||||
CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pFetch->flag, pName, &pFetch->vgId));
|
||||
|
|
|
@ -3483,7 +3483,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
|
|||
#endif
|
||||
|
||||
int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx *ctx, int32_t dbIdx,
|
||||
int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) {
|
||||
int32_t *fetchIdx, int32_t baseResIdx, SArray *pList, bool autoCreate) {
|
||||
int32_t tbNum = taosArrayGetSize(pList);
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
int32_t flag = CTG_FLAG_UNKNOWN_STB;
|
||||
|
@ -3527,6 +3527,11 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
|
|||
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
// for auto create, the second table is child table
|
||||
if (autoCreate && (i == 1)) {
|
||||
CTG_FLAG_SET_STB(flag, TSDB_CHILD_TABLE);
|
||||
}
|
||||
|
||||
pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
|
||||
if (NULL == pCache) {
|
||||
ctgDebug("tb:%s, tb not in cache, db:%s", pName->tname, dbFName);
|
||||
|
|
|
@ -310,8 +310,8 @@ SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, ST
|
|||
int8_t withMeta);
|
||||
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable,
|
||||
int8_t withMeta, SNode* pWhere);
|
||||
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName);
|
||||
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId, SToken* pTopicName);
|
||||
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName, bool force);
|
||||
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId, SToken* pTopicName, bool force);
|
||||
SNode* createAlterClusterStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue);
|
||||
SNode* createAlterLocalStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue);
|
||||
SNode* createDefaultExplainOptions(SAstCreateContext* pCxt);
|
||||
|
|
|
@ -778,8 +778,8 @@ cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) with_meta(D)
|
|||
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) with_meta(E)
|
||||
STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, E, D); }
|
||||
|
||||
cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); }
|
||||
cmd ::= DROP CONSUMER GROUP exists_opt(A) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C); }
|
||||
cmd ::= DROP TOPIC exists_opt(A) force_opt(C) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B, C); }
|
||||
cmd ::= DROP CONSUMER GROUP exists_opt(A) force_opt(D) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C, D); }
|
||||
|
||||
/************************************************ desc/describe *******************************************************/
|
||||
cmd ::= DESC full_table_name(A). { pCxt->pRootNode = createDescribeStmt(pCxt, A); }
|
||||
|
|
|
@ -3814,7 +3814,7 @@ _err:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName) {
|
||||
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName, bool force) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
CHECK_NAME(checkTopicName(pCxt, pTopicName));
|
||||
SDropTopicStmt* pStmt = NULL;
|
||||
|
@ -3822,12 +3822,13 @@ SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken
|
|||
CHECK_MAKE_NODE(pStmt);
|
||||
COPY_STRING_FORM_ID_TOKEN(pStmt->topicName, pTopicName);
|
||||
pStmt->ignoreNotExists = ignoreNotExists;
|
||||
pStmt->force = force;
|
||||
return (SNode*)pStmt;
|
||||
_err:
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId, SToken* pTopicName) {
|
||||
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId, SToken* pTopicName, bool force) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
CHECK_NAME(checkTopicName(pCxt, pTopicName));
|
||||
CHECK_NAME(checkCGroupName(pCxt, pCGroupId));
|
||||
|
@ -3835,6 +3836,7 @@ SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToke
|
|||
pCxt->errCode = nodesMakeNode(QUERY_NODE_DROP_CGROUP_STMT, (SNode**)&pStmt);
|
||||
CHECK_MAKE_NODE(pStmt);
|
||||
pStmt->ignoreNotExists = ignoreNotExists;
|
||||
pStmt->force = force;
|
||||
COPY_STRING_FORM_ID_TOKEN(pStmt->topicName, pTopicName);
|
||||
COPY_STRING_FORM_ID_TOKEN(pStmt->cgroup, pCGroupId);
|
||||
return (SNode*)pStmt;
|
||||
|
|
|
@ -11773,6 +11773,7 @@ static int32_t translateDropTopic(STranslateContext* pCxt, SDropTopicStmt* pStmt
|
|||
|
||||
snprintf(dropReq.name, sizeof(dropReq.name), "%d.%s", pCxt->pParseCxt->acctId, pStmt->topicName);
|
||||
dropReq.igNotExists = pStmt->ignoreNotExists;
|
||||
dropReq.force = pStmt->force;
|
||||
|
||||
int32_t code = buildCmdMsg(pCxt, TDMT_MND_TMQ_DROP_TOPIC, (FSerializeFunc)tSerializeSMDropTopicReq, &dropReq);
|
||||
tFreeSMDropTopicReq(&dropReq);
|
||||
|
@ -11788,6 +11789,7 @@ static int32_t translateDropCGroup(STranslateContext* pCxt, SDropCGroupStmt* pSt
|
|||
if (TSDB_CODE_SUCCESS != code) return code;
|
||||
(void)tNameGetFullDbName(&name, dropReq.topic);
|
||||
dropReq.igNotExists = pStmt->ignoreNotExists;
|
||||
dropReq.force = pStmt->force;
|
||||
tstrncpy(dropReq.cgroup, pStmt->cgroup, TSDB_CGROUP_LEN);
|
||||
|
||||
return buildCmdMsg(pCxt, TDMT_MND_TMQ_DROP_CGROUP, (FSerializeFunc)tSerializeSMDropCgroupReq, &dropReq);
|
||||
|
|
|
@ -81,6 +81,8 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3
|
|||
STableInfoReq infoReq = {0};
|
||||
infoReq.option = pInput->option;
|
||||
infoReq.header.vgId = pInput->vgId;
|
||||
infoReq.autoCreateCtb = pInput->autoCreateCtb;
|
||||
|
||||
if (pInput->dbFName) {
|
||||
tstrncpy(infoReq.dbFName, pInput->dbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
|
|
@ -2014,7 +2014,7 @@ TEST(columnTest, smallint_value_add_int_column) {
|
|||
ASSERT_EQ(column->info.type, TSDB_DATA_TYPE_DOUBLE);
|
||||
ASSERT_EQ(column->info.bytes, tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(column, i)), eRes[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(column, i)), eRes[i]);
|
||||
}
|
||||
|
||||
taosArrayDestroyEx(blockList, scltFreeDataBlock);
|
||||
|
@ -2058,7 +2058,7 @@ TEST(columnTest, bigint_column_multi_binary_column) {
|
|||
ASSERT_EQ(column->info.type, TSDB_DATA_TYPE_DOUBLE);
|
||||
ASSERT_EQ(column->info.bytes, tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(column, i)), eRes[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(column, i)), eRes[i]);
|
||||
}
|
||||
taosArrayDestroyEx(blockList, scltFreeDataBlock);
|
||||
nodesDestroyNode(opNode);
|
||||
|
@ -2838,7 +2838,7 @@ TEST(ScalarFunctionTest, absFunction_constant) {
|
|||
code = absFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), val_double);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), val_double);
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
scltDestroyDataBlock(pOutput);
|
||||
|
@ -2852,7 +2852,7 @@ TEST(ScalarFunctionTest, absFunction_constant) {
|
|||
code = absFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), -val_double);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), -val_double);
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
scltDestroyDataBlock(pOutput);
|
||||
|
@ -3051,7 +3051,7 @@ TEST(ScalarFunctionTest, absFunction_column) {
|
|||
code = absFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), val_double + i);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), val_double + i);
|
||||
PRINTF("double after ABS:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
|
@ -3072,7 +3072,7 @@ TEST(ScalarFunctionTest, absFunction_column) {
|
|||
code = absFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), -(val_double + i));
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), -(val_double + i));
|
||||
PRINTF("double after ABS:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -3100,7 +3100,7 @@ TEST(ScalarFunctionTest, sinFunction_constant) {
|
|||
code = sinFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("tiny_int after SIN:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
|
@ -3118,7 +3118,7 @@ TEST(ScalarFunctionTest, sinFunction_constant) {
|
|||
code = sinFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("float after SIN:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -3150,7 +3150,7 @@ TEST(ScalarFunctionTest, sinFunction_column) {
|
|||
code = sinFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("tiny_int after SIN:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
|
@ -3172,7 +3172,7 @@ TEST(ScalarFunctionTest, sinFunction_column) {
|
|||
code = sinFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("float after SIN:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -3200,7 +3200,7 @@ TEST(ScalarFunctionTest, cosFunction_constant) {
|
|||
code = cosFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("tiny_int after COS:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
|
@ -3218,7 +3218,7 @@ TEST(ScalarFunctionTest, cosFunction_constant) {
|
|||
code = cosFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("float after COS:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -3250,7 +3250,7 @@ TEST(ScalarFunctionTest, cosFunction_column) {
|
|||
code = cosFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("tiny_int after COS:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
|
@ -3272,7 +3272,7 @@ TEST(ScalarFunctionTest, cosFunction_column) {
|
|||
code = cosFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("float after COS:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -3300,7 +3300,7 @@ TEST(ScalarFunctionTest, tanFunction_constant) {
|
|||
code = tanFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("tiny_int after TAN:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
|
@ -3318,7 +3318,7 @@ TEST(ScalarFunctionTest, tanFunction_constant) {
|
|||
code = tanFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("float after TAN:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -3400,7 +3400,7 @@ TEST(ScalarFunctionTest, asinFunction_constant) {
|
|||
code = asinFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("tiny_int after ASIN:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
|
@ -3418,7 +3418,7 @@ TEST(ScalarFunctionTest, asinFunction_constant) {
|
|||
code = asinFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("float after ASIN:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -3450,7 +3450,7 @@ TEST(ScalarFunctionTest, asinFunction_column) {
|
|||
code = asinFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("tiny_int after ASIN:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
|
@ -3471,7 +3471,7 @@ TEST(ScalarFunctionTest, asinFunction_column) {
|
|||
code = asinFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("float after ASIN:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -3517,7 +3517,7 @@ TEST(ScalarFunctionTest, acosFunction_constant) {
|
|||
code = acosFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("float after ACOS:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -3548,7 +3548,7 @@ TEST(ScalarFunctionTest, acosFunction_column) {
|
|||
code = acosFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("tiny_int after ACOS:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
|
@ -3569,7 +3569,7 @@ TEST(ScalarFunctionTest, acosFunction_column) {
|
|||
code = acosFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("float after ACOS:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -3597,7 +3597,7 @@ TEST(ScalarFunctionTest, atanFunction_constant) {
|
|||
code = atanFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("tiny_int after ATAN:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
|
@ -3615,7 +3615,7 @@ TEST(ScalarFunctionTest, atanFunction_constant) {
|
|||
code = atanFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("float after ATAN:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -3646,7 +3646,7 @@ TEST(ScalarFunctionTest, atanFunction_column) {
|
|||
code = atanFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("tiny_int after ATAN:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
|
@ -3667,7 +3667,7 @@ TEST(ScalarFunctionTest, atanFunction_column) {
|
|||
code = atanFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("float after ATAN:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -3983,7 +3983,7 @@ TEST(ScalarFunctionTest, sqrtFunction_constant) {
|
|||
code = sqrtFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("tiny_int after SQRT:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
|
@ -4001,7 +4001,7 @@ TEST(ScalarFunctionTest, sqrtFunction_constant) {
|
|||
code = sqrtFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("float after SQRT:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -4032,7 +4032,7 @@ TEST(ScalarFunctionTest, sqrtFunction_column) {
|
|||
code = sqrtFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("tiny_int after SQRT:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(pInput);
|
||||
|
@ -4053,7 +4053,7 @@ TEST(ScalarFunctionTest, sqrtFunction_column) {
|
|||
code = sqrtFunction(pInput, 1, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("float after SQRT:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -4086,7 +4086,7 @@ TEST(ScalarFunctionTest, logFunction_constant) {
|
|||
code = logFunction(pInput, 2, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("tiny_int after LOG:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(input[0]);
|
||||
|
@ -4108,7 +4108,7 @@ TEST(ScalarFunctionTest, logFunction_constant) {
|
|||
code = logFunction(pInput, 2, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("float after LOG:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(input[0]);
|
||||
|
@ -4131,7 +4131,7 @@ TEST(ScalarFunctionTest, logFunction_constant) {
|
|||
code = logFunction(pInput, 2, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("tiny_int,float after LOG:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -4170,7 +4170,7 @@ TEST(ScalarFunctionTest, logFunction_column) {
|
|||
code = logFunction(pInput, 2, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("tiny_int after LOG:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(input[0]);
|
||||
|
@ -4196,7 +4196,7 @@ TEST(ScalarFunctionTest, logFunction_column) {
|
|||
code = logFunction(pInput, 2, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("float after LOG:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(input[0]);
|
||||
|
@ -4228,7 +4228,7 @@ TEST(ScalarFunctionTest, logFunction_column) {
|
|||
code = logFunction(pInput, 2, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("tiny_int,float after LOG:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -4263,7 +4263,7 @@ TEST(ScalarFunctionTest, powFunction_constant) {
|
|||
code = powFunction(pInput, 2, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("tiny_int after POW:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(input[0]);
|
||||
|
@ -4285,7 +4285,7 @@ TEST(ScalarFunctionTest, powFunction_constant) {
|
|||
code = powFunction(pInput, 2, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("float after POW:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(input[0]);
|
||||
|
@ -4308,7 +4308,7 @@ TEST(ScalarFunctionTest, powFunction_constant) {
|
|||
code = powFunction(pInput, 2, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result);
|
||||
PRINTF("tiny_int,float after POW:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -4347,7 +4347,7 @@ TEST(ScalarFunctionTest, powFunction_column) {
|
|||
code = powFunction(pInput, 2, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("tiny_int after POW:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
@ -4374,7 +4374,7 @@ TEST(ScalarFunctionTest, powFunction_column) {
|
|||
code = powFunction(pInput, 2, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("float after POW:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
scltDestroyDataBlock(input[0]);
|
||||
|
@ -4406,7 +4406,7 @@ TEST(ScalarFunctionTest, powFunction_column) {
|
|||
code = powFunction(pInput, 2, pOutput);
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
ASSERT_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
ASSERT_DOUBLE_EQ(*((double *)colDataGetData(pOutput->columnData, i)), result[i]);
|
||||
PRINTF("tiny_int,float after POW:%f\n", *((double *)colDataGetData(pOutput->columnData, i)));
|
||||
}
|
||||
|
||||
|
|
|
@ -506,6 +506,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td32526.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td32471.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts6115.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py
|
||||
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
|
||||
sys.path.append("./7-tmq")
|
||||
|
||||
insertJson = '''{
|
||||
"filetype": "insert",
|
||||
"cfgdir": "/etc/taos",
|
||||
"host": "localhost",
|
||||
"port": 6030,
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"connection_pool_size": 10,
|
||||
"thread_count": 10,
|
||||
"create_table_thread_count": 10,
|
||||
"result_file": "./insert-2-2-1.txt",
|
||||
"confirm_parameter_prompt": "no",
|
||||
"num_of_records_per_req": 3600,
|
||||
"prepared_rand": 3600,
|
||||
"chinese": "no",
|
||||
"escape_character": "yes",
|
||||
"continue_if_fail": "no",
|
||||
"databases": [
|
||||
{
|
||||
"dbinfo": {
|
||||
"name": "ts6115",
|
||||
"drop": "yes",
|
||||
"vgroups": 10,
|
||||
"precision": "ms",
|
||||
"buffer": 512,
|
||||
"cachemodel":"'both'",
|
||||
"stt_trigger": 1
|
||||
},
|
||||
"super_tables": [
|
||||
{
|
||||
"name": "stb",
|
||||
"child_table_exists": "no",
|
||||
"childtable_count": 10000,
|
||||
"childtable_prefix": "d_",
|
||||
"auto_create_table": "yes",
|
||||
"batch_create_tbl_num": 10,
|
||||
"data_source": "csv",
|
||||
"insert_mode": "stmt",
|
||||
"non_stop_mode": "no",
|
||||
"line_protocol": "line",
|
||||
"insert_rows": 1000,
|
||||
"childtable_limit": 0,
|
||||
"childtable_offset": 0,
|
||||
"interlace_rows": 0,
|
||||
"insert_interval": 0,
|
||||
"partial_col_num": 0,
|
||||
"timestamp_step": 1000,
|
||||
"start_timestamp": "2024-11-01 00:00:00.000",
|
||||
"sample_format": "csv",
|
||||
"sample_file": "./td_double10000_juchi.csv",
|
||||
"use_sample_ts": "no",
|
||||
"tags_file": "",
|
||||
"columns": [
|
||||
{"type": "DOUBLE", "name": "val"},
|
||||
{ "type": "INT", "name": "quality"}
|
||||
],
|
||||
"tags": [
|
||||
{"type": "INT", "name": "id", "max": 100, "min": 1}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}'''
|
||||
|
||||
class TDTestCase:
|
||||
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
|
||||
clientCfgDict = {'debugFlag': 135, 'asynclog': 0}
|
||||
updatecfgDict["clientCfg"] = clientCfgDict
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
def run(self):
|
||||
|
||||
with open('ts-6115.json', 'w') as file:
|
||||
file.write(insertJson)
|
||||
|
||||
tdLog.info("start to insert data: taosBenchmark -f ts-6115.json")
|
||||
if os.system("taosBenchmark -f ts-6115.json") != 0:
|
||||
tdLog.exit("taosBenchmark -f ts-6115.json")
|
||||
|
||||
tdLog.info("test ts-6115 ......")
|
||||
|
||||
buildPath = tdCom.getBuildPath()
|
||||
cmdStr = '%s/build/bin/tmq_ts6115'%(buildPath)
|
||||
|
||||
tdLog.info(cmdStr)
|
||||
if os.system(cmdStr) != 0:
|
||||
tdLog.exit(cmdStr)
|
||||
|
||||
return
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -6,6 +6,7 @@ add_executable(tmq_ts5466 tmq_ts5466.c)
|
|||
add_executable(tmq_td32526 tmq_td32526.c)
|
||||
add_executable(tmq_td32187 tmq_td32187.c)
|
||||
add_executable(tmq_ts5776 tmq_ts5776.c)
|
||||
add_executable(tmq_ts6115 tmq_ts6115.c)
|
||||
add_executable(tmq_td32471 tmq_td32471.c)
|
||||
add_executable(tmq_td33798 tmq_td33798.c)
|
||||
add_executable(tmq_poll_test tmq_poll_test.c)
|
||||
|
@ -96,6 +97,14 @@ target_link_libraries(
|
|||
PUBLIC os
|
||||
)
|
||||
|
||||
target_link_libraries(
|
||||
tmq_ts6115
|
||||
PUBLIC ${TAOS_LIB}
|
||||
PUBLIC util
|
||||
PUBLIC common
|
||||
PUBLIC os
|
||||
)
|
||||
|
||||
target_link_libraries(
|
||||
tmq_poll_test
|
||||
PUBLIC ${TAOS_LIB}
|
||||
|
|
|
@ -0,0 +1,170 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
#include "types.h"
|
||||
|
||||
const char* topic_name = "t1";
|
||||
bool pollStart = false;
|
||||
|
||||
int32_t create_topic() {
|
||||
printf("create topic\n");
|
||||
TAOS_RES* pRes;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT(pConn != NULL);
|
||||
|
||||
pRes = taos_query(pConn, "drop topic if exists t1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create topic t1 as database ts6115");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void* consumeThreadFunc(void* param) {
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
tmq_conf_set(conf, "enable.auto.commit", "false");
|
||||
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
|
||||
tmq_conf_set(conf, "group.id", "group_1");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "false");
|
||||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
tmq_conf_destroy(conf);
|
||||
|
||||
// 创建订阅 topics 列表
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
tmq_list_append(topicList, topic_name);
|
||||
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topicList);
|
||||
tmq_list_destroy(topicList);
|
||||
|
||||
int32_t timeout = 200;
|
||||
while (1) {
|
||||
printf("start to poll\n");
|
||||
|
||||
TAOS_RES *pRes = tmq_consumer_poll(tmq, timeout);
|
||||
pollStart = true;
|
||||
if (pRes == NULL) {
|
||||
printf("pRes is NULL, reason:%s\n", taos_errstr(NULL));
|
||||
break;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
taosMsleep(200);
|
||||
}
|
||||
tmq_consumer_close(tmq);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void* dropTopicThreadFunc(void* param) {
|
||||
printf("drop topic\n");
|
||||
TAOS_RES* pRes;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT(pConn != NULL);
|
||||
|
||||
while(!pollStart) {
|
||||
taosSsleep(taosRand()%5);
|
||||
}
|
||||
pRes = taos_query(pConn, "drop topic force t1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "show consumers");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
ASSERT(taos_affected_rows(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void* dropCGroupThreadFunc(void* param) {
|
||||
printf("drop topic\n");
|
||||
TAOS_RES* pRes;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT(pConn != NULL);
|
||||
|
||||
while(!pollStart) {
|
||||
taosSsleep(taosRand()%5);
|
||||
}
|
||||
|
||||
pRes = taos_query(pConn, "drop consumer group force group_1 on t1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "show consumers");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
ASSERT(taos_affected_rows(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
printf("test start.........\n");
|
||||
|
||||
int32_t runTimes = 1;
|
||||
TdThread thread1, thread2;
|
||||
TdThreadAttr thattr;
|
||||
taosThreadAttrInit(&thattr);
|
||||
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
for (int i = 0; i < runTimes; i++){
|
||||
printf("test drop topic times:%d\n", i);
|
||||
|
||||
pollStart = false;
|
||||
create_topic();
|
||||
taosThreadCreate(&(thread1), &thattr, dropTopicThreadFunc, NULL);
|
||||
taosThreadCreate(&(thread2), &thattr, consumeThreadFunc, NULL);
|
||||
|
||||
taosThreadJoin(thread1, NULL);
|
||||
taosThreadClear(&thread1);
|
||||
|
||||
taosThreadJoin(thread2, NULL);
|
||||
taosThreadClear(&thread2);
|
||||
}
|
||||
|
||||
create_topic();
|
||||
|
||||
for (int i = 0; i < runTimes; i++){
|
||||
printf("test drop consumer group times:%d\n", i);
|
||||
|
||||
pollStart = false;
|
||||
taosThreadCreate(&(thread1), &thattr, dropCGroupThreadFunc, NULL);
|
||||
taosThreadCreate(&(thread2), &thattr, consumeThreadFunc, NULL);
|
||||
|
||||
taosThreadJoin(thread1, NULL);
|
||||
taosThreadClear(&thread1);
|
||||
|
||||
taosThreadJoin(thread2, NULL);
|
||||
taosThreadClear(&thread2);
|
||||
}
|
||||
|
||||
printf("test end.........\n");
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue