diff --git a/docs/en/12-taos-sql/12-keywords.md b/docs/en/12-taos-sql/12-keywords.md index ed0c96b4e4..6d40deb5a6 100644 --- a/docs/en/12-taos-sql/12-keywords.md +++ b/docs/en/12-taos-sql/12-keywords.md @@ -4,50 +4,275 @@ title: Keywords There are about 200 keywords reserved by TDengine, they can't be used as the name of database, STable or table with either upper case, lower case or mixed case. -**Keywords List** +## Keywords List -| | | | | | -| ----------- | ---------- | --------- | ---------- | ------------ | -| ABORT | CREATE | IGNORE | NULL | STAR | -| ACCOUNT | CTIME | IMMEDIATE | OF | STATE | -| ACCOUNTS | DATABASE | IMPORT | OFFSET | STATEMENT | -| ADD | DATABASES | IN | OR | STATE_WINDOW | -| AFTER | DAYS | INITIALLY | ORDER | STORAGE | -| ALL | DBS | INSERT | PARTITIONS | STREAM | -| ALTER | DEFERRED | INSTEAD | PASS | STREAMS | -| AND | DELIMITERS | INT | PLUS | STRING | -| AS | DESC | INTEGER | PPS | SYNCDB | -| ASC | DESCRIBE | INTERVAL | PRECISION | TABLE | -| ATTACH | DETACH | INTO | PREV | TABLES | -| BEFORE | DISTINCT | IS | PRIVILEGE | TAG | -| BEGIN | DIVIDE | ISNULL | QTIME | TAGS | -| BETWEEN | DNODE | JOIN | QUERIES | TBNAME | -| BIGINT | DNODES | KEEP | QUERY | TIMES | -| BINARY | DOT | KEY | QUORUM | TIMESTAMP | -| BITAND | DOUBLE | KILL | RAISE | TINYINT | -| BITNOT | DROP | LE | REM | TOPIC | -| BITOR | EACH | LIKE | REPLACE | TOPICS | -| BLOCKS | END | LIMIT | REPLICA | TRIGGER | -| BOOL | EQ | LINEAR | RESET | TSERIES | -| BY | EXISTS | LOCAL | RESTRICT | UMINUS | -| CACHE | EXPLAIN | LP | ROW | UNION | -| CACHELAST | FAIL | LSHIFT | RP | UNSIGNED | -| CASCADE | FILE | LT | RSHIFT | UPDATE | -| CHANGE | FILL | MATCH | SCORES | UPLUS | -| CLUSTER | FLOAT | MAXROWS | SELECT | USE | -| COLON | FOR | MINROWS | SEMI | USER | -| COLUMN | FROM | MINUS | SESSION | USERS | -| COMMA | FSYNC | MNODES | SET | USING | -| COMP | GE | MODIFY | SHOW | VALUES | -| COMPACT | GLOB | MODULES | SLASH | VARIABLE | -| CONCAT | GRANTS | NCHAR | SLIDING | VARIABLES | -| CONFLICT | GROUP | NE | SLIMIT | VGROUPS | -| CONNECTION | GT | NONE | SMALLINT | VIEW | -| CONNECTIONS | HAVING | NOT | SOFFSET | VNODES | -| CONNS | ID | NOTNULL | STable | WAL | -| COPY | IF | NOW | STableS | WHERE | -| _C0 | _QSTART | _QSTOP | _QDURATION | _WSTART | -| _WSTOP | _WDURATION | _ROWTS | +### A + +- ABORT +- ACCOUNT +- ACCOUNTS +- ADD +- AFTER +- ALL +- ALTER +- AND +- AS +- ASC +- ATTACH + +### B + +- BEFORE +- BEGIN +- BETWEEN +- BIGINT +- BINARY +- BITAND +- BITNOT +- BITOR +- BLOCKS +- BOOL +- BY + +### C + +- CACHE +- CACHELAST +- CASCADE +- CHANGE +- CLUSTER +- COLON +- COLUMN +- COMMA +- COMP +- COMPACT +- CONCAT +- CONFLICT +- CONNECTION +- CONNECTIONS +- CONNS +- COPY +- CREATE +- CTIME + +### D + +- DATABASE +- DATABASES +- DAYS +- DBS +- DEFERRED +- DELETE +- DELIMITERS +- DESC +- DESCRIBE +- DETACH +- DISTINCT +- DIVIDE +- DNODE +- DNODES +- DOT +- DOUBLE +- DROP + +### E + +- END +- EQ +- EXISTS +- EXPLAIN + +### F + +- FAIL +- FILE +- FILL +- FLOAT +- FOR +- FROM +- FSYNC + +### G + +- GE +- GLOB +- GRANTS +- GROUP +- GT + +### H + +- HAVING + +### I + +- ID +- IF +- IGNORE +- IMMEDIA +- IMPORT +- IN +- INITIAL +- INSERT +- INSTEAD +- INT +- INTEGER +- INTERVA +- INTO +- IS +- ISNULL + +### J + +- JOIN + +### K + +- KEEP +- KEY +- KILL + +### L + +- LE +- LIKE +- LIMIT +- LINEAR +- LOCAL +- LP +- LSHIFT +- LT + +### M + +- MATCH +- MAXROWS +- MINROWS +- MINUS +- MNODES +- MODIFY +- MODULES + +### N + +- NE +- NONE +- NOT +- NOTNULL +- NOW +- NULL + +### O + +- OF +- OFFSET +- OR +- ORDER + +### P + +- PARTITION +- PASS +- PLUS +- PPS +- PRECISION +- PREV +- PRIVILEGE + +### Q + +- QTIME +- QUERIE +- QUERY +- QUORUM + +### R + +- RAISE +- REM +- REPLACE +- REPLICA +- RESET +- RESTRIC +- ROW +- RP +- RSHIFT + +### S + +- SCORES +- SELECT +- SEMI +- SESSION +- SET +- SHOW +- SLASH +- SLIDING +- SLIMIT +- SMALLIN +- SOFFSET +- STable +- STableS +- STAR +- STATE +- STATEMEN +- STATE_WI +- STORAGE +- STREAM +- STREAMS +- STRING +- SYNCDB + +### T + +- TABLE +- TABLES +- TAG +- TAGS +- TBNAME +- TIMES +- TIMESTAMP +- TINYINT +- TOPIC +- TOPICS +- TRIGGER +- TSERIES + +### U + +- UMINUS +- UNION +- UNSIGNED +- UPDATE +- UPLUS +- USE +- USER +- USERS +- USING + +### V + +- VALUES +- VARIABLE +- VARIABLES +- VGROUPS +- VIEW +- VNODES + +### W + +- WAL +- WHERE + +### _ + +- _C0 +- _QSTART +- _QSTOP +- _QDURATION +- _WSTART +- _WSTOP +- _WDURATION ## Explanations ### TBNAME diff --git a/docs/zh/12-taos-sql/12-keywords.md b/docs/zh/12-taos-sql/12-keywords.md index 5c68e5da7e..202f223b45 100644 --- a/docs/zh/12-taos-sql/12-keywords.md +++ b/docs/zh/12-taos-sql/12-keywords.md @@ -45,48 +45,274 @@ title: TDengine 参数限制与保留关键字 目前 TDengine 有将近 200 个内部保留关键字,这些关键字无论大小写均不可以用作库名、表名、STable 名、数据列名及标签列名等。这些关键字列表如下: -| 关键字列表 | | | | | -| ----------- | ---------- | --------- | ---------- | ------------ | -| ABORT | CREATE | IGNORE | NULL | STAR | -| ACCOUNT | CTIME | IMMEDIATE | OF | STATE | -| ACCOUNTS | DATABASE | IMPORT | OFFSET | STATEMENT | -| ADD | DATABASES | IN | OR | STATE_WINDOW | -| AFTER | DAYS | INITIALLY | ORDER | STORAGE | -| ALL | DBS | INSERT | PARTITIONS | STREAM | -| ALTER | DEFERRED | INSTEAD | PASS | STREAMS | -| AND | DELIMITERS | INT | PLUS | STRING | -| AS | DESC | INTEGER | PPS | SYNCDB | -| ASC | DESCRIBE | INTERVAL | PRECISION | TABLE | -| ATTACH | DETACH | INTO | PREV | TABLES | -| BEFORE | DISTINCT | IS | PRIVILEGE | TAG | -| BEGIN | DIVIDE | ISNULL | QTIME | TAGS | -| BETWEEN | DNODE | JOIN | QUERIES | TBNAME | -| BIGINT | DNODES | KEEP | QUERY | TIMES | -| BINARY | DOT | KEY | QUORUM | TIMESTAMP | -| BITAND | DOUBLE | KILL | RAISE | TINYINT | -| BITNOT | DROP | LE | REM | TOPIC | -| BITOR | EACH | LIKE | REPLACE | TOPICS | -| BLOCKS | END | LIMIT | REPLICA | TRIGGER | -| BOOL | EQ | LINEAR | RESET | TSERIES | -| BY | EXISTS | LOCAL | RESTRICT | UMINUS | -| CACHE | EXPLAIN | LP | ROW | UNION | -| CACHELAST | FAIL | LSHIFT | RP | UNSIGNED | -| CASCADE | FILE | LT | RSHIFT | UPDATE | -| CHANGE | FILL | MATCH | SCORES | UPLUS | -| CLUSTER | FLOAT | MAXROWS | SELECT | USE | -| COLON | FOR | MINROWS | SEMI | USER | -| COLUMN | FROM | MINUS | SESSION | USERS | -| COMMA | FSYNC | MNODES | SET | USING | -| COMP | GE | MODIFY | SHOW | VALUES | -| COMPACT | GLOB | MODULES | SLASH | VARIABLE | -| CONCAT | GRANTS | NCHAR | SLIDING | VARIABLES | -| CONFLICT | GROUP | NE | SLIMIT | VGROUPS | -| CONNECTION | GT | NONE | SMALLINT | VIEW | -| CONNECTIONS | HAVING | NOT | SOFFSET | VNODES | -| CONNS | ID | NOTNULL | STABLE | WAL | -| COPY | IF | NOW | STABLES | WHERE | -| _C0 | _QSTART | _QSTOP | _QDURATION | _WSTART | -| _WSTOP | _WDURATION | _ROWTS | +### A + +- ABORT +- ACCOUNT +- ACCOUNTS +- ADD +- AFTER +- ALL +- ALTER +- AND +- AS +- ASC +- ATTACH + +### B + +- BEFORE +- BEGIN +- BETWEEN +- BIGINT +- BINARY +- BITAND +- BITNOT +- BITOR +- BLOCKS +- BOOL +- BY + +### C + +- CACHE +- CACHELAST +- CASCADE +- CHANGE +- CLUSTER +- COLON +- COLUMN +- COMMA +- COMP +- COMPACT +- CONCAT +- CONFLICT +- CONNECTION +- CONNECTIONS +- CONNS +- COPY +- CREATE +- CTIME + +### D + +- DATABASE +- DATABASES +- DAYS +- DBS +- DEFERRED +- DELETE +- DELIMITERS +- DESC +- DESCRIBE +- DETACH +- DISTINCT +- DIVIDE +- DNODE +- DNODES +- DOT +- DOUBLE +- DROP + +### E + +- END +- EQ +- EXISTS +- EXPLAIN + +### F + +- FAIL +- FILE +- FILL +- FLOAT +- FOR +- FROM +- FSYNC + +### G + +- GE +- GLOB +- GRANTS +- GROUP +- GT + +### H + +- HAVING + +### I + +- ID +- IF +- IGNORE +- IMMEDIA +- IMPORT +- IN +- INITIAL +- INSERT +- INSTEAD +- INT +- INTEGER +- INTERVA +- INTO +- IS +- ISNULL + +### J + +- JOIN + +### K + +- KEEP +- KEY +- KILL + +### L + +- LE +- LIKE +- LIMIT +- LINEAR +- LOCAL +- LP +- LSHIFT +- LT + +### M + +- MATCH +- MAXROWS +- MINROWS +- MINUS +- MNODES +- MODIFY +- MODULES + +### N + +- NE +- NONE +- NOT +- NOTNULL +- NOW +- NULL + +### O + +- OF +- OFFSET +- OR +- ORDER + +### P + +- PARTITION +- PASS +- PLUS +- PPS +- PRECISION +- PREV +- PRIVILEGE + +### Q + +- QTIME +- QUERIE +- QUERY +- QUORUM + +### R + +- RAISE +- REM +- REPLACE +- REPLICA +- RESET +- RESTRIC +- ROW +- RP +- RSHIFT + +### S + +- SCORES +- SELECT +- SEMI +- SESSION +- SET +- SHOW +- SLASH +- SLIDING +- SLIMIT +- SMALLIN +- SOFFSET +- STable +- STableS +- STAR +- STATE +- STATEMEN +- STATE_WI +- STORAGE +- STREAM +- STREAMS +- STRING +- SYNCDB + +### T + +- TABLE +- TABLES +- TAG +- TAGS +- TBNAME +- TIMES +- TIMESTAMP +- TINYINT +- TOPIC +- TOPICS +- TRIGGER +- TSERIES + +### U + +- UMINUS +- UNION +- UNSIGNED +- UPDATE +- UPLUS +- USE +- USER +- USERS +- USING + +### V + +- VALUES +- VARIABLE +- VARIABLES +- VGROUPS +- VIEW +- VNODES + +### W + +- WAL +- WHERE + +### _ + +- _C0 +- _QSTART +- _QSTOP +- _QDURATION +- _WSTART +- _WSTOP +- _WDURATION + ## 特殊说明 ### TBNAME diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index ab090940f2..99b704826d 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -32,9 +32,7 @@ typedef struct { int32_t dnodeId; bool standby; bool deploy; - int8_t replica; - int8_t selfIndex; - SReplica replicas[TSDB_MAX_REPLICA]; + SReplica replica; SMsgCb msgCb; } SMnodeOpt; @@ -83,6 +81,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); */ int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); +int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg); /** * @brief Generate machine code diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 7eca020371..b5641f2d07 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -138,6 +138,10 @@ typedef enum EFunctionType { FUNCTION_TYPE_TOP_MERGE, FUNCTION_TYPE_BOTTOM_PARTIAL, FUNCTION_TYPE_BOTTOM_MERGE, + FUNCTION_TYPE_FIRST_PARTIAL, + FUNCTION_TYPE_FIRST_MERGE, + FUNCTION_TYPE_LAST_PARTIAL, + FUNCTION_TYPE_LAST_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index cf1d5b7828..ea1c903f53 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1327,8 +1327,8 @@ static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) { static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) { SHashObj *s1 = *(SHashObj **)key1; SHashObj *s2 = *(SHashObj **)key2; - SSmlKv *kv1 = (SSmlKv *)taosHashGet(s1, TS, TS_LEN); - SSmlKv *kv2 = (SSmlKv *)taosHashGet(s2, TS, TS_LEN); + SSmlKv *kv1 = *(SSmlKv **)taosHashGet(s1, TS, TS_LEN); + SSmlKv *kv2 = *(SSmlKv **)taosHashGet(s2, TS, TS_LEN); ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP); ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP); if (kv1->i < kv2->i) { @@ -1340,29 +1340,13 @@ static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) { } } -static int32_t smlDealCols(SSmlTableInfo *oneTable, bool dataFormat, SArray *cols) { - if (dataFormat) { - void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GE); - if (p == NULL) { +static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){ + if(dataFormat){ + void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GT); + if(p == NULL){ taosArrayPush(oneTable->cols, &cols); - } else { // to make the sort stable for update data - SArray *sa = (SArray *)p; - SSmlKv *cur = (SSmlKv *)taosArrayGet(sa, 0); - SSmlKv *dCur = (SSmlKv *)taosArrayGet(cols, 0); - if (cur->i > dCur->i) { - taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols); - } else { - ASSERT(cur->i == dCur->i); - int32_t index = TARRAY_ELEM_IDX(oneTable->cols, p) + 1; - for (; index < taosArrayGetSize(oneTable->cols); index++) { - SArray *tmp = (SArray *)taosArrayGet(oneTable->cols, index); - SSmlKv *curTs = (SSmlKv *)taosArrayGet(tmp, 0); - if (curTs->i > dCur->i) { - break; - } - } - taosArrayInsert(oneTable->cols, index, &cols); - } + }else{ + taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols); } return TSDB_CODE_SUCCESS; } @@ -1377,27 +1361,11 @@ static int32_t smlDealCols(SSmlTableInfo *oneTable, bool dataFormat, SArray *col taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); } - void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GE); - if (p == NULL) { + void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GT); + if(p == NULL){ taosArrayPush(oneTable->cols, &kvHash); - } else { // to make the sort stable for update data - SHashObj *sa = (SHashObj *)p; - SSmlKv *cur = (SSmlKv *)taosHashGet(sa, TS, TS_LEN); - SSmlKv *dCur = (SSmlKv *)taosArrayGet(cols, 0); - if (cur->i > dCur->i) { - taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols); - } else { - ASSERT(cur->i == dCur->i); - int32_t index = TARRAY_ELEM_IDX(oneTable->cols, p) + 1; - for (; index < taosArrayGetSize(oneTable->cols); index++) { - SHashObj *tmp = (SHashObj *)taosArrayGet(oneTable->cols, index); - SSmlKv *curTs = (SSmlKv *)taosHashGet(tmp, TS, TS_LEN); - if (curTs->i > dCur->i) { - break; - } - } - taosArrayInsert(oneTable->cols, index, &cols); - } + }else{ + taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &kvHash); } return TSDB_CODE_SUCCESS; } diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 4ad73a6424..fffb03d9a5 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -1260,4 +1260,28 @@ TEST(testCase, sml_16368_Test) { pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_MICRO_SECONDS); ASSERT_EQ(taos_errno(pRes), 0); taos_free_result(pRes); -}*/ +} + +TEST(testCase, sml_dup_time_Test) { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(taos, nullptr); + + TAOS_RES* pRes = taos_query(taos, "create database if not exists dup_time schemaless 1"); + taos_free_result(pRes); + + const char *sql[] = { + //"test_ms,t0=t c0=f 1626006833641", + "ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=false,c1=1i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"xcxvwjvf\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000", + "ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=T,c1=2i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"fixrzcuq\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000", + "ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=3i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"iupzdqub\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000", + "ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=4i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"yvvtzzof\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000", + "ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=5i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"vbxpilkj\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000" + }; + pRes = taos_query(taos, "use dup_time"); + taos_free_result(pRes); + + pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, 0); + ASSERT_EQ(taos_errno(pRes), 0); + taos_free_result(pRes); +} +*/ diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 22691300bc..c5c3d76f1e 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -34,39 +34,31 @@ typedef struct SMnodeMgmt { SSingleWorker writeWorker; SSingleWorker syncWorker; SSingleWorker monitorWorker; - SReplica replicas[TSDB_MAX_REPLICA]; - int8_t replica; bool stopped; int32_t refCount; TdThreadRwlock lock; } SMnodeMgmt; // mmFile.c -int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed); -int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed); - -// mmInt.c -int32_t mmAcquire(SMnodeMgmt *pMgmt); -void mmRelease(SMnodeMgmt *pMgmt); +int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed); +int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed); // mmHandle.c SArray *mmGetMsgHandles(); int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); -int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg); // mmWorker.c int32_t mmStartWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt); -int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutRpcMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc); +int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index 478d6abd52..27a35ae17a 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "mmInt.h" -int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { +int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed) { int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t len = 0; int32_t maxLen = 4096; @@ -52,61 +52,54 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { } *pDeployed = deployed->valueint; - cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); - if (mnodes != NULL) { - if (!mnodes || mnodes->type != cJSON_Array) { - dError("failed to read %s since nodes not found", file); + cJSON *id = cJSON_GetObjectItem(root, "id"); + if (id) { + if (id->type != cJSON_Number) { + dError("failed to read %s since id not found", file); goto _OVER; } - - pMgmt->replica = cJSON_GetArraySize(mnodes); - if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { - dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica); - goto _OVER; - } - - for (int32_t i = 0; i < pMgmt->replica; ++i) { - cJSON *node = cJSON_GetArrayItem(mnodes, i); - if (node == NULL) break; - - SReplica *pReplica = &pMgmt->replicas[i]; - - cJSON *id = cJSON_GetObjectItem(node, "id"); - if (!id || id->type != cJSON_Number) { - dError("failed to read %s since id not found", file); - goto _OVER; - } + if (pReplica) { pReplica->id = id->valueint; + } + } - cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); - if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { - dError("failed to read %s since fqdn not found", file); - goto _OVER; - } + cJSON *fqdn = cJSON_GetObjectItem(root, "fqdn"); + if (fqdn) { + if (fqdn->type != cJSON_String || fqdn->valuestring == NULL) { + dError("failed to read %s since fqdn not found", file); + goto _OVER; + } + if (pReplica) { tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); + } + } - cJSON *port = cJSON_GetObjectItem(node, "port"); - if (!port || port->type != cJSON_Number) { - dError("failed to read %s since port not found", file); - goto _OVER; - } - pReplica->port = port->valueint; + cJSON *port = cJSON_GetObjectItem(root, "port"); + if (port) { + if (port->type != cJSON_Number) { + dError("failed to read %s since port not found", file); + goto _OVER; + } + if (pReplica) { + pReplica->port = (uint16_t)port->valueint; } } code = 0; - dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed); _OVER: if (content != NULL) taosMemoryFree(content); if (root != NULL) cJSON_Delete(root); if (pFile != NULL) taosCloseFile(&pFile); + if (code == 0) { + dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed); + } terrno = code; return code; } -int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) { +int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed) { char file[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP); @@ -124,26 +117,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) { char *content = taosMemoryCalloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - - int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica); - if (replica > 0) { - len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); - for (int32_t i = 0; i < replica; ++i) { - SReplica *pReplica = &pMgmt->replicas[i]; - if (pMsg != NULL) { - pReplica = &pMsg->replicas[i]; - } - len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); - len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); - len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); - if (i < replica - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }],\n"); - } - } + if (pReplica != NULL && pReplica->id > 0) { + len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); + len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); + len += snprintf(content + len, maxLen - len, " \"port\": %u\n,", pReplica->port); } - len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed); len += snprintf(content + len, maxLen - len, "}\n"); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index c47988e1ef..e91f4b8cf4 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -20,11 +20,6 @@ void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) { mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->grant); } -void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { - pInfo->isMnode = 1; - mndGetLoad(pMgmt->pMnode, &pInfo->load); -} - int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonMmInfo mmInfo = {0}; mmGetMonitorInfo(pMgmt, &mmInfo); @@ -50,6 +45,11 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } +void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { + pInfo->isMnode = 1; + mndGetLoad(pMgmt->pMnode, &pInfo->load); +} + int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonMloadInfo mloads = {0}; mmGetMnodeLoads(pMgmt, &mloads); @@ -90,7 +90,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { SMnodeMgmt mgmt = {0}; mgmt.path = pInput->path; mgmt.name = pInput->name; - if (mmWriteFile(&mgmt, &createReq, deployed) != 0) { + if (mmWriteFile(&mgmt, &createReq.replicas[0], deployed) != 0) { dError("failed to write mnode file since %s", terrstr()); return -1; } @@ -126,117 +126,117 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { SArray *mmGetMsgHandles() { int32_t code = -1; - SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle)); + SArray *pArray = taosArrayInit(128, sizeof(SMgmtHandle)); if (pArray == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_QNODE_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_BNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_BNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_REDISTRIBUTE_VGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MERGE_VGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_QNODE_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_BNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_BNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_REDISTRIBUTE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MERGE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutMsgToMonitorQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutMsgToMonitorQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY, mmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY, mmPutMsgToSyncQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 0f3c06cb3a..b7124dfaa5 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -27,7 +27,7 @@ static bool mmDeployRequired(const SMgmtInputOpt *pInput) { static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) { SMnodeMgmt mgmt = {0}; mgmt.path = pInput->path; - if (mmReadFile(&mgmt, required) != 0) { + if (mmReadFile(&mgmt, NULL, required) != 0) { return -1; } @@ -43,33 +43,19 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu pOption->deploy = true; pOption->msgCb = pMgmt->msgCb; pOption->dnodeId = pMgmt->pData->dnodeId; - - pOption->replica = 1; - pOption->selfIndex = 0; - - SReplica *pReplica = &pOption->replicas[0]; - pReplica->id = 1; - pReplica->port = tsServerPort; - tstrncpy(pReplica->fqdn, tsLocalFqdn, TSDB_FQDN_LEN); + pOption->replica.id = 1; + pOption->replica.port = tsServerPort; + tstrncpy(pOption->replica.fqdn, tsLocalFqdn, TSDB_FQDN_LEN); } -static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { - pOption->deploy = false; +static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, const SReplica *pReplica, SMnodeOpt *pOption) { pOption->standby = false; + pOption->deploy = false; pOption->msgCb = pMgmt->msgCb; pOption->dnodeId = pMgmt->pData->dnodeId; - - if (pMgmt->replica > 0) { + if (pReplica->id > 0) { pOption->standby = true; - pOption->replica = 1; - pOption->selfIndex = 0; - SReplica *pReplica = &pOption->replicas[0]; - for (int32_t i = 0; i < pMgmt->replica; ++i) { - if (pMgmt->replicas[i].id != pMgmt->pData->dnodeId) continue; - pReplica->id = pMgmt->replicas[i].id; - pReplica->port = pMgmt->replicas[i].port; - memcpy(pReplica->fqdn, pMgmt->replicas[i].fqdn, TSDB_FQDN_LEN); - } + pOption->replica = *pReplica; } } @@ -105,12 +91,13 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->path = pInput->path; pMgmt->name = pInput->name; pMgmt->msgCb = pInput->msgCb; - pMgmt->msgCb.putToQueueFp = (PutToQueueFp)mmPutRpcMsgToQueue; + pMgmt->msgCb.putToQueueFp = (PutToQueueFp)mmPutMsgToQueue; pMgmt->msgCb.mgmt = pMgmt; taosThreadRwlockInit(&pMgmt->lock, NULL); - bool deployed = false; - if (mmReadFile(pMgmt, &deployed) != 0) { + bool deployed = false; + SReplica replica = {0}; + if (mmReadFile(pMgmt, &replica, &deployed) != 0) { dError("failed to read file since %s", terrstr()); mmClose(pMgmt); return -1; @@ -123,7 +110,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { mmBuildOptionForDeploy(pMgmt, pInput, &option); } else { dInfo("mnode start to open"); - mmBuildOptionForOpen(pMgmt, &option); + mmBuildOptionForOpen(pMgmt, &replica, &option); } pMgmt->pMnode = mndOpen(pMgmt->path, &option); @@ -141,8 +128,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { } tmsgReportStartup("mnode-worker", "initialized"); - if (!deployed || pMgmt->replica > 0) { - pMgmt->replica = 0; + if (!deployed || replica.id > 0) { deployed = true; if (mmWriteFile(pMgmt, NULL, deployed) != 0) { dError("failed to write mnode file since %s", terrstr()); @@ -178,22 +164,3 @@ SMgmtFunc mmGetMgmtFunc() { return mgmtFunc; } - -int32_t mmAcquire(SMnodeMgmt *pMgmt) { - int32_t code = 0; - - taosThreadRwlockRdlock(&pMgmt->lock); - if (pMgmt->stopped) { - code = -1; - } else { - atomic_add_fetch_32(&pMgmt->refCount, 1); - } - taosThreadRwlockUnlock(&pMgmt->lock); - return code; -} - -void mmRelease(SMnodeMgmt *pMgmt) { - taosThreadRwlockRdlock(&pMgmt->lock); - atomic_sub_fetch_32(&pMgmt->refCount, 1); - taosThreadRwlockUnlock(&pMgmt->lock); -} \ No newline at end of file diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 53943b61b0..ee2df5f089 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -16,6 +16,25 @@ #define _DEFAULT_SOURCE #include "mmInt.h" +static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) { + int32_t code = 0; + + taosThreadRwlockRdlock(&pMgmt->lock); + if (pMgmt->stopped) { + code = -1; + } else { + atomic_add_fetch_32(&pMgmt->refCount, 1); + } + taosThreadRwlockUnlock(&pMgmt->lock); + return code; +} + +static inline void mmRelease(SMnodeMgmt *pMgmt) { + taosThreadRwlockRdlock(&pMgmt->lock); + atomic_sub_fetch_32(&pMgmt->refCount, 1); + taosThreadRwlockUnlock(&pMgmt->lock); +} + static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rsp = { .code = code, @@ -26,7 +45,7 @@ static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) { tmsgSendRsp(&rsp); } -static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { +static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; dTrace("msg:%p, get from mnode queue", pMsg); @@ -53,11 +72,10 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { +static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; - dTrace("msg:%p, get from mnode-sync queue", pMsg); - pMsg->info.node = pMgmt->pMnode; + dTrace("msg:%p, get from mnode-sync queue", pMsg); SMsgHead *pHead = pMsg->pCont; pHead->contLen = ntohl(pHead->contLen); @@ -70,66 +88,69 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { - dTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); - taosWriteQitem(pWorker->queue, pMsg); - return 0; +static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) { + if (mmAcquire(pMgmt) == 0) { + dTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); + taosWriteQitem(pWorker->queue, pMsg); + mmRelease(pMgmt); + return 0; + } else { + dTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, terrstr(), + TMSG_INFO(pMsg->msgType)); + return -1; + } } -int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg); +int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg); } -int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); +int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg); } -int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); +int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); } -int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - mndPreprocessQueryMsg(pMgmt->pMnode, pMsg); - - return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); +int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + if (mndPreprocessQueryMsg(pMgmt->pMnode, pMsg) != 0) { + dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); + return -1; + } + return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg); } -int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg); +int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutMsgToWorker(pMgmt, &pMgmt->monitorWorker, pMsg); } -int32_t mmPutRpcMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { +int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { + SSingleWorker *pWorker = NULL; + switch (qtype) { + case WRITE_QUEUE: + pWorker = &pMgmt->writeWorker; + break; + case QUERY_QUEUE: + pWorker = &pMgmt->queryWorker; + break; + case READ_QUEUE: + pWorker = &pMgmt->readWorker; + break; + case SYNC_QUEUE: + pWorker = &pMgmt->syncWorker; + break; + default: + terrno = TSDB_CODE_INVALID_PARA; + } + + if (pWorker == NULL) return -1; SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); if (pMsg == NULL) return -1; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); - switch (qtype) { - case WRITE_QUEUE: - dTrace("msg:%p, is created and will put into vnode-write queue", pMsg); - taosWriteQitem(pMgmt->writeWorker.queue, pMsg); - return 0; - case QUERY_QUEUE: - dTrace("msg:%p, is created and will put into vnode-query queue", pMsg); - taosWriteQitem(pMgmt->queryWorker.queue, pMsg); - return 0; - - case READ_QUEUE: - dTrace("msg:%p, is created and will put into vnode-read queue", pMsg); - taosWriteQitem(pMgmt->readWorker.queue, pMsg); - return 0; - case SYNC_QUEUE: - if (mmAcquire(pMgmt) == 0) { - dTrace("msg:%p, is created and will put into vnode-sync queue", pMsg); - taosWriteQitem(pMgmt->syncWorker.queue, pMsg); - mmRelease(pMgmt); - return 0; - } else { - return -1; - } - default: - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } + dTrace("msg:%p, is created and will put int %s queue", pMsg, pWorker->name); + return mmPutMsgToWorker(pMgmt, pWorker, pMsg); } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { @@ -137,7 +158,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = tsNumOfMnodeQueryThreads, .max = tsNumOfMnodeQueryThreads, .name = "mnode-query", - .fp = (FItem)mmProcessQueue, + .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { @@ -149,7 +170,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = tsNumOfMnodeReadThreads, .max = tsNumOfMnodeReadThreads, .name = "mnode-read", - .fp = (FItem)mmProcessQueue, + .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) { @@ -161,7 +182,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = 1, .max = 1, .name = "mnode-write", - .fp = (FItem)mmProcessQueue, + .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) { @@ -173,7 +194,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = 1, .max = 1, .name = "mnode-sync", - .fp = (FItem)mmProcessSyncQueue, + .fp = (FItem)mmProcessSyncMsg, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { @@ -185,7 +206,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = 1, .max = 1, .name = "mnode-monitor", - .fp = (FItem)mmProcessQueue, + .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 528beb280b..498af36786 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -123,10 +123,12 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { dError("node:%s, failed to create since %s", pWrapper->name, terrstr()); } else { dInfo("node:%s, has been created", pWrapper->name); - (void)dmOpenNode(pWrapper); - (void)dmStartNode(pWrapper); - pWrapper->required = true; + code = dmOpenNode(pWrapper); + if (code != 0) { + code = dmStartNode(pWrapper); + } pWrapper->deployed = true; + pWrapper->required = true; pWrapper->proc.ptype = pDnode->ptype; } diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 4869a19856..16220f101a 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -76,11 +76,12 @@ typedef struct { } STelemMgmt; typedef struct { - sem_t syncSem; - int64_t sync; - bool standby; - int32_t errCode; - int32_t transId; + sem_t syncSem; + int64_t sync; + bool standby; + SReplica replica; + int32_t errCode; + int32_t transId; } SSyncMgmt; typedef struct { @@ -98,9 +99,6 @@ typedef struct SMnode { bool stopped; bool restored; bool deploy; - int8_t replica; - int8_t selfIndex; - SReplica replicas[TSDB_MAX_REPLICA]; char *path; int64_t checkTime; SSdb *pSdb; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 73eea70195..c936c0f93d 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -95,8 +95,8 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) { dnodeObj.id = 1; dnodeObj.createdTime = taosGetTimestampMs(); dnodeObj.updateTime = dnodeObj.createdTime; - dnodeObj.port = pMnode->replicas[0].port; - memcpy(&dnodeObj.fqdn, pMnode->replicas[0].fqdn, TSDB_FQDN_LEN); + dnodeObj.port = tsServerPort; + memcpy(&dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN); snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port); pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 7a73d3c360..892650fa18 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -289,11 +289,9 @@ static int32_t mndExecSteps(SMnode *pMnode) { } static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { - pMnode->replica = pOption->replica; - pMnode->selfIndex = pOption->selfIndex; - memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); pMnode->msgCb = pOption->msgCb; pMnode->selfDnodeId = pOption->dnodeId; + pMnode->syncMgmt.replica = pOption->replica; pMnode->syncMgmt.standby = pOption->standby; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index e0b4cc6a57..e58012b1b7 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -188,15 +188,15 @@ int32_t mndInitSync(SMnode *pMnode) { syncInfo.isStandBy = pMgmt->standby; syncInfo.snapshotEnable = true; - SSyncCfg *pCfg = &syncInfo.syncCfg; - pCfg->replicaNum = pMnode->replica; - pCfg->myIndex = pMnode->selfIndex; - mInfo("start to open mnode sync, replica:%d myindex:%d standby:%d", pCfg->replicaNum, pCfg->myIndex, pMgmt->standby); - for (int32_t i = 0; i < pMnode->replica; ++i) { - SNodeInfo *pNode = &pCfg->nodeInfo[i]; - tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); - pNode->nodePort = pMnode->replicas[i].port; - mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort); + mInfo("start to open mnode sync, standby:%d", pMgmt->standby); + if (pMgmt->standby || pMgmt->replica.id > 0) { + SSyncCfg *pCfg = &syncInfo.syncCfg; + pCfg->replicaNum = 1; + pCfg->myIndex = 0; + SNodeInfo *pNode = &pCfg->nodeInfo[0]; + tstrncpy(pNode->nodeFqdn, pMgmt->replica.fqdn, sizeof(pNode->nodeFqdn)); + pNode->nodePort = pMgmt->replica.port; + mInfo("fqdn:%s port:%u", pNode->nodeFqdn, pNode->nodePort); } tsem_init(&pMgmt->syncSem, 0, 0); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 132f93a6a5..f663f5798a 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -582,6 +582,15 @@ int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) { return offset; } +static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) { + void *ite = NULL; + while( (ite = taosHashIterate(pInfo->pGroupSet, ite)) != NULL ) { + taosArrayDestroy( ((SDataGroupInfo *)ite)->pPageList); + } + taosHashClear(pInfo->pGroupSet); + clearDiskbasedBuf(pInfo->pBuf); +} + static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { SPartitionOperatorInfo* pInfo = pOperator->info; @@ -591,6 +600,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter); if (pInfo->pGroupIter == NULL) { doSetOperatorCompleted(pOperator); + clearPartitionOperator(pInfo); return NULL; } diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index a528041964..7db1396603 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -93,10 +93,14 @@ int32_t diffFunction(SqlFunctionCtx *pCtx); bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t firstFunction(SqlFunctionCtx *pCtx); +int32_t firstFunctionMerge(SqlFunctionCtx *pCtx); int32_t lastFunction(SqlFunctionCtx *pCtx); +int32_t lastFunctionMerge(SqlFunctionCtx *pCtx); int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); +int32_t getFirstLastInfoSize(int32_t resBytes); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 7441dbacf3..1c3f274ee9 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -954,6 +954,41 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l return TSDB_CODE_SUCCESS; } +static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { + // first(col_list) will be rewritten as first(col) + if (2 != LIST_LENGTH(pFunc->pParameterList)) { //input has two params c0,ts, is this a bug? + return TSDB_CODE_SUCCESS; + } + + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + uint8_t paraType = ((SExprNode*)pPara)->resType.type; + int32_t paraBytes = ((SExprNode*)pPara)->resType.bytes; + if (isPartial) { + if (QUERY_NODE_COLUMN != nodeType(pPara)) { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "The parameters of first/last can only be columns"); + } + + pFunc->node.resType = (SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, + .type = TSDB_DATA_TYPE_BINARY}; + } else { + if (TSDB_DATA_TYPE_BINARY != paraType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = ((SExprNode*)pPara)->resType; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t translateFirstLastPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateFirstLastImpl(pFunc, pErrBuf, len, true); +} + +static int32_t translateFirstLastMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateFirstLastImpl(pFunc, pErrBuf, len, false); +} + static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -1704,6 +1739,30 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = functionSetup, .processFunc = firstFunction, .finalizeFunc = firstLastFinalize, + .pPartialFunc = "_first_partial", + .pMergeFunc = "_first_merge", + .combineFunc = firstCombine, + }, + { + .name = "_first_partial", + .type = FUNCTION_TYPE_FIRST_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateFirstLastPartial, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = firstFunction, + .finalizeFunc = firstLastPartialFinalize, + .combineFunc = firstCombine, + }, + { + .name = "_first_merge", + .type = FUNCTION_TYPE_FIRST_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateFirstLastMerge, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = firstFunctionMerge, + .finalizeFunc = firstLastFinalize, .combineFunc = firstCombine, }, { @@ -1715,6 +1774,30 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = functionSetup, .processFunc = lastFunction, .finalizeFunc = firstLastFinalize, + .pPartialFunc = "_last_partial", + .pMergeFunc = "_last_merge", + .combineFunc = lastCombine, + }, + { + .name = "_last_partial", + .type = FUNCTION_TYPE_LAST_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateFirstLastPartial, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = lastFunction, + .finalizeFunc = firstLastPartialFinalize, + .combineFunc = lastCombine, + }, + { + .name = "_last_merge", + .type = FUNCTION_TYPE_LAST_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateFirstLastMerge, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = lastFunctionMerge, + .finalizeFunc = firstLastFinalize, .combineFunc = lastCombine, }, { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 238334ab42..b5009b7d41 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -71,6 +71,12 @@ typedef struct STopBotRes { STopBotResItem* pItems; } STopBotRes; +typedef struct SFirstLastRes { + bool hasResult; + int32_t bytes; + char buf[]; +} SFirstLastRes; + typedef struct SStddevRes { double result; int64_t count; @@ -2230,9 +2236,13 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) return TSDB_CODE_SUCCESS; } +int32_t getFirstLastInfoSize(int32_t resBytes) { + return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t); +} + bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); - pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t); + SColumnNode* pNode = (SColumnNode *)nodesListGetNode(pFunc->pParameterList, 0); + pEnv->calcMemSize = sizeof(SFirstLastRes) + pNode->node.resType.bytes + sizeof(int64_t); return true; } @@ -2256,12 +2266,13 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - char* buf = GET_ROWCELL_INTERBUF(pResInfo); + SFirstLastRes *pInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; int32_t bytes = pInputCol->info.bytes; + pInfo->bytes = bytes; // All null data column, return directly. if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { @@ -2279,7 +2290,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { if (blockDataOrder == TSDB_ORDER_ASC) { // filter according to current result firstly if (pResInfo->numOfRes > 0) { - TSKEY ts = *(TSKEY*)(buf + bytes); + TSKEY ts = *(TSKEY*)(pInfo->buf + bytes); if (ts < startKey) { return TSDB_CODE_SUCCESS; } @@ -2295,9 +2306,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { - memcpy(buf, data, bytes); - *(TSKEY*)(buf + bytes) = cts; + if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) { + memcpy(pInfo->buf, data, bytes); + *(TSKEY*)(pInfo->buf + bytes) = cts; + pInfo->hasResult = true; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); pResInfo->numOfRes = 1; @@ -2308,7 +2320,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { // in case of descending order time stamp serial, which usually happens as the results of the nest query, // all data needs to be check. if (pResInfo->numOfRes > 0) { - TSKEY ts = *(TSKEY*)(buf + bytes); + TSKEY ts = *(TSKEY*)(pInfo->buf + bytes); if (ts < endKey) { return TSDB_CODE_SUCCESS; } @@ -2324,9 +2336,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { - memcpy(buf, data, bytes); - *(TSKEY*)(buf + bytes) = cts; + if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) { + memcpy(pInfo->buf, data, bytes); + *(TSKEY*)(pInfo->buf + bytes) = cts; + pInfo->hasResult = true; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); pResInfo->numOfRes = 1; break; @@ -2342,12 +2355,13 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - char* buf = GET_ROWCELL_INTERBUF(pResInfo); + SFirstLastRes *pInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; int32_t bytes = pInputCol->info.bytes; + pInfo->bytes = bytes; // All null data column, return directly. if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { @@ -2372,10 +2386,11 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { - memcpy(buf, data, bytes); - *(TSKEY*)(buf + bytes) = cts; + if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { + memcpy(pInfo->buf, data, bytes); + *(TSKEY*)(pInfo->buf + bytes) = cts; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); + pInfo->hasResult = true; pResInfo->numOfRes = 1; } break; @@ -2390,9 +2405,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { - memcpy(buf, data, bytes); - *(TSKEY*)(buf + bytes) = cts; + if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { + memcpy(pInfo->buf, data, bytes); + *(TSKEY*)(pInfo->buf + bytes) = cts; + pInfo->hasResult = true; pResInfo->numOfRes = 1; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); } @@ -2404,6 +2420,56 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst) { + if (!pInput->hasResult) { + return; + } + pOutput->bytes = pInput->bytes; + TSKEY *tsIn = (TSKEY*)(pInput->buf + pInput->bytes); + TSKEY *tsOut = (TSKEY*)(pOutput->buf + pInput->bytes); + if (pOutput->hasResult) { + if (isFirst) { + if (*tsIn > *tsOut) { + return; + } + } else { + if (*tsIn < *tsOut) { + return; + } + } + } + *tsOut = *tsIn; + memcpy(pOutput->buf, pInput->buf, pOutput->bytes); + pOutput->hasResult = true; + return; +} + +static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx *pCtx, bool isFirstQuery) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + SFirstLastRes* pInputInfo = (SFirstLastRes *)varDataVal(data); + + firstLastTransferInfo(pInputInfo, pInfo, isFirstQuery); + + SET_VAL(GET_RES_INFO(pCtx), 1, 1); + + return TSDB_CODE_SUCCESS; +} + +int32_t firstFunctionMerge(SqlFunctionCtx *pCtx) { + return firstLastFunctionMergeImpl(pCtx, true); +} + +int32_t lastFunctionMerge(SqlFunctionCtx *pCtx) { + return firstLastFunctionMergeImpl(pCtx, false); +} + int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); @@ -2411,12 +2477,30 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; - char* in = GET_ROWCELL_INTERBUF(pResInfo); - colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes); + SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); + colDataAppend(pCol, pBlock->info.rows, pRes->buf, pResInfo->isNullRes); return pResInfo->numOfRes; } +int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); + SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getFirstLastInfoSize(pRes->bytes); + char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pRes, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return 1; +} + int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index 1960073f29..2b505d4bf7 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -283,6 +283,8 @@ void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) { if (!dataBuf->ordered) { char* pBlockData = pBlocks->data; + + // todo. qsort is unstable, if timestamp is same, should get the last one qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar); int32_t i = 0; @@ -350,6 +352,8 @@ int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKey if (!dataBuf->ordered) { pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; + + // todo. qsort is unstable, if timestamp is same, should get the last one qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar); pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d1debc6af6..4852dcfb54 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -983,6 +983,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { SEpSet epSet = {0}; tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet); pCtx->epSet = epSet; + if (!transEpSetIsEqual(&epSet, &pCtx->epSet)) { + pCtx->retryCount = 0; + } } addConnToPool(pThrd->pool, pConn); tTrace("use remote epset, current in use: %d, retry count:%d, try limit: %d", pEpSet->inUse, pCtx->retryCount + 1, diff --git a/source/util/src/talgo.c b/source/util/src/talgo.c index 866ac5757b..5353cd9bfe 100644 --- a/source/util/src/talgo.c +++ b/source/util/src/talgo.c @@ -175,7 +175,13 @@ void *taosbsearch(const void *key, const void *base, int32_t nmemb, int32_t size c = compar(key, p); if (c == 0) { - break; + if (flags == TD_GT){ + lidx = midx + 1; + } else if(flags == TD_LT){ + ridx = midx - 1; + }else{ + break; + } } else if (c < 0) { ridx = midx - 1; } else { @@ -189,6 +195,10 @@ void *taosbsearch(const void *key, const void *base, int32_t nmemb, int32_t size return (c <= 0) ? p : (midx + 1 < nmemb ? p + size : NULL); } else if (flags == TD_LE) { return (c >= 0) ? p : (midx > 0 ? p - size : NULL); + } else if (flags == TD_GT) { + return (c < 0) ? p : (midx + 1 < nmemb ? p + size : NULL); + } else if (flags == TD_LT) { + return (c > 0) ? p : (midx > 0 ? p - size : NULL); } else { ASSERT(0); } diff --git a/source/util/test/taosbsearchTest.cpp b/source/util/test/taosbsearchTest.cpp index 0b250c9ecc..b8fa3a4476 100644 --- a/source/util/test/taosbsearchTest.cpp +++ b/source/util/test/taosbsearchTest.cpp @@ -248,6 +248,170 @@ TEST(testCase, taosbsearch_greater_or_equal) { ASSERT_EQ(pRet, nullptr); } +TEST(testCase, taosbsearch_greater) { + // For equal test + int key = 3; + void *pRet = NULL; + + pRet = taosbsearch((void *)&key, NULL, 0, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(pRet, nullptr); + + // 1 element + int array1[1] = {5}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 5); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(pRet, nullptr); + + key = 5; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(pRet, nullptr); + + // 2 element + int array2[2] = {3, 6}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 3); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 6); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 6); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(pRet, nullptr); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(pRet, nullptr); + + // 3 element + int array3[3] = {3, 6, 8}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 3); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 6); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 6); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 8); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 8); + + key = 8; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(pRet, nullptr); + + key = 9; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(pRet, nullptr); + + // 4 element + int array4[4] = {3, 6, 8, 11}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 3); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 6); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 6); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 8); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 8); + + key = 8; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 11); + + key = 9; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 11); + + key = 11; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(pRet, nullptr); + + key = 13; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(pRet, nullptr); + + // 5 element + int array5[5] = {3, 6, 8, 11, 15}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 3); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 6); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 6); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 8); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 8); + + key = 8; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 11); + + key = 9; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 11); + + key = 11; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 15); + + key = 13; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(*(int *)pRet, 15); + + key = 15; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(pRet, nullptr); + + key = 17; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GT); + ASSERT_EQ(pRet, nullptr); +} + TEST(testCase, taosbsearch_less_or_equal) { // For equal test int key = 3; @@ -411,4 +575,168 @@ TEST(testCase, taosbsearch_less_or_equal) { key = 17; pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE); ASSERT_EQ(*(int *)pRet, 15); +} + +TEST(testCase, taosbsearch_less) { + // For equal test + int key = 3; + void *pRet = NULL; + + pRet = taosbsearch((void *)&key, NULL, 0, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(pRet, nullptr); + + // 1 element + int array1[1] = {5}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(pRet, nullptr); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 5); + + key = 5; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(pRet, nullptr); + + // 2 element + int array2[2] = {3, 6}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(pRet, nullptr); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(pRet, nullptr); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 3); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 3); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 6); + + // 3 element + int array3[3] = {3, 6, 8}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(pRet, nullptr); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(pRet, nullptr); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 3); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 3); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 6); + + key = 8; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 6); + + key = 9; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 8); + + // 4 element + int array4[4] = {3, 6, 8, 11}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(pRet, nullptr); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(pRet, nullptr); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 3); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 3); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 6); + + key = 8; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 6); + + key = 9; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 8); + + key = 11; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 8); + + key = 13; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 11); + + // 5 element + int array5[5] = {3, 6, 8, 11, 15}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(pRet, nullptr); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(pRet, nullptr); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 3); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 3); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 6); + + key = 8; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 6); + + key = 9; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 8); + + key = 11; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 8); + + key = 13; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 11); + + key = 15; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 11); + + key = 17; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LT); + ASSERT_EQ(*(int *)pRet, 15); } \ No newline at end of file diff --git a/tests/system-test/2-query/tail.py b/tests/system-test/2-query/tail.py new file mode 100644 index 0000000000..6039f3effa --- /dev/null +++ b/tests/system-test/2-query/tail.py @@ -0,0 +1,439 @@ +from random import randint, random +from numpy import equal +import taos +import sys +import datetime +import inspect + +from util.log import * +from util.sql import * +from util.cases import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 143 ,"cDebugFlag":143,"uDebugFlag":143 ,"rpcDebugFlag":143 , "tmrDebugFlag":143 , + "jniDebugFlag":143 ,"simDebugFlag":143,"dDebugFlag":143, "dDebugFlag":143,"vDebugFlag":143,"mDebugFlag":143,"qDebugFlag":143, + "wDebugFlag":143,"sDebugFlag":143,"tsdbDebugFlag":143,"tqDebugFlag":143 ,"fsDebugFlag":143 ,"fnDebugFlag":143} + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + + def prepare_datas(self): + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + for i in range(9): + tdSql.execute( + f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" + ) + tdSql.execute( + f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" + ) + tdSql.execute("insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )") + tdSql.execute("insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") + tdSql.execute("insert into ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )") + tdSql.execute("insert into ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") + + tdSql.execute("insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + tdSql.execute("insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + tdSql.execute("insert into ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + + tdSql.execute( + f'''insert into t1 values + ( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) + ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) + ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) + ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) + ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) + ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) + ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" ) + ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" ) + ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" ) + ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ''' + ) + + def test_errors(self): + error_sql_lists = [ + "select tail from t1", + "select tail(123--123)==1 from t1", + "select tail(123,123) from t1", + "select tail(c1,ts) from t1", + "select tail(c1,c1,ts) from t1", + "select tail(c1) as 'd1' from t1", + "select tail(c1 ,c2 ) from t1", + "select tail(c1 ,NULL) from t1", + "select tail(,) from t1;", + "select tail(tail(c1) ab from t1)", + "select tail(c1) as int from t1", + "select tail('c1') from t1", + "select tail(NULL) from t1", + "select tail('') from t1", + "select tail(c%) from t1", + "select tail(t1) from t1", + "select tail(True) from t1", + "select tail(c1,1) , count(c1) from t1", + "select tail(c1,1) , avg(c1) from t1", + "select tail(c1,1) , min(c1) from t1", + "select tail(c1,1) , spread(c1) from t1", + "select tail(c1,1) , diff(c1) from t1", + "select tail(c1,1) , abs(c1) from t1", + "select tail(c1,1) , c1 from t1", + "select tail from stb1 partition by tbname", + "select tail(123--123)==1 from stb1 partition by tbname", + "select tail(123,123) from stb1 partition by tbname", + "select tail(c1,ts) from stb1 partition by tbname", + "select tail(c1,c1,ts) from stb1 partition by tbname", + "select tail(c1) as 'd1' from stb1 partition by tbname", + "select tail(c1 ,c2 ) from stb1 partition by tbname", + "select tail(c1 ,NULL) from stb1 partition by tbname", + "select tail(,) from stb1 partition by tbname;", + "select tail(tail(c1) ab from stb1 partition by tbname)", + "select tail(c1) as int from stb1 partition by tbname", + "select tail('c1') from stb1 partition by tbname", + "select tail(NULL) from stb1 partition by tbname", + "select tail('') from stb1 partition by tbname", + "select tail(c%) from stb1 partition by tbname", + "select tail(t1) from stb1 partition by tbname", + "select tail(True) from stb1 partition by tbname", + "select tail(c1,1) , count(c1) from stb1 partition by tbname", + "select tail(c1,1) , avg(c1) from stb1 partition by tbname", + "select tail(c1,1) , min(c1) from stb1 partition by tbname", + "select tail(c1,1) , spread(c1) from stb1 partition by tbname", + "select tail(c1,1) , diff(c1) from stb1 partition by tbname", + "select tail(c1,1) , abs(c1) from stb1 partition by tbname", + "select tail(c1,1) , c1 from stb1 partition by tbname" + + ] + for error_sql in error_sql_lists: + tdSql.error(error_sql) + + def support_types(self): + other_no_value_types = [ + "select tail(ts,1) from t1" , + "select tail(c7,1) from t1", + "select tail(c8,1) from t1", + "select tail(c9,1) from t1", + "select tail(ts,1) from ct1" , + "select tail(c7,1) from ct1", + "select tail(c8,1) from ct1", + "select tail(c9,1) from ct1", + "select tail(ts,1) from ct3" , + "select tail(c7,1) from ct3", + "select tail(c8,1) from ct3", + "select tail(c9,1) from ct3", + "select tail(ts,1) from ct4" , + "select tail(c7,1) from ct4", + "select tail(c8,1) from ct4", + "select tail(c9,1) from ct4", + "select tail(ts,1) from stb1 partition by tbname" , + "select tail(c7,1) from stb1 partition by tbname", + "select tail(c8,1) from stb1 partition by tbname", + "select tail(c9,1) from stb1 partition by tbname" + ] + + for type_sql in other_no_value_types: + tdSql.query(type_sql) + + type_sql_lists = [ + "select tail(c1,1) from t1", + "select tail(c2,1) from t1", + "select tail(c3,1) from t1", + "select tail(c4,1) from t1", + "select tail(c5,1) from t1", + "select tail(c6,1) from t1", + + "select tail(c1,1) from ct1", + "select tail(c2,1) from ct1", + "select tail(c3,1) from ct1", + "select tail(c4,1) from ct1", + "select tail(c5,1) from ct1", + "select tail(c6,1) from ct1", + + "select tail(c1,1) from ct3", + "select tail(c2,1) from ct3", + "select tail(c3,1) from ct3", + "select tail(c4,1) from ct3", + "select tail(c5,1) from ct3", + "select tail(c6,1) from ct3", + + "select tail(c1,1) from stb1 partition by tbname", + "select tail(c2,1) from stb1 partition by tbname", + "select tail(c3,1) from stb1 partition by tbname", + "select tail(c4,1) from stb1 partition by tbname", + "select tail(c5,1) from stb1 partition by tbname", + "select tail(c6,1) from stb1 partition by tbname", + + "select tail(c6,1) as alisb from stb1 partition by tbname", + "select tail(c6,1) alisb from stb1 partition by tbname", + ] + + for type_sql in type_sql_lists: + tdSql.query(type_sql) + + def check_tail_table(self , tbname , col_name , tail_rows , offset): + tail_sql = f"select tail({col_name} , {tail_rows} , {offset}) from {tbname}" + equal_sql = f"select {col_name} from (select ts , {col_name} from {tbname} order by ts desc limit {tail_rows} offset {offset}) order by ts" + tdSql.query(tail_sql) + tail_result = tdSql.queryResult + + tdSql.query(equal_sql) + print(equal_sql) + + equal_result = tdSql.queryResult + + if tail_result == equal_result: + tdLog.info(" tail query check pass , tail sql is: %s" %tail_sql) + else: + tdLog.exit(" tail query check fail , tail sql is: %s " %tail_sql) + + def basic_tail_function(self): + + # basic query + tdSql.query("select c1 from ct3") + tdSql.checkRows(0) + tdSql.query("select c1 from t1") + tdSql.checkRows(12) + tdSql.query("select c1 from stb1") + tdSql.checkRows(25) + + # used for empty table , ct3 is empty + tdSql.query("select tail(c1,1) from ct3") + tdSql.checkRows(0) + tdSql.query("select tail(c2,1) from ct3") + tdSql.checkRows(0) + tdSql.query("select tail(c3,1) from ct3") + tdSql.checkRows(0) + tdSql.query("select tail(c4,1) from ct3") + tdSql.checkRows(0) + tdSql.query("select tail(c5,1) from ct3") + tdSql.checkRows(0) + tdSql.query("select tail(c6,1) from ct3") + + # auto check for t1 table + # used for regular table + tdSql.query("select tail(c1,1) from t1") + + tdSql.query("desc t1") + col_lists_rows = tdSql.queryResult + col_lists = [] + for col_name in col_lists_rows: + if col_name[0] =="ts": + continue + + col_lists.append(col_name[0]) + + for col in col_lists: + for loop in range(100): + limit = randint(1,100) + offset = randint(0,100) + self.check_tail_table("t1" , col , limit , offset) + + # tail for invalid params + + tdSql.error("select tail(c1,-10,10) from ct1") + tdSql.error("select tail(c1,10,10000) from ct1") + tdSql.error("select tail(c1,10,-100) from ct1") + tdSql.error("select tail(c1,100/2,10) from ct1") + tdSql.error("select tail(c1,5,10*2) from ct1") + tdSql.query("select tail(c1,100,100) from ct1") + tdSql.checkRows(0) + tdSql.query("select tail(c1,10,100) from ct1") + tdSql.checkRows(0) + tdSql.error("select tail(c1,10,101) from ct1") + tdSql.query("select tail(c1,10,0) from ct1") + tdSql.query("select tail(c1,100,10) from ct1") + tdSql.checkRows(3) + + # tail with super tags + + tdSql.query("select tail(c1,10,10) from ct1") + tdSql.checkRows(3) + + tdSql.error("select tail(c1,10,10),tbname from ct1") + tdSql.error("select tail(c1,10,10),t1 from ct1") + + # tail with common col + tdSql.error("select tail(c1,10,10) ,ts from ct1") + tdSql.error("select tail(c1,10,10) ,c1 from ct1") + + # tail with scalar function + tdSql.error("select tail(c1,10,10) ,abs(c1) from ct1") + tdSql.error("select tail(c1,10,10) , tail(c2,10,10) from ct1") + tdSql.error("select tail(c1,10,10) , abs(c2)+2 from ct1") + + # bug need fix for scalar value or compute again + # tdSql.error(" select tail(c1,10,10) , 123 from ct1") + # tdSql.error(" select abs(tail(c1,10,10)) from ct1") + # tdSql.error(" select abs(tail(c1,10,10)) + 2 from ct1") + + # tail with aggregate function + tdSql.error("select tail(c1,10,10) ,sum(c1) from ct1") + tdSql.error("select tail(c1,10,10) ,max(c1) from ct1") + tdSql.error("select tail(c1,10,10) ,csum(c1) from ct1") + tdSql.error("select tail(c1,10,10) ,count(c1) from ct1") + + # tail with filter where + tdSql.query("select tail(c1,3,1) from ct4 where c1 is null") + tdSql.checkData(0, 0, None) + tdSql.checkData(1, 0, None) + + tdSql.query("select tail(c1,3,2) from ct4 where c1 >2 ") + tdSql.checkData(0, 0, 7) + tdSql.checkData(1, 0, 6) + tdSql.checkData(2, 0, 5) + + tdSql.query("select tail(c1,2,1) from ct4 where c2 between 0 and 99999") + tdSql.checkData(0, 0, 2) + tdSql.checkData(1, 0, 1) + + # tail with union all + tdSql.query("select tail(c1,2,1) from ct4 union all select c1 from ct1") + tdSql.checkRows(15) + tdSql.query("select tail(c1,2,1) from ct4 union all select c1 from ct2") + tdSql.checkRows(2) + tdSql.checkData(0, 0, 1) + tdSql.checkData(1, 0, 0) + tdSql.query("select tail(c2,2,1) from ct4 union all select abs(c2)/2 from ct4") + tdSql.checkRows(14) + + # tail with join + # prepare join datas with same ts + + tdSql.execute(" use db ") + tdSql.execute(" create stable st1 (ts timestamp , num int) tags(ind int)") + tdSql.execute(" create table tb1 using st1 tags(1)") + tdSql.execute(" create table tb2 using st1 tags(2)") + + tdSql.execute(" create stable st2 (ts timestamp , num int) tags(ind int)") + tdSql.execute(" create table ttb1 using st2 tags(1)") + tdSql.execute(" create table ttb2 using st2 tags(2)") + + start_ts = 1622369635000 # 2021-05-30 18:13:55 + + for i in range(10): + ts_value = start_ts+i*1000 + tdSql.execute(f" insert into tb1 values({ts_value} , {i})") + tdSql.execute(f" insert into tb2 values({ts_value} , {i})") + + tdSql.execute(f" insert into ttb1 values({ts_value} , {i})") + tdSql.execute(f" insert into ttb2 values({ts_value} , {i})") + + tdSql.query("select tail(tb2.num,3,2) from tb1, tb2 where tb1.ts=tb2.ts ") + tdSql.checkRows(3) + tdSql.checkData(0,0,5) + tdSql.checkData(1,0,6) + tdSql.checkData(2,0,7) + + # nest query + # tdSql.query("select tail(c1,2) from (select c1 from ct1)") + tdSql.query("select c1 from (select tail(c1,2) c1 from ct4)") + tdSql.checkRows(2) + tdSql.checkData(0, 0, 0) + tdSql.checkData(1, 0, None) + + tdSql.query("select sum(c1) from (select tail(c1,2) c1 from ct1)") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 18) + + tdSql.query("select abs(c1) from (select tail(c1,2) c1 from ct1)") + tdSql.checkRows(2) + tdSql.checkData(0, 0, 9) + + #partition by tbname + tdSql.query(" select tail(c1,5) from stb1 partition by tbname ") + tdSql.checkRows(10) + + tdSql.query(" select tail(c1,3) from stb1 partition by tbname ") + tdSql.checkRows(6) + + # group by + tdSql.error("select tail(c1,2) from ct1 group by c1") + tdSql.error("select tail(c1,2) from ct1 group by tbname") + + # super table + + + + + def check_boundary_values(self): + + tdSql.execute("drop database if exists bound_test") + tdSql.execute("create database if not exists bound_test") + tdSql.execute("use bound_test") + tdSql.execute( + "create table stb_bound (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(32),c9 nchar(32), c10 timestamp) tags (t1 int);" + ) + tdSql.execute(f'create table sub1_bound using stb_bound tags ( 1 )') + tdSql.execute( + f"insert into sub1_bound values ( now()-1s, 2147483647, 9223372036854775807, 32767, 127, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )" + ) + tdSql.execute( + f"insert into sub1_bound values ( now(), 2147483646, 9223372036854775806, 32766, 126, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )" + ) + + tdSql.execute( + f"insert into sub1_bound values ( now(), -2147483646, -9223372036854775806, -32766, -126, -3.40E+38, -1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )" + ) + + tdSql.execute( + f"insert into sub1_bound values ( now(), 2147483643, 9223372036854775803, 32763, 123, 3.39E+38, 1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )" + ) + + tdSql.execute( + f"insert into sub1_bound values ( now(), -2147483643, -9223372036854775803, -32763, -123, -3.39E+38, -1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )" + ) + + tdSql.error( + f"insert into sub1_bound values ( now()+1s, 2147483648, 9223372036854775808, 32768, 128, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )" + ) + + tdSql.query("select tail(c2,2) from sub1_bound") + tdSql.checkRows(2) + tdSql.checkData(0,0,9223372036854775803) + + def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring + tdSql.prepare() + + tdLog.printNoPrefix("==========step1:create table ==============") + + self.prepare_datas() + + tdLog.printNoPrefix("==========step2:test errors ==============") + + self.test_errors() + + tdLog.printNoPrefix("==========step3:support types ============") + + self.support_types() + + tdLog.printNoPrefix("==========step4: tail basic query ============") + + self.basic_tail_function() + + tdLog.printNoPrefix("==========step5: tail boundary query ============") + + self.check_boundary_values() + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 5149994228..cda0e37237 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -95,6 +95,7 @@ python3 ./test.py -f 2-query/unique.py python3 ./test.py -f 2-query/stateduration.py python3 ./test.py -f 2-query/function_stateduration.py python3 ./test.py -f 2-query/statecount.py +python3 ./test.py -f 2-query/tail.py python3 ./test.py -f 6-cluster/5dnode1mnode.py python3 ./test.py -f 6-cluster/5dnode2mnode.py