Merge branch '3.0' into fix/TD-21873-3.0

This commit is contained in:
kailixu 2023-01-16 13:39:30 +08:00
commit b562b25b75
38 changed files with 1756 additions and 296 deletions

View File

@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG 69eee2e
GIT_TAG 213f8b3
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG f80dd7e
GIT_TAG 723f696
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -117,19 +117,22 @@ class TaosConsumer():
<TabItem label="Go" value="Go">
```go
func NewConsumer(conf *Config) (*Consumer, error)
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
func (c *Consumer) Close() error
// rebalanceCb is reserved for compatibility purpose
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
// rebalanceCb is reserved for compatibility purpose
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
func (c *Consumer) FreeMessage(message unsafe.Pointer)
func (c *Consumer) Poll(timeoutMs int) tmq.Event
func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
func (c *Consumer) Subscribe(topics []string) error
// tmq.TopicPartition is reserved for compatibility purpose
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
func (c *Consumer) Unsubscribe() error
func (c *Consumer) Close() error
```
</TabItem>
@ -357,50 +360,20 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
<TabItem label="Go" value="Go">
```go
config := tmq.NewConfig()
defer config.Destroy()
err = config.SetGroupID("test")
if err != nil {
panic(err)
}
err = config.SetAutoOffsetReset("earliest")
if err != nil {
panic(err)
}
err = config.SetConnectIP("127.0.0.1")
if err != nil {
panic(err)
}
err = config.SetConnectUser("root")
if err != nil {
panic(err)
}
err = config.SetConnectPass("taosdata")
if err != nil {
panic(err)
}
err = config.SetConnectPort("6030")
if err != nil {
panic(err)
}
err = config.SetMsgWithTableName(true)
if err != nil {
panic(err)
}
err = config.EnableHeartBeat()
if err != nil {
panic(err)
}
err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
if result.ErrCode != 0 {
errStr := wrapper.TMQErr2Str(result.ErrCode)
err := errors.NewError(int(result.ErrCode), errStr)
panic(err)
}
})
if err != nil {
panic(err)
conf := &tmq.ConfigMap{
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
}
consumer, err := NewConsumer(conf)
```
</TabItem>
@ -523,11 +496,7 @@ consumer.subscribe(topics);
<TabItem value="Go" label="Go">
```go
consumer, err := tmq.NewConsumer(config)
if err != nil {
panic(err)
}
err = consumer.Subscribe([]string{"example_tmq_topic"})
err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
@ -611,13 +580,17 @@ while(running){
```go
for {
result, err := consumer.Poll(time.Second)
if err != nil {
panic(err)
ev := consumer.Poll(0)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Println(e.Value())
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
fmt.Println(result)
consumer.Commit(context.Background(), result.Message)
consumer.FreeMessage(result.Message)
}
```
@ -729,7 +702,11 @@ consumer.close();
<TabItem value="Go" label="Go">
```go
consumer.Close()
/* Unsubscribe */
_ = consumer.Unsubscribe()
/* Close consumer */
_ = consumer.Close()
```
</TabItem>

View File

@ -355,26 +355,29 @@ The `af` package encapsulates TDengine advanced functions such as connection man
#### Subscribe
* `func NewConsumer(conf *Config) (*Consumer, error)`
* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
Creates consumer group.
* `func (c *Consumer) Subscribe(topics []string) error`
* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
Note: `rebalanceCb` is reserved for compatibility purpose
Subscribes a topic.
* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
Note: `rebalanceCb` is reserved for compatibility purpose
Subscribes to topics.
* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
Polling information.
* `func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error`
* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
Note: `tmq.TopicPartition` is reserved for compatibility purpose
Commit information.
* `func (c *Consumer) FreeMessage(message unsafe.Pointer)`
Free information.
* `func (c *Consumer) Unsubscribe() error`
Unsubscribe.
@ -441,22 +444,33 @@ Close consumer.
### Subscribe via WebSocket
* `func NewConsumer(config *Config) (*Consumer, error)`
* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
Creates consumer group.
* `func (c *Consumer) Subscribe(topic []string) error`
* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
Note: `rebalanceCb` is reserved for compatibility purpose
Subscribes a topic.
* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
Note: `rebalanceCb` is reserved for compatibility purpose
Subscribes to topics.
* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
Polling information.
* `func (c *Consumer) Commit(messageID uint64) error`
* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
Note: `tmq.TopicPartition` is reserved for compatibility purpose
Commit information.
* `func (c *Consumer) Unsubscribe() error`
Unsubscribe.
* `func (c *Consumer) Close() error`
Close consumer.

View File

@ -2,5 +2,5 @@ module goexample
go 1.17
require github.com/taosdata/driver-go/v3 3.0
require github.com/taosdata/driver-go/v3 3.1.0

View File

@ -1,17 +1,12 @@
package main
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"os"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/tmq"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/errors"
"github.com/taosdata/driver-go/v3/wrapper"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
)
func main() {
@ -28,55 +23,26 @@ func main() {
if err != nil {
panic(err)
}
config := tmq.NewConfig()
defer config.Destroy()
err = config.SetGroupID("test")
if err != nil {
panic(err)
}
err = config.SetAutoOffsetReset("earliest")
if err != nil {
panic(err)
}
err = config.SetConnectIP("127.0.0.1")
if err != nil {
panic(err)
}
err = config.SetConnectUser("root")
if err != nil {
panic(err)
}
err = config.SetConnectPass("taosdata")
if err != nil {
panic(err)
}
err = config.SetConnectPort("6030")
if err != nil {
panic(err)
}
err = config.SetMsgWithTableName(true)
if err != nil {
panic(err)
}
err = config.EnableHeartBeat()
if err != nil {
panic(err)
}
err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
if result.ErrCode != 0 {
errStr := wrapper.TMQErr2Str(result.ErrCode)
err := errors.NewError(int(result.ErrCode), errStr)
panic(err)
}
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_client",
"enable.auto.commit": "false",
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
})
if err != nil {
panic(err)
}
consumer, err := tmq.NewConsumer(config)
if err != nil {
panic(err)
}
err = consumer.Subscribe([]string{"example_tmq_topic"})
err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
@ -88,19 +54,25 @@ func main() {
if err != nil {
panic(err)
}
for {
result, err := consumer.Poll(time.Second)
for i := 0; i < 5; i++ {
ev := consumer.Poll(0)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Println(e.String())
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
}
err = consumer.Unsubscribe()
if err != nil {
panic(err)
}
if result.Type != common.TMQ_RES_DATA {
panic("want message type 1 got " + strconv.Itoa(int(result.Type)))
err = consumer.Close()
if err != nil {
panic(err)
}
data, _ := json.Marshal(result.Data)
fmt.Println(string(data))
consumer.Commit(context.Background(), result.Message)
consumer.FreeMessage(result.Message)
break
}
consumer.Close()
}

View File

@ -115,19 +115,22 @@ class TaosConsumer():
<TabItem label="Go" value="Go">
```go
func NewConsumer(conf *Config) (*Consumer, error)
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
func (c *Consumer) Close() error
// 出于兼容目的保留 rebalanceCb 参数,当前未使用
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
// 出于兼容目的保留 rebalanceCb 参数,当前未使用
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
func (c *Consumer) FreeMessage(message unsafe.Pointer)
func (c *Consumer) Poll(timeoutMs int) tmq.Event
func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
func (c *Consumer) Subscribe(topics []string) error
// 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
func (c *Consumer) Unsubscribe() error
func (c *Consumer) Close() error
```
</TabItem>
@ -355,50 +358,20 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
<TabItem label="Go" value="Go">
```go
config := tmq.NewConfig()
defer config.Destroy()
err = config.SetGroupID("test")
if err != nil {
panic(err)
}
err = config.SetAutoOffsetReset("earliest")
if err != nil {
panic(err)
}
err = config.SetConnectIP("127.0.0.1")
if err != nil {
panic(err)
}
err = config.SetConnectUser("root")
if err != nil {
panic(err)
}
err = config.SetConnectPass("taosdata")
if err != nil {
panic(err)
}
err = config.SetConnectPort("6030")
if err != nil {
panic(err)
}
err = config.SetMsgWithTableName(true)
if err != nil {
panic(err)
}
err = config.EnableHeartBeat()
if err != nil {
panic(err)
}
err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
if result.ErrCode != 0 {
errStr := wrapper.TMQErr2Str(result.ErrCode)
err := errors.NewError(int(result.ErrCode), errStr)
panic(err)
}
})
if err != nil {
panic(err)
conf := &tmq.ConfigMap{
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
}
consumer, err := NewConsumer(conf)
```
</TabItem>
@ -532,11 +505,7 @@ consumer.subscribe(topics);
<TabItem value="Go" label="Go">
```go
consumer, err := tmq.NewConsumer(config)
if err != nil {
panic(err)
}
err = consumer.Subscribe([]string{"example_tmq_topic"})
err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
@ -620,13 +589,17 @@ while(running){
```go
for {
result, err := consumer.Poll(time.Second)
if err != nil {
panic(err)
ev := consumer.Poll(0)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Println(e.Value())
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
fmt.Println(result)
consumer.Commit(context.Background(), result.Message)
consumer.FreeMessage(result.Message)
}
```
@ -738,7 +711,11 @@ consumer.close();
<TabItem value="Go" label="Go">
```go
consumer.Close()
/* Unsubscribe */
_ = consumer.Unsubscribe()
/* Close consumer */
_ = consumer.Close()
```
</TabItem>

View File

@ -112,6 +112,7 @@ REST 连接支持所有能运行 Go 的平台。
```text
username:password@protocol(address)/dbname?param=value
```
### 使用连接器进行连接
<Tabs defaultValue="rest">
@ -176,6 +177,7 @@ func main() {
}
}
```
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
@ -207,6 +209,7 @@ func main() {
}
}
```
</TabItem>
</Tabs>
@ -357,33 +360,32 @@ func main() {
#### 订阅
* `func NewConsumer(conf *Config) (*Consumer, error)`
* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
创建消费者。
* `func (c *Consumer) Subscribe(topics []string) error`
* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
订阅单个主题。
* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
订阅主题。
* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
轮询消息。
* `func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error`
* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
注意:出于兼容目的保留 `tmq.TopicPartition` 参数,当前未使用
提交消息。
* `func (c *Consumer) FreeMessage(message unsafe.Pointer)`
释放消息。
* `func (c *Consumer) Unsubscribe() error`
取消订阅。
* `func (c *Consumer) Close() error`
关闭消费者
关闭连接。
#### schemaless
@ -443,25 +445,32 @@ func main() {
### 通过 WebSocket 订阅
* `func NewConsumer(config *Config) (*Consumer, error)`
* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
创建消费者。
* `func (c *Consumer) Subscribe(topic []string) error`
* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
订阅单个主题。
* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
订阅主题。
* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
轮询消息。
* `func (c *Consumer) Commit(messageID uint64) error`
* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
注意:出于兼容目的保留 `tmq.TopicPartition` 参数,当前未使用
提交消息。
* `func (c *Consumer) Close() error`
关闭消费者
关闭连接
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/driver-go/blob/3.0/examples/tmqoverws/main.go)

View File

@ -208,6 +208,7 @@ DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
DLL_EXPORT const char *taos_get_client_info();
DLL_EXPORT int taos_get_current_db(TAOS *taos, char *database, int len, int *required);
DLL_EXPORT const char *taos_errstr(TAOS_RES *res);
DLL_EXPORT int taos_errno(TAOS_RES *res);

View File

@ -36,6 +36,7 @@ extern "C" {
#define TSDB_INS_TABLE_STABLES "ins_stables"
#define TSDB_INS_TABLE_TABLES "ins_tables"
#define TSDB_INS_TABLE_TAGS "ins_tags"
#define TSDB_INS_TABLE_COLS "ins_columns"
#define TSDB_INS_TABLE_TABLE_DISTRIBUTED "ins_table_distributed"
#define TSDB_INS_TABLE_USERS "ins_users"
#define TSDB_INS_TABLE_LICENCES "ins_grants"

View File

@ -115,6 +115,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_STREAMS,
TSDB_MGMT_TABLE_TABLE,
TSDB_MGMT_TABLE_TAG,
TSDB_MGMT_TABLE_COL,
TSDB_MGMT_TABLE_USER,
TSDB_MGMT_TABLE_GRANTS,
TSDB_MGMT_TABLE_VGROUP,
@ -344,6 +345,7 @@ void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
#define IS_SET_NULL(s) (((s)->flags & COL_SET_NULL) == COL_SET_NULL)
#define SSCHMEA_TYPE(s) ((s)->type)
#define SSCHMEA_FLAGS(s) ((s)->flags)
@ -381,6 +383,13 @@ static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
}
}
static FORCE_INLINE void tDeleteSSchemaWrapperForHash(void* pSchemaWrapper) {
if (pSchemaWrapper != NULL && *(SSchemaWrapper**)pSchemaWrapper != NULL) {
taosMemoryFree((*(SSchemaWrapper**)pSchemaWrapper)->pSchema);
taosMemoryFree(*(SSchemaWrapper**)pSchemaWrapper);
}
}
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pSchema->type);
@ -1392,6 +1401,7 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN];
char tb[TSDB_TABLE_NAME_LEN];
char user[TSDB_USER_LEN];
char filterTb[TSDB_TABLE_NAME_LEN];
int64_t showId;
} SRetrieveTableReq;
@ -1772,6 +1782,8 @@ typedef struct {
int64_t checkpointFreq; // ms
// 3.0.2.3
int8_t createStb;
uint64_t targetStbUid;
SArray* fillNullCols;
} SCMCreateStreamReq;
typedef struct {

View File

@ -207,6 +207,12 @@ typedef struct SQueryNodeStat {
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
} SQueryNodeStat;
typedef struct SColLocation {
int16_t slotId;
col_id_t colId;
int8_t type;
} SColLocation;
int32_t initTaskQueue();
int32_t cleanupTaskQueue();

View File

@ -435,6 +435,22 @@ const char *taos_data_type(int type) {
return "TSDB_DATA_TYPE_NCHAR";
case TSDB_DATA_TYPE_JSON:
return "TSDB_DATA_TYPE_JSON";
case TSDB_DATA_TYPE_UTINYINT:
return "TSDB_DATA_TYPE_UTINYINT";
case TSDB_DATA_TYPE_USMALLINT:
return "TSDB_DATA_TYPE_USMALLINT";
case TSDB_DATA_TYPE_UINT:
return "TSDB_DATA_TYPE_UINT";
case TSDB_DATA_TYPE_UBIGINT:
return "TSDB_DATA_TYPE_UBIGINT";
case TSDB_DATA_TYPE_VARBINARY:
return "TSDB_DATA_TYPE_VARBINARY";
case TSDB_DATA_TYPE_DECIMAL:
return "TSDB_DATA_TYPE_DECIMAL";
case TSDB_DATA_TYPE_BLOB:
return "TSDB_DATA_TYPE_BLOB";
case TSDB_DATA_TYPE_MEDIUMBLOB:
return "TSDB_DATA_TYPE_MEDIUMBLOB";
default:
return "UNKNOWN";
}
@ -675,6 +691,32 @@ const char *taos_get_server_info(TAOS *taos) {
return pTscObj->sDetailVer;
}
int taos_get_current_db(TAOS *taos, char *database, int len, int *required) {
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return -1;
}
int code = TSDB_CODE_SUCCESS;
taosThreadMutexLock(&pTscObj->mutex);
if(database == NULL || len <= 0){
if(required != NULL) *required = strlen(pTscObj->db) + 1;
terrno = TSDB_CODE_INVALID_PARA;
code = -1;
}else if(len < strlen(pTscObj->db) + 1){
tstrncpy(database, pTscObj->db, len);
if(required) *required = strlen(pTscObj->db) + 1;
terrno = TSDB_CODE_INVALID_PARA;
code = -1;
}else{
strcpy(database, pTscObj->db);
code = 0;
}
taosThreadMutexUnlock(&pTscObj->mutex);
return code;
}
static void destoryTablesReq(void *p) {
STablesReq *pRes = (STablesReq *)p;
taosArrayDestroy(pRes->pTables);

View File

@ -178,6 +178,18 @@ static const SSysDbTableSchema userTagsSchema[] = {
{.name = "tag_value", .bytes = TSDB_MAX_TAGS_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema userColsSchema[] = {
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "table_type", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "col_name", .bytes = TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "col_type", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "col_length", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "col_precision", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "col_scale", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "col_nullable", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}
};
static const SSysDbTableSchema userTblDistSchema[] = {
{.name = "db_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
@ -294,6 +306,7 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_STABLES, userStbsSchema, tListLen(userStbsSchema), false},
{TSDB_INS_TABLE_TABLES, userTblsSchema, tListLen(userTblsSchema), false},
{TSDB_INS_TABLE_TAGS, userTagsSchema, tListLen(userTagsSchema), false},
{TSDB_INS_TABLE_COLS, userColsSchema, tListLen(userColsSchema), false},
// {TSDB_INS_TABLE_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema)},
{TSDB_INS_TABLE_USERS, userUsersSchema, tListLen(userUsersSchema), false},
{TSDB_INS_TABLE_LICENCES, grantsSchema, tListLen(grantsSchema), true},

View File

@ -3191,6 +3191,7 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq
if (tEncodeI64(&encoder, pReq->showId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->tb) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->filterTb) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
tEndEncode(&encoder);
@ -3207,6 +3208,7 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR
if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->tb) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->filterTb) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
tEndDecode(&decoder);
@ -5425,6 +5427,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
}
if (tEncodeI8(&encoder, pReq->createStb) < 0) return -1;
if (tEncodeU64(&encoder, pReq->targetStbUid) < 0) return -1;
tEndEncode(&encoder);
@ -5486,6 +5489,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
}
}
if (tDecodeI8(&decoder, &pReq->createStb) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->targetStbUid) < 0) return -1;
tEndDecode(&decoder);

View File

@ -444,6 +444,7 @@ typedef struct {
STableMetaRsp* pMeta;
bool sysDbRsp;
char db[TSDB_DB_FNAME_LEN];
char filterTb[TSDB_TABLE_NAME_LEN];
} SShowObj;
typedef struct {

View File

@ -19,6 +19,7 @@
#include "systable.h"
#define SHOW_STEP_SIZE 100
#define SHOW_COLS_STEP_SIZE 4096
static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq);
static void mndFreeShowObj(SShowObj *pShow);
@ -76,6 +77,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
type = TSDB_MGMT_TABLE_TABLE;
} else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, len) == 0) {
type = TSDB_MGMT_TABLE_TAG;
} else if (strncasecmp(name, TSDB_INS_TABLE_COLS, len) == 0) {
type = TSDB_MGMT_TABLE_COL;
} else if (strncasecmp(name, TSDB_INS_TABLE_TABLE_DISTRIBUTED, len) == 0) {
// type = TSDB_MGMT_TABLE_DIST;
} else if (strncasecmp(name, TSDB_INS_TABLE_USERS, len) == 0) {
@ -131,6 +134,7 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
showObj.pMnode = pMnode;
showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
strncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN);
int32_t keepTime = tsShellActivityTimer * 6 * 1000;
SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int64_t), &showObj, size, keepTime);
@ -190,13 +194,15 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
int32_t rowsToRead = SHOW_STEP_SIZE;
int32_t size = 0;
int32_t rowsRead = 0;
mDebug("mndProcessRetrieveSysTableReq start");
SRetrieveTableReq retrieveReq = {0};
if (tDeserializeSRetrieveTableReq(pReq->pCont, pReq->contLen, &retrieveReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
mDebug("mndProcessRetrieveSysTableReq tb:%s", retrieveReq.tb);
if (retrieveReq.showId == 0) {
STableMetaRsp *pMeta = taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb));
if (pMeta == NULL) {
@ -226,6 +232,9 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
}
}
if(pShow->type == TSDB_MGMT_TABLE_COL){ // expend capacity for ins_columns
rowsToRead = SHOW_COLS_STEP_SIZE;
}
ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
if (retrieveFp == NULL) {
mndReleaseShowObj(pShow, false);

View File

@ -43,6 +43,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq);
static int32_t mndProcessDropStbReq(SRpcMsg *pReq);
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
static int32_t mndProcessTableCfgReq(SRpcMsg *pReq);
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
@ -69,10 +70,14 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
mndSetMsgHandle(pMnode, TDMT_MND_TTL_TIMER, mndProcessTtlTimer);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq);
// mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STB, mndCancelGetNextStb);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COL, mndRetrieveStbCol);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_COL, mndCancelGetNextStb);
return sdbSetTable(pMnode->pSdb, table);
}
@ -2489,6 +2494,283 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
}
}
//static int32_t mndProcessRetrieveStbReq(SRpcMsg *pReq) {
// SMnode *pMnode = pReq->info.node;
// SShowMgmt *pMgmt = &pMnode->showMgmt;
// SShowObj *pShow = NULL;
// int32_t rowsToRead = SHOW_STEP_SIZE;
// int32_t rowsRead = 0;
//
// SRetrieveTableReq retrieveReq = {0};
// if (tDeserializeSRetrieveTableReq(pReq->pCont, pReq->contLen, &retrieveReq) != 0) {
// terrno = TSDB_CODE_INVALID_MSG;
// return -1;
// }
//
// SMnode *pMnode = pReq->info.node;
// SSdb *pSdb = pMnode->pSdb;
// int32_t numOfRows = 0;
// SDbObj *pDb = NULL;
// ESdbStatus objStatus = 0;
//
// SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
// if (pUser == NULL) return 0;
// bool sysinfo = pUser->sysInfo;
//
// // Append the information_schema database into the result.
//// if (!pShow->sysDbRsp) {
//// SDbObj infoschemaDb = {0};
//// setInformationSchemaDbCfg(pMnode, &infoschemaDb);
//// size_t numOfTables = 0;
//// getVisibleInfosTablesNum(sysinfo, &numOfTables);
//// mndDumpDbInfoData(pMnode, pBlock, &infoschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
////
//// numOfRows += 1;
////
//// SDbObj perfschemaDb = {0};
//// setPerfSchemaDbCfg(pMnode, &perfschemaDb);
//// numOfTables = 0;
//// getPerfDbMeta(NULL, &numOfTables);
//// mndDumpDbInfoData(pMnode, pBlock, &perfschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
////
//// numOfRows += 1;
//// pShow->sysDbRsp = true;
//// }
//
// SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_COLS);
// blockDataEnsureCapacity(p, rowsToRead);
//
// size_t size = 0;
// const SSysTableMeta* pSysDbTableMeta = NULL;
//
// getInfosDbMeta(&pSysDbTableMeta, &size);
// p->info.rows = buildDbColsInfoBlock(sysinfo, p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);
//
// getPerfDbMeta(&pSysDbTableMeta, &size);
// p->info.rows = buildDbColsInfoBlock(sysinfo, p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
//
// blockDataDestroy(p);
//
//
// while (numOfRows < rowsToRead) {
// pShow->pIter = sdbFetchAll(pSdb, SDB_DB, pShow->pIter, (void **)&pDb, &objStatus, true);
// if (pShow->pIter == NULL) break;
// if (strncmp(retrieveReq.db, pDb->name, strlen(retrieveReq.db)) != 0){
// continue;
// }
// if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) {
// continue;
// }
//
// while (numOfRows < rowsToRead) {
// pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
// if (pShow->pIter == NULL) break;
//
// if (pDb != NULL && pStb->dbUid != pDb->uid) {
// sdbRelease(pSdb, pStb);
// continue;
// }
//
// cols = 0;
//
// SName name = {0};
// char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
// mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
// varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
//
// SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)stbName, false);
//
// char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
// tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
// tNameGetDbName(&name, varDataVal(db));
// varDataSetLen(db, strlen(varDataVal(db)));
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)db, false);
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->createdTime, false);
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false);
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false);
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// if (pStb->commentLen > 0) {
// char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
// STR_TO_VARSTR(comment, pStb->comment);
// colDataAppend(pColInfo, numOfRows, comment, false);
// } else if (pStb->commentLen == 0) {
// char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
// STR_TO_VARSTR(comment, "");
// colDataAppend(pColInfo, numOfRows, comment, false);
// } else {
// colDataAppendNULL(pColInfo, numOfRows);
// }
//
// char watermark[64 + VARSTR_HEADER_SIZE] = {0};
// sprintf(varDataVal(watermark), "%" PRId64 "a,%" PRId64 "a", pStb->watermark[0], pStb->watermark[1]);
// varDataSetLen(watermark, strlen(varDataVal(watermark)));
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)watermark, false);
//
// char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
// sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]);
// varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)maxDelay, false);
//
// char rollup[160 + VARSTR_HEADER_SIZE] = {0};
// int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
// char *sep = ", ";
// int32_t sepLen = strlen(sep);
// int32_t rollupLen = sizeof(rollup) - VARSTR_HEADER_SIZE - 2;
// for (int32_t i = 0; i < rollupNum; ++i) {
// char *funcName = taosArrayGet(pStb->pFuncs, i);
// if (i) {
// strncat(varDataVal(rollup), sep, rollupLen);
// rollupLen -= sepLen;
// }
// strncat(varDataVal(rollup), funcName, rollupLen);
// rollupLen -= strlen(funcName);
// }
// varDataSetLen(rollup, strlen(varDataVal(rollup)));
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)rollup, false);
//
// numOfRows++;
// sdbRelease(pSdb, pStb);
// }
//
// if (pDb != NULL) {
// mndReleaseDb(pMnode, pDb);
// }
//
// sdbRelease(pSdb, pDb);
// }
//
// pShow->numOfRows += numOfRows;
// mndReleaseUser(pMnode, pUser);
//
//
//
//
//
//
//
//
// ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
// if (retrieveFp == NULL) {
// mndReleaseShowObj(pShow, false);
// terrno = TSDB_CODE_MSG_NOT_PROCESSED;
// mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
// return -1;
// }
//
// mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
// if (retrieveReq.user[0] != 0) {
// memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN);
// } else {
// memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1);
// }
// if (retrieveReq.db[0] && mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) {
// return -1;
// }
//
// int32_t numOfCols = pShow->pMeta->numOfColumns;
//
// SSDataBlock *pBlock = createDataBlock();
// for (int32_t i = 0; i < numOfCols; ++i) {
// SColumnInfoData idata = {0};
//
// SSchema *p = &pShow->pMeta->pSchemas[i];
//
// idata.info.bytes = p->bytes;
// idata.info.type = p->type;
// idata.info.colId = p->colId;
// blockDataAppendColInfo(pBlock, &idata);
// }
//
// blockDataEnsureCapacity(pBlock, rowsToRead);
//
// if (mndCheckRetrieveFinished(pShow)) {
// mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);
// rowsRead = 0;
// } else {
// rowsRead = (*retrieveFp)(pReq, pShow, pBlock, rowsToRead);
// if (rowsRead < 0) {
// terrno = rowsRead;
// mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
// mndReleaseShowObj(pShow, true);
// blockDataDestroy(pBlock);
// return -1;
// }
//
// pBlock->info.rows = rowsRead;
// mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
// }
//
// size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
// blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock));
//
// SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
// if (pRsp == NULL) {
// mndReleaseShowObj(pShow, false);
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
// blockDataDestroy(pBlock);
// return -1;
// }
//
// pRsp->handle = htobe64(pShow->id);
//
// if (rowsRead > 0) {
// char *pStart = pRsp->data;
// SSchema *ps = pShow->pMeta->pSchemas;
//
// *(int32_t *)pStart = htonl(pShow->pMeta->numOfColumns);
// pStart += sizeof(int32_t); // number of columns
//
// for (int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
// SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
// pSchema->bytes = htonl(ps[i].bytes);
// pSchema->colId = htons(ps[i].colId);
// pSchema->type = ps[i].type;
//
// pStart += sizeof(SSysTableSchema);
// }
//
// int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
// }
//
// pRsp->numOfRows = htonl(rowsRead);
// pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
// pReq->info.rsp = pRsp;
// pReq->info.rspLen = size;
//
// if (rowsRead == 0 || rowsRead < rowsToRead) {
// pRsp->completed = 1;
// mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
// mndReleaseShowObj(pShow, true);
// } else {
// mDebug("show:0x%" PRIx64 ", retrieve not completed yet", pShow->id);
// mndReleaseShowObj(pShow, false);
// }
//
// blockDataDestroy(pBlock);
// return TSDB_CODE_SUCCESS;
//}
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -2599,6 +2881,187 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
return numOfRows;
}
static int32_t buildDbColsInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName, const char* tbName) {
char tName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char dName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t numOfRows = p->info.rows;
STR_TO_VARSTR(dName, dbName);
STR_TO_VARSTR(typeName, "SYSTEM_TABLE");
for (int32_t i = 0; i < size; ++i) {
const SSysTableMeta* pm = &pSysDbTableMeta[i];
// if (pm->sysInfo) {
// continue;
// }
if(tbName[0] && strncmp(tbName, pm->name, TSDB_TABLE_NAME_LEN) != 0){
continue;
}
STR_TO_VARSTR(tName, pm->name);
for(int32_t j = 0; j < pm->colNum; j++){
// table name
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
colDataAppend(pColInfoData, numOfRows, tName, false);
// database name
pColInfoData = taosArrayGet(p->pDataBlock, 1);
colDataAppend(pColInfoData, numOfRows, dName, false);
pColInfoData = taosArrayGet(p->pDataBlock, 2);
colDataAppend(pColInfoData, numOfRows, typeName, false);
// col name
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(colName, pm->schema[j].name);
pColInfoData = taosArrayGet(p->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, colName, false);
// col type
int8_t colType = pm->schema[j].type;
pColInfoData = taosArrayGet(p->pDataBlock, 4);
char colTypeStr[VARSTR_HEADER_SIZE + 32];
int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
if (colType == TSDB_DATA_TYPE_VARCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)(pm->schema[j].bytes - VARSTR_HEADER_SIZE));
} else if (colType == TSDB_DATA_TYPE_NCHAR) {
colTypeLen += sprintf(
varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)((pm->schema[j].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
varDataSetLen(colTypeStr, colTypeLen);
colDataAppend(pColInfoData, numOfRows, (char*)colTypeStr, false);
pColInfoData = taosArrayGet(p->pDataBlock, 5);
colDataAppend(pColInfoData, numOfRows, (const char*)&pm->schema[j].bytes, false);
for (int32_t k = 6; k <= 8; ++k) {
pColInfoData = taosArrayGet(p->pDataBlock, k);
colDataAppendNULL(pColInfoData, numOfRows);
}
numOfRows += 1;
}
}
return numOfRows;
}
static int32_t buildSysDbColsInfo(SSDataBlock* p, char* db, char* tb) {
size_t size = 0;
const SSysTableMeta* pSysDbTableMeta = NULL;
if(db[0] && strncmp(db, TSDB_INFORMATION_SCHEMA_DB, TSDB_DB_FNAME_LEN) != 0 && strncmp(db, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_DB_FNAME_LEN) != 0){
return p->info.rows;
}
getInfosDbMeta(&pSysDbTableMeta, &size);
p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB, tb);
getPerfDbMeta(&pSysDbTableMeta, &size);
p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB, tb);
return p->info.rows;
}
static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SStbObj *pStb = NULL;
int32_t numOfRows = buildSysDbColsInfo(pBlock, pShow->db, pShow->filterTb);
mDebug("mndRetrieveStbCol get system table cols, rows:%d, db:%s", numOfRows, pShow->db);
SDbObj *pDb = NULL;
if (strlen(pShow->db) > 0) {
pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return terrno;
}
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(typeName, "SUPER_TABLE");
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break;
if (pDb != NULL && pStb->dbUid != pDb->uid) {
sdbRelease(pSdb, pStb);
continue;
}
SName name = {0};
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
if(pShow->filterTb[0] && strncmp(pShow->filterTb, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN) != 0){
sdbRelease(pSdb, pStb);
continue;
}
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&name, varDataVal(db));
varDataSetLen(db, strlen(varDataVal(db)));
for(int i = 0; i < pStb->numOfColumns; i++){
int32_t cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)stbName, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)db, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, typeName, false);
// col name
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(colName, pStb->pColumns[i].name);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, colName, false);
// col type
int8_t colType = pStb->pColumns[i].type;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char colTypeStr[VARSTR_HEADER_SIZE + 32];
int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
if (colType == TSDB_DATA_TYPE_VARCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)(pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE));
} else if (colType == TSDB_DATA_TYPE_NCHAR) {
colTypeLen += sprintf(
varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
varDataSetLen(colTypeStr, colTypeLen);
colDataAppend(pColInfo, numOfRows, (char*)colTypeStr, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*)&pStb->pColumns[i].bytes, false);
while(cols < pShow->numOfColumns) {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppendNULL(pColInfo, numOfRows);
}
numOfRows++;
}
sdbRelease(pSdb, pStb);
}
if (pDb != NULL) {
mndReleaseDb(pMnode, pDb);
}
pShow->numOfRows += numOfRows;
mDebug("mndRetrieveStbCol success, rows:%d, pShow->numOfRows:%d", numOfRows, pShow->numOfRows);
return numOfRows;
}
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);

View File

@ -314,7 +314,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
}
tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
} else {
pObj->targetStbUid = pCreate->targetStbUid;
}
pObj->targetDbUid = pTargetDb->uid;
mndReleaseDb(pMnode, pTargetDb);
@ -334,6 +338,38 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
goto FAIL;
}
int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
if(numOfNULL > 0) {
pObj->outputSchema.nCols += numOfNULL;
SSchema* pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
if (!pFullSchema) {
goto FAIL;
}
int32_t nullIndex = 0;
int32_t dataIndex = 0;
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
SColLocation* pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
if (i < pos->slotId) {
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
dataIndex++;
} else {
pFullSchema[i].bytes = 0;
pFullSchema[i].colId = pos->colId;
pFullSchema[i].flags = COL_SET_NULL;
memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
pFullSchema[i].type = pos->type;
nullIndex++;
}
}
taosMemoryFree(pObj->outputSchema.pSchema);
pObj->outputSchema.pSchema = pFullSchema;
}
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,

View File

@ -152,7 +152,7 @@ typedef struct SMTbCursor SMTbCursor;
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
int32_t metaTbCursorNext(SMTbCursor *pTbCur);
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
#endif
// tsdb

View File

@ -310,7 +310,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
}
}
int metaTbCursorNext(SMTbCursor *pTbCur) {
int metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) {
int ret;
void *pBuf;
STbCfg tbCfg;
@ -324,7 +324,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
tDecoderClear(&pTbCur->mr.coder);
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
if (pTbCur->mr.me.type == jumpTableType) {
continue;
}

View File

@ -323,19 +323,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
taosArrayDestroy(tagArray);
}
static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, void** pBuf, int32_t* contLen) {
static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
int32_t ret = 0;
SVCreateTbBatchReq reqs = {0};
reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
if (NULL == reqs.pArray) {
ret = -1;
goto end;
}
taosArrayPush(reqs.pArray, req);
reqs.nReqs = 1;
tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
if (ret < 0) {
ret = -1;
goto end;
@ -350,7 +341,7 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, vo
((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
SEncoder coder = {0};
tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead) );
if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) {
if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
rpcFreeCont(*pBuf);
*pBuf = NULL;
*contLen = 0;
@ -361,14 +352,13 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, vo
tEncoderClear(&coder);
end:
taosArrayDestroy(reqs.pArray);
return ret;
}
int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbReq* pReq) {
int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
void* buf = NULL;
int32_t tlen = 0;
encodeCreateChildTableForRPC(pReq, TD_VID(pVnode), &buf, &tlen);
encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
SRpcMsg msg = {
.msgType = TDMT_VND_CREATE_TABLE,
@ -387,6 +377,7 @@ _error:
tqError("failed to encode submit req since %s", terrstr());
return TSDB_CODE_OUT_OF_MEMORY;
}
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
@ -402,6 +393,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
void* pBuf = NULL;
SArray* tagArray = NULL;
SArray* pVals = NULL;
SArray* crTblArray = NULL;
for (int32_t i = 0; i < blockSz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
@ -442,8 +434,14 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tqDebug("failed to put delete req into write-queue since %s", terrstr());
}
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
SVCreateTbBatchReq reqs = {0};
crTblArray = reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
if (NULL == reqs.pArray) {
goto _end;
}
for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq));
SVCreateTbReq createTbReq = {0};
SVCreateTbReq* pCreateTbReq = &createTbReq;
if (!pCreateTbReq) {
goto _end;
}
@ -511,6 +509,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set table name
@ -524,13 +523,15 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
pCreateTbReq->name = taosMemoryCalloc(1, varDataLen(pTbData) + 1);
memcpy(pCreateTbReq->name, varDataVal(pTbData), varDataLen(pTbData));
}
if (tqPutReqToQueue(pVnode, pCreateTbReq) != TSDB_CODE_SUCCESS) {
taosArrayPush(reqs.pArray, pCreateTbReq);
}
reqs.nReqs = taosArrayGetSize(reqs.pArray);
if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) {
goto _end;
}
tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFreeClear(pCreateTbReq);
}
tagArray = taosArrayDestroy(tagArray);
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
crTblArray = NULL;
} else {
SSubmitTbData tbData = {0};
tqDebug("tq sink pipe2, convert block1 %d, rows: %d", i, rows);
@ -579,7 +580,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
goto _end;
}
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.cid = pTSchema->numOfCols + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.id.groupId,
};
@ -638,16 +639,23 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
for (int32_t j = 0; j < rows; j++) {
taosArrayClear(pVals);
int32_t dataIndex = 0;
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pCol = &pTSchema->columns[k];
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j);
tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
}
if (IS_SET_NULL(pCol)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
} else{
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
if (colDataIsNull_s(pColData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
dataIndex++;
} else {
void* colData = colDataGetData(pColData, j);
if (IS_STR_DATA_TYPE(pCol->type)) {
@ -661,6 +669,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
}
dataIndex++;
}
}
}
SRow* pRow = NULL;
@ -716,5 +726,6 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
_end:
taosArrayDestroy(tagArray);
taosArrayDestroy(pVals);
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
// TODO: change
}

View File

@ -140,6 +140,10 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
SMetaReader* smrChildTable, const char* dbname, const char* tableName,
int32_t* pNumOfRows, const SSDataBlock* dataBlock);
static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, const char* dbname,
int32_t* pNumOfRows, const SSDataBlock* dataBlock,
char* tName, SSchemaWrapper* schemaRow, char* tableType);
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock,
SFilterInfo* pFilterInfo);
@ -413,6 +417,176 @@ static bool sysTableIsCondOnOneTable(SNode* pCond, char* condTable) {
return false;
}
static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
qDebug("sysTableScanUserCols get cols start");
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
blockDataCleanup(pInfo->pRes);
int32_t numOfRows = 0;
SSDataBlock* dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_COLS);
blockDataEnsureCapacity(dataBlock, pOperator->resultInfo.capacity);
const char* db = NULL;
int32_t vgId = 0;
vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
SName sn = {0};
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&sn, varDataVal(dbname));
varDataSetLen(dbname, strlen(varDataVal(dbname)));
// optimize when sql like where table_name='tablename' and xxx.
if (pInfo->req.filterTb[0]) {
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tableName, pInfo->req.filterTb);
SMetaReader smrTable = {0};
metaReaderInit(&smrTable, pInfo->readHandle.meta, 0);
int32_t code = metaGetTableEntryByName(&smrTable, pInfo->req.filterTb);
if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by metaGetTableEntryByName, therefore, return directly
metaReaderClear(&smrTable);
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
if (smrTable.me.type == TSDB_SUPER_TABLE) {
metaReaderClear(&smrTable);
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
if (smrTable.me.type == TSDB_CHILD_TABLE) {
int64_t suid = smrTable.me.ctbEntry.suid;
metaReaderClear(&smrTable);
metaReaderInit(&smrTable, pInfo->readHandle.meta, 0);
code = metaGetTableEntryByUid(&smrTable, suid);
if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by metaGetTableEntryByName, therefore, return directly
metaReaderClear(&smrTable);
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
}
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
SSchemaWrapper *schemaRow = NULL;
if(smrTable.me.type == TSDB_SUPER_TABLE){
schemaRow = &smrTable.me.stbEntry.schemaRow;
STR_TO_VARSTR(typeName, "CHILD_TABLE");
}else if(smrTable.me.type == TSDB_NORMAL_TABLE){
schemaRow = &smrTable.me.ntbEntry.schemaRow;
STR_TO_VARSTR(typeName, "NORMAL_TABLE");
}
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
metaReaderClear(&smrTable);
if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
}
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
setOperatorCompleted(pOperator);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
int32_t ret = 0;
if (pInfo->pCur == NULL) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
}
SHashObj *stableSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
taosHashSetFreeFp(stableSchema, tDeleteSSchemaWrapperForHash);
while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0) {
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
SSchemaWrapper *schemaRow = NULL;
if(pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE){
qDebug("sysTableScanUserCols cursor get super table");
void *schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
if(schema == NULL){
SSchemaWrapper *schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
taosHashPut(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
}
continue;
}else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) {
qDebug("sysTableScanUserCols cursor get child table");
STR_TO_VARSTR(typeName, "CHILD_TABLE");
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
void *schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
if(schema != NULL){
schemaRow = *(SSchemaWrapper **)schema;
}else{
tDecoderClear(&pInfo->pCur->mr.coder);
int code = metaGetTableEntryByUid(&pInfo->pCur->mr, suid);
if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by metaGetTableEntryByName, therefore, return directly
qError("sysTableScanUserCols get meta by suid:%"PRId64 " error, code:%d", suid, code);
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
taosHashCleanup(stableSchema);
return NULL;
}
schemaRow = &pInfo->pCur->mr.me.stbEntry.schemaRow;
}
}else if(pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE){
qDebug("sysTableScanUserCols cursor get normal table");
schemaRow = &pInfo->pCur->mr.me.ntbEntry.schemaRow;
STR_TO_VARSTR(typeName, "NORMAL_TABLE");
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
}else{
qDebug("sysTableScanUserCols cursor get invalid table");
continue;
}
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
if (numOfRows >= pOperator->resultInfo.capacity) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
if (pInfo->pRes->info.rows > 0) {
break;
}
}
}
taosHashCleanup(stableSchema);
if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
}
blockDataDestroy(dataBlock);
if (ret != 0) {
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
setOperatorCompleted(pOperator);
}
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
qDebug("sysTableScanUserCols get cols success, rows:%" PRIu64, pInfo->loadInfo.totalRows);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
@ -491,7 +665,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
}
while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_SUPER_TABLE)) == 0) {
if (pInfo->pCur->mr.me.type != TSDB_CHILD_TABLE) {
continue;
}
@ -728,6 +902,67 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
return TSDB_CODE_SUCCESS;
}
static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, const char* dbname,
int32_t* pNumOfRows, const SSDataBlock* dataBlock, char* tName,
SSchemaWrapper* schemaRow, char* tableType) {
if(schemaRow == NULL){
qError("sysTableUserColsFillOneTableCols schemaRow is NULL");
return TSDB_CODE_SUCCESS;
}
int32_t numOfRows = *pNumOfRows;
int32_t numOfCols = schemaRow->nCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = NULL;
// table name
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 0);
colDataAppend(pColInfoData, numOfRows, tName, false);
// database name
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 1);
colDataAppend(pColInfoData, numOfRows, dbname, false);
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 2);
colDataAppend(pColInfoData, numOfRows, tableType, false);
// col name
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(colName, schemaRow->pSchema[i].name);
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, colName, false);
// col type
int8_t colType = schemaRow->pSchema[i].type;
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 4);
char colTypeStr[VARSTR_HEADER_SIZE + 32];
int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
if (colType == TSDB_DATA_TYPE_VARCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)(schemaRow->pSchema[i].bytes - VARSTR_HEADER_SIZE));
} else if (colType == TSDB_DATA_TYPE_NCHAR) {
colTypeLen += sprintf(
varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)((schemaRow->pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
varDataSetLen(colTypeStr, colTypeLen);
colDataAppend(pColInfoData, numOfRows, (char*)colTypeStr, false);
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 5);
colDataAppend(pColInfoData, numOfRows, (const char*)&schemaRow->pSchema[i].bytes, false);
for (int32_t j = 6; j <= 8; ++j) {
pColInfoData = taosArrayGet(dataBlock->pDataBlock, j);
colDataAppendNULL(pColInfoData, numOfRows);
}
++numOfRows;
}
*pNumOfRows = numOfRows;
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) {
size_t size = 0;
const SSysTableMeta* pMeta = NULL;
@ -1029,7 +1264,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t ret = 0;
while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_SUPER_TABLE)) == 0) {
STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
// table name
@ -1315,12 +1550,19 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
if (pInfo->showRewrite) {
getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
}else if(strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0){
getDBNameFromCondition(pInfo->pCondition, dbName);
if(dbName[0]) sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
sysTableIsCondOnOneTable(pInfo->pCondition, pInfo->req.filterTb);
}
SSDataBlock* pBlock = NULL;
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
pBlock = sysTableScanUserTables(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
pBlock = sysTableScanUserTags(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 && pInfo->readHandle.mnd == NULL) {
pBlock = sysTableScanUserCols(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && pInfo->showRewrite &&
IS_SYS_DBNAME(dbName)) {
pBlock = sysTableScanUserSTables(pOperator);
@ -1391,7 +1633,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
tsem_wait(&pInfo->ready);
if (pTaskInfo->code) {
qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
qError("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code));
return NULL;
}
@ -1427,6 +1669,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
const char* pUser, SExecTaskInfo* pTaskInfo) {
int32_t code = TDB_CODE_SUCCESS;
SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@ -1437,7 +1680,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
int32_t num = 0;
int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -1513,7 +1756,8 @@ void destroySysScanOperator(void* param) {
const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0|| pInfo->pCur != NULL) {
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
}

View File

@ -166,7 +166,7 @@ static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, const c
code = reserveDnodeRequiredInCache(pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code &&
(0 == strcmp(pTable, TSDB_INS_TABLE_TAGS) || 0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) &&
(0 == strcmp(pTable, TSDB_INS_TABLE_TAGS) || 0 == strcmp(pTable, TSDB_INS_TABLE_TABLES) || 0 == strcmp(pTable, TSDB_INS_TABLE_COLS)) &&
QUERY_NODE_SELECT_STMT == nodeType(pCxt->pStmt)) {
code = collectMetaKeyFromInsTags(pCxt);
}

View File

@ -2210,7 +2210,7 @@ static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) {
}
static bool sysTableFromVnode(const char* pTable) {
return ((0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)));
return ((0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)) || (0 == strcmp(pTable, TSDB_INS_TABLE_COLS)));
}
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
@ -2278,7 +2278,8 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName,
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
}
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) && !hasUserDbCond) {
if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) && !hasUserDbCond) ||
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_COLS)) {
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &pVgs);
}
@ -2376,7 +2377,8 @@ static bool isSingleTable(SRealTableNode* pRealTable) {
int8_t tableType = pRealTable->pMeta->tableType;
if (TSDB_SYSTEM_TABLE == tableType) {
return 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) &&
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS);
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) &&
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_COLS);
}
return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType);
}
@ -5740,7 +5742,7 @@ static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STable
int32_t index = 0;
SNode* pProj = NULL;
FOREACH(pProj, pProjections) {
SSchema* pSchema = pSchemas + index;
SSchema* pSchema = pSchemas + index++;
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes};
if (!dataTypeEqual(&dt, &((SExprNode*)pProj)->resType)) {
SNode* pFunc = NULL;
@ -5761,7 +5763,7 @@ typedef struct SProjColPos {
} SProjColPos;
static int32_t projColPosCompar(const void* l, const void* r) {
return ((SProjColPos*)l)->colId < ((SProjColPos*)r)->colId;
return ((SProjColPos*)l)->colId > ((SProjColPos*)r)->colId;
}
static void projColPosDelete(void* p) { taosMemoryFree(((SProjColPos*)p)->pProj); }
@ -5856,7 +5858,11 @@ static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateSt
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
}
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
pReq->targetStbUid = 0;
return TSDB_CODE_SUCCESS;
} else {
pReq->createStb = STREAM_CREATE_STABLE_FALSE;
pReq->targetStbUid = pMeta->suid;
}
if (TSDB_CODE_SUCCESS == code) {
code = adjustStreamQueryForExistTableImpl(pCxt, pStmt, pMeta);

View File

@ -102,6 +102,10 @@ void generateInformationSchema(MockCatalogService* mcs) {
.addColumn("table_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
.addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
.done();
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_COLS, TSDB_SYSTEM_TABLE, 2)
.addColumn("table_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
.addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
.done();
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_USER_PRIVILEGES, TSDB_SYSTEM_TABLE, 2)
.addColumn("user_name", TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN)
.addColumn("privilege", TSDB_DATA_TYPE_BINARY, 10)

View File

@ -1334,11 +1334,12 @@ static int32_t smaIndexOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNod
if (TSDB_CODE_SUCCESS == code) {
code = replaceLogicNode(pLogicSubplan, pScan->node.pParent, pSmaScan);
}
if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode((SNode*)pScan->node.pParent);
}
return code;
}
static void smaIndexOptDestroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); }
static int32_t smaIndexOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t nindexes = taosArrayGetSize(pScan->pSmaIndexes);
@ -1348,8 +1349,6 @@ static int32_t smaIndexOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogi
code = smaIndexOptCouldApplyIndex(pScan, pIndex, &pSmaCols);
if (TSDB_CODE_SUCCESS == code && NULL != pSmaCols) {
code = smaIndexOptApplyIndex(pLogicSubplan, pScan, pIndex, pSmaCols);
taosArrayDestroyEx(pScan->pSmaIndexes, smaIndexOptDestroySmaIndex);
pScan->pSmaIndexes = NULL;
pCxt->optimized = true;
break;
}

View File

@ -609,7 +609,8 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
pScan->accountId = pCxt->pPlanCxt->acctId;
pScan->sysInfo = pCxt->pPlanCxt->sysInfo;
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TABLES) ||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TAGS)) {
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TAGS) ||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_COLS)) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
} else {
pSubplan->execNode.nodeId = MNODE_HANDLE;

View File

@ -247,8 +247,8 @@
,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim
,,y,script,./test.sh -f tsim/stream/tableAndTag0.sim
,,y,script,./test.sh -f tsim/stream/tableAndTag1.sim
,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim
,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim
,,y,script,./test.sh -f tsim/trans/lossdata1.sim
,,y,script,./test.sh -f tsim/trans/create_db.sim
,,y,script,./test.sh -f tsim/tmq/basic1.sim
@ -1053,6 +1053,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/odbc.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py

View File

@ -53,7 +53,7 @@ endi
sql select tbname from information_schema.ins_tables;
print $rows $data00
if $rows != 32 then
if $rows != 33 then
return -1
endi
if $data00 != @ins_tables@ then

View File

@ -0,0 +1,310 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
print ===== step1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print ===== step2
sql create database result vgroups 1;
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stable result.streamt0(ts timestamp,a int,b int) tags(ta int,tb int,tc int);
sql create stream streams0 trigger at_once into result.streamt0 as select _wstart, count(*) c1, max(a) c2 from st partition by tbname interval(10s);
sql insert into t1 values(1648791213000,1,2,3);
sql insert into t2 values(1648791213000,2,2,3);
$loop_count = 0
sql select _wstart, count(*) c1, max(a) c2 from st partition by tbname interval(10s);
print $data00, $data01, $data02
print $data10, $data11, $data12
print $data20, $data21, $data22
loop0:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result.streamt0 order by ta;
if $rows != 2 then
print =====rows=$rows
print $data00, $data01, $data02
print $data10, $data11, $data12
print $data20, $data21, $data22
goto loop0
endi
if $data01 != 1 then
print =====data01=$data01
goto loop0
endi
if $data02 != 1 then
print =====data02=$data02
goto loop0
endi
if $data11 != 1 then
print =====data11=$data11
goto loop0
endi
if $data12 != 2 then
print =====data12=$data12
goto loop0
endi
print ===== step3
sql create database result1 vgroups 1;
sql create database test1 vgroups 4;
sql use test1;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stable result1.streamt1(ts timestamp,a int,b int,c int) tags(ta bigint unsigned,tb int,tc int);
sql create stream streams1 trigger at_once into result1.streamt1(ts,c,a,b) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname interval(10s);
sql insert into t1 values(1648791213000,10,20,30);
sql insert into t2 values(1648791213000,40,50,60);
$loop_count = 0
sql select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname interval(10s);
print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
loop1:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result1.streamt1 order by ta;
if $rows != 2 then
print =====rows=$rows
print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
goto loop1
endi
if $data01 != 10 then
print =====data01=$data01
goto loop1
endi
if $data02 != 20 then
print =====data02=$data02
goto loop1
endi
if $data03 != 1 then
print =====data03=$data03
goto loop1
endi
if $data11 != 40 then
print =====data11=$data11
goto loop1
endi
if $data12 != 50 then
print =====data12=$data12
goto loop1
endi
if $data13 != 1 then
print =====data13=$data13
goto loop1
endi
print ===== step4
sql create database result2 vgroups 1;
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stable result2.streamt2(ts timestamp, a int , b int) tags(ta varchar(20));
# tag dest 1, source 2
##sql_error create stream streams2 trigger at_once into result2.streamt2 TAGS(aa varchar(100), ta int) as select _wstart, count(*) c1, max(a) from st partition by tbname as aa, ta interval(10s);
# column dest 3, source 4
sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s);
# column dest 3, source 4
sql_error create stream streams2 trigger at_once into result2.streamt2(ts, a, b) as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s);
# column dest 3, source 2
sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1 from st partition by tbname interval(10s);
# column dest 3, source 2
sql create stream streams2 trigger at_once into result2.streamt2(ts, a) as select _wstart, count(*) c1 from st partition by tbname interval(10s);
print ===== step5
sql create database result3 vgroups 1;
sql create database test3 vgroups 4;
sql use test3;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,2,3);
sql create table t2 using st tags(4,5,6);
sql create stable result3.streamt3(ts timestamp,a int,b int,c int, d int) tags(ta int,tb int,tc int);
sql create stream streams3 trigger at_once into result3.streamt3(ts,c,a,b) as select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s);
sql insert into t1 values(1648791213000,10,20,30);
sql insert into t2 values(1648791213000,40,50,60);
$loop_count = 0
sql select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s);
print $data00, $data01, $data02, $data03, $data04
print $data10, $data11, $data12, $data13, $data14
print $data20, $data21, $data22, $data23, $data24
loop2:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result3.streamt3;
if $rows != 1 then
print =====rows=$rows
print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
goto loop2
endi
if $data01 != 40 then
print =====data01=$data01
goto loop2
endi
if $data02 != 20 then
print =====data02=$data02
goto loop2
endi
if $data03 != 2 then
print =====data03=$data03
goto loop2
endi
if $data04 != NULL then
print =====data04=$data04
goto loop2
endi
print ===== step6
sql create database result4 vgroups 1;
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,2,3);
sql create table t2 using st tags(4,5,6);
sql create stable result4.streamt4(ts timestamp,a int,b int,c int, d int) tags(ta int,tb int,tc int);
sql create stream streams4 trigger at_once into result4.streamt4(ts,c,a,b) tags(tg2 int, tg3 varchar(100), tg1 bigint) subtable(concat("tbl-", tg1)) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s);
sql insert into t1 values(1648791213000,10,20,30);
sql insert into t2 values(1648791213000,40,50,60);
$loop_count = 0
sql select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s);
print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
loop2:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result4.streamt4;
if $rows != 2 then
print =====rows=$rows
print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
goto loop2
endi
if $data01 != 40 then
print =====data01=$data01
goto loop2
endi
if $data02 != 20 then
print =====data02=$data02
goto loop2
endi
if $data03 != 2 then
print =====data03=$data03
goto loop2
endi
if $data04 != NULL then
print =====data04=$data04
goto loop2
endi
print ======over
system sh/stop_dnodes.sh

View File

@ -268,6 +268,105 @@ if $data10 != tbn-t2 then
endi
print ===== step5
print ===== tag name + table name
sql create database result4 vgroups 1;
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(3,3,3);
sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", tbname)) as select _wstart, count(*) c1 from st partition by concat("tag-", tbname) as dd, tbname interval(10s);
sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3);
$loop_count = 0
loop7:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select table_name from information_schema.ins_tables where db_name="result4" order by 1;
if $rows != 3 then
print =====rows=$rows
print $data00 $data10
goto loop7
endi
if $data00 != tbn-t1 then
print =====data00=$data00
goto loop7
endi
if $data10 != tbn-t2 then
print =====data10=$data10
goto loop7
endi
if $data20 != tbn-t3 then
print =====data20=$data20
goto loop7
endi
$loop_count = 0
loop8:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result4.streamt4 order by 3;
if $rows != 3 then
print =====rows=$rows
print $data00 $data10
goto loop8
endi
if $data01 != 1 then
print =====data01=$data01
goto loop8
endi
if $data02 != tag-t1 then
print =====data02=$data02
goto loop8
endi
if $data11 != 1 then
print =====data11=$data11
goto loop8
endi
if $data12 != tag-t2 then
print =====data12=$data12
goto loop8
endi
if $data21 != 1 then
print =====data21=$data21
goto loop8
endi
if $data22 != tag-t3 then
print =====data22=$data22
goto loop8
endi
print ======over
system sh/stop_dnodes.sh

View File

@ -269,6 +269,104 @@ if $data10 != tbn-2 then
goto loop6
endi
print ===== step5
print ===== tag name + table name
sql create database result4 vgroups 1;
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(3,3,3);
sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", dd)) as select _wstart, count(*) c1 from st partition by concat("t", cast(a as varchar(10) ) ) as dd interval(10s);
sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3);
$loop_count = 0
loop7:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select table_name from information_schema.ins_tables where db_name="result4" order by 1;
if $rows != 3 then
print =====rows=$rows
print $data00 $data10
goto loop7
endi
if $data00 != tbn-t1 then
print =====data00=$data00
goto loop7
endi
if $data10 != tbn-t2 then
print =====data10=$data10
goto loop7
endi
if $data20 != tbn-t3 then
print =====data20=$data20
goto loop7
endi
$loop_count = 0
loop8:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result4.streamt4 order by 3;
if $rows != 3 then
print =====rows=$rows
print $data00 $data10
goto loop8
endi
if $data01 != 1 then
print =====data01=$data01
goto loop8
endi
if $data02 != t1 then
print =====data02=$data02
goto loop8
endi
if $data11 != 1 then
print =====data11=$data11
goto loop8
endi
if $data12 != t2 then
print =====data12=$data12
goto loop8
endi
if $data21 != 1 then
print =====data21=$data21
goto loop8
endi
if $data22 != t3 then
print =====data22=$data22
goto loop8
endi
print ======over

View File

@ -0,0 +1,76 @@
import taos
import sys
import datetime
import inspect
from util.log import *
from util.sql import *
from util.cases import *
from util.common import tdCom
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def check_ins_cols(self):
tdSql.execute("create database if not exists db")
tdSql.execute("create table db.ntb (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, c10 float, c11 double, c12 varchar(100), c13 nchar(100))")
tdSql.execute("create table db.stb (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, c10 float, c11 double, c12 varchar(100), c13 nchar(100)) tags(t int)")
tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)")
tdSql.query("select count(*) from information_schema.ins_columns")
tdSql.checkData(0, 0, 265)
tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'")
tdSql.checkRows(14)
tdSql.checkData(0, 2, "NORMAL_TABLE")
tdSql.query("select * from information_schema.ins_columns where table_name = 'stb'")
tdSql.checkRows(14)
tdSql.checkData(0, 2, "SUPER_TABLE")
tdSql.query("select db_name,table_type,col_name,col_type,col_length from information_schema.ins_columns where table_name = 'ctb'")
tdSql.checkRows(14)
tdSql.checkData(0, 0, "db")
tdSql.checkData(1, 1, "CHILD_TABLE")
tdSql.checkData(3, 2, "c3")
tdSql.checkData(4, 3, "INT")
tdSql.checkData(5, 4, 8)
tdSql.query("desc information_schema.ins_columns")
tdSql.checkRows(9)
tdSql.checkData(0, 0, "table_name")
tdSql.checkData(5, 0, "col_length")
tdSql.checkData(1, 2, 64)
def check_get_db_name(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/get_db_name_test'%(buildPath)
tdLog.info(cmdStr)
ret = os.system(cmdStr)
if ret != 0:
tdLog.exit("sml_test get_db_name_test != 0")
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare(replica = self.replicaVar)
tdLog.printNoPrefix("==========start check_ins_cols run ...............")
self.check_ins_cols()
tdLog.printNoPrefix("==========end check_ins_cols run ...............")
tdLog.printNoPrefix("==========start check_get_db_name run ...............")
self.check_get_db_name()
tdLog.printNoPrefix("==========end check_get_db_name run ...............")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -116,7 +116,7 @@ class TDTestCase:
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
consumerId = 4
@ -188,7 +188,7 @@ class TDTestCase:
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")

View File

@ -116,7 +116,7 @@ class TDTestCase:
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
consumerId = 4
@ -188,7 +188,7 @@ class TDTestCase:
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")

View File

@ -4,6 +4,7 @@ add_executable(tmq_sim tmqSim.c)
add_executable(create_table createTable.c)
add_executable(tmq_taosx_ci tmq_taosx_ci.c)
add_executable(sml_test sml_test.c)
add_executable(get_db_name_test get_db_name_test.c)
target_link_libraries(
create_table
PUBLIC taos_static
@ -40,3 +41,11 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
target_link_libraries(
get_db_name_test
PUBLIC taos_static
PUBLIC util
PUBLIC common
PUBLIC os
)

View File

@ -0,0 +1,65 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
#include "types.h"
#include "tlog.h"
int get_db_test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db vgroups 2");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
int code = taos_errno(pRes);
taos_free_result(pRes);
ASSERT(code == 0);
code = taos_get_current_db(taos, NULL, 0, NULL);
ASSERT(code != 0);
int required = 0;
code = taos_get_current_db(taos, NULL, 0, &required);
ASSERT(code != 0);
ASSERT(required == 7);
char database[10] = {0};
code = taos_get_current_db(taos, database, 3, &required);
ASSERT(code != 0);
ASSERT(required == 7);
ASSERT(strcpy(database, "sm"));
char database1[10] = {0};
code = taos_get_current_db(taos, database1, 10, &required);
ASSERT(code == 0);
ASSERT(strcpy(database1, "sml_db"));
taos_close(taos);
return code;
}
int main(int argc, char *argv[]) {
int ret = 0;
ret = get_db_test();
ASSERT(!ret);
return ret;
}