Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact

This commit is contained in:
Hongze Cheng 2022-06-14 05:24:18 +00:00
commit 62f9403ed9
26 changed files with 1829 additions and 461 deletions

View File

@ -4,50 +4,275 @@ title: Keywords
There are about 200 keywords reserved by TDengine, they can't be used as the name of database, STable or table with either upper case, lower case or mixed case.
**Keywords List**
## Keywords List
| | | | | |
| ----------- | ---------- | --------- | ---------- | ------------ |
| ABORT | CREATE | IGNORE | NULL | STAR |
| ACCOUNT | CTIME | IMMEDIATE | OF | STATE |
| ACCOUNTS | DATABASE | IMPORT | OFFSET | STATEMENT |
| ADD | DATABASES | IN | OR | STATE_WINDOW |
| AFTER | DAYS | INITIALLY | ORDER | STORAGE |
| ALL | DBS | INSERT | PARTITIONS | STREAM |
| ALTER | DEFERRED | INSTEAD | PASS | STREAMS |
| AND | DELIMITERS | INT | PLUS | STRING |
| AS | DESC | INTEGER | PPS | SYNCDB |
| ASC | DESCRIBE | INTERVAL | PRECISION | TABLE |
| ATTACH | DETACH | INTO | PREV | TABLES |
| BEFORE | DISTINCT | IS | PRIVILEGE | TAG |
| BEGIN | DIVIDE | ISNULL | QTIME | TAGS |
| BETWEEN | DNODE | JOIN | QUERIES | TBNAME |
| BIGINT | DNODES | KEEP | QUERY | TIMES |
| BINARY | DOT | KEY | QUORUM | TIMESTAMP |
| BITAND | DOUBLE | KILL | RAISE | TINYINT |
| BITNOT | DROP | LE | REM | TOPIC |
| BITOR | EACH | LIKE | REPLACE | TOPICS |
| BLOCKS | END | LIMIT | REPLICA | TRIGGER |
| BOOL | EQ | LINEAR | RESET | TSERIES |
| BY | EXISTS | LOCAL | RESTRICT | UMINUS |
| CACHE | EXPLAIN | LP | ROW | UNION |
| CACHELAST | FAIL | LSHIFT | RP | UNSIGNED |
| CASCADE | FILE | LT | RSHIFT | UPDATE |
| CHANGE | FILL | MATCH | SCORES | UPLUS |
| CLUSTER | FLOAT | MAXROWS | SELECT | USE |
| COLON | FOR | MINROWS | SEMI | USER |
| COLUMN | FROM | MINUS | SESSION | USERS |
| COMMA | FSYNC | MNODES | SET | USING |
| COMP | GE | MODIFY | SHOW | VALUES |
| COMPACT | GLOB | MODULES | SLASH | VARIABLE |
| CONCAT | GRANTS | NCHAR | SLIDING | VARIABLES |
| CONFLICT | GROUP | NE | SLIMIT | VGROUPS |
| CONNECTION | GT | NONE | SMALLINT | VIEW |
| CONNECTIONS | HAVING | NOT | SOFFSET | VNODES |
| CONNS | ID | NOTNULL | STable | WAL |
| COPY | IF | NOW | STableS | WHERE |
| _C0 | _QSTART | _QSTOP | _QDURATION | _WSTART |
| _WSTOP | _WDURATION | _ROWTS |
### A
- ABORT
- ACCOUNT
- ACCOUNTS
- ADD
- AFTER
- ALL
- ALTER
- AND
- AS
- ASC
- ATTACH
### B
- BEFORE
- BEGIN
- BETWEEN
- BIGINT
- BINARY
- BITAND
- BITNOT
- BITOR
- BLOCKS
- BOOL
- BY
### C
- CACHE
- CACHELAST
- CASCADE
- CHANGE
- CLUSTER
- COLON
- COLUMN
- COMMA
- COMP
- COMPACT
- CONCAT
- CONFLICT
- CONNECTION
- CONNECTIONS
- CONNS
- COPY
- CREATE
- CTIME
### D
- DATABASE
- DATABASES
- DAYS
- DBS
- DEFERRED
- DELETE
- DELIMITERS
- DESC
- DESCRIBE
- DETACH
- DISTINCT
- DIVIDE
- DNODE
- DNODES
- DOT
- DOUBLE
- DROP
### E
- END
- EQ
- EXISTS
- EXPLAIN
### F
- FAIL
- FILE
- FILL
- FLOAT
- FOR
- FROM
- FSYNC
### G
- GE
- GLOB
- GRANTS
- GROUP
- GT
### H
- HAVING
### I
- ID
- IF
- IGNORE
- IMMEDIA
- IMPORT
- IN
- INITIAL
- INSERT
- INSTEAD
- INT
- INTEGER
- INTERVA
- INTO
- IS
- ISNULL
### J
- JOIN
### K
- KEEP
- KEY
- KILL
### L
- LE
- LIKE
- LIMIT
- LINEAR
- LOCAL
- LP
- LSHIFT
- LT
### M
- MATCH
- MAXROWS
- MINROWS
- MINUS
- MNODES
- MODIFY
- MODULES
### N
- NE
- NONE
- NOT
- NOTNULL
- NOW
- NULL
### O
- OF
- OFFSET
- OR
- ORDER
### P
- PARTITION
- PASS
- PLUS
- PPS
- PRECISION
- PREV
- PRIVILEGE
### Q
- QTIME
- QUERIE
- QUERY
- QUORUM
### R
- RAISE
- REM
- REPLACE
- REPLICA
- RESET
- RESTRIC
- ROW
- RP
- RSHIFT
### S
- SCORES
- SELECT
- SEMI
- SESSION
- SET
- SHOW
- SLASH
- SLIDING
- SLIMIT
- SMALLIN
- SOFFSET
- STable
- STableS
- STAR
- STATE
- STATEMEN
- STATE_WI
- STORAGE
- STREAM
- STREAMS
- STRING
- SYNCDB
### T
- TABLE
- TABLES
- TAG
- TAGS
- TBNAME
- TIMES
- TIMESTAMP
- TINYINT
- TOPIC
- TOPICS
- TRIGGER
- TSERIES
### U
- UMINUS
- UNION
- UNSIGNED
- UPDATE
- UPLUS
- USE
- USER
- USERS
- USING
### V
- VALUES
- VARIABLE
- VARIABLES
- VGROUPS
- VIEW
- VNODES
### W
- WAL
- WHERE
### _
- _C0
- _QSTART
- _QSTOP
- _QDURATION
- _WSTART
- _WSTOP
- _WDURATION
## Explanations
### TBNAME

View File

@ -45,48 +45,274 @@ title: TDengine 参数限制与保留关键字
目前 TDengine 有将近 200 个内部保留关键字这些关键字无论大小写均不可以用作库名、表名、STable 名、数据列名及标签列名等。这些关键字列表如下:
| 关键字列表 | | | | |
| ----------- | ---------- | --------- | ---------- | ------------ |
| ABORT | CREATE | IGNORE | NULL | STAR |
| ACCOUNT | CTIME | IMMEDIATE | OF | STATE |
| ACCOUNTS | DATABASE | IMPORT | OFFSET | STATEMENT |
| ADD | DATABASES | IN | OR | STATE_WINDOW |
| AFTER | DAYS | INITIALLY | ORDER | STORAGE |
| ALL | DBS | INSERT | PARTITIONS | STREAM |
| ALTER | DEFERRED | INSTEAD | PASS | STREAMS |
| AND | DELIMITERS | INT | PLUS | STRING |
| AS | DESC | INTEGER | PPS | SYNCDB |
| ASC | DESCRIBE | INTERVAL | PRECISION | TABLE |
| ATTACH | DETACH | INTO | PREV | TABLES |
| BEFORE | DISTINCT | IS | PRIVILEGE | TAG |
| BEGIN | DIVIDE | ISNULL | QTIME | TAGS |
| BETWEEN | DNODE | JOIN | QUERIES | TBNAME |
| BIGINT | DNODES | KEEP | QUERY | TIMES |
| BINARY | DOT | KEY | QUORUM | TIMESTAMP |
| BITAND | DOUBLE | KILL | RAISE | TINYINT |
| BITNOT | DROP | LE | REM | TOPIC |
| BITOR | EACH | LIKE | REPLACE | TOPICS |
| BLOCKS | END | LIMIT | REPLICA | TRIGGER |
| BOOL | EQ | LINEAR | RESET | TSERIES |
| BY | EXISTS | LOCAL | RESTRICT | UMINUS |
| CACHE | EXPLAIN | LP | ROW | UNION |
| CACHELAST | FAIL | LSHIFT | RP | UNSIGNED |
| CASCADE | FILE | LT | RSHIFT | UPDATE |
| CHANGE | FILL | MATCH | SCORES | UPLUS |
| CLUSTER | FLOAT | MAXROWS | SELECT | USE |
| COLON | FOR | MINROWS | SEMI | USER |
| COLUMN | FROM | MINUS | SESSION | USERS |
| COMMA | FSYNC | MNODES | SET | USING |
| COMP | GE | MODIFY | SHOW | VALUES |
| COMPACT | GLOB | MODULES | SLASH | VARIABLE |
| CONCAT | GRANTS | NCHAR | SLIDING | VARIABLES |
| CONFLICT | GROUP | NE | SLIMIT | VGROUPS |
| CONNECTION | GT | NONE | SMALLINT | VIEW |
| CONNECTIONS | HAVING | NOT | SOFFSET | VNODES |
| CONNS | ID | NOTNULL | STABLE | WAL |
| COPY | IF | NOW | STABLES | WHERE |
| _C0 | _QSTART | _QSTOP | _QDURATION | _WSTART |
| _WSTOP | _WDURATION | _ROWTS |
### A
- ABORT
- ACCOUNT
- ACCOUNTS
- ADD
- AFTER
- ALL
- ALTER
- AND
- AS
- ASC
- ATTACH
### B
- BEFORE
- BEGIN
- BETWEEN
- BIGINT
- BINARY
- BITAND
- BITNOT
- BITOR
- BLOCKS
- BOOL
- BY
### C
- CACHE
- CACHELAST
- CASCADE
- CHANGE
- CLUSTER
- COLON
- COLUMN
- COMMA
- COMP
- COMPACT
- CONCAT
- CONFLICT
- CONNECTION
- CONNECTIONS
- CONNS
- COPY
- CREATE
- CTIME
### D
- DATABASE
- DATABASES
- DAYS
- DBS
- DEFERRED
- DELETE
- DELIMITERS
- DESC
- DESCRIBE
- DETACH
- DISTINCT
- DIVIDE
- DNODE
- DNODES
- DOT
- DOUBLE
- DROP
### E
- END
- EQ
- EXISTS
- EXPLAIN
### F
- FAIL
- FILE
- FILL
- FLOAT
- FOR
- FROM
- FSYNC
### G
- GE
- GLOB
- GRANTS
- GROUP
- GT
### H
- HAVING
### I
- ID
- IF
- IGNORE
- IMMEDIA
- IMPORT
- IN
- INITIAL
- INSERT
- INSTEAD
- INT
- INTEGER
- INTERVA
- INTO
- IS
- ISNULL
### J
- JOIN
### K
- KEEP
- KEY
- KILL
### L
- LE
- LIKE
- LIMIT
- LINEAR
- LOCAL
- LP
- LSHIFT
- LT
### M
- MATCH
- MAXROWS
- MINROWS
- MINUS
- MNODES
- MODIFY
- MODULES
### N
- NE
- NONE
- NOT
- NOTNULL
- NOW
- NULL
### O
- OF
- OFFSET
- OR
- ORDER
### P
- PARTITION
- PASS
- PLUS
- PPS
- PRECISION
- PREV
- PRIVILEGE
### Q
- QTIME
- QUERIE
- QUERY
- QUORUM
### R
- RAISE
- REM
- REPLACE
- REPLICA
- RESET
- RESTRIC
- ROW
- RP
- RSHIFT
### S
- SCORES
- SELECT
- SEMI
- SESSION
- SET
- SHOW
- SLASH
- SLIDING
- SLIMIT
- SMALLIN
- SOFFSET
- STable
- STableS
- STAR
- STATE
- STATEMEN
- STATE_WI
- STORAGE
- STREAM
- STREAMS
- STRING
- SYNCDB
### T
- TABLE
- TABLES
- TAG
- TAGS
- TBNAME
- TIMES
- TIMESTAMP
- TINYINT
- TOPIC
- TOPICS
- TRIGGER
- TSERIES
### U
- UMINUS
- UNION
- UNSIGNED
- UPDATE
- UPLUS
- USE
- USER
- USERS
- USING
### V
- VALUES
- VARIABLE
- VARIABLES
- VGROUPS
- VIEW
- VNODES
### W
- WAL
- WHERE
### _
- _C0
- _QSTART
- _QSTOP
- _QDURATION
- _WSTART
- _WSTOP
- _WDURATION
## 特殊说明
### TBNAME

View File

@ -32,9 +32,7 @@ typedef struct {
int32_t dnodeId;
bool standby;
bool deploy;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
SReplica replica;
SMsgCb msgCb;
} SMnodeOpt;
@ -83,6 +81,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
*/
int32_t mndProcessRpcMsg(SRpcMsg *pMsg);
int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg);
/**
* @brief Generate machine code

View File

@ -138,6 +138,10 @@ typedef enum EFunctionType {
FUNCTION_TYPE_TOP_MERGE,
FUNCTION_TYPE_BOTTOM_PARTIAL,
FUNCTION_TYPE_BOTTOM_MERGE,
FUNCTION_TYPE_FIRST_PARTIAL,
FUNCTION_TYPE_FIRST_MERGE,
FUNCTION_TYPE_LAST_PARTIAL,
FUNCTION_TYPE_LAST_MERGE,
// user defined funcion
FUNCTION_TYPE_UDF = 10000

View File

@ -1327,8 +1327,8 @@ static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) {
static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
SHashObj *s1 = *(SHashObj **)key1;
SHashObj *s2 = *(SHashObj **)key2;
SSmlKv *kv1 = (SSmlKv *)taosHashGet(s1, TS, TS_LEN);
SSmlKv *kv2 = (SSmlKv *)taosHashGet(s2, TS, TS_LEN);
SSmlKv *kv1 = *(SSmlKv **)taosHashGet(s1, TS, TS_LEN);
SSmlKv *kv2 = *(SSmlKv **)taosHashGet(s2, TS, TS_LEN);
ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
if (kv1->i < kv2->i) {
@ -1340,29 +1340,13 @@ static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
}
}
static int32_t smlDealCols(SSmlTableInfo *oneTable, bool dataFormat, SArray *cols) {
if (dataFormat) {
void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GE);
if (p == NULL) {
static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
if(dataFormat){
void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GT);
if(p == NULL){
taosArrayPush(oneTable->cols, &cols);
} else { // to make the sort stable for update data
SArray *sa = (SArray *)p;
SSmlKv *cur = (SSmlKv *)taosArrayGet(sa, 0);
SSmlKv *dCur = (SSmlKv *)taosArrayGet(cols, 0);
if (cur->i > dCur->i) {
taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols);
} else {
ASSERT(cur->i == dCur->i);
int32_t index = TARRAY_ELEM_IDX(oneTable->cols, p) + 1;
for (; index < taosArrayGetSize(oneTable->cols); index++) {
SArray *tmp = (SArray *)taosArrayGet(oneTable->cols, index);
SSmlKv *curTs = (SSmlKv *)taosArrayGet(tmp, 0);
if (curTs->i > dCur->i) {
break;
}
}
taosArrayInsert(oneTable->cols, index, &cols);
}
}else{
taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols);
}
return TSDB_CODE_SUCCESS;
}
@ -1377,27 +1361,11 @@ static int32_t smlDealCols(SSmlTableInfo *oneTable, bool dataFormat, SArray *col
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
}
void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GE);
if (p == NULL) {
void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GT);
if(p == NULL){
taosArrayPush(oneTable->cols, &kvHash);
} else { // to make the sort stable for update data
SHashObj *sa = (SHashObj *)p;
SSmlKv *cur = (SSmlKv *)taosHashGet(sa, TS, TS_LEN);
SSmlKv *dCur = (SSmlKv *)taosArrayGet(cols, 0);
if (cur->i > dCur->i) {
taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols);
} else {
ASSERT(cur->i == dCur->i);
int32_t index = TARRAY_ELEM_IDX(oneTable->cols, p) + 1;
for (; index < taosArrayGetSize(oneTable->cols); index++) {
SHashObj *tmp = (SHashObj *)taosArrayGet(oneTable->cols, index);
SSmlKv *curTs = (SSmlKv *)taosHashGet(tmp, TS, TS_LEN);
if (curTs->i > dCur->i) {
break;
}
}
taosArrayInsert(oneTable->cols, index, &cols);
}
}else{
taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &kvHash);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -1260,4 +1260,28 @@ TEST(testCase, sml_16368_Test) {
pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_MICRO_SECONDS);
ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes);
}*/
}
TEST(testCase, sml_dup_time_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists dup_time schemaless 1");
taos_free_result(pRes);
const char *sql[] = {
//"test_ms,t0=t c0=f 1626006833641",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=false,c1=1i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"xcxvwjvf\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=T,c1=2i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"fixrzcuq\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=3i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"iupzdqub\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=4i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"yvvtzzof\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=5i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"vbxpilkj\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000"
};
pRes = taos_query(taos, "use dup_time");
taos_free_result(pRes);
pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, 0);
ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes);
}
*/

View File

@ -34,39 +34,31 @@ typedef struct SMnodeMgmt {
SSingleWorker writeWorker;
SSingleWorker syncWorker;
SSingleWorker monitorWorker;
SReplica replicas[TSDB_MAX_REPLICA];
int8_t replica;
bool stopped;
int32_t refCount;
TdThreadRwlock lock;
} SMnodeMgmt;
// mmFile.c
int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed);
// mmInt.c
int32_t mmAcquire(SMnodeMgmt *pMgmt);
void mmRelease(SMnodeMgmt *pMgmt);
int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed);
// mmHandle.c
SArray *mmGetMsgHandles();
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg);
// mmWorker.c
int32_t mmStartWorker(SMnodeMgmt *pMgmt);
void mmStopWorker(SMnodeMgmt *pMgmt);
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);
#ifdef __cplusplus
}

View File

@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed) {
int32_t code = TSDB_CODE_INVALID_JSON_FORMAT;
int32_t len = 0;
int32_t maxLen = 4096;
@ -52,61 +52,54 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
}
*pDeployed = deployed->valueint;
cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
if (mnodes != NULL) {
if (!mnodes || mnodes->type != cJSON_Array) {
dError("failed to read %s since nodes not found", file);
cJSON *id = cJSON_GetObjectItem(root, "id");
if (id) {
if (id->type != cJSON_Number) {
dError("failed to read %s since id not found", file);
goto _OVER;
}
pMgmt->replica = cJSON_GetArraySize(mnodes);
if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
goto _OVER;
}
for (int32_t i = 0; i < pMgmt->replica; ++i) {
cJSON *node = cJSON_GetArrayItem(mnodes, i);
if (node == NULL) break;
SReplica *pReplica = &pMgmt->replicas[i];
cJSON *id = cJSON_GetObjectItem(node, "id");
if (!id || id->type != cJSON_Number) {
dError("failed to read %s since id not found", file);
goto _OVER;
}
if (pReplica) {
pReplica->id = id->valueint;
}
}
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
dError("failed to read %s since fqdn not found", file);
goto _OVER;
}
cJSON *fqdn = cJSON_GetObjectItem(root, "fqdn");
if (fqdn) {
if (fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
dError("failed to read %s since fqdn not found", file);
goto _OVER;
}
if (pReplica) {
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
}
}
cJSON *port = cJSON_GetObjectItem(node, "port");
if (!port || port->type != cJSON_Number) {
dError("failed to read %s since port not found", file);
goto _OVER;
}
pReplica->port = port->valueint;
cJSON *port = cJSON_GetObjectItem(root, "port");
if (port) {
if (port->type != cJSON_Number) {
dError("failed to read %s since port not found", file);
goto _OVER;
}
if (pReplica) {
pReplica->port = (uint16_t)port->valueint;
}
}
code = 0;
dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed);
_OVER:
if (content != NULL) taosMemoryFree(content);
if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile);
if (code == 0) {
dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed);
}
terrno = code;
return code;
}
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) {
int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed) {
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
@ -124,26 +117,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) {
char *content = taosMemoryCalloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica);
if (replica > 0) {
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
for (int32_t i = 0; i < replica; ++i) {
SReplica *pReplica = &pMgmt->replicas[i];
if (pMsg != NULL) {
pReplica = &pMsg->replicas[i];
}
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port);
if (i < replica - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }],\n");
}
}
if (pReplica != NULL && pReplica->id > 0) {
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u\n,", pReplica->port);
}
len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed);
len += snprintf(content + len, maxLen - len, "}\n");

View File

@ -20,11 +20,6 @@ void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) {
mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->grant);
}
void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
pInfo->isMnode = 1;
mndGetLoad(pMgmt->pMnode, &pInfo->load);
}
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonMmInfo mmInfo = {0};
mmGetMonitorInfo(pMgmt, &mmInfo);
@ -50,6 +45,11 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0;
}
void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
pInfo->isMnode = 1;
mndGetLoad(pMgmt->pMnode, &pInfo->load);
}
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonMloadInfo mloads = {0};
mmGetMnodeLoads(pMgmt, &mloads);
@ -90,7 +90,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SMnodeMgmt mgmt = {0};
mgmt.path = pInput->path;
mgmt.name = pInput->name;
if (mmWriteFile(&mgmt, &createReq, deployed) != 0) {
if (mmWriteFile(&mgmt, &createReq.replicas[0], deployed) != 0) {
dError("failed to write mnode file since %s", terrstr());
return -1;
}
@ -126,117 +126,117 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SArray *mmGetMsgHandles() {
int32_t code = -1;
SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle));
SArray *pArray = taosArrayInit(128, sizeof(SMgmtHandle));
if (pArray == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_QNODE_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_BNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_BNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_REDISTRIBUTE_VGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MERGE_VGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_QNODE_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_BNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_BNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_REDISTRIBUTE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MERGE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutMsgToMonitorQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutMsgToMonitorQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY, mmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY, mmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
code = 0;

View File

@ -27,7 +27,7 @@ static bool mmDeployRequired(const SMgmtInputOpt *pInput) {
static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) {
SMnodeMgmt mgmt = {0};
mgmt.path = pInput->path;
if (mmReadFile(&mgmt, required) != 0) {
if (mmReadFile(&mgmt, NULL, required) != 0) {
return -1;
}
@ -43,33 +43,19 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
pOption->deploy = true;
pOption->msgCb = pMgmt->msgCb;
pOption->dnodeId = pMgmt->pData->dnodeId;
pOption->replica = 1;
pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0];
pReplica->id = 1;
pReplica->port = tsServerPort;
tstrncpy(pReplica->fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
pOption->replica.id = 1;
pOption->replica.port = tsServerPort;
tstrncpy(pOption->replica.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
}
static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
pOption->deploy = false;
static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, const SReplica *pReplica, SMnodeOpt *pOption) {
pOption->standby = false;
pOption->deploy = false;
pOption->msgCb = pMgmt->msgCb;
pOption->dnodeId = pMgmt->pData->dnodeId;
if (pMgmt->replica > 0) {
if (pReplica->id > 0) {
pOption->standby = true;
pOption->replica = 1;
pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0];
for (int32_t i = 0; i < pMgmt->replica; ++i) {
if (pMgmt->replicas[i].id != pMgmt->pData->dnodeId) continue;
pReplica->id = pMgmt->replicas[i].id;
pReplica->port = pMgmt->replicas[i].port;
memcpy(pReplica->fqdn, pMgmt->replicas[i].fqdn, TSDB_FQDN_LEN);
}
pOption->replica = *pReplica;
}
}
@ -105,12 +91,13 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->path = pInput->path;
pMgmt->name = pInput->name;
pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.putToQueueFp = (PutToQueueFp)mmPutRpcMsgToQueue;
pMgmt->msgCb.putToQueueFp = (PutToQueueFp)mmPutMsgToQueue;
pMgmt->msgCb.mgmt = pMgmt;
taosThreadRwlockInit(&pMgmt->lock, NULL);
bool deployed = false;
if (mmReadFile(pMgmt, &deployed) != 0) {
bool deployed = false;
SReplica replica = {0};
if (mmReadFile(pMgmt, &replica, &deployed) != 0) {
dError("failed to read file since %s", terrstr());
mmClose(pMgmt);
return -1;
@ -123,7 +110,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
mmBuildOptionForDeploy(pMgmt, pInput, &option);
} else {
dInfo("mnode start to open");
mmBuildOptionForOpen(pMgmt, &option);
mmBuildOptionForOpen(pMgmt, &replica, &option);
}
pMgmt->pMnode = mndOpen(pMgmt->path, &option);
@ -141,8 +128,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
}
tmsgReportStartup("mnode-worker", "initialized");
if (!deployed || pMgmt->replica > 0) {
pMgmt->replica = 0;
if (!deployed || replica.id > 0) {
deployed = true;
if (mmWriteFile(pMgmt, NULL, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr());
@ -178,22 +164,3 @@ SMgmtFunc mmGetMgmtFunc() {
return mgmtFunc;
}
int32_t mmAcquire(SMnodeMgmt *pMgmt) {
int32_t code = 0;
taosThreadRwlockRdlock(&pMgmt->lock);
if (pMgmt->stopped) {
code = -1;
} else {
atomic_add_fetch_32(&pMgmt->refCount, 1);
}
taosThreadRwlockUnlock(&pMgmt->lock);
return code;
}
void mmRelease(SMnodeMgmt *pMgmt) {
taosThreadRwlockRdlock(&pMgmt->lock);
atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosThreadRwlockUnlock(&pMgmt->lock);
}

View File

@ -16,6 +16,25 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
int32_t code = 0;
taosThreadRwlockRdlock(&pMgmt->lock);
if (pMgmt->stopped) {
code = -1;
} else {
atomic_add_fetch_32(&pMgmt->refCount, 1);
}
taosThreadRwlockUnlock(&pMgmt->lock);
return code;
}
static inline void mmRelease(SMnodeMgmt *pMgmt) {
taosThreadRwlockRdlock(&pMgmt->lock);
atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosThreadRwlockUnlock(&pMgmt->lock);
}
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = {
.code = code,
@ -26,7 +45,7 @@ static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
tmsgSendRsp(&rsp);
}
static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
dTrace("msg:%p, get from mnode queue", pMsg);
@ -53,11 +72,10 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem(pMsg);
}
static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from mnode-sync queue", pMsg);
pMsg->info.node = pMgmt->pMnode;
dTrace("msg:%p, get from mnode-sync queue", pMsg);
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = ntohl(pHead->contLen);
@ -70,66 +88,69 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem(pMsg);
}
static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
dTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg);
return 0;
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
if (mmAcquire(pMgmt) == 0) {
dTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg);
mmRelease(pMgmt);
return 0;
} else {
dTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, terrstr(),
TMSG_INFO(pMsg->msgType));
return -1;
}
}
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg);
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
}
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg);
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
}
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg);
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
}
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
mndPreprocessQueryMsg(pMgmt->pMnode, pMsg);
return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
if (mndPreprocessQueryMsg(pMgmt->pMnode, pMsg) != 0) {
dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
return -1;
}
return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
}
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg);
int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->monitorWorker, pMsg);
}
int32_t mmPutRpcMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SSingleWorker *pWorker = NULL;
switch (qtype) {
case WRITE_QUEUE:
pWorker = &pMgmt->writeWorker;
break;
case QUERY_QUEUE:
pWorker = &pMgmt->queryWorker;
break;
case READ_QUEUE:
pWorker = &pMgmt->readWorker;
break;
case SYNC_QUEUE:
pWorker = &pMgmt->syncWorker;
break;
default:
terrno = TSDB_CODE_INVALID_PARA;
}
if (pWorker == NULL) return -1;
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) return -1;
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
switch (qtype) {
case WRITE_QUEUE:
dTrace("msg:%p, is created and will put into vnode-write queue", pMsg);
taosWriteQitem(pMgmt->writeWorker.queue, pMsg);
return 0;
case QUERY_QUEUE:
dTrace("msg:%p, is created and will put into vnode-query queue", pMsg);
taosWriteQitem(pMgmt->queryWorker.queue, pMsg);
return 0;
case READ_QUEUE:
dTrace("msg:%p, is created and will put into vnode-read queue", pMsg);
taosWriteQitem(pMgmt->readWorker.queue, pMsg);
return 0;
case SYNC_QUEUE:
if (mmAcquire(pMgmt) == 0) {
dTrace("msg:%p, is created and will put into vnode-sync queue", pMsg);
taosWriteQitem(pMgmt->syncWorker.queue, pMsg);
mmRelease(pMgmt);
return 0;
} else {
return -1;
}
default:
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
dTrace("msg:%p, is created and will put int %s queue", pMsg, pWorker->name);
return mmPutMsgToWorker(pMgmt, pWorker, pMsg);
}
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
@ -137,7 +158,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.min = tsNumOfMnodeQueryThreads,
.max = tsNumOfMnodeQueryThreads,
.name = "mnode-query",
.fp = (FItem)mmProcessQueue,
.fp = (FItem)mmProcessRpcMsg,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) {
@ -149,7 +170,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.min = tsNumOfMnodeReadThreads,
.max = tsNumOfMnodeReadThreads,
.name = "mnode-read",
.fp = (FItem)mmProcessQueue,
.fp = (FItem)mmProcessRpcMsg,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) {
@ -161,7 +182,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.min = 1,
.max = 1,
.name = "mnode-write",
.fp = (FItem)mmProcessQueue,
.fp = (FItem)mmProcessRpcMsg,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) {
@ -173,7 +194,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.min = 1,
.max = 1,
.name = "mnode-sync",
.fp = (FItem)mmProcessSyncQueue,
.fp = (FItem)mmProcessSyncMsg,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
@ -185,7 +206,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.min = 1,
.max = 1,
.name = "mnode-monitor",
.fp = (FItem)mmProcessQueue,
.fp = (FItem)mmProcessRpcMsg,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {

View File

@ -123,10 +123,12 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
} else {
dInfo("node:%s, has been created", pWrapper->name);
(void)dmOpenNode(pWrapper);
(void)dmStartNode(pWrapper);
pWrapper->required = true;
code = dmOpenNode(pWrapper);
if (code != 0) {
code = dmStartNode(pWrapper);
}
pWrapper->deployed = true;
pWrapper->required = true;
pWrapper->proc.ptype = pDnode->ptype;
}

View File

@ -76,11 +76,12 @@ typedef struct {
} STelemMgmt;
typedef struct {
sem_t syncSem;
int64_t sync;
bool standby;
int32_t errCode;
int32_t transId;
sem_t syncSem;
int64_t sync;
bool standby;
SReplica replica;
int32_t errCode;
int32_t transId;
} SSyncMgmt;
typedef struct {
@ -98,9 +99,6 @@ typedef struct SMnode {
bool stopped;
bool restored;
bool deploy;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
char *path;
int64_t checkTime;
SSdb *pSdb;

View File

@ -95,8 +95,8 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
dnodeObj.id = 1;
dnodeObj.createdTime = taosGetTimestampMs();
dnodeObj.updateTime = dnodeObj.createdTime;
dnodeObj.port = pMnode->replicas[0].port;
memcpy(&dnodeObj.fqdn, pMnode->replicas[0].fqdn, TSDB_FQDN_LEN);
dnodeObj.port = tsServerPort;
memcpy(&dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL);

View File

@ -289,11 +289,9 @@ static int32_t mndExecSteps(SMnode *pMnode) {
}
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->replica = pOption->replica;
pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->msgCb = pOption->msgCb;
pMnode->selfDnodeId = pOption->dnodeId;
pMnode->syncMgmt.replica = pOption->replica;
pMnode->syncMgmt.standby = pOption->standby;
}

View File

@ -188,15 +188,15 @@ int32_t mndInitSync(SMnode *pMnode) {
syncInfo.isStandBy = pMgmt->standby;
syncInfo.snapshotEnable = true;
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = pMnode->replica;
pCfg->myIndex = pMnode->selfIndex;
mInfo("start to open mnode sync, replica:%d myindex:%d standby:%d", pCfg->replicaNum, pCfg->myIndex, pMgmt->standby);
for (int32_t i = 0; i < pMnode->replica; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i];
tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = pMnode->replicas[i].port;
mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
mInfo("start to open mnode sync, standby:%d", pMgmt->standby);
if (pMgmt->standby || pMgmt->replica.id > 0) {
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = 1;
pCfg->myIndex = 0;
SNodeInfo *pNode = &pCfg->nodeInfo[0];
tstrncpy(pNode->nodeFqdn, pMgmt->replica.fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = pMgmt->replica.port;
mInfo("fqdn:%s port:%u", pNode->nodeFqdn, pNode->nodePort);
}
tsem_init(&pMgmt->syncSem, 0, 0);

View File

@ -582,6 +582,15 @@ int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
return offset;
}
static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
void *ite = NULL;
while( (ite = taosHashIterate(pInfo->pGroupSet, ite)) != NULL ) {
taosArrayDestroy( ((SDataGroupInfo *)ite)->pPageList);
}
taosHashClear(pInfo->pGroupSet);
clearDiskbasedBuf(pInfo->pBuf);
}
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
SPartitionOperatorInfo* pInfo = pOperator->info;
@ -591,6 +600,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter);
if (pInfo->pGroupIter == NULL) {
doSetOperatorCompleted(pOperator);
clearPartitionOperator(pInfo);
return NULL;
}

View File

@ -93,10 +93,14 @@ int32_t diffFunction(SqlFunctionCtx *pCtx);
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t firstFunction(SqlFunctionCtx *pCtx);
int32_t firstFunctionMerge(SqlFunctionCtx *pCtx);
int32_t lastFunction(SqlFunctionCtx *pCtx);
int32_t lastFunctionMerge(SqlFunctionCtx *pCtx);
int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getFirstLastInfoSize(int32_t resBytes);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);

View File

@ -954,6 +954,41 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return TSDB_CODE_SUCCESS;
}
static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
// first(col_list) will be rewritten as first(col)
if (2 != LIST_LENGTH(pFunc->pParameterList)) { //input has two params c0,ts, is this a bug?
return TSDB_CODE_SUCCESS;
}
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
uint8_t paraType = ((SExprNode*)pPara)->resType.type;
int32_t paraBytes = ((SExprNode*)pPara)->resType.bytes;
if (isPartial) {
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The parameters of first/last can only be columns");
}
pFunc->node.resType = (SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE,
.type = TSDB_DATA_TYPE_BINARY};
} else {
if (TSDB_DATA_TYPE_BINARY != paraType) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = ((SExprNode*)pPara)->resType;
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateFirstLastPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateFirstLastImpl(pFunc, pErrBuf, len, true);
}
static int32_t translateFirstLastMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateFirstLastImpl(pFunc, pErrBuf, len, false);
}
static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
@ -1704,6 +1739,30 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup,
.processFunc = firstFunction,
.finalizeFunc = firstLastFinalize,
.pPartialFunc = "_first_partial",
.pMergeFunc = "_first_merge",
.combineFunc = firstCombine,
},
{
.name = "_first_partial",
.type = FUNCTION_TYPE_FIRST_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateFirstLastPartial,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = firstFunction,
.finalizeFunc = firstLastPartialFinalize,
.combineFunc = firstCombine,
},
{
.name = "_first_merge",
.type = FUNCTION_TYPE_FIRST_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateFirstLastMerge,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = firstFunctionMerge,
.finalizeFunc = firstLastFinalize,
.combineFunc = firstCombine,
},
{
@ -1715,6 +1774,30 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup,
.processFunc = lastFunction,
.finalizeFunc = firstLastFinalize,
.pPartialFunc = "_last_partial",
.pMergeFunc = "_last_merge",
.combineFunc = lastCombine,
},
{
.name = "_last_partial",
.type = FUNCTION_TYPE_LAST_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateFirstLastPartial,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastFunction,
.finalizeFunc = firstLastPartialFinalize,
.combineFunc = lastCombine,
},
{
.name = "_last_merge",
.type = FUNCTION_TYPE_LAST_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateFirstLastMerge,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastFunctionMerge,
.finalizeFunc = firstLastFinalize,
.combineFunc = lastCombine,
},
{

View File

@ -71,6 +71,12 @@ typedef struct STopBotRes {
STopBotResItem* pItems;
} STopBotRes;
typedef struct SFirstLastRes {
bool hasResult;
int32_t bytes;
char buf[];
} SFirstLastRes;
typedef struct SStddevRes {
double result;
int64_t count;
@ -2230,9 +2236,13 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
return TSDB_CODE_SUCCESS;
}
int32_t getFirstLastInfoSize(int32_t resBytes) {
return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t);
}
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
SColumnNode* pNode = (SColumnNode *)nodesListGetNode(pFunc->pParameterList, 0);
pEnv->calcMemSize = sizeof(SFirstLastRes) + pNode->node.resType.bytes + sizeof(int64_t);
return true;
}
@ -2256,12 +2266,13 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
SFirstLastRes *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes;
// All null data column, return directly.
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
@ -2279,7 +2290,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
if (blockDataOrder == TSDB_ORDER_ASC) {
// filter according to current result firstly
if (pResInfo->numOfRes > 0) {
TSKEY ts = *(TSKEY*)(buf + bytes);
TSKEY ts = *(TSKEY*)(pInfo->buf + bytes);
if (ts < startKey) {
return TSDB_CODE_SUCCESS;
}
@ -2295,9 +2306,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
memcpy(buf, data, bytes);
*(TSKEY*)(buf + bytes) = cts;
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) {
memcpy(pInfo->buf, data, bytes);
*(TSKEY*)(pInfo->buf + bytes) = cts;
pInfo->hasResult = true;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1;
@ -2308,7 +2320,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
// in case of descending order time stamp serial, which usually happens as the results of the nest query,
// all data needs to be check.
if (pResInfo->numOfRes > 0) {
TSKEY ts = *(TSKEY*)(buf + bytes);
TSKEY ts = *(TSKEY*)(pInfo->buf + bytes);
if (ts < endKey) {
return TSDB_CODE_SUCCESS;
}
@ -2324,9 +2336,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
memcpy(buf, data, bytes);
*(TSKEY*)(buf + bytes) = cts;
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) {
memcpy(pInfo->buf, data, bytes);
*(TSKEY*)(pInfo->buf + bytes) = cts;
pInfo->hasResult = true;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1;
break;
@ -2342,12 +2355,13 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
SFirstLastRes *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes;
// All null data column, return directly.
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
@ -2372,10 +2386,11 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) {
memcpy(buf, data, bytes);
*(TSKEY*)(buf + bytes) = cts;
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) {
memcpy(pInfo->buf, data, bytes);
*(TSKEY*)(pInfo->buf + bytes) = cts;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pInfo->hasResult = true;
pResInfo->numOfRes = 1;
}
break;
@ -2390,9 +2405,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) {
memcpy(buf, data, bytes);
*(TSKEY*)(buf + bytes) = cts;
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) {
memcpy(pInfo->buf, data, bytes);
*(TSKEY*)(pInfo->buf + bytes) = cts;
pInfo->hasResult = true;
pResInfo->numOfRes = 1;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
}
@ -2404,6 +2420,56 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst) {
if (!pInput->hasResult) {
return;
}
pOutput->bytes = pInput->bytes;
TSKEY *tsIn = (TSKEY*)(pInput->buf + pInput->bytes);
TSKEY *tsOut = (TSKEY*)(pOutput->buf + pInput->bytes);
if (pOutput->hasResult) {
if (isFirst) {
if (*tsIn > *tsOut) {
return;
}
} else {
if (*tsIn < *tsOut) {
return;
}
}
}
*tsOut = *tsIn;
memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
pOutput->hasResult = true;
return;
}
static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx *pCtx, bool isFirstQuery) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start);
SFirstLastRes* pInputInfo = (SFirstLastRes *)varDataVal(data);
firstLastTransferInfo(pInputInfo, pInfo, isFirstQuery);
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
return TSDB_CODE_SUCCESS;
}
int32_t firstFunctionMerge(SqlFunctionCtx *pCtx) {
return firstLastFunctionMergeImpl(pCtx, true);
}
int32_t lastFunctionMerge(SqlFunctionCtx *pCtx) {
return firstLastFunctionMergeImpl(pCtx, false);
}
int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
@ -2411,12 +2477,30 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
char* in = GET_ROWCELL_INTERBUF(pResInfo);
colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pResInfo->isNullRes);
return pResInfo->numOfRes;
}
int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getFirstLastInfoSize(pRes->bytes);
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pRes, resultBytes);
varDataSetLen(res, resultBytes);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
colDataAppend(pCol, pBlock->info.rows, res, false);
taosMemoryFree(res);
return 1;
}
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);

View File

@ -283,6 +283,8 @@ void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
if (!dataBuf->ordered) {
char* pBlockData = pBlocks->data;
// todo. qsort is unstable, if timestamp is same, should get the last one
qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
int32_t i = 0;
@ -350,6 +352,8 @@ int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKey
if (!dataBuf->ordered) {
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
// todo. qsort is unstable, if timestamp is same, should get the last one
qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar);
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;

View File

@ -983,6 +983,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SEpSet epSet = {0};
tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
pCtx->epSet = epSet;
if (!transEpSetIsEqual(&epSet, &pCtx->epSet)) {
pCtx->retryCount = 0;
}
}
addConnToPool(pThrd->pool, pConn);
tTrace("use remote epset, current in use: %d, retry count:%d, try limit: %d", pEpSet->inUse, pCtx->retryCount + 1,

View File

@ -175,7 +175,13 @@ void *taosbsearch(const void *key, const void *base, int32_t nmemb, int32_t size
c = compar(key, p);
if (c == 0) {
break;
if (flags == TD_GT){
lidx = midx + 1;
} else if(flags == TD_LT){
ridx = midx - 1;
}else{
break;
}
} else if (c < 0) {
ridx = midx - 1;
} else {
@ -189,6 +195,10 @@ void *taosbsearch(const void *key, const void *base, int32_t nmemb, int32_t size
return (c <= 0) ? p : (midx + 1 < nmemb ? p + size : NULL);
} else if (flags == TD_LE) {
return (c >= 0) ? p : (midx > 0 ? p - size : NULL);
} else if (flags == TD_GT) {
return (c < 0) ? p : (midx + 1 < nmemb ? p + size : NULL);
} else if (flags == TD_LT) {
return (c > 0) ? p : (midx > 0 ? p - size : NULL);
} else {
ASSERT(0);
}

View File

@ -248,6 +248,170 @@ TEST(testCase, taosbsearch_greater_or_equal) {
ASSERT_EQ(pRet, nullptr);
}
TEST(testCase, taosbsearch_greater) {
// For equal test
int key = 3;
void *pRet = NULL;
pRet = taosbsearch((void *)&key, NULL, 0, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(pRet, nullptr);
// 1 element
int array1[1] = {5};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 5);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(pRet, nullptr);
key = 5;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(pRet, nullptr);
// 2 element
int array2[2] = {3, 6};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 3);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 6);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 6);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(pRet, nullptr);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(pRet, nullptr);
// 3 element
int array3[3] = {3, 6, 8};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 3);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 6);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 6);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 8);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 8);
key = 8;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(pRet, nullptr);
key = 9;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(pRet, nullptr);
// 4 element
int array4[4] = {3, 6, 8, 11};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 3);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 6);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 6);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 8);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 8);
key = 8;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 11);
key = 9;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 11);
key = 11;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(pRet, nullptr);
key = 13;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(pRet, nullptr);
// 5 element
int array5[5] = {3, 6, 8, 11, 15};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 3);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 6);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 6);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 8);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 8);
key = 8;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 11);
key = 9;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 11);
key = 11;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 15);
key = 13;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(*(int *)pRet, 15);
key = 15;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(pRet, nullptr);
key = 17;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT);
ASSERT_EQ(pRet, nullptr);
}
TEST(testCase, taosbsearch_less_or_equal) {
// For equal test
int key = 3;
@ -412,3 +576,167 @@ TEST(testCase, taosbsearch_less_or_equal) {
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 15);
}
TEST(testCase, taosbsearch_less) {
// For equal test
int key = 3;
void *pRet = NULL;
pRet = taosbsearch((void *)&key, NULL, 0, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(pRet, nullptr);
// 1 element
int array1[1] = {5};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(pRet, nullptr);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 5);
key = 5;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(pRet, nullptr);
// 2 element
int array2[2] = {3, 6};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(pRet, nullptr);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(pRet, nullptr);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 3);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 3);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 6);
// 3 element
int array3[3] = {3, 6, 8};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(pRet, nullptr);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(pRet, nullptr);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 3);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 3);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 6);
key = 8;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 6);
key = 9;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 8);
// 4 element
int array4[4] = {3, 6, 8, 11};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(pRet, nullptr);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(pRet, nullptr);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 3);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 3);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 6);
key = 8;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 6);
key = 9;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 8);
key = 11;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 8);
key = 13;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 11);
// 5 element
int array5[5] = {3, 6, 8, 11, 15};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(pRet, nullptr);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(pRet, nullptr);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 3);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 3);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 6);
key = 8;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 6);
key = 9;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 8);
key = 11;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 8);
key = 13;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 11);
key = 15;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 11);
key = 17;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT);
ASSERT_EQ(*(int *)pRet, 15);
}

View File

@ -0,0 +1,439 @@
from random import randint, random
from numpy import equal
import taos
import sys
import datetime
import inspect
from util.log import *
from util.sql import *
from util.cases import *
class TDTestCase:
updatecfgDict = {'debugFlag': 143 ,"cDebugFlag":143,"uDebugFlag":143 ,"rpcDebugFlag":143 , "tmrDebugFlag":143 ,
"jniDebugFlag":143 ,"simDebugFlag":143,"dDebugFlag":143, "dDebugFlag":143,"vDebugFlag":143,"mDebugFlag":143,"qDebugFlag":143,
"wDebugFlag":143,"sDebugFlag":143,"tsdbDebugFlag":143,"tqDebugFlag":143 ,"fsDebugFlag":143 ,"fnDebugFlag":143}
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def prepare_datas(self):
tdSql.execute(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
for i in range(9):
tdSql.execute(
f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
)
tdSql.execute(
f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
)
tdSql.execute("insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
tdSql.execute("insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
tdSql.execute("insert into ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
tdSql.execute("insert into ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
tdSql.execute("insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
tdSql.execute("insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
tdSql.execute("insert into ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
tdSql.execute(
f'''insert into t1 values
( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a )
( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
'''
)
def test_errors(self):
error_sql_lists = [
"select tail from t1",
"select tail(123--123)==1 from t1",
"select tail(123,123) from t1",
"select tail(c1,ts) from t1",
"select tail(c1,c1,ts) from t1",
"select tail(c1) as 'd1' from t1",
"select tail(c1 ,c2 ) from t1",
"select tail(c1 ,NULL) from t1",
"select tail(,) from t1;",
"select tail(tail(c1) ab from t1)",
"select tail(c1) as int from t1",
"select tail('c1') from t1",
"select tail(NULL) from t1",
"select tail('') from t1",
"select tail(c%) from t1",
"select tail(t1) from t1",
"select tail(True) from t1",
"select tail(c1,1) , count(c1) from t1",
"select tail(c1,1) , avg(c1) from t1",
"select tail(c1,1) , min(c1) from t1",
"select tail(c1,1) , spread(c1) from t1",
"select tail(c1,1) , diff(c1) from t1",
"select tail(c1,1) , abs(c1) from t1",
"select tail(c1,1) , c1 from t1",
"select tail from stb1 partition by tbname",
"select tail(123--123)==1 from stb1 partition by tbname",
"select tail(123,123) from stb1 partition by tbname",
"select tail(c1,ts) from stb1 partition by tbname",
"select tail(c1,c1,ts) from stb1 partition by tbname",
"select tail(c1) as 'd1' from stb1 partition by tbname",
"select tail(c1 ,c2 ) from stb1 partition by tbname",
"select tail(c1 ,NULL) from stb1 partition by tbname",
"select tail(,) from stb1 partition by tbname;",
"select tail(tail(c1) ab from stb1 partition by tbname)",
"select tail(c1) as int from stb1 partition by tbname",
"select tail('c1') from stb1 partition by tbname",
"select tail(NULL) from stb1 partition by tbname",
"select tail('') from stb1 partition by tbname",
"select tail(c%) from stb1 partition by tbname",
"select tail(t1) from stb1 partition by tbname",
"select tail(True) from stb1 partition by tbname",
"select tail(c1,1) , count(c1) from stb1 partition by tbname",
"select tail(c1,1) , avg(c1) from stb1 partition by tbname",
"select tail(c1,1) , min(c1) from stb1 partition by tbname",
"select tail(c1,1) , spread(c1) from stb1 partition by tbname",
"select tail(c1,1) , diff(c1) from stb1 partition by tbname",
"select tail(c1,1) , abs(c1) from stb1 partition by tbname",
"select tail(c1,1) , c1 from stb1 partition by tbname"
]
for error_sql in error_sql_lists:
tdSql.error(error_sql)
def support_types(self):
other_no_value_types = [
"select tail(ts,1) from t1" ,
"select tail(c7,1) from t1",
"select tail(c8,1) from t1",
"select tail(c9,1) from t1",
"select tail(ts,1) from ct1" ,
"select tail(c7,1) from ct1",
"select tail(c8,1) from ct1",
"select tail(c9,1) from ct1",
"select tail(ts,1) from ct3" ,
"select tail(c7,1) from ct3",
"select tail(c8,1) from ct3",
"select tail(c9,1) from ct3",
"select tail(ts,1) from ct4" ,
"select tail(c7,1) from ct4",
"select tail(c8,1) from ct4",
"select tail(c9,1) from ct4",
"select tail(ts,1) from stb1 partition by tbname" ,
"select tail(c7,1) from stb1 partition by tbname",
"select tail(c8,1) from stb1 partition by tbname",
"select tail(c9,1) from stb1 partition by tbname"
]
for type_sql in other_no_value_types:
tdSql.query(type_sql)
type_sql_lists = [
"select tail(c1,1) from t1",
"select tail(c2,1) from t1",
"select tail(c3,1) from t1",
"select tail(c4,1) from t1",
"select tail(c5,1) from t1",
"select tail(c6,1) from t1",
"select tail(c1,1) from ct1",
"select tail(c2,1) from ct1",
"select tail(c3,1) from ct1",
"select tail(c4,1) from ct1",
"select tail(c5,1) from ct1",
"select tail(c6,1) from ct1",
"select tail(c1,1) from ct3",
"select tail(c2,1) from ct3",
"select tail(c3,1) from ct3",
"select tail(c4,1) from ct3",
"select tail(c5,1) from ct3",
"select tail(c6,1) from ct3",
"select tail(c1,1) from stb1 partition by tbname",
"select tail(c2,1) from stb1 partition by tbname",
"select tail(c3,1) from stb1 partition by tbname",
"select tail(c4,1) from stb1 partition by tbname",
"select tail(c5,1) from stb1 partition by tbname",
"select tail(c6,1) from stb1 partition by tbname",
"select tail(c6,1) as alisb from stb1 partition by tbname",
"select tail(c6,1) alisb from stb1 partition by tbname",
]
for type_sql in type_sql_lists:
tdSql.query(type_sql)
def check_tail_table(self , tbname , col_name , tail_rows , offset):
tail_sql = f"select tail({col_name} , {tail_rows} , {offset}) from {tbname}"
equal_sql = f"select {col_name} from (select ts , {col_name} from {tbname} order by ts desc limit {tail_rows} offset {offset}) order by ts"
tdSql.query(tail_sql)
tail_result = tdSql.queryResult
tdSql.query(equal_sql)
print(equal_sql)
equal_result = tdSql.queryResult
if tail_result == equal_result:
tdLog.info(" tail query check pass , tail sql is: %s" %tail_sql)
else:
tdLog.exit(" tail query check fail , tail sql is: %s " %tail_sql)
def basic_tail_function(self):
# basic query
tdSql.query("select c1 from ct3")
tdSql.checkRows(0)
tdSql.query("select c1 from t1")
tdSql.checkRows(12)
tdSql.query("select c1 from stb1")
tdSql.checkRows(25)
# used for empty table , ct3 is empty
tdSql.query("select tail(c1,1) from ct3")
tdSql.checkRows(0)
tdSql.query("select tail(c2,1) from ct3")
tdSql.checkRows(0)
tdSql.query("select tail(c3,1) from ct3")
tdSql.checkRows(0)
tdSql.query("select tail(c4,1) from ct3")
tdSql.checkRows(0)
tdSql.query("select tail(c5,1) from ct3")
tdSql.checkRows(0)
tdSql.query("select tail(c6,1) from ct3")
# auto check for t1 table
# used for regular table
tdSql.query("select tail(c1,1) from t1")
tdSql.query("desc t1")
col_lists_rows = tdSql.queryResult
col_lists = []
for col_name in col_lists_rows:
if col_name[0] =="ts":
continue
col_lists.append(col_name[0])
for col in col_lists:
for loop in range(100):
limit = randint(1,100)
offset = randint(0,100)
self.check_tail_table("t1" , col , limit , offset)
# tail for invalid params
tdSql.error("select tail(c1,-10,10) from ct1")
tdSql.error("select tail(c1,10,10000) from ct1")
tdSql.error("select tail(c1,10,-100) from ct1")
tdSql.error("select tail(c1,100/2,10) from ct1")
tdSql.error("select tail(c1,5,10*2) from ct1")
tdSql.query("select tail(c1,100,100) from ct1")
tdSql.checkRows(0)
tdSql.query("select tail(c1,10,100) from ct1")
tdSql.checkRows(0)
tdSql.error("select tail(c1,10,101) from ct1")
tdSql.query("select tail(c1,10,0) from ct1")
tdSql.query("select tail(c1,100,10) from ct1")
tdSql.checkRows(3)
# tail with super tags
tdSql.query("select tail(c1,10,10) from ct1")
tdSql.checkRows(3)
tdSql.error("select tail(c1,10,10),tbname from ct1")
tdSql.error("select tail(c1,10,10),t1 from ct1")
# tail with common col
tdSql.error("select tail(c1,10,10) ,ts from ct1")
tdSql.error("select tail(c1,10,10) ,c1 from ct1")
# tail with scalar function
tdSql.error("select tail(c1,10,10) ,abs(c1) from ct1")
tdSql.error("select tail(c1,10,10) , tail(c2,10,10) from ct1")
tdSql.error("select tail(c1,10,10) , abs(c2)+2 from ct1")
# bug need fix for scalar value or compute again
# tdSql.error(" select tail(c1,10,10) , 123 from ct1")
# tdSql.error(" select abs(tail(c1,10,10)) from ct1")
# tdSql.error(" select abs(tail(c1,10,10)) + 2 from ct1")
# tail with aggregate function
tdSql.error("select tail(c1,10,10) ,sum(c1) from ct1")
tdSql.error("select tail(c1,10,10) ,max(c1) from ct1")
tdSql.error("select tail(c1,10,10) ,csum(c1) from ct1")
tdSql.error("select tail(c1,10,10) ,count(c1) from ct1")
# tail with filter where
tdSql.query("select tail(c1,3,1) from ct4 where c1 is null")
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None)
tdSql.query("select tail(c1,3,2) from ct4 where c1 >2 ")
tdSql.checkData(0, 0, 7)
tdSql.checkData(1, 0, 6)
tdSql.checkData(2, 0, 5)
tdSql.query("select tail(c1,2,1) from ct4 where c2 between 0 and 99999")
tdSql.checkData(0, 0, 2)
tdSql.checkData(1, 0, 1)
# tail with union all
tdSql.query("select tail(c1,2,1) from ct4 union all select c1 from ct1")
tdSql.checkRows(15)
tdSql.query("select tail(c1,2,1) from ct4 union all select c1 from ct2")
tdSql.checkRows(2)
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 0)
tdSql.query("select tail(c2,2,1) from ct4 union all select abs(c2)/2 from ct4")
tdSql.checkRows(14)
# tail with join
# prepare join datas with same ts
tdSql.execute(" use db ")
tdSql.execute(" create stable st1 (ts timestamp , num int) tags(ind int)")
tdSql.execute(" create table tb1 using st1 tags(1)")
tdSql.execute(" create table tb2 using st1 tags(2)")
tdSql.execute(" create stable st2 (ts timestamp , num int) tags(ind int)")
tdSql.execute(" create table ttb1 using st2 tags(1)")
tdSql.execute(" create table ttb2 using st2 tags(2)")
start_ts = 1622369635000 # 2021-05-30 18:13:55
for i in range(10):
ts_value = start_ts+i*1000
tdSql.execute(f" insert into tb1 values({ts_value} , {i})")
tdSql.execute(f" insert into tb2 values({ts_value} , {i})")
tdSql.execute(f" insert into ttb1 values({ts_value} , {i})")
tdSql.execute(f" insert into ttb2 values({ts_value} , {i})")
tdSql.query("select tail(tb2.num,3,2) from tb1, tb2 where tb1.ts=tb2.ts ")
tdSql.checkRows(3)
tdSql.checkData(0,0,5)
tdSql.checkData(1,0,6)
tdSql.checkData(2,0,7)
# nest query
# tdSql.query("select tail(c1,2) from (select c1 from ct1)")
tdSql.query("select c1 from (select tail(c1,2) c1 from ct4)")
tdSql.checkRows(2)
tdSql.checkData(0, 0, 0)
tdSql.checkData(1, 0, None)
tdSql.query("select sum(c1) from (select tail(c1,2) c1 from ct1)")
tdSql.checkRows(1)
tdSql.checkData(0, 0, 18)
tdSql.query("select abs(c1) from (select tail(c1,2) c1 from ct1)")
tdSql.checkRows(2)
tdSql.checkData(0, 0, 9)
#partition by tbname
tdSql.query(" select tail(c1,5) from stb1 partition by tbname ")
tdSql.checkRows(10)
tdSql.query(" select tail(c1,3) from stb1 partition by tbname ")
tdSql.checkRows(6)
# group by
tdSql.error("select tail(c1,2) from ct1 group by c1")
tdSql.error("select tail(c1,2) from ct1 group by tbname")
# super table
def check_boundary_values(self):
tdSql.execute("drop database if exists bound_test")
tdSql.execute("create database if not exists bound_test")
tdSql.execute("use bound_test")
tdSql.execute(
"create table stb_bound (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(32),c9 nchar(32), c10 timestamp) tags (t1 int);"
)
tdSql.execute(f'create table sub1_bound using stb_bound tags ( 1 )')
tdSql.execute(
f"insert into sub1_bound values ( now()-1s, 2147483647, 9223372036854775807, 32767, 127, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.execute(
f"insert into sub1_bound values ( now(), 2147483646, 9223372036854775806, 32766, 126, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.execute(
f"insert into sub1_bound values ( now(), -2147483646, -9223372036854775806, -32766, -126, -3.40E+38, -1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.execute(
f"insert into sub1_bound values ( now(), 2147483643, 9223372036854775803, 32763, 123, 3.39E+38, 1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.execute(
f"insert into sub1_bound values ( now(), -2147483643, -9223372036854775803, -32763, -123, -3.39E+38, -1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.error(
f"insert into sub1_bound values ( now()+1s, 2147483648, 9223372036854775808, 32768, 128, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.query("select tail(c2,2) from sub1_bound")
tdSql.checkRows(2)
tdSql.checkData(0,0,9223372036854775803)
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table ==============")
self.prepare_datas()
tdLog.printNoPrefix("==========step2:test errors ==============")
self.test_errors()
tdLog.printNoPrefix("==========step3:support types ============")
self.support_types()
tdLog.printNoPrefix("==========step4: tail basic query ============")
self.basic_tail_function()
tdLog.printNoPrefix("==========step5: tail boundary query ============")
self.check_boundary_values()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -95,6 +95,7 @@ python3 ./test.py -f 2-query/unique.py
python3 ./test.py -f 2-query/stateduration.py
python3 ./test.py -f 2-query/function_stateduration.py
python3 ./test.py -f 2-query/statecount.py
python3 ./test.py -f 2-query/tail.py
python3 ./test.py -f 6-cluster/5dnode1mnode.py
python3 ./test.py -f 6-cluster/5dnode2mnode.py