TD-12506 implement taos_query_l

This commit is contained in:
Xiaoyu Wang 2021-12-24 01:39:46 -05:00
parent 1d68b746e6
commit e7ea350872
8 changed files with 127 additions and 87 deletions

View File

@ -44,7 +44,9 @@ typedef struct SParseContext {
*/ */
int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery); int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery);
bool qIsDclQuery(const SQueryNode* pQuery); bool qIsDdlQuery(const SQueryNode* pQuery);
void qDestoryQuery(SQueryNode* pQuery);
/** /**
* Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that * Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that

View File

@ -139,7 +139,7 @@ struct SQueryNode;
/** /**
* Create the physical plan for the query, according to the AST. * Create the physical plan for the query, according to the AST.
*/ */
int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag); int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag);
// Set datasource of this subplan, multiple calls may be made to a subplan. // Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule // @subplan subplan to be schedule

View File

@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries( target_link_libraries(
taos taos
INTERFACE api INTERFACE api
PRIVATE os util common transport parser catalog function qcom PRIVATE os util common transport parser planner catalog scheduler function qcom
) )
ADD_SUBDIRECTORY(test) ADD_SUBDIRECTORY(test)

View File

@ -121,6 +121,7 @@ typedef struct SRequestObj {
STscObj *pTscObj; STscObj *pTscObj;
SQueryExecMetric metric; SQueryExecMetric metric;
char *sqlstr; // sql string char *sqlstr; // sql string
int32_t sqlLen;
SRequestSendRecvBody body; SRequestSendRecvBody body;
int64_t self; int64_t self;
char *msgBuf; char *msgBuf;

View File

@ -9,6 +9,17 @@
#include "tpagedfile.h" #include "tpagedfile.h"
#include "tref.h" #include "tref.h"
#include "parser.h" #include "parser.h"
#include "planner.h"
#include "scheduler.h"
#define CHECK_CODE_GOTO(expr, lable) \
do { \
int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
terrno = code; \
goto lable; \
} \
} while (0)
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody); static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody);
@ -113,63 +124,57 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, *pInst); return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, *pInst);
} }
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
STscObj *pTscObj = (STscObj *)taos; *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
if (sqlLen > (size_t) tsMaxSQLStringLen) { if (*pRequest == NULL) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL;
}
nPrintTsc("%s", sql)
SRequestObj* pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
if (pRequest == NULL) {
tscError("failed to malloc sqlObj"); tscError("failed to malloc sqlObj");
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
} }
pRequest->sqlstr = malloc(sqlLen + 1); (*pRequest)->sqlstr = malloc(sqlLen + 1);
if (pRequest->sqlstr == NULL) { if ((*pRequest)->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to prepare sql string buffer", pRequest->self); tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self);
(*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
pRequest->msgBuf = strdup("failed to prepare sql string buffer"); return TSDB_CODE_TSC_OUT_OF_MEMORY;
terrno = pRequest->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pRequest;
} }
strntolower(pRequest->sqlstr, sql, (int32_t)sqlLen); strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
pRequest->sqlstr[sqlLen] = 0; (*pRequest)->sqlstr[sqlLen] = 0;
(*pRequest)->sqlLen = sqlLen;
tscDebugL("0x%"PRIx64" SQL: %s", pRequest->requestId, pRequest->sqlstr); tscDebugL("0x%"PRIx64" SQL: %s", (*pRequest)->requestId, (*pRequest)->sqlstr);
return TSDB_CODE_SUCCESS;
}
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
SParseContext cxt = { SParseContext cxt = {
.ctx = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj)}, .ctx = {.requestId = pRequest->requestId, .acctId = pRequest->pTscObj->acctId, .db = getConnectionDB(pRequest->pTscObj)},
.pSql = pRequest->sqlstr, .pSql = pRequest->sqlstr,
.sqlLen = sqlLen, .sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf, .pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
}; };
SQueryNode* pQuery = NULL; int32_t code = qParseQuerySql(&cxt, pQuery);
int32_t code = qParseQuerySql(&cxt, &pQuery); tfree(cxt.ctx.db);
if (qIsDclQuery(pQuery)) { return code;
}
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery; SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
pRequest->type = pDcl->msgType; pRequest->type = pDcl->msgType;
pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen}; pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen};
SRequestMsgBody body = buildRequestMsgImpl(pRequest); SRequestMsgBody body = buildRequestMsgImpl(pRequest);
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; SEpSet* pEpSet = &pRequest->pTscObj->pAppInfo->mgmtEp.epSet;
if (pDcl->msgType == TSDB_MSG_TYPE_CREATE_TABLE) { if (pDcl->msgType == TSDB_MSG_TYPE_CREATE_TABLE) {
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
char buf[12] = {0}; char buf[12] = {0};
sprintf(buf, "%d", pTscObj->pAppInfo->clusterId); sprintf(buf, "%d", pRequest->pTscObj->pAppInfo->clusterId);
code = catalogGetHandle(buf, &pCatalog); int32_t code = catalogGetHandle(buf, &pCatalog);
if (code != 0) { if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code; return code;
return pRequest;
} }
SCreateTableMsg* pMsg = body.msgInfo.pMsg; SCreateTableMsg* pMsg = body.msgInfo.pMsg;
@ -181,7 +186,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
tNameGetFullDbName(&t, db); tNameGetFullDbName(&t, db);
SVgroupInfo info = {0}; SVgroupInfo info = {0};
catalogGetTableHashVgroup(pCatalog, pTscObj->pTransporter, pEpSet, db, tNameGetTableName(&t), &info); catalogGetTableHashVgroup(pCatalog, pRequest->pTscObj->pTransporter, pEpSet, db, tNameGetTableName(&t), &info);
int64_t transporterId = 0; int64_t transporterId = 0;
SEpSet ep = {0}; SEpSet ep = {0};
@ -192,23 +197,51 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i])); tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i]));
} }
sendMsgToServer(pTscObj->pTransporter, &ep, &body, &transporterId); sendMsgToServer(pRequest->pTscObj->pTransporter, &ep, &body, &transporterId);
} else { } else {
int64_t transporterId = 0; int64_t transporterId = 0;
sendMsgToServer(pTscObj->pTransporter, pEpSet, &body, &transporterId); sendMsgToServer(pRequest->pTscObj->pTransporter, pEpSet, &body, &transporterId);
} }
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
destroyRequestMsgBody(&body); destroyRequestMsgBody(&body);
return TSDB_CODE_SUCCESS;
}
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
}
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
STscObj *pTscObj = (STscObj *)taos;
if (sqlLen > (size_t) tsMaxSQLStringLen) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL;
} }
tfree(cxt.ctx.db); nPrintTsc("%s", sql)
if (code != TSDB_CODE_SUCCESS) { SRequestObj* pRequest = NULL;
pRequest->code = code; SQueryNode* pQuery = NULL;
return pRequest; SQueryDag* pDag = NULL;
void* pJob = NULL;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
if (qIsDdlQuery(pQuery)) {
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
goto _return;
} }
CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return);
_return:
qDestoryQuery(pQuery);
qDestroyQueryDag(pDag);
if (NULL != pRequest) {
pRequest->code = terrno;
}
return pRequest; return pRequest;
} }

View File

@ -31,7 +31,7 @@ bool isInsertSql(const char* pStr, size_t length) {
} while (1); } while (1);
} }
bool qIsDclQuery(const SQueryNode* pQuery) { bool qIsDdlQuery(const SQueryNode* pQuery) {
return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type; return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type;
} }
@ -227,3 +227,7 @@ void qParserClearupMetaRequestInfo(SCatalogReq* pMetaReq) {
taosArrayDestroy(pMetaReq->pTableName); taosArrayDestroy(pMetaReq->pTableName);
taosArrayDestroy(pMetaReq->pUdf); taosArrayDestroy(pMetaReq->pUdf);
} }
void qDestoryQuery(SQueryNode* pQuery) {
// todo
}

View File

@ -24,7 +24,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
// todo // todo
} }
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SEpSet* pEpSet, struct SQueryDag** pDag) { int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag) {
SQueryPlanNode* logicPlan; SQueryPlanNode* logicPlan;
int32_t code = createQueryPlan(pNode, &logicPlan); int32_t code = createQueryPlan(pNode, &logicPlan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {

View File

@ -60,7 +60,7 @@ protected:
return code; return code;
} }
SQueryDag* dag = nullptr; SQueryDag* dag = nullptr;
code = qCreateQueryDag(query, nullptr, &dag); code = qCreateQueryDag(query, &dag);
dag_.reset(dag); dag_.reset(dag);
return code; return code;
} }