From b9927eae8516b2a2eb0c62ed13c902aabcd5fe44 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Dec 2021 17:18:39 +0800 Subject: [PATCH] [td-11818] support async exec task. --- include/libs/qcom/query.h | 22 +++++++++++--- source/client/inc/clientInt.h | 2 +- source/client/src/clientImpl.c | 3 +- source/client/src/clientMain.c | 13 ++------ source/client/src/tscEnv.c | 24 ++++----------- source/client/test/clientTests.cpp | 15 +++++++++ source/libs/parser/inc/parserUtil.h | 1 + source/libs/parser/src/parserUtil.c | 9 +++++- source/libs/qcom/CMakeLists.txt | 2 ++ source/libs/qcom/src/queryUtil.c | 47 ++++++++++++++++++++++++++++- 10 files changed, 100 insertions(+), 38 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 254d572149..70dcc94280 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -81,16 +81,28 @@ typedef struct STableMetaOutput { STableMeta *tbMeta; } STableMetaOutput; +typedef int32_t __async_exec_fn_t(void* param); + bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); +int32_t initTaskQueue(); +int32_t cleanupTaskQueue(); + +/** + * + * @param execFn The asynchronously execution function + * @param execParam The parameters of the execFn + * @param code The response code during execution the execFn + * @return + */ +int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); + +SSchema* tGetTbnameColumnSchema(); +void msgInit(); + extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize); -SSchema* tGetTbnameColumnSchema(); -extern void msgInit(); - -extern int32_t qDebugFlag; - #define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0) #define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0) #define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index a1a9155d16..c1fd113da7 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -154,7 +154,7 @@ void destroyRequest(SRequestObj* pRequest); void taos_init_imp(void); int taos_options_imp(TSDB_OPTION option, const char *str); -void* openTransporter(const char *user, const char *auth); +void* openTransporter(const char *user, const char *auth, int32_t numOfThreads); void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); void initMsgHandleFp(); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index bab1f9cc9b..0acb74d4a9 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -102,9 +102,8 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); if (pInst == NULL) { SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); - p->mgmtEp = epSet; - p->pTransporter = openTransporter(user, secretEncrypt); + p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); pInst = &p; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 8a75799ed7..dce04a869b 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -1,16 +1,11 @@ +#include "os.h" #include "clientInt.h" #include "clientLog.h" -#include "os.h" +#include "query.h" #include "taosmsg.h" -#include "tcache.h" -#include "tconfig.h" #include "tglobal.h" -#include "tnote.h" #include "tref.h" #include "trpc.h" -#include "tsched.h" -#include "ttime.h" -#include "ttimezone.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 @@ -44,9 +39,7 @@ void taos_cleanup(void) { tscReqRef = -1; taosCloseRef(id); - void* p = tscQhandle; - tscQhandle = NULL; - taosCleanUpScheduler(p); + cleanupTaskQueue(); id = tscConnRef; tscConnRef = -1; diff --git a/source/client/src/tscEnv.c b/source/client/src/tscEnv.c index 182e330df7..b7a622c2e8 100644 --- a/source/client/src/tscEnv.c +++ b/source/client/src/tscEnv.c @@ -13,17 +13,17 @@ * along with this program. If not, see . */ -#include "clientInt.h" -#include "clientLog.h" #include "os.h" #include "taosmsg.h" +#include "query.h" +#include "clientInt.h" +#include "clientLog.h" #include "tcache.h" #include "tconfig.h" #include "tglobal.h" #include "tnote.h" #include "tref.h" #include "trpc.h" -#include "tsched.h" #include "ttime.h" #include "ttimezone.h" @@ -33,10 +33,8 @@ SAppInfo appInfo; int32_t tscReqRef = -1; int32_t tscConnRef = -1; -void *tscQhandle = NULL; static pthread_once_t tscinit = PTHREAD_ONCE_INIT; -int32_t tsNumOfThreads = 1; volatile int32_t tscInitRes = 0; static void registerRequest(SRequestObj* pRequest) { @@ -98,12 +96,12 @@ void closeTransporter(STscObj* pTscObj) { } // TODO refactor -void* openTransporter(const char *user, const char *auth) { +void* openTransporter(const char *user, const char *auth, int32_t numOfThread) { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = "TSC"; - rpcInit.numOfThreads = tsNumOfThreads; + rpcInit.numOfThreads = numOfThread; rpcInit.cfp = processMsgFromServer; rpcInit.sessions = tsMaxConnections; rpcInit.connType = TAOS_CONN_CLIENT; @@ -229,18 +227,8 @@ void taos_init_imp(void) { taosSetCoreDump(true); - double factor = 4.0; - int32_t numOfThreads = MAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2); + initTaskQueue(); - int32_t queueSize = tsMaxConnections * 2; - tscQhandle = taosInitScheduler(queueSize, numOfThreads, "tsc"); - if (NULL == tscQhandle) { - tscError("failed to init task queue"); - tscInitRes = -1; - return; - } - - tscDebug("client task queue is initialized, numOfThreads: %d", numOfThreads); tscConnRef = taosOpenRef(200, destroyTscObj); tscReqRef = taosOpenRef(40960, doDestroyRequest); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 1c8cd6a4b6..2e89bb45fa 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -148,3 +148,18 @@ TEST(testCase, create_db_Test) { taos_close(pConn); } + +TEST(testCase, create_stable_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)"); + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == NULL); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_close(pConn); +} diff --git a/source/libs/parser/inc/parserUtil.h b/source/libs/parser/inc/parserUtil.h index c588a34a40..ad43b9876a 100644 --- a/source/libs/parser/inc/parserUtil.h +++ b/source/libs/parser/inc/parserUtil.h @@ -67,6 +67,7 @@ int32_t getExprFunctionId(SExprInfo *pExprInfo); STableMeta* tableMetaDup(const STableMeta* pTableMeta); bool isDclSqlStatement(SSqlInfo* pSqlInfo); +bool isDdlSqlStatement(SSqlInfo* pSqlInfo); #ifdef __cplusplus } diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index b72bc06324..13434da057 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -1613,7 +1613,14 @@ uint32_t convertRelationalOperator(SToken *pToken) { } bool isDclSqlStatement(SSqlInfo* pSqlInfo) { - return (pSqlInfo->type != TSDB_SQL_SELECT); + int32_t type = pSqlInfo->type; + return (type == TSDB_SQL_CREATE_USER || type == TSDB_SQL_CREATE_ACCT || type == TSDB_SQL_DROP_USER || + type == TSDB_SQL_DROP_ACCT || type == TSDB_SQL_SHOW); +} + +bool isDdlSqlStatement(SSqlInfo* pSqlInfo) { + int32_t type = pSqlInfo->type; + return (type == TSDB_SQL_CREATE_TABLE || type == TSDB_SQL_CREATE_DB); } #if 0 diff --git a/source/libs/qcom/CMakeLists.txt b/source/libs/qcom/CMakeLists.txt index 41cf1826bc..8e09f3d97a 100644 --- a/source/libs/qcom/CMakeLists.txt +++ b/source/libs/qcom/CMakeLists.txt @@ -10,3 +10,5 @@ target_link_libraries( qcom PRIVATE os util transport ) + +ADD_SUBDIRECTORY(test) diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 2a13b708ec..829f426c9d 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -1,5 +1,8 @@ #include "os.h" #include "taosmsg.h" +#include "query.h" +#include "tglobal.h" +#include "tsched.h" #define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS) #define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS) @@ -75,4 +78,46 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag } return true; -} \ No newline at end of file +} + +static void* pTaskQueue = NULL; + +int32_t initTaskQueue() { + double factor = 4.0; + int32_t numOfThreads = MAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2); + + int32_t queueSize = tsMaxConnections * 2; + pTaskQueue = taosInitScheduler(queueSize, numOfThreads, "tsc"); + if (NULL == pTaskQueue) { + qError("failed to init task queue"); + return -1; + } + + qDebug("task queue is initialized, numOfThreads: %d", numOfThreads); +} + +int32_t cleanupTaskQueue() { + taosCleanUpScheduler(pTaskQueue); +} + +static void execHelper(struct SSchedMsg* pSchedMsg) { + assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL); + + __async_exec_fn_t* execFn = (__async_exec_fn_t*) pSchedMsg->ahandle; + int32_t code = execFn(pSchedMsg->thandle); + if (code != 0 && pSchedMsg->msg != NULL) { + *(int32_t*) pSchedMsg->msg = code; + } +} + +int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) { + assert(execFn != NULL); + + SSchedMsg schedMsg = {0}; + schedMsg.fp = execHelper; + schedMsg.ahandle = execFn; + schedMsg.thandle = execParam; + schedMsg.msg = code; + + taosScheduleTask(pTaskQueue, &schedMsg); +}