Merge pull request #9452 from taosdata/feature/tq
send create topic req
This commit is contained in:
commit
b51819ddc8
|
@ -193,6 +193,9 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
|
|||
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
|
||||
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
|
||||
|
||||
|
||||
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1076,6 +1076,27 @@ typedef struct STaskDropRsp {
|
|||
int32_t code;
|
||||
} STaskDropRsp;
|
||||
|
||||
typedef struct {
|
||||
int8_t igExists;
|
||||
char* name;
|
||||
char* phyPlan;
|
||||
} SCMCreateTopicReq;
|
||||
|
||||
static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) {
|
||||
int tlen = 0;
|
||||
tlen += taosEncodeString(buf, pReq->name);
|
||||
tlen += taosEncodeFixedI8(buf, pReq->igExists);
|
||||
tlen += taosEncodeString(buf, pReq->phyPlan);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) {
|
||||
buf = taosDecodeFixedI8(buf, &(pReq->igExists));
|
||||
buf = taosDecodeString(buf, &(pReq->name));
|
||||
buf = taosDecodeString(buf, &(pReq->phyPlan));
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
int8_t igExists;
|
||||
|
|
|
@ -251,6 +251,8 @@ typedef struct STqMetaStore {
|
|||
STqMetaList* bucket[TQ_BUCKET_SIZE];
|
||||
// a table head
|
||||
STqMetaList* unpersistHead;
|
||||
// topics that are not connectted
|
||||
STqMetaList* unconnectTopic;
|
||||
|
||||
// TODO:temporaral use, to be replaced by unified tfile
|
||||
int fileFd;
|
||||
|
|
|
@ -43,7 +43,7 @@ int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery);
|
|||
|
||||
bool qIsDdlQuery(const SQueryNode* pQuery);
|
||||
|
||||
void qDestoryQuery(SQueryNode* pQuery);
|
||||
void qDestroyQuery(SQueryNode* pQuery);
|
||||
|
||||
/**
|
||||
* Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that
|
||||
|
@ -83,4 +83,4 @@ void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* p
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_PARSER_H_*/
|
||||
#endif /*_TD_PARSER_H_*/
|
||||
|
|
|
@ -230,6 +230,57 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
|
|||
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
|
||||
}
|
||||
|
||||
TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) {
|
||||
STscObj* pTscObj = (STscObj*)taos;
|
||||
SRequestObj* pRequest = NULL;
|
||||
SQueryNode* pQuery = NULL;
|
||||
SQueryDag* pDag = NULL;
|
||||
char *dagStr = NULL;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
||||
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
|
||||
//TODO: check sql valid
|
||||
|
||||
CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return);
|
||||
|
||||
dagStr = qDagToString(pDag);
|
||||
if(dagStr == NULL) {
|
||||
//TODO
|
||||
}
|
||||
|
||||
SCMCreateTopicReq req = {
|
||||
.name = (char*)name,
|
||||
.igExists = 0,
|
||||
.phyPlan = dagStr,
|
||||
};
|
||||
|
||||
void* buf = NULL;
|
||||
int tlen = tSerializeSCMCreateTopicReq(&buf, &req);
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
|
||||
|
||||
SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest);
|
||||
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
|
||||
destroySendMsgInfo(body);
|
||||
|
||||
_return:
|
||||
qDestroyQuery(pQuery);
|
||||
qDestroyQueryDag(pDag);
|
||||
destroySendMsgInfo(body);
|
||||
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
|
||||
pRequest->code = terrno;
|
||||
}
|
||||
return pRequest;
|
||||
}
|
||||
|
||||
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
||||
STscObj *pTscObj = (STscObj *)taos;
|
||||
if (sqlLen > (size_t) tsMaxSQLStringLen) {
|
||||
|
@ -257,7 +308,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
|||
}
|
||||
|
||||
_return:
|
||||
qDestoryQuery(pQuery);
|
||||
qDestroyQuery(pQuery);
|
||||
qDestroyQueryDag(pDag);
|
||||
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
|
||||
pRequest->code = terrno;
|
||||
|
|
|
@ -399,6 +399,41 @@ TEST(testCase, drop_stable_Test) {
|
|||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, create_topic_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database abc1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "use abc1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create stable, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
ASSERT_TRUE(pFields == NULL);
|
||||
|
||||
int32_t numOfFields = taos_num_fields(pRes);
|
||||
ASSERT_EQ(numOfFields, 0);
|
||||
|
||||
taos_free_result(pRes);
|
||||
|
||||
char* sql = "select * from st1";
|
||||
tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql));
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
|
||||
//TEST(testCase, show_table_Test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
|
|
|
@ -229,6 +229,6 @@ void qParserClearupMetaRequestInfo(SCatalogReq* pMetaReq) {
|
|||
taosArrayDestroy(pMetaReq->pUdf);
|
||||
}
|
||||
|
||||
void qDestoryQuery(SQueryNode* pQuery) {
|
||||
void qDestroyQuery(SQueryNode* pQuery) {
|
||||
// todo
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue