Merge branch '3.0' of github.com:taosdata/TDengine into szhou/tags-after-distinct

This commit is contained in:
slzhou 2023-08-24 16:57:25 +08:00
commit d5db4b16ba
34 changed files with 3602 additions and 3374 deletions

View File

@ -91,15 +91,12 @@ ALTER TABLE [db_name.]tb_name alter_table_clause
alter_table_clause: {
alter_table_options
| ADD COLUMN col_name column_definition
| ADD COLUMN col_name column_type
| DROP COLUMN col_name
| MODIFY COLUMN col_name column_definition
| MODIFY COLUMN col_name column_type
| RENAME COLUMN old_col_name new_col_name
}
column_definition:
type_name [comment 'string_value']
alter_table_options:
alter_table_option ...

30
docs/en/12-taos-sql/06-select.md Normal file → Executable file
View File

@ -9,7 +9,7 @@ description: This document describes how to query data in TDengine.
```sql
SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE()}
SELECT [DISTINCT] select_list
SELECT [hints] [DISTINCT] select_list
from_clause
[WHERE condition]
[partition_by_clause]
@ -21,6 +21,11 @@ SELECT [DISTINCT] select_list
[LIMIT limit_val [OFFSET offset_val]]
[>> export_file]
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
hint:
BATCH_SCAN | NO_BATCH_SCAN
select_list:
select_expr [, select_expr] ...
@ -70,6 +75,29 @@ order_expr:
{expr | position | c_alias} [DESC | ASC] [NULLS FIRST | NULLS LAST]
```
## Hints
Hints are a means of user control over query optimization for individual statements. Hints will be ignore automatically if they are not applicable to the current query statement. The specific instructions are as follows:
- Hints syntax starts with `/*+` and ends with `*/`, spaces are allowed before or after.
- Hints syntax can only follow the SELECT keyword.
- Each hints can contain multiple hint, separated by spaces. When multiple hints conflict or are identical, whichever comes first takes effect.
- When an error occurs with a hint in hints, the effective hint before the error is still valid, and the current and subsequent hints are ignored.
- hint_param_list are arguments to each hint, which varies according to each hint.
The list of currently supported Hints is as follows:
| **Hint** | **Params** | **Comment** | **Scopt** |
| :-----------: | -------------- | -------------------------- | -------------------------- |
| BATCH_SCAN | None | Batch table scan | JOIN statment for stable |
| NO_BATCH_SCAN | None | Sequential table scan | JOIN statment for stable |
For example:
```sql
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
```
## Lists
A query can be performed on some or all columns. Data and tag columns can all be included in the SELECT list.

View File

@ -23,10 +23,7 @@ create_subtable_clause: {
}
create_definition:
col_name column_definition
column_definition:
type_name [comment 'string_value']
col_name column_type
table_options:
table_option ...
@ -92,15 +89,12 @@ ALTER TABLE [db_name.]tb_name alter_table_clause
alter_table_clause: {
alter_table_options
| ADD COLUMN col_name column_definition
| ADD COLUMN col_name column_type
| DROP COLUMN col_name
| MODIFY COLUMN col_name column_definition
| MODIFY COLUMN col_name column_type
| RENAME COLUMN old_col_name new_col_name
}
column_definition:
type_name [comment 'string_value']
alter_table_options:
alter_table_option ...

30
docs/zh/12-taos-sql/06-select.md Normal file → Executable file
View File

@ -9,7 +9,7 @@ description: 查询数据的详细语法
```sql
SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE()}
SELECT [DISTINCT] select_list
SELECT [hints] [DISTINCT] select_list
from_clause
[WHERE condition]
[partition_by_clause]
@ -21,6 +21,11 @@ SELECT [DISTINCT] select_list
[LIMIT limit_val [OFFSET offset_val]]
[>> export_file]
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
hint:
BATCH_SCAN | NO_BATCH_SCAN
select_list:
select_expr [, select_expr] ...
@ -70,6 +75,29 @@ order_expr:
{expr | position | c_alias} [DESC | ASC] [NULLS FIRST | NULLS LAST]
```
## Hints
Hints 是用户控制单个语句查询优化的一种手段,当 Hint 不适用于当前的查询语句时会被自动忽略,具体说明如下:
- Hints 语法以`/*+`开始,终于`*/`,前后可有空格。
- Hints 语法只能跟随在 SELECT 关键字后。
- 每个 Hints 可以包含多个 HintHint 间以空格分开,当多个 Hint 冲突或相同时以先出现的为准。
- 当 Hints 中某个 Hint 出现错误时,错误出现之前的有效 Hint 仍然有效,当前及之后的 Hint 被忽略。
- hint_param_list 是每个 Hint 的参数,根据每个 Hint 的不同而不同。
目前支持的 Hints 列表如下:
| **Hint** | **参数** | **说明** | **适用范围** |
| :-----------: | -------------- | -------------------------- | -------------------------- |
| BATCH_SCAN | 无 | 采用批量读表的方式 | 超级表 JOIN 语句 |
| NO_BATCH_SCAN | 无 | 采用顺序读表的方式 | 超级表 JOIN 语句 |
举例:
```sql
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
```
## 列表
查询语句可以指定部分或全部列作为返回结果。数据列和标签列都可以出现在列表中。

View File

@ -130,6 +130,7 @@ extern bool tsKeepColumnName;
extern bool tsEnableQueryHb;
extern bool tsEnableScience;
extern bool tsTtlChangeOnWrite;
extern int32_t tsTtlFlushThreshold;
extern int32_t tsRedirectPeriod;
extern int32_t tsRedirectFactor;
extern int32_t tsRedirectMaxPeriod;
@ -186,7 +187,9 @@ extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval;
extern int32_t tsStreamCheckpointTickInterval;
extern int32_t tsTtlUnit;
extern int32_t tsTtlPushInterval;
extern int32_t tsTtlPushIntervalSec;
extern int32_t tsTtlBatchDropNum;
extern int32_t tsTrimVDbIntervalSec;
extern int32_t tsGrantHBInterval;
extern int32_t tsUptimeInterval;

View File

@ -441,7 +441,6 @@ typedef struct SField {
uint8_t type;
int8_t flags;
int32_t bytes;
char comment[TSDB_COL_COMMENT_LEN];
} SField;
typedef struct SRetention {
@ -520,7 +519,6 @@ struct SSchema {
col_id_t colId;
int32_t bytes;
char name[TSDB_COL_NAME_LEN];
char comment[TSDB_COL_COMMENT_LEN];
};
struct SSchema2 {
@ -1161,6 +1159,9 @@ int32_t tDeserializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq);
typedef struct {
int32_t timestampSec;
int32_t ttlDropMaxCount;
int32_t nUids;
SArray* pTbUids;
} SVDropTtlTableReq;
int32_t tSerializeSVDropTtlTableReq(void* buf, int32_t bufLen, SVDropTtlTableReq* pReq);
@ -2630,9 +2631,6 @@ typedef struct {
int8_t type;
int8_t flags;
int32_t bytes;
bool hasColComment;
char* colComment;
int32_t colCommentLen;
// TSDB_ALTER_TABLE_DROP_COLUMN
// TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
int8_t colModType;

View File

@ -65,7 +65,7 @@ enum {
#define TD_NEW_MSG_SEG(TYPE) TYPE = ((TYPE##_SEG_CODE) << 8),
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE, TYPE##_RSP,
enum {
enum { // WARN: new msg should be appended to segment tail
#endif
TD_NEW_MSG_SEG(TDMT_DND_MSG)
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL)
@ -89,15 +89,15 @@ enum {
TD_NEW_MSG_SEG(TDMT_MND_MSG)
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "create-acct", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "create-acct", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_ACCT, "alter-acct", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_ACCT, "drop-acct", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_USER, "create-user", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_USER, "create-user", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_USER, "alter-user", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_USER, "drop-user", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_AUTH, "get-user-auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_USER, "drop-user", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_AUTH, "get-user-auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_DNODE, "create-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_DNODE, "config-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_DNODE, "config-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_DNODE, "drop-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_MNODE, "create-mnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_MNODE, "alter-mnode", NULL, NULL)
@ -182,6 +182,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
@ -296,7 +297,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)

View File

@ -16,7 +16,6 @@
#ifndef _TD_COMMON_TOKEN_H_
#define _TD_COMMON_TOKEN_H_
#define TK_OR 1
#define TK_AND 2
#define TK_UNION 3
@ -131,25 +130,25 @@
#define TK_NK_EQ 112
#define TK_USING 113
#define TK_TAGS 114
#define TK_COMMENT 115
#define TK_BOOL 116
#define TK_TINYINT 117
#define TK_SMALLINT 118
#define TK_INT 119
#define TK_INTEGER 120
#define TK_BIGINT 121
#define TK_FLOAT 122
#define TK_DOUBLE 123
#define TK_BINARY 124
#define TK_NCHAR 125
#define TK_UNSIGNED 126
#define TK_JSON 127
#define TK_VARCHAR 128
#define TK_MEDIUMBLOB 129
#define TK_BLOB 130
#define TK_VARBINARY 131
#define TK_GEOMETRY 132
#define TK_DECIMAL 133
#define TK_BOOL 115
#define TK_TINYINT 116
#define TK_SMALLINT 117
#define TK_INT 118
#define TK_INTEGER 119
#define TK_BIGINT 120
#define TK_FLOAT 121
#define TK_DOUBLE 122
#define TK_BINARY 123
#define TK_NCHAR 124
#define TK_UNSIGNED 125
#define TK_JSON 126
#define TK_VARCHAR 127
#define TK_MEDIUMBLOB 128
#define TK_BLOB 129
#define TK_VARBINARY 130
#define TK_GEOMETRY 131
#define TK_DECIMAL 132
#define TK_COMMENT 133
#define TK_MAX_DELAY 134
#define TK_WATERMARK 135
#define TK_ROLLUP 136

View File

@ -23,11 +23,10 @@ extern "C" {
#include "query.h"
#include "querynodes.h"
#define DESCRIBE_RESULT_COLS 5
#define DESCRIBE_RESULT_FIELD_LEN (TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_TYPE_LEN (20 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_NOTE_LEN (8 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_COL_COMMENT_LEN (TSDB_COL_COMMENT_LEN)
#define DESCRIBE_RESULT_COLS 4
#define DESCRIBE_RESULT_FIELD_LEN (TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_TYPE_LEN (20 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_NOTE_LEN (8 + VARSTR_HEADER_SIZE)
#define SHOW_CREATE_DB_RESULT_COLS 2
#define SHOW_CREATE_DB_RESULT_FIELD1_LEN (TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE)
@ -156,7 +155,7 @@ typedef struct SColumnDefNode {
ENodeType type;
char colName[TSDB_COL_NAME_LEN];
SDataType dataType;
char comments[TSDB_COL_COMMENT_LEN];
char comments[TSDB_TB_COMMENT_LEN];
bool sma;
} SColumnDefNode;
@ -215,7 +214,6 @@ typedef struct SAlterTableStmt {
char newColName[TSDB_COL_NAME_LEN];
STableOptions* pOptions;
SDataType dataType;
char colComment[TSDB_COL_COMMENT_LEN];
SValueNode* pVal;
} SAlterTableStmt;

View File

@ -228,9 +228,8 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_SQL_SHOW_LEN 1024
#define TSDB_MAX_ALLOWED_SQL_LEN (1 * 1024 * 1024u) // sql length should be less than 1mb
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
#define TSDB_TB_COMMENT_LEN 1025
#define TSDB_COL_COMMENT_LEN 1025
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
#define TSDB_TB_COMMENT_LEN 1025
#define TSDB_QUERY_ID_LEN 26
#define TSDB_TRANS_OPER_LEN 16

View File

@ -92,6 +92,10 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSubscriptionIm
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNIEnv *, jobject, jlong, jlong);
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *, jobject, jlong);
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *, jobject, jlong, jstring,
jint, jlong);
/*
* Class: com_taosdata_jdbc_tmq_TMQConnector
* Method: tmqCommitAsync
@ -102,6 +106,12 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *, jobject, jlong, jlong,
jobject);
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *, jobject, jlong,
jobject);
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *, jobject, jlong,
jstring, jint, jlong, jobject);
/*
* Class: com_taosdata_jdbc_tmq_TMQConnector
* Method: tmqUnsubscribeImp
@ -179,6 +189,11 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong,
jstring, jobject);
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *, jobject, jlong, jstring,
jint);
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *, jobject, jlong, jstring, jint);
#ifdef __cplusplus
}
#endif

View File

@ -291,6 +291,39 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI
TAOS_RES *res = (TAOS_RES *)jres;
return tmq_commit_sync(tmq, res);
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *env, jobject jobj, jlong jtmq) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniError("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}
return tmq_commit_sync(tmq, NULL);
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *env, jobject jobj,
jlong jtmq, jstring jtopic,
jint vgId, jlong offset) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}
if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
int code = tmq_commit_offset_sync(tmq, topicName, vgId, offset);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, tmq commit offset error, code:%d, msg:%s", jobj, code, tmq_err2str(code));
}
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return code;
}
// deprecated
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
@ -319,6 +352,27 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsy
tmq_commit_async(tmq, res, consumer_callback, offset);
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *env, jobject jobj,
jlong jtmq, jobject offset) {
tmqGlobalMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;
offset = (*env)->NewGlobalRef(env, offset);
tmq_commit_async(tmq, NULL, consumer_callback, offset);
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *env, jobject jobj,
jlong jtmq, jstring jtopic,
jint vgId, jlong offset,
jobject callback) {
tmqGlobalMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
callback = (*env)->NewGlobalRef(env, callback);
tmq_commit_offset_async(tmq, topicName, vgId, offset, consumer_callback, callback);
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj,
jlong jtmq) {
tmq_t *tmq = (tmq_t *)jtmq;
@ -497,9 +551,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign
int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment);
if (res != TSDB_CODE_SUCCESS) {
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res,
tmq_err2str(res));
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
tmq_free_assignment(pAssign);
return (jint)res;
}
@ -518,3 +572,55 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign
tmq_free_assignment(pAssign);
return JNI_SUCCESS;
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *env, jobject jobj, jlong jtmq,
jstring jtopic, jint vgId) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}
if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
int64_t offset = tmq_committed(tmq, topicName, vgId);
if (offset < JNI_SUCCESS && offset != -2147467247) {
jniError("jobj:%p, tmq get committed offset error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName,
vgId, offset, tmq_err2str(offset));
}
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return (jlong)offset;
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *env, jobject jobj, jlong jtmq,
jstring jtopic, jint vgId) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}
if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
int64_t offset = tmq_position(tmq, topicName, vgId);
if (offset < JNI_SUCCESS) {
jniError("jobj:%p, tmq get position error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, vgId,
offset, tmq_err2str(offset));
}
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return (jlong)offset;
}

View File

@ -124,7 +124,6 @@ int32_t tsQueryRspPolicy = 0;
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
bool tsEnableQueryHb = true;
bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true
bool tsTtlChangeOnWrite = false; // ttl delete time changes on last write if true
int32_t tsQuerySmaOptimize = 0;
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
bool tsQueryPlannerTrace = false;
@ -226,12 +225,20 @@ bool tsStartUdfd = true;
// wal
int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L);
// ttl
bool tsTtlChangeOnWrite = false; // if true, ttl delete time changes on last write
int32_t tsTtlFlushThreshold = 100; /* maximum number of dirty items in memory.
* if -1, flush will not be triggered by write-ops
*/
int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
// internal
int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 1;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 3600;
int32_t tsTtlPushIntervalSec = 10;
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups
int32_t tsGrantHBInterval = 60;
int32_t tsUptimeInterval = 300; // seconds
char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits
@ -604,8 +611,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushIntervalSec, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlBatchDropNum", tsTtlBatchDropNum, 0, INT32_MAX, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "ttlChangeOnWrite", tsTtlChangeOnWrite, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlFlushThreshold", tsTtlFlushThreshold, -1, 1000000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "trimVDbIntervalSec", tsTrimVDbIntervalSec, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER) != 0) return -1;
@ -994,6 +1004,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval;
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval;
tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32;
tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32;
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
@ -1003,7 +1014,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32;
tsTtlPushInterval = cfgGetItem(pCfg, "ttlPushInterval")->i32;
tsTtlPushIntervalSec = cfgGetItem(pCfg, "ttlPushInterval")->i32;
tsTtlBatchDropNum = cfgGetItem(pCfg, "ttlBatchDropNum")->i32;
tsTrimVDbIntervalSec = cfgGetItem(pCfg, "trimVDbIntervalSec")->i32;
tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32;
tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32;
@ -1405,13 +1418,19 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
} else if (strcasecmp("ttlUnit", name) == 0) {
tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32;
} else if (strcasecmp("ttlPushInterval", name) == 0) {
tsTtlPushInterval = cfgGetItem(pCfg, "ttlPushInterval")->i32;
tsTtlPushIntervalSec = cfgGetItem(pCfg, "ttlPushInterval")->i32;
} else if (strcasecmp("ttlBatchDropNum", name) == 0) {
tsTtlBatchDropNum = cfgGetItem(pCfg, "ttlBatchDropNum")->i32;
} else if (strcasecmp("trimVDbIntervalSec", name) == 0) {
tsTrimVDbIntervalSec = cfgGetItem(pCfg, "trimVDbIntervalSec")->i32;
} else if (strcasecmp("tmrDebugFlag", name) == 0) {
tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32;
} else if (strcasecmp("tsdbDebugFlag", name) == 0) {
tsdbDebugFlag = cfgGetItem(pCfg, "tsdbDebugFlag")->i32;
} else if (strcasecmp("tqDebugFlag", name) == 0) {
tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32;
} else if (strcasecmp("ttlFlushThreshold", name) == 0) {
tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32;
}
break;
}
@ -1613,6 +1632,20 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
return;
}
if (strcasecmp(option, "ttlPushInterval") == 0) {
int32_t newTtlPushInterval = atoi(value);
uInfo("ttlPushInterval set from %d to %d", tsTtlPushIntervalSec, newTtlPushInterval);
tsTtlPushIntervalSec = newTtlPushInterval;
return;
}
if (strcasecmp(option, "ttlBatchDropNum") == 0) {
int32_t newTtlBatchDropNum = atoi(value);
uInfo("ttlBatchDropNum set from %d to %d", tsTtlBatchDropNum, newTtlBatchDropNum);
tsTtlBatchDropNum = newTtlBatchDropNum;
return;
}
const char *options[] = {
"dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag", "tsdbDebugFlag", "tqDebugFlag",
"fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag", "tdbDebugFlag", "tmrDebugFlag", "uDebugFlag",

View File

@ -534,7 +534,6 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
if (tEncodeCStr(&encoder, pField->comment) < 0) return -1;
}
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
@ -543,7 +542,6 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
if (tEncodeCStr(&encoder, pField->comment) < 0) return -1;
}
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
@ -610,7 +608,6 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.comment) < 0) return -1;
if (taosArrayPush(pReq->pColumns, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
@ -623,7 +620,6 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.comment) < 0) return -1;
if (taosArrayPush(pReq->pTags, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
@ -2327,7 +2323,7 @@ int32_t tDeserializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp)
}
int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns;
pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols);
if (pRsp->pSchemas == NULL) return -1;
for (int32_t i = 0; i < totalCols; ++i) {
@ -3179,6 +3175,12 @@ int32_t tSerializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableReq
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->timestampSec) < 0) return -1;
if (tEncodeI32(&encoder, pReq->ttlDropMaxCount) < 0) return -1;
if (tEncodeI32(&encoder, pReq->nUids) < 0) return -1;
for (int32_t i = 0; i < pReq->nUids; ++i) {
tb_uid_t *pTbUid = taosArrayGet(pReq->pTbUids, i);
if (tEncodeI64(&encoder, *pTbUid) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -3192,6 +3194,30 @@ int32_t tDeserializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableR
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->timestampSec) < 0) return -1;
pReq->ttlDropMaxCount = INT32_MAX;
pReq->nUids = 0;
pReq->pTbUids = NULL;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pReq->ttlDropMaxCount) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->nUids) < 0) return -1;
if (pReq->nUids > 0) {
pReq->pTbUids = taosArrayInit(pReq->nUids, sizeof(tb_uid_t));
if (pReq->pTbUids == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
tb_uid_t tbUid = 0;
for (int32_t i = 0; i < pReq->nUids; ++i) {
if (tDecodeI64(&decoder, &tbUid) < 0) return -1;
if (taosArrayPush(pReq->pTbUids, &tbUid) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -3710,7 +3736,7 @@ static int32_t tDecodeSTableMetaRsp(SDecoder *pDecoder, STableMetaRsp *pRsp) {
int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns;
if (totalCols > 0) {
pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols);
if (pRsp->pSchemas == NULL) return -1;
for (int32_t i = 0; i < totalCols; ++i) {

View File

@ -33,10 +33,10 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
return -1;
}
SMnodeOpt option = {.deploy = true, .numOfReplicas = createReq.replica,
.numOfTotalReplicas = createReq.replica + createReq.learnerReplica,
SMnodeOpt option = {.deploy = true, .numOfReplicas = createReq.replica,
.numOfTotalReplicas = createReq.replica + createReq.learnerReplica,
.selfIndex = -1, .lastIndex = createReq.lastIndex};
memcpy(option.replicas, createReq.replicas, sizeof(createReq.replicas));
for (int32_t i = 0; i < createReq.replica; ++i) {
if (createReq.replicas[i].id == pInput->pData->dnodeId) {
@ -191,6 +191,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -1123,6 +1123,36 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
strcpy(dcfgReq.config, "keeptimeoffset");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
} else if (strncasecmp(cfgReq.config, "ttlpushinterval", 14) == 0) {
int32_t optLen = strlen("ttlpushinterval");
int32_t flag = -1;
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
if (code < 0) return code;
if (flag < 0 || flag > 100000) {
mError("dnode:%d, failed to config ttlPushInterval since value:%d. Valid range: [0, 100000]", cfgReq.dnodeId,
flag);
terrno = TSDB_CODE_INVALID_CFG;
return -1;
}
strcpy(dcfgReq.config, "ttlpushinterval");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
} else if (strncasecmp(cfgReq.config, "ttlbatchdropnum", 15) == 0) {
int32_t optLen = strlen("ttlbatchdropnum");
int32_t flag = -1;
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
if (code < 0) return code;
if (flag < 0) {
mError("dnode:%d, failed to config ttlBatchDropNum since value:%d. Valid range: [0, %d]", cfgReq.dnodeId,
flag, INT32_MAX);
terrno = TSDB_CODE_INVALID_CFG;
return -1;
}
strcpy(dcfgReq.config, "ttlbatchdropnum");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
#ifdef TD_ENTERPRISE
} else if (strncasecmp(cfgReq.config, "activeCode", 10) == 0 || strncasecmp(cfgReq.config, "cActiveCode", 11) == 0) {
int8_t opt = strncasecmp(cfgReq.config, "a", 1) == 0 ? DND_ACTIVE_CODE : DND_CONN_ACTIVE_CODE;
@ -1376,7 +1406,7 @@ static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t opLen, int32_
return 0;
_err:
mError("dnode:%d, failed to config keeptimeoffset since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
terrno = TSDB_CODE_INVALID_CFG;
return -1;
}

View File

@ -119,6 +119,14 @@ static void mndPullupTtl(SMnode *pMnode) {
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
static void mndPullupTrimDb(SMnode *pMnode) {
mTrace("pullup trim");
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRIM_DB_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
static void mndCalMqRebalance(SMnode *pMnode) {
mTrace("calc mq rebalance");
int32_t contLen = 0;
@ -255,10 +263,14 @@ static void *mndThreadFp(void *param) {
if (lastTime % 10 != 0) continue;
int64_t sec = lastTime / 10;
if (sec % tsTtlPushInterval == 0) {
if (sec % tsTtlPushIntervalSec == 0) {
mndPullupTtl(pMnode);
}
if (sec % tsTrimVDbIntervalSec == 0) {
mndPullupTrimDb(pMnode);
}
if (sec % tsTransPullupInterval == 0) {
mndPullupTrans(pMnode);
}
@ -661,7 +673,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
_OVER:
if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
pMnode->stopped, state.restored, syncStr(state.state));
return -1;

View File

@ -40,10 +40,12 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
static int32_t mndProcessTtlTimer(SRpcMsg *pReq);
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq);
static int32_t mndProcessCreateStbReq(SRpcMsg *pReq);
static int32_t mndProcessAlterStbReq(SRpcMsg *pReq);
static int32_t mndProcessDropStbReq(SRpcMsg *pReq);
static int32_t mndProcessDropTtltbReq(SRpcMsg *pReq);
static int32_t mndProcessDropTtltbRsp(SRpcMsg *pReq);
static int32_t mndProcessTrimDbRsp(SRpcMsg *pReq);
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
@ -72,11 +74,13 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbRsp);
mndSetMsgHandle(pMnode, TDMT_VND_TRIM_RSP, mndProcessTrimDbRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
mndSetMsgHandle(pMnode, TDMT_MND_TTL_TIMER, mndProcessTtlTimer);
mndSetMsgHandle(pMnode, TDMT_MND_TRIM_DB_TIMER, mndProcessTrimDbTimer);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq);
// mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);
@ -835,7 +839,6 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
pSchema->bytes = pField->bytes;
pSchema->flags = pField->flags;
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
memcpy(pSchema->comment, pField->comment, TSDB_COL_COMMENT_LEN);
pSchema->colId = pDst->nextColId;
pDst->nextColId++;
}
@ -849,7 +852,6 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
SSCHMEA_SET_IDX_ON(pSchema);
}
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
memcpy(pSchema->comment, pField->comment, TSDB_COL_COMMENT_LEN);
pSchema->colId = pDst->nextColId;
pDst->nextColId++;
}
@ -919,11 +921,12 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
SVDropTtlTableReq ttlReq = {.timestampSec = taosGetTimestampSec()};
int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq);
int32_t contLen = reqLen + sizeof(SMsgHead);
SVDropTtlTableReq ttlReq = {
.timestampSec = taosGetTimestampSec(), .ttlDropMaxCount = tsTtlBatchDropNum, .nUids = 0, .pTbUids = NULL};
int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq);
int32_t contLen = reqLen + sizeof(SMsgHead);
mInfo("start to process ttl timer");
mDebug("start to process ttl timer");
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
@ -936,7 +939,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
}
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), contLen, &ttlReq);
tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), reqLen, &ttlReq);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen, .info = pReq->info};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
@ -944,7 +947,44 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
if (code != 0) {
mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code);
} else {
mInfo("vgId:%d, send drop ttl table request to vnode, time:%" PRId32, pVgroup->vgId, ttlReq.timestampSec);
mDebug("vgId:%d, send drop ttl table request to vnode, time:%" PRId32, pVgroup->vgId, ttlReq.timestampSec);
}
sdbRelease(pSdb, pVgroup);
}
return 0;
}
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
SVTrimDbReq trimReq = {.timestamp = taosGetTimestampSec()};
int32_t reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq);
int32_t contLen = reqLen + sizeof(SMsgHead);
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
SMsgHead *pHead = rpcMallocCont(contLen);
if (pHead == NULL) {
sdbCancelFetch(pSdb, pVgroup);
sdbRelease(pSdb, pVgroup);
continue;
}
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), reqLen, &trimReq);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
if (code != 0) {
mError("vgId:%d, timer failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code);
} else {
mInfo("vgId:%d, timer send vnode-trim request to vnode, time:%d", pVgroup->vgId, trimReq.timestamp);
}
sdbRelease(pSdb, pVgroup);
}
@ -2405,7 +2445,8 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
return 0;
}
static int32_t mndProcessDropTtltbReq(SRpcMsg *pRsp) { return 0; }
static int32_t mndProcessDropTtltbRsp(SRpcMsg *pRsp) { return 0; }
static int32_t mndProcessTrimDbRsp(SRpcMsg *pRsp) { return 0; }
static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;

View File

@ -31,15 +31,14 @@ typedef enum DirtyEntryType {
} DirtyEntryType;
typedef struct STtlManger {
TdThreadRwlock lock;
TTB* pOldTtlIdx; // btree<{deleteTime, tuid}, NULL>
TTB* pOldTtlIdx; // btree<{deleteTime, tuid}, NULL>
SHashObj* pTtlCache; // key: tuid, value: {ttl, ctime}
SHashObj* pDirtyUids; // dirty tuid
SHashObj* pTtlCache; // hash<tuid, {ttl, ctime}>
SHashObj* pDirtyUids; // hash<dirtyTuid, entryType>
TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl>
char* logPrefix;
char* logPrefix;
int32_t flushThreshold; // max dirty entry number in memory. if -1, flush will not be triggered by write-ops
} STtlManger;
typedef struct {
@ -68,23 +67,24 @@ typedef struct {
typedef struct {
tb_uid_t uid;
int64_t changeTimeMs;
TXN* pTxn;
} STtlUpdCtimeCtx;
typedef struct {
tb_uid_t uid;
int64_t changeTimeMs;
int64_t ttlDays;
TXN* pTxn;
} STtlUpdTtlCtx;
typedef struct {
tb_uid_t uid;
TXN* pTxn;
int64_t ttlDays;
TXN* pTxn;
} STtlDelTtlCtx;
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback, const char* logPrefix);
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback, const char* logPrefix, int32_t flushThreshold);
void ttlMgrClose(STtlManger* pTtlMgr);
int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta);
bool ttlMgrNeedUpgrade(TDB* pEnv);
int ttlMgrUpgrade(STtlManger* pTtlMgr, void* pMeta);
@ -94,7 +94,7 @@ int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx);
int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx);
int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn);
int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids);
int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids, int32_t ttlDropMaxCount);
#ifdef __cplusplus
}

View File

@ -151,9 +151,10 @@ int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
int32_t metaTrimTables(SMeta* pMeta);
int metaTtlDropTable(SMeta* pMeta, int64_t timePointMs, SArray* tbUids);
void metaDropTables(SMeta* pMeta, SArray* tbUids);
int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
int metaUpdateChangeTime(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);

View File

@ -130,7 +130,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
// open pTtlMgr ("ttlv1.idx")
char logPrefix[128] = {0};
sprintf(logPrefix, "vgId:%d", TD_VID(pVnode));
ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0, logPrefix);
ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0, logPrefix, tsTtlFlushThreshold);
if (ret < 0) {
metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;

View File

@ -21,6 +21,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs);
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME);
@ -842,9 +843,11 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
return 0;
}
static void metaDropTables(SMeta *pMeta, SArray *tbUids) {
void metaDropTables(SMeta *pMeta, SArray *tbUids) {
if (taosArrayGetSize(tbUids) == 0) return;
metaWLock(pMeta);
for (int i = 0; i < TARRAY_SIZE(tbUids); ++i) {
for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i);
metaDropTableByUid(pMeta, uid, NULL);
metaDebug("batch drop table:%" PRId64, uid);
@ -927,26 +930,23 @@ end:
return code;
}
int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) {
int metaTtlFindExpired(SMeta *pMeta, int64_t timePointMs, SArray *tbUids, int32_t ttlDropMaxCount) {
metaWLock(pMeta);
int ret = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
if (ret != 0) {
metaError("ttl failed to flush, ret:%d", ret);
return ret;
goto _err;
}
ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids);
ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids, ttlDropMaxCount);
if (ret != 0) {
metaError("ttl failed to find expired table, ret:%d", ret);
return ret;
}
if (TARRAY_SIZE(tbUids) == 0) {
return 0;
goto _err;
}
metaInfo("ttl find expired table count: %zu", TARRAY_SIZE(tbUids));
metaDropTables(pMeta, tbUids);
return 0;
_err:
metaULock(pMeta);
return ret;
}
static int metaBuildBtimeIdxKey(SBtimeIdxKey *btimeKey, const SMetaEntry *pME) {
@ -1326,10 +1326,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
metaSaveToSkmDb(pMeta, &entry);
metaULock(pMeta);
metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs);
metaULock(pMeta);
metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp);
if (entry.pBuf) taosMemoryFree(entry.pBuf);
@ -1515,10 +1515,10 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid);
metaTbGroupCacheClear(pMeta, ctbEntry.ctbEntry.suid);
metaULock(pMeta);
metaUpdateChangeTime(pMeta, ctbEntry.uid, pAlterTbReq->ctimeMs);
metaULock(pMeta);
tDecoderClear(&dc1);
tDecoderClear(&dc2);
taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
@ -1630,10 +1630,10 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
// save to table db
metaSaveToTbDb(pMeta, &entry);
metaUpdateUidIdx(pMeta, &entry);
metaULock(pMeta);
metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs);
metaULock(pMeta);
tdbTbcClose(pTbDbc);
tdbTbcClose(pUidIdxc);
tDecoderClear(&dc);
@ -1981,7 +1981,7 @@ static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0;
STtlUpdTtlCtx ctx = {.uid = pME->uid};
STtlUpdTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn};
if (pME->type == TSDB_CHILD_TABLE) {
ctx.ttlDays = pME->ctbEntry.ttlDays;
ctx.changeTimeMs = pME->ctbEntry.btime;
@ -1993,7 +1993,7 @@ static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
return ttlMgrInsertTtl(pMeta->pTtlMgr, &ctx);
}
int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) {
static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) {
if (!tsTtlChangeOnWrite) return 0;
if (changeTimeMs <= 0) {
@ -2001,11 +2001,20 @@ int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) {
return TSDB_CODE_VERSION_NOT_COMPATIBLE;
}
STtlUpdCtimeCtx ctx = {.uid = uid, .changeTimeMs = changeTimeMs};
STtlUpdCtimeCtx ctx = {.uid = uid, .changeTimeMs = changeTimeMs, .pTxn = pMeta->txn};
return ttlMgrUpdateChangeTime(pMeta->pTtlMgr, &ctx);
}
int metaUpdateChangeTimeWithLock(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) {
if (!tsTtlChangeOnWrite) return 0;
metaWLock(pMeta);
int ret = metaUpdateChangeTime(pMeta, uid, changeTimeMs);
metaULock(pMeta);
return ret;
}
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid};

View File

@ -21,6 +21,13 @@ typedef struct {
SMeta *pMeta;
} SConvertData;
typedef struct {
int32_t ttlDropMaxCount;
int32_t count;
STtlIdxKeyV1 expiredKey;
SArray *pTbUids;
} STtlExpiredCtx;
static void ttlMgrCleanup(STtlManger *pTtlMgr);
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta);
@ -31,15 +38,15 @@ static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2,
static int ttlMgrFillCache(STtlManger *pTtlMgr);
static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pTtlCache);
static int32_t ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData);
static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen,
void *pExpiredInfo);
static int32_t ttlMgrWLock(STtlManger *pTtlMgr);
static int32_t ttlMgrRLock(STtlManger *pTtlMgr);
static int32_t ttlMgrULock(STtlManger *pTtlMgr);
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr);
const char *ttlTbname = "ttl.idx";
const char *ttlV1Tbname = "ttlv1.idx";
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix) {
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix, int32_t flushThreshold) {
int ret = TSDB_CODE_SUCCESS;
int64_t startNs = taosGetTimestampNs();
@ -55,6 +62,7 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *lo
}
strcpy(logBuffer, logPrefix);
pTtlMgr->logPrefix = logBuffer;
pTtlMgr->flushThreshold = flushThreshold;
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
if (ret < 0) {
@ -66,8 +74,6 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *lo
pTtlMgr->pTtlCache = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
pTtlMgr->pDirtyUids = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosThreadRwlockInit(&pTtlMgr->lock, NULL);
ret = ttlMgrFillCache(pTtlMgr);
if (ret < 0) {
metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
@ -130,6 +136,7 @@ int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
int64_t endNs = taosGetTimestampNs();
metaInfo("%s, ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
_out:
tdbTbClose(pTtlMgr->pOldTtlIdx);
pTtlMgr->pOldTtlIdx = NULL;
@ -142,7 +149,6 @@ static void ttlMgrCleanup(STtlManger *pTtlMgr) {
taosHashCleanup(pTtlMgr->pTtlCache);
taosHashCleanup(pTtlMgr->pDirtyUids);
tdbTbClose(pTtlMgr->pTtlIdx);
taosThreadRwlockDestroy(&pTtlMgr->lock);
taosMemoryFree(pTtlMgr);
}
@ -229,10 +235,25 @@ static int ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal,
}
ret = 0;
_out:
return ret;
}
static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen,
void *pExpiredCtx) {
STtlExpiredCtx *pCtx = (STtlExpiredCtx *)pExpiredCtx;
if (pCtx->count >= pCtx->ttlDropMaxCount) return -1;
int c = ttlIdxKeyV1Cmpr(&pCtx->expiredKey, sizeof(pCtx->expiredKey), pKey, keyLen);
if (c > 0) {
taosArrayPush(pCtx->pTbUids, &((STtlIdxKeyV1 *)pKey)->uid);
pCtx->count++;
}
return c;
}
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
SMeta *meta = pMeta;
@ -255,8 +276,6 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays, .changeTimeMs = updCtx->changeTimeMs};
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
ttlMgrWLock(pTtlMgr);
int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
if (ret < 0) {
metaError("%s, ttlMgr insert failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
@ -269,10 +288,13 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
goto _out;
}
ret = 0;
_out:
ttlMgrULock(pTtlMgr);
if (ttlMgrNeedFlush(pTtlMgr)) {
ttlMgrFlush(pTtlMgr, updCtx->pTxn);
}
ret = 0;
_out:
metaDebug("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
@ -281,7 +303,6 @@ _out:
int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
if (delCtx->ttlDays == 0) return 0;
ttlMgrWLock(pTtlMgr);
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL};
@ -291,18 +312,19 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
goto _out;
}
ret = 0;
_out:
ttlMgrULock(pTtlMgr);
if (ttlMgrNeedFlush(pTtlMgr)) {
ttlMgrFlush(pTtlMgr, delCtx->pTxn);
}
ret = 0;
_out:
metaDebug("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
return ret;
}
int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) {
ttlMgrWLock(pTtlMgr);
int ret = 0;
STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid));
@ -327,59 +349,35 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
goto _out;
}
ret = 0;
_out:
ttlMgrULock(pTtlMgr);
if (ttlMgrNeedFlush(pTtlMgr)) {
ttlMgrFlush(pTtlMgr, pUpdCtimeCtx->pTxn);
}
ret = 0;
_out:
metaDebug("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
pUpdCtimeCtx->changeTimeMs);
return ret;
}
int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids) {
ttlMgrRLock(pTtlMgr);
int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) {
STtlIdxKeyV1 ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX};
STtlExpiredCtx expiredCtx = {
.ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids};
return tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry);
}
TBC *pCur;
int ret = tdbTbcOpen(pTtlMgr->pTtlIdx, &pCur, NULL);
if (ret < 0) {
goto _out;
}
STtlIdxKeyV1 ttlKey = {0};
ttlKey.deleteTimeMs = timePointMs;
ttlKey.uid = INT64_MAX;
int c = 0;
tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c);
if (c < 0) {
tdbTbcMoveToPrev(pCur);
}
void *pKey = NULL;
int kLen = 0;
while (1) {
ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
if (ret < 0) {
ret = 0;
break;
}
ttlKey = *(STtlIdxKeyV1 *)pKey;
taosArrayPush(pTbUids, &ttlKey.uid);
}
tdbFree(pKey);
tdbTbcClose(pCur);
ret = 0;
_out:
ttlMgrULock(pTtlMgr);
return ret;
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) {
return pTtlMgr->flushThreshold > 0 && taosHashGetSize(pTtlMgr->pDirtyUids) > pTtlMgr->flushThreshold;
}
int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
ttlMgrWLock(pTtlMgr);
int64_t startNs = taosGetTimestampNs();
int64_t endNs = startNs;
metaDebug("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
metaTrace("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
int ret = -1;
@ -430,40 +428,10 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
taosHashClear(pTtlMgr->pDirtyUids);
ret = 0;
_out:
ttlMgrULock(pTtlMgr);
metaDebug("%s, ttl mgr flush end.", pTtlMgr->logPrefix);
return ret;
}
static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("%s, ttlMgr rlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
ret = taosThreadRwlockRdlock(&pTtlMgr->lock);
return ret;
}
static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("%s, ttlMgr wlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
ret = taosThreadRwlockWrlock(&pTtlMgr->lock);
return ret;
}
static int32_t ttlMgrULock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("%s, ttlMgr ulock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
ret = taosThreadRwlockUnlock(&pTtlMgr->lock);
endNs = taosGetTimestampNs();
metaTrace("%s, ttl mgr flush end, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, endNs - startNs);
return ret;
}

View File

@ -216,7 +216,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
cfgRsp.numOfTags = schemaTag.nCols;
cfgRsp.numOfColumns = schema.nCols;
cfgRsp.pSchemas = (SSchema *)taosMemoryCalloc(cfgRsp.numOfColumns + cfgRsp.numOfTags, sizeof(SSchema));
cfgRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (cfgRsp.numOfColumns + cfgRsp.numOfTags));
memcpy(cfgRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
if (schemaTag.nCols) {

View File

@ -142,6 +142,74 @@ _exit:
return code;
}
static int32_t vnodePreProcessDropTtlMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_INVALID_MSG;
int32_t lino = 0;
SMsgHead *pContOld = pMsg->pCont;
int32_t reqLenOld = pMsg->contLen - sizeof(SMsgHead);
SArray *tbUids = NULL;
int64_t timestampMs = 0;
SVDropTtlTableReq ttlReq = {0};
if (tDeserializeSVDropTtlTableReq((char *)pContOld + sizeof(SMsgHead), reqLenOld, &ttlReq) != 0) {
code = TSDB_CODE_INVALID_MSG;
TSDB_CHECK_CODE(code, lino, _exit);
}
{ // find expired uids
tbUids = taosArrayInit(8, sizeof(int64_t));
if (tbUids == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
timestampMs = (int64_t)ttlReq.timestampSec * 1000;
code = metaTtlFindExpired(pVnode->pMeta, timestampMs, tbUids, ttlReq.ttlDropMaxCount);
if (code != 0) {
code = TSDB_CODE_INVALID_MSG;
TSDB_CHECK_CODE(code, lino, _exit);
}
ttlReq.nUids = taosArrayGetSize(tbUids);
ttlReq.pTbUids = tbUids;
}
{ // prepare new content
int32_t reqLenNew = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq);
int32_t contLenNew = reqLenNew + sizeof(SMsgHead);
SMsgHead *pContNew = rpcMallocCont(contLenNew);
if (pContNew == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
tSerializeSVDropTtlTableReq((char *)pContNew + sizeof(SMsgHead), reqLenNew, &ttlReq);
pContNew->contLen = htonl(reqLenNew);
pContNew->vgId = pContOld->vgId;
rpcFreeCont(pContOld);
pMsg->pCont = pContNew;
pMsg->contLen = contLenNew;
}
code = 0;
_exit:
taosArrayDestroy(tbUids);
if (code) {
vError("vgId:%d, %s:%d failed to preprocess drop ttl request since %s, msg type:%s", TD_VID(pVnode), __func__, lino,
tstrerror(code), TMSG_INFO(pMsg->msgType));
} else {
vTrace("vgId:%d, %s done, timestampSec:%d, nUids:%d", TD_VID(pVnode), __func__, ttlReq.timestampSec, ttlReq.nUids);
}
return code;
}
extern int64_t tsMaxKeyByPrecision[];
static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t btimeMs, int64_t ctimeMs) {
int32_t code = 0;
@ -371,6 +439,9 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
case TDMT_VND_ALTER_TABLE: {
code = vnodePreProcessAlterTableMsg(pVnode, pMsg);
} break;
case TDMT_VND_DROP_TTL_TABLE: {
code = vnodePreProcessDropTtlMsg(pVnode, pMsg);
} break;
case TDMT_VND_SUBMIT: {
code = vnodePreProcessSubmitMsg(pVnode, pMsg);
} break;
@ -405,10 +476,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
return -1;
}
vDebug("vgId:%d, start to process write request %s, index:%" PRId64 ", applied:%" PRId64
", state.applyTerm:%" PRId64 ", conn.applyTerm:%" PRId64,
TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver, pVnode->state.applied,
pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
vDebug("vgId:%d, start to process write request %s, index:%" PRId64 ", applied:%" PRId64 ", state.applyTerm:%" PRId64
", conn.applyTerm:%" PRId64,
TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver, pVnode->state.applied, pVnode->state.applyTerm,
pMsg->info.conn.applyTerm);
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
ASSERTS(pVnode->state.applied + 1 == ver, "applied:%" PRId64 ", ver:%" PRId64, pVnode->state.applied, ver);
@ -727,28 +798,27 @@ _exit:
}
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;
SVDropTtlTableReq ttlReq = {0};
if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto end;
}
vDebug("vgId:%d, drop ttl table req will be processed, time:%" PRId32, pVnode->config.vgId, ttlReq.timestampSec);
int32_t ret = metaTtlDropTable(pVnode->pMeta, (int64_t)ttlReq.timestampSec * 1000, tbUids);
if (ret != 0) {
goto end;
}
if (taosArrayGetSize(tbUids) > 0) {
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
ASSERT(ttlReq.nUids == taosArrayGetSize(ttlReq.pTbUids));
if (ttlReq.nUids != 0) {
vInfo("vgId:%d, drop ttl table req will be processed, time:%d, ntbUids:%d", pVnode->config.vgId,
ttlReq.timestampSec, ttlReq.nUids);
}
vnodeDoRetention(pVnode, ttlReq.timestampSec);
int ret = 0;
if (ttlReq.nUids > 0) {
metaDropTables(pVnode->pMeta, ttlReq.pTbUids);
tqUpdateTbUidList(pVnode->pTq, ttlReq.pTbUids, false);
}
end:
taosArrayDestroy(tbUids);
taosArrayDestroy(ttlReq.pTbUids);
return ret;
}
@ -1482,7 +1552,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
code = tsdbInsertTableData(pVnode->pTsdb, ver, pSubmitTbData, &affectedRows);
if (code) goto _exit;
code = metaUpdateChangeTime(pVnode->pMeta, pSubmitTbData->uid, pSubmitTbData->ctimeMs);
code = metaUpdateChangeTimeWithLock(pVnode->pMeta, pSubmitTbData->uid, pSubmitTbData->ctimeMs);
if (code) goto _exit;
pSubmitRsp->affectedRows += affectedRows;
@ -1739,7 +1809,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
}
code = metaUpdateChangeTime(pVnode->pMeta, uid, deleteReq.ctimeMs);
code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, deleteReq.ctimeMs);
if (code < 0) {
terrno = code;
vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64
@ -1778,7 +1848,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in
uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid);
code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey);
if (code) goto _err;
code = metaUpdateChangeTime(pVnode->pMeta, uid, pRes->ctimeMs);
code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, pRes->ctimeMs);
if (code) goto _err;
}

View File

@ -78,10 +78,6 @@ static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_NOTE_LEN, 4);
code = blockDataAppendColInfo(pBlock, &infoData);
}
if (TSDB_CODE_SUCCESS == code) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_COL_COMMENT_LEN, 5);
code = blockDataAppendColInfo(pBlock, &infoData);
}
if (TSDB_CODE_SUCCESS == code) {
*pOutput = pBlock;
@ -103,9 +99,7 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
SColumnInfoData* pCol3 = taosArrayGet(pBlock->pDataBlock, 2);
// Note
SColumnInfoData* pCol4 = taosArrayGet(pBlock->pDataBlock, 3);
// Comment
SColumnInfoData* pCol5 = taosArrayGet(pBlock->pDataBlock, 4);
char buf[DESCRIBE_RESULT_COL_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
char buf[DESCRIBE_RESULT_FIELD_LEN] = {0};
for (int32_t i = 0; i < numOfRows; ++i) {
if (invisibleColumn(sysInfoUser, pMeta->tableType, pMeta->schema[i].flags)) {
continue;
@ -118,8 +112,6 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
colDataSetVal(pCol3, pBlock->info.rows, (const char*)&bytes, false);
STR_TO_VARSTR(buf, i >= pMeta->tableInfo.numOfColumns ? "TAG" : "");
colDataSetVal(pCol4, pBlock->info.rows, buf, false);
STR_TO_VARSTR(buf, pMeta->schema[i].comment);
colDataSetVal(pCol5, pBlock->info.rows, buf, false);
++(pBlock->info.rows);
}
if (pBlock->info.rows <= 0) {
@ -464,19 +456,14 @@ void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfColumns; ++i) {
SSchema* pSchema = pCfg->pSchemas + i;
char type[32];
char comments[TSDB_COL_COMMENT_LEN + 16] = {0};
sprintf(type, "%s", tDataTypes[pSchema->type].name);
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_GEOMETRY == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
if (pSchema->comment[0]) {
sprintf(comments, " COMMENT '%s'", pSchema->comment);
}
*len +=
sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s%s", ((i > 0) ? ", " : ""), pSchema->name, type, comments);
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type);
}
}
@ -484,18 +471,14 @@ void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfTags; ++i) {
SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i;
char type[32];
char comments[TSDB_COL_COMMENT_LEN + 16] = {0};
sprintf(type, "%s", tDataTypes[pSchema->type].name);
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_GEOMETRY == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
if (pSchema->comment[0]) {
sprintf(comments, " COMMENT '%s'", pSchema->comment);
}
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s%s", ((i > 0) ? ", " : ""), pSchema->name, type, comments);
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type);
}
}

View File

@ -173,7 +173,8 @@ SNode* createDropTableClause(SAstCreateContext* pCxt, bool ignoreNotExists, SNod
SNode* createDropTableStmt(SAstCreateContext* pCxt, SNodeList* pTables);
SNode* createDropSuperTableStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pRealTable);
SNode* createAlterTableModifyOptions(SAstCreateContext* pCxt, SNode* pRealTable, SNode* pOptions);
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SNode* pColDefNode);
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pColName,
SDataType dataType);
SNode* createAlterTableDropCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pColName);
SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pOldColName,
SToken* pNewColName);

View File

@ -312,17 +312,17 @@ cmd ::= ALTER STABLE alter_table_clause(A).
alter_table_clause(A) ::= full_table_name(B) alter_table_options(C). { A = createAlterTableModifyOptions(pCxt, B, C); }
alter_table_clause(A) ::=
full_table_name(B) ADD COLUMN column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_COLUMN, C); }
full_table_name(B) ADD COLUMN column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_COLUMN, &C, D); }
alter_table_clause(A) ::= full_table_name(B) DROP COLUMN column_name(C). { A = createAlterTableDropCol(pCxt, B, TSDB_ALTER_TABLE_DROP_COLUMN, &C); }
alter_table_clause(A) ::=
full_table_name(B) MODIFY COLUMN column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, C); }
full_table_name(B) MODIFY COLUMN column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, &C, D); }
alter_table_clause(A) ::=
full_table_name(B) RENAME COLUMN column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, &C, &D); }
alter_table_clause(A) ::=
full_table_name(B) ADD TAG column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_TAG, C); }
full_table_name(B) ADD TAG column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_TAG, &C, D); }
alter_table_clause(A) ::= full_table_name(B) DROP TAG column_name(C). { A = createAlterTableDropCol(pCxt, B, TSDB_ALTER_TABLE_DROP_TAG, &C); }
alter_table_clause(A) ::=
full_table_name(B) MODIFY TAG column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, C); }
full_table_name(B) MODIFY TAG column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, &C, D); }
alter_table_clause(A) ::=
full_table_name(B) RENAME TAG column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, &C, &D); }
alter_table_clause(A) ::=
@ -358,7 +358,7 @@ column_def_list(A) ::= column_def(B).
column_def_list(A) ::= column_def_list(B) NK_COMMA column_def(C). { A = addNodeToList(pCxt, B, C); }
column_def(A) ::= column_name(B) type_name(C). { A = createColumnDefNode(pCxt, &B, C, NULL); }
column_def(A) ::= column_name(B) type_name(C) COMMENT NK_STRING(D). { A = createColumnDefNode(pCxt, &B, C, &D); }
//column_def(A) ::= column_name(B) type_name(C) COMMENT NK_STRING(D). { A = createColumnDefNode(pCxt, &B, C, &D); }
%type type_name { SDataType }
%destructor type_name { }

View File

@ -1457,15 +1457,17 @@ SNode* createAlterTableModifyOptions(SAstCreateContext* pCxt, SNode* pRealTable,
return createAlterTableStmtFinalize(pRealTable, pStmt);
}
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SNode* pColDefNode) {
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pColName,
SDataType dataType) {
CHECK_PARSER_STATUS(pCxt);
SColumnDefNode* pCol = (SColumnDefNode*)pColDefNode;
if (!checkColumnName(pCxt, pColName)) {
return NULL;
}
SAlterTableStmt* pStmt = (SAlterTableStmt*)nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->alterType = alterType;
strcpy(pStmt->colName, pCol->colName);
strcpy(pStmt->colComment, pCol->comments);
pStmt->dataType = pCol->dataType;
COPY_STRING_FORM_ID_TOKEN(pStmt->colName, pColName);
pStmt->dataType = dataType;
return createAlterTableStmtFinalize(pRealTable, pStmt);
}

View File

@ -4702,7 +4702,6 @@ static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray) {
SColumnDefNode* pCol = (SColumnDefNode*)pNode;
SField field = {.type = pCol->dataType.type, .bytes = calcTypeBytes(pCol->dataType)};
strcpy(field.name, pCol->colName);
strcpy(field.comment, pCol->comments);
if (pCol->sma) {
field.flags |= COL_SMA_ON;
}
@ -5050,7 +5049,6 @@ static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchem
pSchema->bytes = calcTypeBytes(pCol->dataType);
pSchema->flags = flags;
strcpy(pSchema->name, pCol->colName);
strcpy(pSchema->comment, pCol->comments);
}
typedef struct SSampleAstInfo {
@ -7699,10 +7697,6 @@ static int32_t extractDescribeResultSchema(int32_t* numOfCols, SSchema** pSchema
(*pSchema)[3].bytes = DESCRIBE_RESULT_NOTE_LEN;
strcpy((*pSchema)[3].name, "note");
(*pSchema)[4].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[4].bytes = DESCRIBE_RESULT_COL_COMMENT_LEN;
strcpy((*pSchema)[4].name, "comment");
return TSDB_CODE_SUCCESS;
}
@ -8883,15 +8877,6 @@ static int32_t buildAddColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, S
pReq->type = pStmt->dataType.type;
pReq->flags = COL_SMA_ON;
pReq->bytes = calcTypeBytes(pStmt->dataType);
if (pStmt->colComment[0]) {
pReq->colComment = taosStrdup(pStmt->colComment);
if (pReq->colComment == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->colCommentLen = strlen(pReq->colComment);
} else {
pReq->colCommentLen = -1;
}
return TSDB_CODE_SUCCESS;
}
@ -8942,15 +8927,6 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->colId = pSchema->colId;
if (pStmt->colComment[0]) {
pReq->colComment = taosStrdup(pStmt->colComment);
if (pReq->colComment == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->colCommentLen = strlen(pReq->colComment);
} else {
pReq->colCommentLen = -1;
}
return TSDB_CODE_SUCCESS;
}

File diff suppressed because it is too large Load Diff

View File

@ -212,7 +212,7 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime, int32_t *a
}
if (atime != NULL) {
*atime = fileStat.st_mtime;
*atime = fileStat.st_atime;
}
return 0;

View File

@ -806,21 +806,33 @@ class TDDnodes:
psCmd = "for /f %a in ('wmic process where \"name='taosd.exe'\" get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
while(processID):
print(processID)
print(f"pid of taosd.exe:{processID}")
killCmd = "kill -9 %s > nul 2>&1" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8").strip()
psCmd = "for /f %a in ('wmic process where \"name='tmq_sim.exe'\" get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
while(processID):
print(processID)
print(f"pid of tmq_sim.exe:{processID}")
killCmd = "kill -9 %s > nul 2>&1" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8").strip()
psCmd = "for /f %a in ('wmic process where \"name='taosBenchmark.exe'\" get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
while(processID):
print(f"pid of taosBenchmark.exe:{processID}")
killCmd = "kill -9 %s > nul 2>&1" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8").strip()
else:
psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()

View File

@ -210,66 +210,6 @@ class TDTestCase:
licences_info = tdSql.queryResult
tdSql.checkEqual(grants_info,licences_info)
def show_create_table_with_col_comment(self):
tdSql.execute("create database comment_test_db")
tdSql.execute("use comment_test_db")
tdSql.execute("create table normal_table(ts timestamp, c2 int comment 'c2 comment')")
tdSql.execute("create stable super_table(ts timestamp comment 'ts', c2 int comment 'c2 comment') tags(tg int comment 'tg comment')")
create_sql = "create table `normal_table` (`ts` timestamp, `c2` int)"
tdSql.query('show create table normal_table')
tdSql.checkEqual(tdSql.queryResult[0][1].lower(), create_sql)
tdSql.query('show create table super_table')
create_sql = "create stable `super_table` (`ts` timestamp, `c2` int) tags (`tg` int)"
tdSql.checkEqual(tdSql.queryResult[0][1].lower(), create_sql)
tdSql.query("desc normal_table")
tdSql.checkCols(5)
tdSql.checkData(0, 4, "")
tdSql.query("desc super_table")
tdSql.checkCols(5)
tdSql.checkData(0, 4, "")
tdSql.execute("drop database comment_test_db")
def alter_table_with_col_comment(self):
tdSql.execute("create database comment_test_db")
tdSql.execute("use comment_test_db")
tdSql.execute("create table normal_table(ts timestamp, c2 int comment 'c2 comment')")
tdSql.execute("create stable super_table(ts timestamp comment 'ts', c2 int comment 'c2 comment') tags(tg int comment 'tg comment')")
create_sql = "create table `normal_table` (`ts` timestamp, `c2` int, `c3` int)"
tdSql.execute("alter table normal_table add column c3 int comment 'c3 comment'", queryTimes=1)
tdSql.query("show create table normal_table")
tdSql.checkEqual(tdSql.queryResult[0][1].lower(), create_sql)
create_sql = "create table `normal_table` (`ts` timestamp, `c2` int, `c3` int, `c4` varchar(255))"
tdSql.execute("alter table normal_table add column c4 varchar(255) comment 'c4 comment'", queryTimes=1)
tdSql.query("show create table normal_table")
tdSql.checkEqual(tdSql.queryResult[0][1].lower(), create_sql)
create_sql = "create table `normal_table` (`ts` timestamp, `c2` int, `c3` int, `c4` varchar(255), `c5` varchar(255))"
tdSql.execute("alter table normal_table add column c5 varchar(255)", queryTimes=1)
tdSql.query("show create table normal_table")
tdSql.checkEqual(tdSql.queryResult[0][1].lower(), create_sql)
create_sql = "create stable `super_table` (`ts` timestamp, `c2` int, `c3` int) tags (`tg` int) sma(`ts`,`c2`)"
tdSql.execute("alter table super_table add column c3 int comment 'c3 comment'", queryTimes=1)
tdSql.query("show create table super_table")
tdSql.checkEqual(tdSql.queryResult[0][1].lower(), create_sql)
create_sql = "create stable `super_table` (`ts` timestamp, `c2` int, `c3` int, `c4` varchar(255)) tags (`tg` int) sma(`ts`,`c2`)"
tdSql.execute("alter table super_table add column c4 varchar(255) comment 'c4 comment'", queryTimes=1)
tdSql.query("show create table super_table")
tdSql.checkEqual(tdSql.queryResult[0][1].lower(), create_sql)
create_sql = "create stable `super_table` (`ts` timestamp, `c2` int, `c3` int, `c4` varchar(256)) tags (`tg` int) sma(`ts`,`c2`)"
tdSql.execute("alter table super_table modify column c4 varchar(256) comment 'c4 256 comment'", queryTimes=1)
tdSql.query("show create table super_table")
tdSql.checkEqual(tdSql.queryResult[0][1].lower(), create_sql)
tdSql.execute("drop database comment_test_db")
def run(self):
self.check_gitinfo()
self.show_base()
@ -278,8 +218,6 @@ class TDTestCase:
self.show_create_sql()
self.show_create_sysdb_sql()
self.show_create_systb_sql()
self.show_create_table_with_col_comment()
self.alter_table_with_col_comment()
def stop(self):
tdSql.close()