From c9f38d2be294544308154059da53d6023e6941d3 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 28 Dec 2021 17:01:09 +0800 Subject: [PATCH 1/2] send create topic req --- include/common/tmsg.h | 21 +++++++++++++ include/libs/parser/parser.h | 4 +-- source/client/src/clientImpl.c | 56 +++++++++++++++++++++++++++++++++- 3 files changed, 78 insertions(+), 3 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6727dd3289..25002a9f92 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1063,6 +1063,27 @@ typedef struct STaskDropRsp { int32_t code; } STaskDropRsp; +typedef struct { + int8_t igExists; + char* name; + char* phyPlan; +} SCMCreateTopicReq; + +static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { + int tlen = 0; + tlen += taosEncodeString(buf, pReq->name); + tlen += taosEncodeFixedI8(buf, pReq->igExists); + tlen += taosEncodeString(buf, pReq->phyPlan); + return tlen; +} + +static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) { + buf = taosDecodeFixedI8(buf, &(pReq->igExists)); + buf = taosDecodeString(buf, &(pReq->name)); + buf = taosDecodeString(buf, &(pReq->phyPlan)); + return buf; +} + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; int8_t igExists; diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index f2f3fcd49b..c7d637ee8a 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -46,7 +46,7 @@ int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery); bool qIsDdlQuery(const SQueryNode* pQuery); -void qDestoryQuery(SQueryNode* pQuery); +void qDestroyQuery(SQueryNode* pQuery); /** * Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that @@ -86,4 +86,4 @@ void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* p } #endif -#endif /*_TD_PARSER_H_*/ \ No newline at end of file +#endif /*_TD_PARSER_H_*/ diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0ee99f77aa..6220d7eb50 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -212,6 +212,60 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); } +TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) { + STscObj* pTscObj = (STscObj*)taos; + SRequestObj* pRequest = NULL; + SQueryNode* pQuery = NULL; + SQueryDag* pDag = NULL; + char *dagStr = NULL; + + //parse sql to logical plan and physical plan + //send topic name and plans to mnode + + terrno = TSDB_CODE_SUCCESS; + + CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); + CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return); + //TODO: check sql valid + + CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return); + + dagStr = qDagToString(pDag); + if(dagStr == NULL) { + //TODO + } + + SCMCreateTopicReq req = { + .name = (char*)name, + .igExists = 0, + .phyPlan = dagStr, + }; + + void* buf = NULL; + int tlen = tSerializeSCMCreateTopicReq(&buf, &req); + + pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; + + SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest); + SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; + + int64_t transporterId = 0; + asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body); + + tsem_wait(&pRequest->body.rspSem); + + destroySendMsgInfo(body); + +_return: + qDestroyQuery(pQuery); + qDestroyQueryDag(pDag); + destroySendMsgInfo(body); + if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { + pRequest->code = terrno; + } + return pRequest; +} + TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { STscObj *pTscObj = (STscObj *)taos; if (sqlLen > (size_t) tsMaxSQLStringLen) { @@ -239,7 +293,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { } _return: - qDestoryQuery(pQuery); + qDestroyQuery(pQuery); qDestroyQueryDag(pDag); if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) { pRequest->code = terrno; From 97f334c9dd05bbcc0ca9efbf42828a97886a50a6 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 28 Dec 2021 17:51:13 +0800 Subject: [PATCH 2/2] send create topic req --- include/client/taos.h | 3 +++ include/dnode/vnode/tq/tq.h | 2 ++ source/client/src/clientImpl.c | 3 --- source/client/test/clientTests.cpp | 35 ++++++++++++++++++++++++++++++ source/libs/parser/src/parser.c | 2 +- 5 files changed, 41 insertions(+), 4 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 7357478555..4669ca51f7 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -193,6 +193,9 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); + +DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); + #ifdef __cplusplus } #endif diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index b6cb96a57b..5774131377 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -251,6 +251,8 @@ typedef struct STqMetaStore { STqMetaList* bucket[TQ_BUCKET_SIZE]; // a table head STqMetaList* unpersistHead; + // topics that are not connectted + STqMetaList* unconnectTopic; // TODO:temporaral use, to be replaced by unified tfile int fileFd; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c3c2f70b33..b40c718d51 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -237,9 +237,6 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq SQueryDag* pDag = NULL; char *dagStr = NULL; - //parse sql to logical plan and physical plan - //send topic name and plans to mnode - terrno = TSDB_CODE_SUCCESS; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 83d0e61eb3..de494cb031 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -399,6 +399,41 @@ TEST(testCase, drop_stable_Test) { taos_close(pConn); } +TEST(testCase, create_topic_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create database abc1"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("error in create stable, reason:%s\n", taos_errstr(pRes)); + } + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == NULL); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + + char* sql = "select * from st1"; + tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql)); + taos_close(pConn); +} + + //TEST(testCase, show_table_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 5c9a48e52f..2ccd76723b 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -229,6 +229,6 @@ void qParserClearupMetaRequestInfo(SCatalogReq* pMetaReq) { taosArrayDestroy(pMetaReq->pUdf); } -void qDestoryQuery(SQueryNode* pQuery) { +void qDestroyQuery(SQueryNode* pQuery) { // todo }