send create topic req
This commit is contained in:
parent
d5f5fc00bf
commit
c9f38d2be2
|
@ -1063,6 +1063,27 @@ typedef struct STaskDropRsp {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
} STaskDropRsp;
|
} STaskDropRsp;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t igExists;
|
||||||
|
char* name;
|
||||||
|
char* phyPlan;
|
||||||
|
} SCMCreateTopicReq;
|
||||||
|
|
||||||
|
static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) {
|
||||||
|
int tlen = 0;
|
||||||
|
tlen += taosEncodeString(buf, pReq->name);
|
||||||
|
tlen += taosEncodeFixedI8(buf, pReq->igExists);
|
||||||
|
tlen += taosEncodeString(buf, pReq->phyPlan);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) {
|
||||||
|
buf = taosDecodeFixedI8(buf, &(pReq->igExists));
|
||||||
|
buf = taosDecodeString(buf, &(pReq->name));
|
||||||
|
buf = taosDecodeString(buf, &(pReq->phyPlan));
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
int8_t igExists;
|
int8_t igExists;
|
||||||
|
|
|
@ -46,7 +46,7 @@ int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery);
|
||||||
|
|
||||||
bool qIsDdlQuery(const SQueryNode* pQuery);
|
bool qIsDdlQuery(const SQueryNode* pQuery);
|
||||||
|
|
||||||
void qDestoryQuery(SQueryNode* pQuery);
|
void qDestroyQuery(SQueryNode* pQuery);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that
|
* Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that
|
||||||
|
@ -86,4 +86,4 @@ void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* p
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_PARSER_H_*/
|
#endif /*_TD_PARSER_H_*/
|
||||||
|
|
|
@ -212,6 +212,60 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
|
||||||
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
|
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) {
|
||||||
|
STscObj* pTscObj = (STscObj*)taos;
|
||||||
|
SRequestObj* pRequest = NULL;
|
||||||
|
SQueryNode* pQuery = NULL;
|
||||||
|
SQueryDag* pDag = NULL;
|
||||||
|
char *dagStr = NULL;
|
||||||
|
|
||||||
|
//parse sql to logical plan and physical plan
|
||||||
|
//send topic name and plans to mnode
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
||||||
|
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
|
||||||
|
//TODO: check sql valid
|
||||||
|
|
||||||
|
CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return);
|
||||||
|
|
||||||
|
dagStr = qDagToString(pDag);
|
||||||
|
if(dagStr == NULL) {
|
||||||
|
//TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
SCMCreateTopicReq req = {
|
||||||
|
.name = (char*)name,
|
||||||
|
.igExists = 0,
|
||||||
|
.phyPlan = dagStr,
|
||||||
|
};
|
||||||
|
|
||||||
|
void* buf = NULL;
|
||||||
|
int tlen = tSerializeSCMCreateTopicReq(&buf, &req);
|
||||||
|
|
||||||
|
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
|
||||||
|
|
||||||
|
SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest);
|
||||||
|
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
|
||||||
|
|
||||||
|
int64_t transporterId = 0;
|
||||||
|
asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body);
|
||||||
|
|
||||||
|
tsem_wait(&pRequest->body.rspSem);
|
||||||
|
|
||||||
|
destroySendMsgInfo(body);
|
||||||
|
|
||||||
|
_return:
|
||||||
|
qDestroyQuery(pQuery);
|
||||||
|
qDestroyQueryDag(pDag);
|
||||||
|
destroySendMsgInfo(body);
|
||||||
|
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
|
||||||
|
pRequest->code = terrno;
|
||||||
|
}
|
||||||
|
return pRequest;
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
||||||
STscObj *pTscObj = (STscObj *)taos;
|
STscObj *pTscObj = (STscObj *)taos;
|
||||||
if (sqlLen > (size_t) tsMaxSQLStringLen) {
|
if (sqlLen > (size_t) tsMaxSQLStringLen) {
|
||||||
|
@ -239,7 +293,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
qDestoryQuery(pQuery);
|
qDestroyQuery(pQuery);
|
||||||
qDestroyQueryDag(pDag);
|
qDestroyQueryDag(pDag);
|
||||||
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
|
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
|
||||||
pRequest->code = terrno;
|
pRequest->code = terrno;
|
||||||
|
|
Loading…
Reference in New Issue