From e7ea350872a16f60bf7a605e7d2d12b29e332920 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 24 Dec 2021 01:39:46 -0500 Subject: [PATCH] TD-12506 implement taos_query_l --- include/libs/parser/parser.h | 4 +- include/libs/planner/planner.h | 2 +- source/client/CMakeLists.txt | 2 +- source/client/inc/clientInt.h | 1 + source/client/src/clientImpl.c | 195 +++++++++++++--------- source/libs/parser/src/parser.c | 6 +- source/libs/planner/src/planner.c | 2 +- source/libs/planner/test/phyPlanTests.cpp | 2 +- 8 files changed, 127 insertions(+), 87 deletions(-) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 7834bc6913..f2f3fcd49b 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -44,7 +44,9 @@ typedef struct SParseContext { */ int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery); -bool qIsDclQuery(const SQueryNode* pQuery); +bool qIsDdlQuery(const SQueryNode* pQuery); + +void qDestoryQuery(SQueryNode* pQuery); /** * Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index d4469be5e3..146d01faa7 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -139,7 +139,7 @@ struct SQueryNode; /** * Create the physical plan for the query, according to the AST. */ -int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag); +int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag); // Set datasource of this subplan, multiple calls may be made to a subplan. // @subplan subplan to be schedule diff --git a/source/client/CMakeLists.txt b/source/client/CMakeLists.txt index c78bf02cbd..ad6a93415c 100644 --- a/source/client/CMakeLists.txt +++ b/source/client/CMakeLists.txt @@ -8,7 +8,7 @@ target_include_directories( target_link_libraries( taos INTERFACE api - PRIVATE os util common transport parser catalog function qcom + PRIVATE os util common transport parser planner catalog scheduler function qcom ) ADD_SUBDIRECTORY(test) \ No newline at end of file diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index a34250ccab..e68b989b50 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -121,6 +121,7 @@ typedef struct SRequestObj { STscObj *pTscObj; SQueryExecMetric metric; char *sqlstr; // sql string + int32_t sqlLen; SRequestSendRecvBody body; int64_t self; char *msgBuf; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 992d93f39b..3236d1308a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -9,6 +9,17 @@ #include "tpagedfile.h" #include "tref.h" #include "parser.h" +#include "planner.h" +#include "scheduler.h" + +#define CHECK_CODE_GOTO(expr, lable) \ + do { \ + int32_t code = expr; \ + if (TSDB_CODE_SUCCESS != code) { \ + terrno = code; \ + goto lable; \ + } \ + } while (0) static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody); @@ -113,6 +124,94 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, *pInst); } +int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) { + *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT); + if (*pRequest == NULL) { + tscError("failed to malloc sqlObj"); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + (*pRequest)->sqlstr = malloc(sqlLen + 1); + if ((*pRequest)->sqlstr == NULL) { + tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self); + (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer"); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen); + (*pRequest)->sqlstr[sqlLen] = 0; + (*pRequest)->sqlLen = sqlLen; + + tscDebugL("0x%"PRIx64" SQL: %s", (*pRequest)->requestId, (*pRequest)->sqlstr); + return TSDB_CODE_SUCCESS; +} + +int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) { + SParseContext cxt = { + .ctx = {.requestId = pRequest->requestId, .acctId = pRequest->pTscObj->acctId, .db = getConnectionDB(pRequest->pTscObj)}, + .pSql = pRequest->sqlstr, + .sqlLen = pRequest->sqlLen, + .pMsg = pRequest->msgBuf, + .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE + }; + int32_t code = qParseQuerySql(&cxt, pQuery); + tfree(cxt.ctx.db); + return code; +} + +int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { + SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery; + pRequest->type = pDcl->msgType; + pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen}; + + SRequestMsgBody body = buildRequestMsgImpl(pRequest); + SEpSet* pEpSet = &pRequest->pTscObj->pAppInfo->mgmtEp.epSet; + + if (pDcl->msgType == TSDB_MSG_TYPE_CREATE_TABLE) { + struct SCatalog* pCatalog = NULL; + + char buf[12] = {0}; + sprintf(buf, "%d", pRequest->pTscObj->pAppInfo->clusterId); + int32_t code = catalogGetHandle(buf, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + SCreateTableMsg* pMsg = body.msgInfo.pMsg; + + SName t = {0}; + tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE); + + char db[TSDB_DB_NAME_LEN + TS_PATH_DELIMITER_LEN + TSDB_ACCT_ID_LEN] = {0}; + tNameGetFullDbName(&t, db); + + SVgroupInfo info = {0}; + catalogGetTableHashVgroup(pCatalog, pRequest->pTscObj->pTransporter, pEpSet, db, tNameGetTableName(&t), &info); + + int64_t transporterId = 0; + SEpSet ep = {0}; + ep.inUse = info.inUse; + ep.numOfEps = info.numOfEps; + for(int32_t i = 0; i < ep.numOfEps; ++i) { + ep.port[i] = info.epAddr[i].port; + tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i])); + } + + sendMsgToServer(pRequest->pTscObj->pTransporter, &ep, &body, &transporterId); + } else { + int64_t transporterId = 0; + sendMsgToServer(pRequest->pTscObj->pTransporter, pEpSet, &body, &transporterId); + } + + tsem_wait(&pRequest->body.rspSem); + destroyRequestMsgBody(&body); + return TSDB_CODE_SUCCESS; +} + +int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { + return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); +} + TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { STscObj *pTscObj = (STscObj *)taos; if (sqlLen > (size_t) tsMaxSQLStringLen) { @@ -123,92 +222,26 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { nPrintTsc("%s", sql) - SRequestObj* pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT); - if (pRequest == NULL) { - tscError("failed to malloc sqlObj"); - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - return NULL; - } - - pRequest->sqlstr = malloc(sqlLen + 1); - if (pRequest->sqlstr == NULL) { - tscError("0x%"PRIx64" failed to prepare sql string buffer", pRequest->self); - - pRequest->msgBuf = strdup("failed to prepare sql string buffer"); - terrno = pRequest->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - return pRequest; - } - - strntolower(pRequest->sqlstr, sql, (int32_t)sqlLen); - pRequest->sqlstr[sqlLen] = 0; - - tscDebugL("0x%"PRIx64" SQL: %s", pRequest->requestId, pRequest->sqlstr); - - SParseContext cxt = { - .ctx = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj)}, - .pSql = pRequest->sqlstr, - .sqlLen = sqlLen, - .pMsg = pRequest->msgBuf, - .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE - }; + SRequestObj* pRequest = NULL; SQueryNode* pQuery = NULL; - int32_t code = qParseQuerySql(&cxt, &pQuery); - if (qIsDclQuery(pQuery)) { - SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery; - pRequest->type = pDcl->msgType; - pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen}; + SQueryDag* pDag = NULL; + void* pJob = NULL; - SRequestMsgBody body = buildRequestMsgImpl(pRequest); - SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; - - if (pDcl->msgType == TSDB_MSG_TYPE_CREATE_TABLE) { - struct SCatalog* pCatalog = NULL; - - char buf[12] = {0}; - sprintf(buf, "%d", pTscObj->pAppInfo->clusterId); - code = catalogGetHandle(buf, &pCatalog); - if (code != 0) { - pRequest->code = code; - return pRequest; - } - - SCreateTableMsg* pMsg = body.msgInfo.pMsg; - - SName t = {0}; - tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE); - - char db[TSDB_DB_NAME_LEN + TS_PATH_DELIMITER_LEN + TSDB_ACCT_ID_LEN] = {0}; - tNameGetFullDbName(&t, db); - - SVgroupInfo info = {0}; - catalogGetTableHashVgroup(pCatalog, pTscObj->pTransporter, pEpSet, db, tNameGetTableName(&t), &info); - - int64_t transporterId = 0; - SEpSet ep = {0}; - ep.inUse = info.inUse; - ep.numOfEps = info.numOfEps; - for(int32_t i = 0; i < ep.numOfEps; ++i) { - ep.port[i] = info.epAddr[i].port; - tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i])); - } - - sendMsgToServer(pTscObj->pTransporter, &ep, &body, &transporterId); - } else { - int64_t transporterId = 0; - sendMsgToServer(pTscObj->pTransporter, pEpSet, &body, &transporterId); - } - - tsem_wait(&pRequest->body.rspSem); - destroyRequestMsgBody(&body); + CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); + CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return); + if (qIsDdlQuery(pQuery)) { + CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return); + goto _return; } + CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return); + CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return); - tfree(cxt.ctx.db); - - if (code != TSDB_CODE_SUCCESS) { - pRequest->code = code; - return pRequest; +_return: + qDestoryQuery(pQuery); + qDestroyQueryDag(pDag); + if (NULL != pRequest) { + pRequest->code = terrno; } - return pRequest; } diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 9922642df3..0f77135ec1 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -31,7 +31,7 @@ bool isInsertSql(const char* pStr, size_t length) { } while (1); } -bool qIsDclQuery(const SQueryNode* pQuery) { +bool qIsDdlQuery(const SQueryNode* pQuery) { return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type; } @@ -227,3 +227,7 @@ void qParserClearupMetaRequestInfo(SCatalogReq* pMetaReq) { taosArrayDestroy(pMetaReq->pTableName); taosArrayDestroy(pMetaReq->pUdf); } + +void qDestoryQuery(SQueryNode* pQuery) { + // todo +} diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index e8523249e4..6381771400 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -24,7 +24,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) { // todo } -int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SEpSet* pEpSet, struct SQueryDag** pDag) { +int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag) { SQueryPlanNode* logicPlan; int32_t code = createQueryPlan(pNode, &logicPlan); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/planner/test/phyPlanTests.cpp b/source/libs/planner/test/phyPlanTests.cpp index f14fd50e03..d0f6fd5adf 100644 --- a/source/libs/planner/test/phyPlanTests.cpp +++ b/source/libs/planner/test/phyPlanTests.cpp @@ -60,7 +60,7 @@ protected: return code; } SQueryDag* dag = nullptr; - code = qCreateQueryDag(query, nullptr, &dag); + code = qCreateQueryDag(query, &dag); dag_.reset(dag); return code; }