From 0d0d4652bb40e2d769d9173124b13fed5a7f2946 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 24 Dec 2021 23:02:30 +0800 Subject: [PATCH 1/3] [td-11818] Add async send callback mechanism. --- source/client/inc/clientInt.h | 42 +++++--- source/client/src/clientEnv.c | 31 +++--- source/client/src/clientImpl.c | 150 ++++++++++++++------------- source/client/src/clientMain.c | 14 +-- source/client/src/clientMsgHandler.c | 128 ++++++++++++----------- source/client/test/clientTests.cpp | 115 +++++++++++--------- source/libs/parser/inc/astToMsg.h | 2 +- source/libs/parser/src/astToMsg.c | 8 +- source/libs/parser/src/astValidate.c | 2 +- 9 files changed, 268 insertions(+), 224 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index a34250ccab..550d8c3ecf 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -37,7 +37,7 @@ typedef struct SQueryExecMetric { int64_t rsp; // receive response from server } SQueryExecMetric; -typedef struct SInstanceActivity { +typedef struct SInstanceSummary { uint64_t numOfInsertsReq; uint64_t numOfInsertRows; uint64_t insertElapsedTime; @@ -48,7 +48,7 @@ typedef struct SInstanceActivity { uint64_t numOfSlowQueries; uint64_t totalRequests; uint64_t currentRequests; // the number of SRequestObj -} SInstanceActivity; +} SInstanceSummary; typedef struct SHeartBeatInfo { void *pTimer; // timer, used to send request msg to mnode @@ -57,7 +57,7 @@ typedef struct SHeartBeatInfo { typedef struct SAppInstInfo { int64_t numOfConns; SCorEpSet mgmtEp; - SInstanceActivity summary; + SInstanceSummary summary; SList *pConnList; // STscObj linked list uint32_t clusterId; void *pTransporter; @@ -100,16 +100,16 @@ typedef struct SReqResultInfo { uint32_t current; } SReqResultInfo; -typedef struct SReqMsg { - void *pMsg; +typedef struct SDataBuf { + void *pData; uint32_t len; -} SReqMsgInfo; +} SDataBuf; typedef struct SRequestSendRecvBody { tsem_t rspSem; // not used now void* fp; int64_t execId; // showId/queryId - SReqMsgInfo requestMsg; + SDataBuf requestMsg; SReqResultInfo resInfo; } SRequestSendRecvBody; @@ -121,26 +121,38 @@ typedef struct SRequestObj { STscObj *pTscObj; SQueryExecMetric metric; char *sqlstr; // sql string - SRequestSendRecvBody body; + SRequestSendRecvBody body; int64_t self; char *msgBuf; - int32_t code; void *pInfo; // sql parse info, generated by parser module + int32_t code; } SRequestObj; typedef struct SRequestMsgBody { int32_t msgType; - SReqMsgInfo msgInfo; + SDataBuf msgInfo; uint64_t requestId; uint64_t requestObjRefId; } SRequestMsgBody; -extern SAppInfo appInfo; -extern int32_t tscReqRef; -extern int32_t tscConnRef; +typedef int (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); -SRequestMsgBody buildRequestMsgImpl(SRequestObj *pRequest); -extern int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen); +typedef struct SMsgSendInfo { + __async_send_cb_fn_t fp; + void *param; + uint64_t requestId; + uint64_t requestObjRefId; + int32_t msgType; + SDataBuf msgInfo; +} SMsgSendInfo; + +extern SAppInfo appInfo; +extern int32_t msgObjRefPool; +extern int32_t clientConnRefPool; + +SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest); +int genericExecCallback(void* param, const SDataBuf* pMsg, int32_t code); +extern int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(void* param, const SDataBuf* pMsg, int32_t code); int taos_init(); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index b54f7fedd7..dca4d85ad6 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -13,10 +13,10 @@ * along with this program. If not, see . */ -#include +#include "os.h" +#include "catalog.h" #include "clientInt.h" #include "clientLog.h" -#include "os.h" #include "query.h" #include "taosmsg.h" #include "tcache.h" @@ -32,26 +32,26 @@ #define TSC_VAR_RELEASED 0 SAppInfo appInfo; -int32_t tscReqRef = -1; -int32_t tscConnRef = -1; +int32_t msgObjRefPool = -1; +int32_t clientConnRefPool = -1; static pthread_once_t tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; static void registerRequest(SRequestObj* pRequest) { - STscObj *pTscObj = (STscObj *)taosAcquireRef(tscConnRef, pRequest->pTscObj->id); + STscObj *pTscObj = (STscObj *)taosAcquireRef(clientConnRefPool, pRequest->pTscObj->id); assert(pTscObj != NULL); // connection has been released already, abort creating request. - pRequest->self = taosAddRef(tscReqRef, pRequest); + pRequest->self = taosAddRef(msgObjRefPool, pRequest); int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1); if (pTscObj->pAppInfo) { - SInstanceActivity *pActivity = &pTscObj->pAppInfo->summary; + SInstanceSummary *pSummary = &pTscObj->pAppInfo->summary; - int32_t total = atomic_add_fetch_32(&pActivity->totalRequests, 1); - int32_t currentInst = atomic_add_fetch_32(&pActivity->currentRequests, 1); + int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1); + int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1); tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self, pRequest->pTscObj->id, num, currentInst, total); } @@ -61,13 +61,13 @@ static void deregisterRequest(SRequestObj* pRequest) { assert(pRequest != NULL); STscObj* pTscObj = pRequest->pTscObj; - SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary; + SInstanceSummary* pActivity = &pTscObj->pAppInfo->summary; int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); tscDebug("0x%"PRIx64" free Request from connObj: 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst); - taosReleaseRef(tscConnRef, pTscObj->id); + taosReleaseRef(clientConnRefPool, pTscObj->id); } static void tscInitLogFile() { @@ -150,7 +150,7 @@ void* createTscObj(const char* user, const char* auth, const char *db, SAppInstI } pthread_mutex_init(&pObj->mutex, NULL); - pObj->id = taosAddRef(tscConnRef, pObj); + pObj->id = taosAddRef(clientConnRefPool, pObj); tscDebug("connObj created, 0x%"PRIx64, pObj->id); return pObj; @@ -173,7 +173,6 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty pRequest->type = type; pRequest->pTscObj = pObj; pRequest->body.fp = fp; -// pRequest->body.requestMsg. = param; pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); tsem_init(&pRequest->body.rspSem, 0, 0); @@ -202,7 +201,7 @@ void destroyRequest(SRequestObj* pRequest) { return; } - taosReleaseRef(tscReqRef, pRequest->self); + taosReleaseRef(msgObjRefPool, pRequest->self); } void taos_init_imp(void) { @@ -238,8 +237,8 @@ void taos_init_imp(void) { initTaskQueue(); - tscConnRef = taosOpenRef(200, destroyTscObj); - tscReqRef = taosOpenRef(40960, doDestroyRequest); + clientConnRefPool = taosOpenRef(200, destroyTscObj); + msgObjRefPool = taosOpenRef(40960, doDestroyRequest); taosGetAppName(appInfo.appName, NULL); appInfo.pid = taosGetPId(); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f00e7bd578..7cd8e9d359 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -11,10 +11,10 @@ #include "parser.h" static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); -static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody); -static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody); +static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest); +static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); -static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId); +static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); static bool stringLengthCheck(const char* str, size_t maxsize) { if (str == NULL) { @@ -163,10 +163,11 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { int32_t code = qParseQuerySql(&cxt, &pQuery); if (qIsDclQuery(pQuery)) { SDclStmtInfo* pDcl = (SDclStmtInfo*) pQuery; - pRequest->type = pDcl->msgType; - pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen}; - SRequestMsgBody body = buildRequestMsgImpl(pRequest); + pRequest->type = pDcl->msgType; + pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen}; + + SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest); SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; if (pDcl->msgType == TSDB_MSG_TYPE_CREATE_TABLE) { @@ -180,7 +181,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { return pRequest; } - SCreateTableMsg* pMsg = body.msgInfo.pMsg; + SCreateTableMsg* pMsg = body->msgInfo.pData; SName t = {0}; tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE); @@ -200,14 +201,14 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i])); } - sendMsgToServer(pTscObj->pTransporter, &ep, &body, &transporterId); + sendMsgToServer(pTscObj->pTransporter, &ep, &transporterId, body); } else { int64_t transporterId = 0; - sendMsgToServer(pTscObj->pTransporter, pEpSet, &body, &transporterId); + sendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body); } tsem_wait(&pRequest->body.rspSem); - destroyRequestMsgBody(&body); + destroySendMsgInfo(body); } tfree(cxt.ctx.db); @@ -270,14 +271,13 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con return NULL; } - SRequestMsgBody body = {0}; - buildConnectMsg(pRequest, &body); + SMsgSendInfo* body = buildConnectMsg(pRequest); int64_t transporterId = 0; - sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); + sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); - destroyRequestMsgBody(&body); + destroySendMsgInfo(body); if (pRequest->code != TSDB_CODE_SUCCESS) { const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno); @@ -294,15 +294,25 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con return pTscObj; } -static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { - pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT; - pMsgBody->msgInfo.len = sizeof(SConnectMsg); - pMsgBody->requestObjRefId = pRequest->self; +static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) { + SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + if (pMsgSendInfo == NULL) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + + pMsgSendInfo->msgType = TSDB_MSG_TYPE_CONNECT; + pMsgSendInfo->msgInfo.len = sizeof(SConnectMsg); + pMsgSendInfo->requestObjRefId = pRequest->self; + pMsgSendInfo->requestId = pRequest->requestId; + pMsgSendInfo->fp = handleRequestRspFp[pMsgSendInfo->msgType]; + pMsgSendInfo->param = pRequest; SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg)); if (pConnect == NULL) { + tfree(pMsgSendInfo); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - return -1; + return NULL; } STscObj *pObj = pRequest->pTscObj; @@ -315,84 +325,78 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) pConnect->startTime = htobe64(appInfo.startTime); tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); - pMsgBody->msgInfo.pMsg = pConnect; - return 0; + pMsgSendInfo->msgInfo.pData = pConnect; + return pMsgSendInfo; } -static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody) { +static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { assert(pMsgBody != NULL); - tfree(pMsgBody->msgInfo.pMsg); + tfree(pMsgBody->msgInfo.pData); + tfree(pMsgBody); } -int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId) { - char *pMsg = rpcMallocCont(pBody->msgInfo.len); +int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { + char *pMsg = rpcMallocCont(pInfo->msgInfo.len); if (NULL == pMsg) { - tscError("0x%"PRIx64" msg:%s malloc failed", pBody->requestId, taosMsg[pBody->msgType]); + tscError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, taosMsg[pInfo->msgType]); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return -1; } - memcpy(pMsg, pBody->msgInfo.pMsg, pBody->msgInfo.len); + memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len); SRpcMsg rpcMsg = { - .msgType = pBody->msgType, + .msgType = pInfo->msgType, .pCont = pMsg, - .contLen = pBody->msgInfo.len, - .ahandle = (void*) pBody->requestObjRefId, + .contLen = pInfo->msgInfo.len, + .ahandle = (void*) pInfo, .handle = NULL, .code = 0 }; + assert(pInfo->fp != NULL); + rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId); return TSDB_CODE_SUCCESS; } void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { - int64_t requestRefId = (int64_t)pMsg->ahandle; + SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle; + assert(pMsg->ahandle != NULL); - SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(tscReqRef, requestRefId); - if (pRequest == NULL) { - rpcFreeCont(pMsg->pCont); - return; - } + if (pSendInfo->requestObjRefId != 0) { + SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(msgObjRefPool, pSendInfo->requestObjRefId); + assert(pRequest->self == pSendInfo->requestObjRefId); - assert(pRequest->self == requestRefId); - pRequest->metric.rsp = taosGetTimestampMs(); + pRequest->metric.rsp = taosGetTimestampMs(); + pRequest->code = pMsg->code; - pRequest->code = pMsg->code; - - STscObj *pTscObj = pRequest->pTscObj; - if (pEpSet) { - if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) { - updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet); - } - } - - /* - * There is not response callback function for submit response. - * The actual inserted number of points is the first number. - */ - if (pMsg->code == TSDB_CODE_SUCCESS) { - tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%"PRId64 " ms", pRequest->requestId, taosMsg[pMsg->msgType], - tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start); - if (handleRequestRspFp[pRequest->type]) { - char *p = malloc(pMsg->contLen); - if (p == NULL) { - pRequest->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - terrno = pRequest->code; - } else { - memcpy(p, pMsg->pCont, pMsg->contLen); - pMsg->code = (*handleRequestRspFp[pRequest->type])(pRequest, p, pMsg->contLen); + STscObj *pTscObj = pRequest->pTscObj; + if (pEpSet) { + if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) { + updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet); } } - } else { - tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%"PRId64" ms", pRequest->requestId, taosMsg[pMsg->msgType], - tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start); + + /* + * There is not response callback function for submit response. + * The actual inserted number of points is the first number. + */ + if (pMsg->code == TSDB_CODE_SUCCESS) { + tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%" PRId64 " ms", pRequest->requestId, + taosMsg[pMsg->msgType], tstrerror(pMsg->code), pMsg->contLen, + pRequest->metric.rsp - pRequest->metric.start); + } else { + tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%" PRId64 " ms", pRequest->requestId, + taosMsg[pMsg->msgType], tstrerror(pMsg->code), pMsg->contLen, + pRequest->metric.rsp - pRequest->metric.start); + } + + taosReleaseRef(msgObjRefPool, pSendInfo->requestObjRefId); } - taosReleaseRef(tscReqRef, requestRefId); + SDataBuf buf = {.pData = pMsg->pCont, .len = pMsg->contLen}; + pSendInfo->fp(pSendInfo->param, &buf, pMsg->code); rpcFreeCont(pMsg->pCont); - - sem_post(&pRequest->body.rspSem); } TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) { @@ -429,14 +433,14 @@ void* doFetchRow(SRequestObj* pRequest) { if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { pRequest->type = TSDB_MSG_TYPE_SHOW_RETRIEVE; - SRequestMsgBody body = buildRequestMsgImpl(pRequest); + SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest); - int64_t transporterId = 0; - STscObj* pTscObj = pRequest->pTscObj; - sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); + int64_t transporterId = 0; + STscObj *pTscObj = pRequest->pTscObj; + sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); - destroyRequestMsgBody(&body); + destroySendMsgInfo(body); pResultInfo->current = 0; if (pResultInfo->numOfRows <= pResultInfo->current) { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index f50765d37a..44c78076d9 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -35,14 +35,14 @@ void taos_cleanup(void) { return; } - int32_t id = tscReqRef; - tscReqRef = -1; + int32_t id = msgObjRefPool; + msgObjRefPool = -1; taosCloseRef(id); cleanupTaskQueue(); - id = tscConnRef; - tscConnRef = -1; + id = clientConnRefPool; + clientConnRefPool = -1; taosCloseRef(id); rpcCleanup(); @@ -72,7 +72,7 @@ void taos_close(TAOS* taos) { STscObj *pTscObj = (STscObj *)taos; tscDebug("0x%"PRIx64" try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs); - taosRemoveRef(tscConnRef, pTscObj->id); + taosRemoveRef(clientConnRefPool, pTscObj->id); } int taos_errno(TAOS_RES *tres) { @@ -130,7 +130,7 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { return NULL; } - return taos_query_l(taos, sql, strlen(sql)); + return taos_query_l(taos, sql, (int32_t) strlen(sql)); } TAOS_ROW taos_fetch_row(TAOS_RES *pRes) { @@ -140,7 +140,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *pRes) { SRequestObj *pRequest = (SRequestObj *) pRes; if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || - pRequest->type == TSDB_SQL_INSERT) { + pRequest->type == TSDB_SQL_INSERT || pRequest->code != TSDB_CODE_SUCCESS) { return NULL; } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 548ea3d725..b18b3ecb51 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -18,45 +18,23 @@ #include "tname.h" #include "clientInt.h" #include "clientLog.h" -#include "tmsgtype.h" #include "trpc.h" -int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen); +int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(void*, const SDataBuf* pMsg, int32_t code); -int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { - pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT; - pMsgBody->msgInfo.len = sizeof(SConnectMsg); - pMsgBody->requestObjRefId = pRequest->self; - - SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg)); - if (pConnect == NULL) { - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - return -1; - } - - // TODO refactor full_name - char *db; // ugly code to move the space - - STscObj *pObj = pRequest->pTscObj; - pthread_mutex_lock(&pObj->mutex); - db = strstr(pObj->db, TS_PATH_DELIMITER); - - db = (db == NULL) ? pObj->db : db + 1; - tstrncpy(pConnect->db, db, sizeof(pConnect->db)); - pthread_mutex_unlock(&pObj->mutex); - - pConnect->pid = htonl(appInfo.pid); - pConnect->startTime = htobe64(appInfo.startTime); - tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); - - pMsgBody->msgInfo.pMsg = pConnect; +int genericExecCallback(void* param, const SDataBuf* pMsg, int32_t code) { + SRequestObj* pRequest = param; + pRequest->code = code; + sem_post(&pRequest->body.rspSem); return 0; } -int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { +int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { + SRequestObj* pRequest = param; + STscObj *pTscObj = pRequest->pTscObj; - SConnectRsp *pConnect = (SConnectRsp *)pMsg; + SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData; pConnect->acctId = htonl(pConnect->acctId); pConnect->connId = htonl(pConnect->connId); pConnect->clusterId = htonl(pConnect->clusterId); @@ -81,15 +59,19 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { pTscObj->pAppInfo->clusterId = pConnect->clusterId; atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - pRequest->body.resInfo.pRspMsg = pMsg; +// pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%d, totalConn:%"PRId64, pRequest->requestId, pConnect->clusterId, pTscObj->pAppInfo->numOfConns); + + sem_post(&pRequest->body.rspSem); return 0; } -static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { - pMsgBody->msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; - pMsgBody->msgInfo.len = sizeof(SRetrieveTableMsg); - pMsgBody->requestObjRefId = pRequest->self; +static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SMsgSendInfo* pMsgSendInfo) { + pMsgSendInfo->msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; + pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg); + pMsgSendInfo->requestObjRefId = pRequest->self; + pMsgSendInfo->param = pRequest; + pMsgSendInfo->fp = handleRequestRspFp[pMsgSendInfo->msgType]; SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg)); if (pRetrieveMsg == NULL) { @@ -97,29 +79,38 @@ static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMs } pRetrieveMsg->showId = htonl(pRequest->body.execId); - pMsgBody->msgInfo.pMsg = pRetrieveMsg; + pMsgSendInfo->msgInfo.pData = pRetrieveMsg; return TSDB_CODE_SUCCESS; } -SRequestMsgBody buildRequestMsgImpl(SRequestObj *pRequest) { +SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) { + SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + if (pRequest->type == TSDB_MSG_TYPE_SHOW_RETRIEVE) { - SRequestMsgBody body = {0}; - buildRetrieveMnodeMsg(pRequest, &body); - return body; + buildRetrieveMnodeMsg(pRequest, pMsgSendInfo); } else { assert(pRequest != NULL); - SRequestMsgBody body = { - .requestObjRefId = pRequest->self, - .msgInfo = pRequest->body.requestMsg, - .msgType = pRequest->type, - .requestId = pRequest->requestId, - }; - return body; + pMsgSendInfo->requestObjRefId = pRequest->self; + pMsgSendInfo->msgInfo = pRequest->body.requestMsg; + pMsgSendInfo->msgType = pRequest->type; + pMsgSendInfo->requestId = pRequest->requestId; + pMsgSendInfo->param = pRequest; + + pMsgSendInfo->fp = (handleRequestRspFp[pRequest->type] == NULL)? genericExecCallback:handleRequestRspFp[pRequest->type]; } + + return pMsgSendInfo; } -int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { - SShowRsp* pShow = (SShowRsp *)pMsg; +int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) { + SRequestObj* pRequest = param; + if (code != TSDB_CODE_SUCCESS) { + pRequest->code = code; + tsem_post(&pRequest->body.rspSem); + return code; + } + + SShowRsp* pShow = (SShowRsp *)pMsg->pData; pShow->showId = htonl(pShow->showId); STableMetaMsg *pMetaMsg = &(pShow->tableMeta); @@ -140,7 +131,7 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) pFields[i].bytes = pSchema[i].bytes; } - pRequest->body.resInfo.pRspMsg = pMsg; +// pRequest->body.resInfo.pRspMsg = pMsg->pData; SReqResultInfo* pResInfo = &pRequest->body.resInfo; pResInfo->fields = pFields; @@ -150,16 +141,18 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t)); pRequest->body.execId = pShow->showId; + tsem_post(&pRequest->body.rspSem); return 0; } -int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { - assert(msgLen >= sizeof(SRetrieveTableRsp)); +int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) { + assert(pMsg->len >= sizeof(SRetrieveTableRsp)); - tfree(pRequest->body.resInfo.pRspMsg); - pRequest->body.resInfo.pRspMsg = pMsg; + SRequestObj* pRequest = param; +// tfree(pRequest->body.resInfo.pRspMsg); +// pRequest->body.resInfo.pRspMsg = pMsg->pData; - SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg; + SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData; pRetrieve->numOfRows = htonl(pRetrieve->numOfRows); pRetrieve->precision = htons(pRetrieve->precision); @@ -172,29 +165,42 @@ int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows, pRetrieve->completed, pRequest->body.execId); + + tsem_post(&pRequest->body.rspSem); return 0; } -int32_t processCreateDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { +int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { // todo rsp with the vnode id list + SRequestObj* pRequest = param; + tsem_post(&pRequest->body.rspSem); } -int32_t processUseDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { - SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg; +int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { + SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg->pData; SName name = {0}; tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT|T_NAME_DB); char db[TSDB_DB_NAME_LEN] = {0}; tNameGetDbName(&name, db); + + SRequestObj* pRequest = param; setConnectionDB(pRequest->pTscObj, db); + + tsem_post(&pRequest->body.rspSem); + return 0; } -int32_t processCreateTableRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { +int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { assert(pMsg != NULL); + SRequestObj* pRequest = param; + tsem_post(&pRequest->body.rspSem); } -int32_t processDropDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { +int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { // todo: Remove cache in catalog cache. + SRequestObj* pRequest = param; + tsem_post(&pRequest->body.rspSem); } void initMsgHandleFp() { diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 2938d0180a..1a5872f77a 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -49,54 +49,54 @@ int main(int argc, char** argv) { TEST(testCase, driverInit_Test) { taos_init(); } - TEST(testCase, connect_Test) { +TEST(testCase, connect_Test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + taos_close(pConn); +} + +//TEST(testCase, create_user_Test) { +// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); // assert(pConn != NULL); - taos_close(pConn); -} - - TEST(testCase, create_user_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} - - TEST(testCase, create_account_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} - - TEST(testCase, drop_account_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} - - TEST(testCase, show_user_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); +// +// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); +// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { +// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, create_account_Test) { +// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); // assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); +// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { +// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, drop_account_Test) { +// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); +// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { +// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} + +TEST(testCase, show_user_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "show users"); TAOS_ROW pRow = NULL; @@ -113,7 +113,7 @@ TEST(testCase, driverInit_Test) { taos_init(); } taos_close(pConn); } - TEST(testCase, drop_user_Test) { +TEST(testCase, drop_user_Test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -126,7 +126,7 @@ TEST(testCase, driverInit_Test) { taos_init(); } taos_close(pConn); } - TEST(testCase, show_db_Test) { +TEST(testCase, show_db_Test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); // assert(pConn != NULL); @@ -191,10 +191,15 @@ TEST(testCase, drop_db_test) { if (taos_errno(pRes) != 0) { printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); } - taos_free_result(pRes); showDB(pConn); + + pRes = taos_query(pConn, "create database abc1"); + if (taos_errno(pRes) != 0) { + printf("create to drop db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); taos_close(pConn); } @@ -248,7 +253,19 @@ TEST(testCase, show_stable_Test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); assert(pConn != NULL); - TAOS_RES* pRes = taos_query(pConn, "show stables"); + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "show stables"); + if (taos_errno(pRes) != 0) { + printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + TAOS_ROW pRow = NULL; TAOS_FIELD* pFields = taos_fetch_fields(pRes); diff --git a/source/libs/parser/inc/astToMsg.h b/source/libs/parser/inc/astToMsg.h index c0c3c8386b..361b4e71cb 100644 --- a/source/libs/parser/inc/astToMsg.h +++ b/source/libs/parser/inc/astToMsg.h @@ -7,7 +7,7 @@ SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); -SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t msgLen); +SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx* pParseCtx, char* msgBuf, int32_t msgLen); SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf); SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index 00f3d3f716..7f2f15ac65 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -86,7 +86,7 @@ SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t *msgLen, int64_t id, cha return pMsg; } -SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t msgLen) { +SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, char* msgBuf, int32_t msgLen) { SShowMsg* pShowMsg = calloc(1, sizeof(SShowMsg)); pShowMsg->type = pShowInfo->showType; @@ -105,6 +105,12 @@ SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t m pShowMsg->payloadLen = htons(pEpAddr->n); } + if (pShowInfo->showType == TSDB_MGMT_TABLE_STB || pShowInfo->showType == TSDB_MGMT_TABLE_VGROUP) { + SName n = {0}; + tNameSetDbName(&n, pCtx->acctId, pCtx->db, strlen(pCtx->db)); + tNameGetFullDbName(&n, pShowMsg->db); + } + return pShowMsg; } diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index cd11607326..2510f354fb 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -4090,7 +4090,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, void** ou } } - *output = buildShowMsg(pShowInfo, pCtx->requestId, pMsgBuf->buf, pMsgBuf->len); + *output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len); *outputLen = sizeof(SShowMsg)/* + htons(pShowMsg->payloadLen)*/; return TSDB_CODE_SUCCESS; } From 5ef4444c122667822a64784749510ca46d48c908 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 24 Dec 2021 23:09:38 +0800 Subject: [PATCH 2/3] [td-11818]add test case for support show vgroups; --- source/client/test/clientTests.cpp | 109 +++++++++++++++++++---------- 1 file changed, 71 insertions(+), 38 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 1a5872f77a..0f2c0a727f 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -55,44 +55,44 @@ TEST(testCase, connect_Test) { taos_close(pConn); } -//TEST(testCase, create_user_Test) { -// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, create_account_Test) { -// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, drop_account_Test) { -// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} +TEST(testCase, create_user_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, create_account_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, drop_account_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} TEST(testCase, show_user_Test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); @@ -282,6 +282,39 @@ TEST(testCase, show_stable_Test) { taos_close(pConn); } +TEST(testCase, show_vgroup_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "show vgroups"); + if (taos_errno(pRes) != 0) { + printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + + taos_close(pConn); +} + TEST(testCase, drop_stable_Test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); assert(pConn != NULL); From 563c0ddd4473e814b9272e203023f14b3751f1ad Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 24 Dec 2021 23:39:55 +0800 Subject: [PATCH 3/3] [td-11818] refactor. --- include/libs/qcom/query.h | 23 +++++++++++++--- source/client/inc/clientInt.h | 38 ++++++--------------------- source/client/src/clientEnv.c | 11 ++++---- source/client/src/clientImpl.c | 16 +++++------ source/client/src/clientMain.c | 4 +-- source/client/src/clientMsgHandler.c | 4 +-- source/client/test/CMakeLists.txt | 2 +- source/libs/qcom/src/queryUtil.c | 4 +-- source/libs/scheduler/src/scheduler.c | 2 +- 9 files changed, 48 insertions(+), 56 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 877cfb130c..86b51dbb85 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -88,12 +88,27 @@ typedef struct SUseDbOutput { typedef struct STableMetaOutput { int32_t metaNum; char ctbFname[TSDB_TABLE_FNAME_LEN]; - char tbFname[TSDB_TABLE_FNAME_LEN]; + char tbFname[TSDB_TABLE_FNAME_LEN]; SCTableMeta ctbMeta; STableMeta *tbMeta; } STableMetaOutput; -typedef int32_t __async_exec_fn_t(void* param); +typedef struct SDataBuf { + void *pData; + uint32_t len; +} SDataBuf; + +typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); +typedef int32_t (*__async_exec_fn_t)(void* param); + +typedef struct SMsgSendInfo { + __async_send_cb_fn_t fp; //async callback function + void *param; + uint64_t requestId; + uint64_t requestObjRefId; + int32_t msgType; + SDataBuf msgInfo; +} SMsgSendInfo; bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); @@ -109,7 +124,9 @@ int32_t cleanupTaskQueue(); */ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); -SSchema* tGetTbnameColumnSchema(); +int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); + +const SSchema* tGetTbnameColumnSchema(); void initQueryModuleMsgHandle(); extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 550d8c3ecf..eba904c0b8 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -29,6 +29,7 @@ extern "C" { #include "tlist.h" #include "tmsgtype.h" #include "trpc.h" +#include "query.h" typedef struct SQueryExecMetric { int64_t start; // start timestamp @@ -100,11 +101,6 @@ typedef struct SReqResultInfo { uint32_t current; } SReqResultInfo; -typedef struct SDataBuf { - void *pData; - uint32_t len; -} SDataBuf; - typedef struct SRequestSendRecvBody { tsem_t rspSem; // not used now void* fp; @@ -119,39 +115,21 @@ typedef struct SRequestObj { uint64_t requestId; int32_t type; // request type STscObj *pTscObj; - SQueryExecMetric metric; char *sqlstr; // sql string - SRequestSendRecvBody body; int64_t self; char *msgBuf; void *pInfo; // sql parse info, generated by parser module int32_t code; + SQueryExecMetric metric; + SRequestSendRecvBody body; } SRequestObj; -typedef struct SRequestMsgBody { - int32_t msgType; - SDataBuf msgInfo; - uint64_t requestId; - uint64_t requestObjRefId; -} SRequestMsgBody; - -typedef int (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); - -typedef struct SMsgSendInfo { - __async_send_cb_fn_t fp; - void *param; - uint64_t requestId; - uint64_t requestObjRefId; - int32_t msgType; - SDataBuf msgInfo; -} SMsgSendInfo; - extern SAppInfo appInfo; -extern int32_t msgObjRefPool; +extern int32_t clientReqRefPool; extern int32_t clientConnRefPool; SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest); -int genericExecCallback(void* param, const SDataBuf* pMsg, int32_t code); +int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); extern int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(void* param, const SDataBuf* pMsg, int32_t code); int taos_init(); @@ -166,7 +144,7 @@ char *getConnectionDB(STscObj* pObj); void setConnectionDB(STscObj* pTscObj, const char* db); void taos_init_imp(void); -int taos_options_imp(TSDB_OPTION option, const char *str); +int taos_options_imp(TSDB_OPTION option, const char *str); void* openTransporter(const char *user, const char *auth, int32_t numOfThreads); @@ -176,8 +154,8 @@ void initMsgHandleFp(); TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port); TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); -void* doFetchRow(SRequestObj* pRequest); -void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); +void *doFetchRow(SRequestObj* pRequest); +void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); #ifdef __cplusplus } diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index dca4d85ad6..7fcdafaf44 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -32,7 +32,7 @@ #define TSC_VAR_RELEASED 0 SAppInfo appInfo; -int32_t msgObjRefPool = -1; +int32_t clientReqRefPool = -1; int32_t clientConnRefPool = -1; static pthread_once_t tscinit = PTHREAD_ONCE_INIT; @@ -43,7 +43,7 @@ static void registerRequest(SRequestObj* pRequest) { assert(pTscObj != NULL); // connection has been released already, abort creating request. - pRequest->self = taosAddRef(msgObjRefPool, pRequest); + pRequest->self = taosAddRef(clientReqRefPool, pRequest); int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1); @@ -167,12 +167,11 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty // TODO generated request uuid pRequest->requestId = 0; - pRequest->metric.start = taosGetTimestampMs(); pRequest->type = type; pRequest->pTscObj = pObj; - pRequest->body.fp = fp; + pRequest->body.fp = fp; // not used it yet pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); tsem_init(&pRequest->body.rspSem, 0, 0); @@ -201,7 +200,7 @@ void destroyRequest(SRequestObj* pRequest) { return; } - taosReleaseRef(msgObjRefPool, pRequest->self); + taosReleaseRef(clientReqRefPool, pRequest->self); } void taos_init_imp(void) { @@ -238,7 +237,7 @@ void taos_init_imp(void) { initTaskQueue(); clientConnRefPool = taosOpenRef(200, destroyTscObj); - msgObjRefPool = taosOpenRef(40960, doDestroyRequest); + clientReqRefPool = taosOpenRef(40960, doDestroyRequest); taosGetAppName(appInfo.appName, NULL); appInfo.pid = taosGetPId(); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 7cd8e9d359..ed211e142c 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -14,8 +14,6 @@ static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorE static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); -static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); - static bool stringLengthCheck(const char* str, size_t maxsize) { if (str == NULL) { return false; @@ -201,10 +199,10 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i])); } - sendMsgToServer(pTscObj->pTransporter, &ep, &transporterId, body); + asyncSendMsgToServer(pTscObj->pTransporter, &ep, &transporterId, body); } else { int64_t transporterId = 0; - sendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body); } tsem_wait(&pRequest->body.rspSem); @@ -274,7 +272,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con SMsgSendInfo* body = buildConnectMsg(pRequest); int64_t transporterId = 0; - sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); destroySendMsgInfo(body); @@ -335,7 +333,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { tfree(pMsgBody); } -int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { +int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { char *pMsg = rpcMallocCont(pInfo->msgInfo.len); if (NULL == pMsg) { tscError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, taosMsg[pInfo->msgType]); @@ -364,7 +362,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { assert(pMsg->ahandle != NULL); if (pSendInfo->requestObjRefId != 0) { - SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(msgObjRefPool, pSendInfo->requestObjRefId); + SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); assert(pRequest->self == pSendInfo->requestObjRefId); pRequest->metric.rsp = taosGetTimestampMs(); @@ -391,7 +389,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { pRequest->metric.rsp - pRequest->metric.start); } - taosReleaseRef(msgObjRefPool, pSendInfo->requestObjRefId); + taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); } SDataBuf buf = {.pData = pMsg->pCont, .len = pMsg->contLen}; @@ -437,7 +435,7 @@ void* doFetchRow(SRequestObj* pRequest) { int64_t transporterId = 0; STscObj *pTscObj = pRequest->pTscObj; - sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); destroySendMsgInfo(body); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 44c78076d9..af85f1b310 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -35,8 +35,8 @@ void taos_cleanup(void) { return; } - int32_t id = msgObjRefPool; - msgObjRefPool = -1; + int32_t id = clientReqRefPool; + clientReqRefPool = -1; taosCloseRef(id); cleanupTaskQueue(); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index b18b3ecb51..700a7a20cf 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -22,7 +22,7 @@ int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(void*, const SDataBuf* pMsg, int32_t code); -int genericExecCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; pRequest->code = code; sem_post(&pRequest->body.rspSem); @@ -96,7 +96,7 @@ SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) { pMsgSendInfo->requestId = pRequest->requestId; pMsgSendInfo->param = pRequest; - pMsgSendInfo->fp = (handleRequestRspFp[pRequest->type] == NULL)? genericExecCallback:handleRequestRspFp[pRequest->type]; + pMsgSendInfo->fp = (handleRequestRspFp[pRequest->type] == NULL)? genericRspCallback:handleRequestRspFp[pRequest->type]; } return pMsgSendInfo; diff --git a/source/client/test/CMakeLists.txt b/source/client/test/CMakeLists.txt index a3f2ad88bb..6886206363 100644 --- a/source/client/test/CMakeLists.txt +++ b/source/client/test/CMakeLists.txt @@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(clientTest ${SOURCE_LIST}) TARGET_LINK_LIBRARIES( clientTest - PUBLIC os util common transport gtest taos + PUBLIC os util common transport gtest taos qcom ) TARGET_INCLUDE_DIRECTORIES( diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 829f426c9d..df3fa40004 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -14,7 +14,7 @@ static struct SSchema _s = { .name = "tbname", }; -SSchema* tGetTbnameColumnSchema() { +const SSchema* tGetTbnameColumnSchema() { return &_s; } @@ -103,7 +103,7 @@ int32_t cleanupTaskQueue() { static void execHelper(struct SSchedMsg* pSchedMsg) { assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL); - __async_exec_fn_t* execFn = (__async_exec_fn_t*) pSchedMsg->ahandle; + __async_exec_fn_t execFn = (__async_exec_fn_t) pSchedMsg->ahandle; int32_t code = execFn(pSchedMsg->thandle); if (code != 0 && pSchedMsg->msg != NULL) { *(int32_t*) pSchedMsg->msg = code; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 4185b3176c..8e57f12777 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -32,7 +32,7 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ buildConnectMsg(pRequest, &body); int64_t transporterId = 0; - sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); + asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); tsem_wait(&pRequest->body.rspSem); destroyConnectMsg(&body);