From a657413fa6f30f2f6c862a98a4ca4de1af854d8e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 1 Jul 2022 11:10:35 +0800 Subject: [PATCH 1/7] enh: kill query --- source/client/src/clientEnv.c | 19 ++++++- source/libs/scheduler/src/schJob.c | 6 +- tests/script/api/stopquery.c | 91 ++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index a234311569..fefabf6539 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -153,7 +153,13 @@ void destroyAppInst(SAppInstInfo *pAppInfo) { } void destroyTscObj(void *pObj) { + if (NULL == pObj) { + return; + } + STscObj *pTscObj = pObj; + int64_t tscId = pTscObj->id; + tscDebug("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); @@ -168,6 +174,8 @@ void destroyTscObj(void *pObj) { } taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFreeClear(pTscObj); + + tscDebug("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); } void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) { @@ -261,9 +269,14 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); } void doDestroyRequest(void *p) { - assert(p != NULL); + if (NULL == p) { + return; + } + SRequestObj *pRequest = (SRequestObj *)p; - + int64_t reqId = pRequest->self; + tscDebug("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); + taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); if (pRequest->body.queryJob != 0) { @@ -285,6 +298,8 @@ void doDestroyRequest(void *p) { deregisterRequest(pRequest); } taosMemoryFreeClear(pRequest); + + tscDebug("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); } void destroyRequest(SRequestObj *pRequest) { diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 425ea242fd..5d5dc745d0 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -1495,6 +1495,8 @@ void schFreeJobImpl(void *job) { uint64_t queryId = pJob->queryId; int64_t refId = pJob->refId; + qDebug("QID:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob); + if (pJob->status == JOB_TASK_STATUS_EXECUTING) { schCancelJob(pJob); } @@ -1535,12 +1537,12 @@ void schFreeJobImpl(void *job) { taosMemoryFreeClear(pJob->resData); taosMemoryFree(pJob); - qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob); - int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1); if (jobNum == 0) { schCloseJobRef(); } + + qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob); } int32_t schLaunchStaticExplainJob(SSchedulerReq *pReq, SSchJob *pJob, bool sync) { diff --git a/tests/script/api/stopquery.c b/tests/script/api/stopquery.c index 4c7964c983..72fe4c406a 100644 --- a/tests/script/api/stopquery.c +++ b/tests/script/api/stopquery.c @@ -156,6 +156,28 @@ void sqCloseQueryCb(void *param, TAOS_RES *pRes, int code) { } } +void sqKillFetchCb(void *param, TAOS_RES *pRes, int numOfRows) { + SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param; + taos_kill_query(qParam->taos); + + *qParam->end = 1; +} + +void sqKillQueryCb(void *param, TAOS_RES *pRes, int code) { + SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param; + if (code == 0 && pRes) { + if (qParam->fetch) { + taos_fetch_rows_a(pRes, sqKillFetchCb, param); + } else { + taos_kill_query(qParam->taos); + *qParam->end = 1; + } + } else { + sqExit("select", taos_errstr(pRes)); + } +} + + int sqSyncStopQuery(bool fetch) { CASE_ENTER(); for (int32_t i = 0; i < runTimes; ++i) { @@ -391,6 +413,69 @@ int sqConSyncCloseQuery(bool fetch) { CASE_LEAVE(); } +int sqSyncKillQuery(bool fetch) { + CASE_ENTER(); + for (int32_t i = 0; i < runTimes; ++i) { + char sql[1024] = {0}; + int32_t code = 0; + TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0); + if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL)); + + sprintf(sql, "reset query cache"); + sqExecSQL(taos, sql); + + sprintf(sql, "use %s", dbName); + sqExecSQL(taos, sql); + + sprintf(sql, "select * from %s", tbName); + TAOS_RES* pRes = taos_query(taos, sql); + code = taos_errno(pRes); + if (code) { + sqExit("taos_query", taos_errstr(pRes)); + } + + if (fetch) { + taos_fetch_row(pRes); + } + + taos_kill_query(taos); + + taos_close(taos); + } + CASE_LEAVE(); +} + +int sqAsyncKillQuery(bool fetch) { + CASE_ENTER(); + for (int32_t i = 0; i < runTimes; ++i) { + char sql[1024] = {0}; + int32_t code = 0; + TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0); + if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL)); + + sprintf(sql, "reset query cache"); + sqExecSQL(taos, sql); + + sprintf(sql, "use %s", dbName); + sqExecSQL(taos, sql); + + sprintf(sql, "select * from %s", tbName); + + int32_t qEnd = 0; + SSP_CB_PARAM param = {0}; + param.fetch = fetch; + param.end = &qEnd; + taos_query_a(taos, sql, sqKillQueryCb, ¶m); + while (0 == qEnd) { + usleep(5000); + } + + taos_close(taos); + } + CASE_LEAVE(); +} + + void sqRunAllCase(void) { /* sqSyncStopQuery(false); @@ -409,8 +494,14 @@ void sqRunAllCase(void) { sqAsyncCloseQuery(true); */ sqConSyncCloseQuery(false); +/* sqConSyncCloseQuery(true); + sqSyncKillQuery(false); + sqSyncKillQuery(true); + sqAsyncKillQuery(false); + sqAsyncKillQuery(true); +*/ } From 3f8efa106ac1327028eaba711a7443d700cc47fb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 1 Jul 2022 11:27:30 +0800 Subject: [PATCH 2/7] fix brokenlink retry --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index a239e6bbcb..33765b1f96 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1068,7 +1068,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (retry) { pMsg->sent = 0; pCtx->retryCnt += 1; - if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) { cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3); if (pCtx->retryCnt < pCtx->retryLimit) { transUnrefCliHandle(pConn); From b2be5169ab69781909f384cbf8d5b446afef0a75 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 1 Jul 2022 15:11:24 +0800 Subject: [PATCH 3/7] add inst ref --- include/libs/transport/trpc.h | 15 ++++++----- source/libs/transport/inc/transComm.h | 21 ++++++++------- source/libs/transport/src/trans.c | 14 +++++----- source/libs/transport/src/transCli.c | 35 +++++++++++++++--------- source/libs/transport/src/transComm.c | 30 ++++++++++++--------- source/libs/transport/src/transSvr.c | 38 +++++++++++++-------------- 6 files changed, 89 insertions(+), 64 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 8471aa8286..48550b890a 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -110,12 +110,15 @@ typedef struct { } SRpcCtx; int32_t rpcInit(); -void rpcCleanup(); -void * rpcOpen(const SRpcInit *pRpc); -void rpcClose(void *); -void * rpcMallocCont(int32_t contLen); -void rpcFreeCont(void *pCont); -void * rpcReallocCont(void *ptr, int32_t contLen); + +void rpcCleanup(); +void *rpcOpen(const SRpcInit *pRpc); + +void rpcClose(void *); +void rpcCloseImpl(void *); +void *rpcMallocCont(int32_t contLen); +void rpcFreeCont(void *pCont); +void *rpcReallocCont(void *ptr, int32_t contLen); // Because taosd supports multi-process mode // These functions should not be used on the server side diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index b06d541b75..f699df6883 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -253,7 +253,7 @@ int transAsyncSend(SAsyncPool* pool, queue* mq); do { \ if (id > 0) { \ tTrace("handle step1"); \ - SExHandle* exh2 = transAcquireExHandle(id); \ + SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \ if (exh2 == NULL || id != exh2->refId) { \ tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \ exh2 ? exh2->refId : 0, id); \ @@ -261,7 +261,7 @@ int transAsyncSend(SAsyncPool* pool, queue* mq); } \ } else if (id == 0) { \ tTrace("handle step2"); \ - SExHandle* exh2 = transAcquireExHandle(id); \ + SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \ if (exh2 == NULL || id == exh2->refId) { \ tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, id, \ exh2 ? exh2->refId : 0); \ @@ -391,13 +391,16 @@ void transThreadOnce(); void transInit(); void transCleanup(); -int32_t transOpenExHandleMgt(int size); -void transCloseExHandleMgt(); -int64_t transAddExHandle(void* p); -int32_t transRemoveExHandle(int64_t refId); -SExHandle* transAcquireExHandle(int64_t refId); -int32_t transReleaseExHandle(int64_t refId); -void transDestoryExHandle(void* handle); +int32_t transOpenRefMgt(int size, void (*func)(void*)); +void transCloseRefMgt(int32_t refMgt); +int64_t transAddExHandle(int32_t refMgt, void* p); +int32_t transRemoveExHandle(int32_t refMgt, int64_t refId); +void* transAcquireExHandle(int32_t refMgt, int64_t refId); +int32_t transReleaseExHandle(int32_t refMgt, int64_t refId); +void transDestoryExHandle(void* handle); + +int32_t transGetRefMgt(); +int32_t transGetInstMgt(); #ifdef __cplusplus } diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 936cfe870d..c970440d47 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -76,16 +76,19 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->user) { memcpy(pRpc->user, pInit->user, strlen(pInit->user)); } - return pRpc; + int64_t refId = taosAddRef(transGetInstMgt(), pRpc); + return (void*)refId; } void rpcClose(void* arg) { tInfo("start to close rpc"); + taosRemoveRef(transGetInstMgt(), (int64_t)arg); + tInfo("finish to close rpc"); + return; +} +void rpcCloseImpl(void* arg) { SRpcInfo* pRpc = (SRpcInfo*)arg; (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle); taosMemoryFree(pRpc); - tInfo("finish to close rpc"); - - return; } void* rpcMallocCont(int32_t contLen) { @@ -140,11 +143,10 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { transSendRecv(shandle, pEpSet, pMsg, pRsp); } -void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } +void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return 0; } - void rpcRefHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*taosRefHandle[type])(handle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 33765b1f96..bda40cbc2a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -501,13 +501,13 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { } static void allocConnRef(SCliConn* conn, bool update) { if (update) { - transRemoveExHandle(conn->refId); + transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; } SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); exh->handle = conn; exh->pThrd = conn->hostThrd; - exh->refId = transAddExHandle(exh); + exh->refId = transAddExHandle(transGetRefMgt(), exh); conn->refId = exh->refId; } static void addConnToPool(void* pool, SCliConn* conn) { @@ -601,7 +601,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); QUEUE_REMOVE(&conn->conn); QUEUE_INIT(&conn->conn); - transRemoveExHandle(conn->refId); + transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; if (clear) { @@ -619,7 +619,7 @@ static void cliDestroy(uv_handle_t* handle) { } SCliConn* conn = handle->data; - transRemoveExHandle(conn->refId); + transRemoveExHandle(transGetRefMgt(), conn->refId); taosMemoryFree(conn->ip); conn->stream->data = NULL; taosMemoryFree(conn->stream); @@ -747,7 +747,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { } static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { int64_t refId = (int64_t)(pMsg->msg.info.handle); - SExHandle* exh = transAcquireExHandle(refId); + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { tDebug("%" PRId64 " already release", refId); } @@ -773,7 +773,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { SCliConn* conn = NULL; int64_t refId = (int64_t)(pMsg->msg.info.handle); if (refId != 0) { - SExHandle* exh = transAcquireExHandle(refId); + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { *ignore = true; destroyCmsg(pMsg); @@ -781,7 +781,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { // assert(0); } else { conn = exh->handle; - transReleaseExHandle(refId); + transReleaseExHandle(transGetRefMgt(), refId); } return conn; }; @@ -1154,12 +1154,12 @@ void transUnrefCliHandle(void* handle) { } SCliThrd* transGetWorkThrdFromHandle(int64_t handle) { SCliThrd* pThrd = NULL; - SExHandle* exh = transAcquireExHandle(handle); + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); if (exh == NULL) { return NULL; } pThrd = exh->pThrd; - transReleaseExHandle(handle); + transReleaseExHandle(transGetRefMgt(), handle); return pThrd; } SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { @@ -1186,10 +1186,13 @@ void transReleaseCliHandle(void* handle) { } void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { - STrans* pTransInst = (STrans*)shandle; + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pTransInst == NULL) return; + SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); if (pThrd == NULL) { transFreeMsg(pReq->pCont); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return; } @@ -1215,14 +1218,18 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return; } void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { - STrans* pTransInst = (STrans*)shandle; + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pTransInst == NULL) return; + SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); if (pThrd == NULL) { transFreeMsg(pReq->pCont); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return; } tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); @@ -1252,13 +1259,16 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM tsem_wait(sem); tsem_destroy(sem); taosMemoryFree(sem); + + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return; } /* * **/ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { - STrans* pTransInst = shandle; + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pTransInst == NULL) return; SCvtAddr cvtAddr = {0}; if (ip != NULL && fqdn != NULL) { @@ -1279,5 +1289,6 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { transAsyncSend(thrd->asyncPool, &(cliMsg->q)); } + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); } #endif diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 85a45ec921..676985b31c 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -19,6 +19,7 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static int32_t refMgt; +int32_t instMgt; int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { T_MD5_CTX context; @@ -481,44 +482,49 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { } static void transInitEnv() { - refMgt = transOpenExHandleMgt(50000); + refMgt = transOpenRefMgt(50000, transDestoryExHandle); + instMgt = taosOpenRef(50, rpcCloseImpl); uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); } static void transDestroyEnv() { - // close ref - transCloseExHandleMgt(refMgt); + transCloseRefMgt(refMgt); + transCloseRefMgt(instMgt); } void transInit() { // init env taosThreadOnce(&transModuleInit, transInitEnv); } + +int32_t transGetRefMgt() { return refMgt; } +int32_t transGetInstMgt() { return instMgt; } + void transCleanup() { // clean env transDestroyEnv(); } -int32_t transOpenExHandleMgt(int size) { +int32_t transOpenRefMgt(int size, void (*func)(void*)) { // added into once later - return taosOpenRef(size, transDestoryExHandle); + return taosOpenRef(size, func); } -void transCloseExHandleMgt() { +void transCloseRefMgt(int32_t mgt) { // close ref - taosCloseRef(refMgt); + taosCloseRef(mgt); } -int64_t transAddExHandle(void* p) { +int64_t transAddExHandle(int32_t refMgt, void* p) { // acquire extern handle return taosAddRef(refMgt, p); } -int32_t transRemoveExHandle(int64_t refId) { +int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) { // acquire extern handle return taosRemoveRef(refMgt, refId); } -SExHandle* transAcquireExHandle(int64_t refId) { +void* transAcquireExHandle(int32_t refMgt, int64_t refId) { // acquire extern handle - return (SExHandle*)taosAcquireRef(refMgt, refId); + return (void*)taosAcquireRef(refMgt, refId); } -int32_t transReleaseExHandle(int64_t refId) { +int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) { // release extern handle return taosReleaseRef(refMgt, refId); } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index d32156dd0d..da1a37917f 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -261,7 +261,7 @@ static void uvHandleReq(SSvrConn* pConn) { // 2. once send out data, cli conn released to conn pool immediately // 3. not mixed with persist transMsg.info.ahandle = (void*)pHead->ahandle; - transMsg.info.handle = (void*)transAcquireExHandle(pConn->refId); + transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId); transMsg.info.refId = pConn->refId; transMsg.info.traceId = pHead->traceId; @@ -279,7 +279,7 @@ static void uvHandleReq(SSvrConn* pConn) { pConnInfo->clientPort = ntohs(pConn->addr.sin_port); tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); - transReleaseExHandle(pConn->refId); + transReleaseExHandle(transGetRefMgt(), pConn->refId); STrans* pTransInst = pConn->pTransInst; (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); @@ -507,15 +507,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { SExHandle* exh1 = transMsg.info.handle; int64_t refId = transMsg.info.refId; - SExHandle* exh2 = transAcquireExHandle(refId); + SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); if (exh2 == NULL || exh1 != exh2) { tTrace("handle except msg %p, ignore it", exh1); - transReleaseExHandle(refId); + transReleaseExHandle(transGetRefMgt(), refId); destroySmsg(msg); continue; } msg->pConn = exh1->handle; - transReleaseExHandle(refId); + transReleaseExHandle(transGetRefMgt(), refId); (*transAsyncHandle[msg->type])(msg, pThrd); } } @@ -757,8 +757,8 @@ static SSvrConn* createConn(void* hThrd) { SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); exh->handle = pConn; exh->pThrd = pThrd; - exh->refId = transAddExHandle(exh); - transAcquireExHandle(exh->refId); + exh->refId = transAddExHandle(transGetRefMgt(), exh); + transAcquireExHandle(transGetRefMgt(), exh->refId); pConn->refId = exh->refId; transRefSrvHandle(pConn); @@ -789,14 +789,14 @@ static void destroyConnRegArg(SSvrConn* conn) { } } static int reallocConnRef(SSvrConn* conn) { - transReleaseExHandle(conn->refId); - transRemoveExHandle(conn->refId); + transReleaseExHandle(transGetRefMgt(), conn->refId); + transRemoveExHandle(transGetRefMgt(), conn->refId); // avoid app continue to send msg on invalid handle SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); exh->handle = conn; exh->pThrd = conn->hostThrd; - exh->refId = transAddExHandle(exh); - transAcquireExHandle(exh->refId); + exh->refId = transAddExHandle(transGetRefMgt(), exh); + transAcquireExHandle(transGetRefMgt(), exh->refId); conn->refId = exh->refId; return 0; @@ -808,8 +808,8 @@ static void uvDestroyConn(uv_handle_t* handle) { } SWorkThrd* thrd = conn->hostThrd; - transReleaseExHandle(conn->refId); - transRemoveExHandle(conn->refId); + transReleaseExHandle(transGetRefMgt(), conn->refId); + transRemoveExHandle(transGetRefMgt(), conn->refId); tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn); transQueueDestroy(&conn->srvMsgs); @@ -1045,11 +1045,11 @@ void transReleaseSrvHandle(void* handle) { tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); - transReleaseExHandle(refId); + transReleaseExHandle(transGetRefMgt(), refId); return; _return1: tTrace("handle %p failed to send to release handle", exh); - transReleaseExHandle(refId); + transReleaseExHandle(transGetRefMgt(), refId); return; _return2: tTrace("handle %p failed to send to release handle", exh); @@ -1074,12 +1074,12 @@ void transSendResponse(const STransMsg* msg) { STraceId* trace = (STraceId*)&msg->info.traceId; tGTrace("conn %p start to send resp (1/2)", exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); - transReleaseExHandle(refId); + transReleaseExHandle(transGetRefMgt(), refId); return; _return1: tTrace("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); - transReleaseExHandle(refId); + transReleaseExHandle(transGetRefMgt(), refId); return; _return2: tTrace("handle %p failed to send resp", exh); @@ -1103,13 +1103,13 @@ void transRegisterMsg(const STransMsg* msg) { tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); - transReleaseExHandle(refId); + transReleaseExHandle(transGetRefMgt(), refId); return; _return1: tTrace("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); - transReleaseExHandle(refId); + transReleaseExHandle(transGetRefMgt(), refId); return; _return2: tTrace("handle %p failed to register brokenlink", exh); From 817a319a2ef31e4392d9bc7f80982a4c03ab9e55 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 1 Jul 2022 21:01:35 +0800 Subject: [PATCH 4/7] enh: kill query --- include/libs/scheduler/scheduler.h | 2 +- source/client/inc/clientInt.h | 3 +- source/client/src/clientEnv.c | 44 ++-- source/client/src/clientImpl.c | 13 +- source/client/src/clientMain.c | 8 +- source/libs/scheduler/src/schJob.c | 17 +- source/libs/scheduler/src/schRemote.c | 6 +- source/libs/scheduler/src/scheduler.c | 23 +- source/libs/scheduler/test/schedulerTests.cpp | 10 +- tests/script/api/stopquery.c | 202 +++++++++++++++--- 10 files changed, 238 insertions(+), 90 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index be3d16ab0d..1c73b2c2c8 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -130,7 +130,7 @@ void schedulerStopQueryHb(void *pTrans); * Free the query job * @param pJob */ -void schedulerFreeJob(int64_t job, int32_t errCode); +void schedulerFreeJob(int64_t* job, int32_t errCode); void schedulerDestroy(void); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 53292ed46a..737fee5125 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -337,7 +337,8 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp); SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); void appHbMgrCleanup(void); void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr); -void closeAllRequests(SHashObj *pRequests); +void destroyAllRequests(SHashObj *pRequests); +void stopAllRequests(SHashObj *pRequests); // conn level int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index fefabf6539..9e72e5fe35 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -121,17 +121,37 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { return pDnodeConn; } -void closeAllRequests(SHashObj *pRequests) { +void destroyAllRequests(SHashObj *pRequests) { void *pIter = taosHashIterate(pRequests, NULL); while (pIter != NULL) { int64_t *rid = pIter; - removeRequest(*rid); + SRequestObj *pRequest = acquireRequest(*rid); + if (pRequest) { + destroyRequest(pRequest); + releaseRequest(*rid); + } pIter = taosHashIterate(pRequests, pIter); } } +void stopAllRequests(SHashObj *pRequests) { + void *pIter = taosHashIterate(pRequests, NULL); + while (pIter != NULL) { + int64_t *rid = pIter; + + SRequestObj *pRequest = acquireRequest(*rid); + if (pRequest) { + taos_stop_query(pRequest); + releaseRequest(*rid); + } + + pIter = taosHashIterate(pRequests, pIter); + } +} + + void destroyAppInst(SAppInstInfo *pAppInfo) { tscDebug("destroy app inst mgr %p", pAppInfo); @@ -159,12 +179,12 @@ void destroyTscObj(void *pObj) { STscObj *pTscObj = pObj; int64_t tscId = pTscObj->id; - tscDebug("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); + tscTrace("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - closeAllRequests(pTscObj->pRequests); + destroyAllRequests(pTscObj->pRequests); schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter); tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj, pTscObj->pAppInfo->numOfConns); @@ -173,9 +193,9 @@ void destroyTscObj(void *pObj) { destroyAppInst(pTscObj->pAppInfo); } taosThreadMutexDestroy(&pTscObj->mutex); - taosMemoryFreeClear(pTscObj); + taosMemoryFree(pTscObj); - tscDebug("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); + tscTrace("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); } void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) { @@ -275,13 +295,11 @@ void doDestroyRequest(void *p) { SRequestObj *pRequest = (SRequestObj *)p; int64_t reqId = pRequest->self; - tscDebug("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); + tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); - if (pRequest->body.queryJob != 0) { - schedulerFreeJob(pRequest->body.queryJob, 0); - } + schedulerFreeJob(&pRequest->body.queryJob, 0); taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->sqlstr); @@ -297,9 +315,9 @@ void doDestroyRequest(void *p) { if (pRequest->self) { deregisterRequest(pRequest); } - taosMemoryFreeClear(pRequest); + taosMemoryFree(pRequest); - tscDebug("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); + tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); } void destroyRequest(SRequestObj *pRequest) { @@ -307,6 +325,8 @@ void destroyRequest(SRequestObj *pRequest) { return; } + taos_stop_query(pRequest); + removeRequest(pRequest->self); } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b6009ebbda..26fb81a48d 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -645,9 +645,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList pRequest->body.resInfo.execRes = res.res; if (code != TSDB_CODE_SUCCESS) { - if (pRequest->body.queryJob != 0) { - schedulerFreeJob(pRequest->body.queryJob, 0); - } + schedulerFreeJob(&pRequest->body.queryJob, 0); pRequest->code = code; terrno = code; @@ -658,9 +656,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList TDMT_VND_CREATE_TABLE == pRequest->type) { pRequest->body.resInfo.numOfRows = res.numOfRows; - if (pRequest->body.queryJob != 0) { - schedulerFreeJob(pRequest->body.queryJob, 0); - } + schedulerFreeJob(&pRequest->body.queryJob, 0); } pRequest->code = res.code; @@ -791,10 +787,7 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { TDMT_VND_CREATE_TABLE == pRequest->type) { pRequest->body.resInfo.numOfRows = pResult->numOfRows; - if (pRequest->body.queryJob != 0) { - schedulerFreeJob(pRequest->body.queryJob, 0); - pRequest->body.queryJob = 0; - } + schedulerFreeJob(&pRequest->body.queryJob, 0); } tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index d8a9ce581a..d824ef998f 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -196,10 +196,10 @@ void taos_kill_query(TAOS *taos) { if (NULL == taos) { return; } - int64_t rid = *(int64_t*)taos; + int64_t rid = *(int64_t*)taos; STscObj* pTscObj = acquireTscObj(rid); - closeAllRequests(pTscObj->pRequests); + stopAllRequests(pTscObj->pRequests); releaseTscObj(rid); } @@ -480,9 +480,7 @@ void taos_stop_query(TAOS_RES *res) { return; } - if (pRequest->body.queryJob) { - schedulerFreeJob(pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED); - } + schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED); tscDebug("request %" PRIx64 " killed", pRequest->requestId); } diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 5d5dc745d0..c75787665d 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -184,9 +184,7 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { } if ((*pJob->chkKillFp)(pJob->chkKillParam)) { - schUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING); schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED); - return true; } @@ -811,14 +809,6 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { */ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) { - int8_t status = 0; - - if (schJobNeedToStop(pJob, &status)) { - *needRetry = false; - SCH_TASK_DLOG("task no more retry cause of job status, job status:%s", jobTaskStatusStr(status)); - return TSDB_CODE_SUCCESS; - } - if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) { pTask->maxExecTimes++; if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) { @@ -1277,7 +1267,7 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) { for (int32_t i = 0; i < taskNum; ++i) { STaskStatus *taskStatus = taosArrayGet(pStatusList, i); - qDebug("QID:%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", taskStatus->queryId, taskStatus->taskId, taskStatus->execId, jobTaskStatusStr(taskStatus->status)); SSchJob *pJob = schAcquireJob(taskStatus->refId); @@ -1689,11 +1679,6 @@ _return: int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) { int32_t code = 0; - int8_t status = 0; - if (schJobNeedToStop(pJob, &status)) { - SCH_TASK_ELOG("redirect will no continue cause of job status %s", jobTaskStatusStr(status)); - SCH_RET(atomic_load_32(&pJob->errCode)); - } if ((pTask->execId + 1) >= pTask->maxExecTimes) { SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 69e41d3111..fd51bf9511 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -401,12 +401,16 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { goto _return; } - code = schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode); + schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode); pMsg->pData = NULL; _return: if (pTask) { + if (code) { + schProcessOnTaskFailure(pJob, pTask, code); + } + SCH_UNLOCK_TASK(pTask); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 74ddc89b40..e2389c2a75 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -225,26 +225,33 @@ void schedulerStopQueryHb(void *pTrans) { schCleanClusterHb(pTrans); } -void schedulerFreeJob(int64_t job, int32_t errCode) { - SSchJob *pJob = schAcquireJob(job); +void schedulerFreeJob(int64_t* job, int32_t errCode) { + if (0 == *job) { + return; + } + + SSchJob *pJob = schAcquireJob(*job); if (NULL == pJob) { - qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job); + qError("acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *job); + *job = 0; return; } int32_t code = schProcessOnJobDropped(pJob, errCode); if (TSDB_CODE_SCH_JOB_IS_DROPPING == code) { - SCH_JOB_DLOG("sch job is already dropping, refId:0x%" PRIx64, job); + SCH_JOB_DLOG("sch job is already dropping, refId:0x%" PRIx64, *job); + *job = 0; return; } - SCH_JOB_DLOG("start to remove job from jobRef list, refId:0x%" PRIx64, job); + SCH_JOB_DLOG("start to remove job from jobRef list, refId:0x%" PRIx64, *job); - if (taosRemoveRef(schMgmt.jobRef, job)) { - SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, job); + if (taosRemoveRef(schMgmt.jobRef, *job)) { + SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, *job); } - schReleaseJob(job); + schReleaseJob(*job); + *job = 0; } void schedulerDestroy(void) { diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 098699744d..7fe6cc22bf 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -457,7 +457,7 @@ void schtFreeQueryJob(int32_t freeThread) { int64_t job = queryJobRefId; if (job && atomic_val_compare_exchange_64(&queryJobRefId, job, 0)) { - schedulerFreeJob(job, 0); + schedulerFreeJob(&job, 0); if (freeThread) { if (++freeNum % schtTestPrintNum == 0) { printf("FreeNum:%d\n", freeNum); @@ -724,7 +724,7 @@ TEST(queryTest, normalCase) { schReleaseJob(job); - schedulerFreeJob(job, 0); + schedulerFreeJob(&job, 0); schtFreeQueryDag(&dag); @@ -828,7 +828,7 @@ TEST(queryTest, readyFirstCase) { schReleaseJob(job); - schedulerFreeJob(job, 0); + schedulerFreeJob(&job, 0); schtFreeQueryDag(&dag); @@ -940,7 +940,7 @@ TEST(queryTest, flowCtrlCase) { schReleaseJob(job); - schedulerFreeJob(job, 0); + schedulerFreeJob(&job, 0); schtFreeQueryDag(&dag); @@ -994,7 +994,7 @@ TEST(insertTest, normalCase) { ASSERT_EQ(code, 0); ASSERT_EQ(res.numOfRows, 20); - schedulerFreeJob(insertJobRefId, 0); + schedulerFreeJob(&insertJobRefId, 0); schedulerDestroy(); } diff --git a/tests/script/api/stopquery.c b/tests/script/api/stopquery.c index 72fe4c406a..0f27fdf9f9 100644 --- a/tests/script/api/stopquery.c +++ b/tests/script/api/stopquery.c @@ -52,6 +52,7 @@ typedef struct { typedef struct SSP_CB_PARAM { TAOS *taos; bool fetch; + bool free; int32_t *end; } SSP_CB_PARAM; @@ -177,8 +178,37 @@ void sqKillQueryCb(void *param, TAOS_RES *pRes, int code) { } } +void sqAsyncFetchCb(void *param, TAOS_RES *pRes, int numOfRows) { + SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param; + if (numOfRows > 0) { + taos_fetch_rows_a(pRes, sqAsyncFetchCb, param); + } else { + *qParam->end = 1; + if (qParam->free) { + taos_free_result(pRes); + } + } +} -int sqSyncStopQuery(bool fetch) { + +void sqAsyncQueryCb(void *param, TAOS_RES *pRes, int code) { + SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param; + if (code == 0 && pRes) { + if (qParam->fetch) { + taos_fetch_rows_a(pRes, sqAsyncFetchCb, param); + } else { + if (qParam->free) { + taos_free_result(pRes); + } + *qParam->end = 1; + } + } else { + sqExit("select", taos_errstr(pRes)); + } +} + + +int sqStopSyncQuery(bool fetch) { CASE_ENTER(); for (int32_t i = 0; i < runTimes; ++i) { char sql[1024] = {0}; @@ -211,7 +241,7 @@ int sqSyncStopQuery(bool fetch) { CASE_LEAVE(); } -int sqAsyncStopQuery(bool fetch) { +int sqStopAsyncQuery(bool fetch) { CASE_ENTER(); for (int32_t i = 0; i < runTimes; ++i) { char sql[1024] = {0}; @@ -241,7 +271,7 @@ int sqAsyncStopQuery(bool fetch) { CASE_LEAVE(); } -int sqSyncFreeQuery(bool fetch) { +int sqFreeSyncQuery(bool fetch) { CASE_ENTER(); for (int32_t i = 0; i < runTimes; ++i) { char sql[1024] = {0}; @@ -272,7 +302,7 @@ int sqSyncFreeQuery(bool fetch) { CASE_LEAVE(); } -int sqAsyncFreeQuery(bool fetch) { +int sqFreeAsyncQuery(bool fetch) { CASE_ENTER(); for (int32_t i = 0; i < runTimes; ++i) { char sql[1024] = {0}; @@ -302,7 +332,7 @@ int sqAsyncFreeQuery(bool fetch) { CASE_LEAVE(); } -int sqSyncCloseQuery(bool fetch) { +int sqCloseSyncQuery(bool fetch) { CASE_ENTER(); for (int32_t i = 0; i < runTimes; ++i) { char sql[1024] = {0}; @@ -332,7 +362,7 @@ int sqSyncCloseQuery(bool fetch) { CASE_LEAVE(); } -int sqAsyncCloseQuery(bool fetch) { +int sqCloseAsyncQuery(bool fetch) { CASE_ENTER(); for (int32_t i = 0; i < runTimes; ++i) { char sql[1024] = {0}; @@ -382,9 +412,39 @@ void *syncQueryThreadFp(void *arg) { taos_fetch_row(pRes); } - taos_free_result(pRes); + if (qParam->free) { + taos_free_result(pRes); + } } +void *asyncQueryThreadFp(void *arg) { + SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg; + char sql[1024] = {0}; + int32_t code = 0; + TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0); + if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL)); + + qParam->taos = taos; + + sprintf(sql, "reset query cache"); + sqExecSQLE(taos, sql); + + sprintf(sql, "use %s", dbName); + sqExecSQLE(taos, sql); + + sprintf(sql, "select * from %s", tbName); + + int32_t qEnd = 0; + SSP_CB_PARAM param = {0}; + param.fetch = qParam->fetch; + param.end = &qEnd; + taos_query_a(taos, sql, sqAsyncQueryCb, ¶m); + while (0 == qEnd) { + usleep(5000); + } +} + + void *closeThreadFp(void *arg) { SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg; while (true) { @@ -398,7 +458,22 @@ void *closeThreadFp(void *arg) { } -int sqConSyncCloseQuery(bool fetch) { + +void *killThreadFp(void *arg) { + SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg; + while (true) { + if (qParam->taos) { + usleep(rand() % 10000); + taos_kill_query(qParam->taos); + break; + } + usleep(1); + } +} + + + +int sqConCloseSyncQuery(bool fetch) { CASE_ENTER(); pthread_t qid, cid; for (int32_t i = 0; i < runTimes; ++i) { @@ -413,7 +488,23 @@ int sqConSyncCloseQuery(bool fetch) { CASE_LEAVE(); } -int sqSyncKillQuery(bool fetch) { +int sqConCloseAsyncQuery(bool fetch) { + CASE_ENTER(); + pthread_t qid, cid; + for (int32_t i = 0; i < runTimes; ++i) { + SSP_CB_PARAM param = {0}; + param.fetch = fetch; + pthread_create(&qid, NULL, asyncQueryThreadFp, (void*)¶m); + pthread_create(&cid, NULL, closeThreadFp, (void*)¶m); + + pthread_join(qid, NULL); + pthread_join(cid, NULL); + } + CASE_LEAVE(); +} + + +int sqKillSyncQuery(bool fetch) { CASE_ENTER(); for (int32_t i = 0; i < runTimes; ++i) { char sql[1024] = {0}; @@ -445,7 +536,7 @@ int sqSyncKillQuery(bool fetch) { CASE_LEAVE(); } -int sqAsyncKillQuery(bool fetch) { +int sqKillAsyncQuery(bool fetch) { CASE_ENTER(); for (int32_t i = 0; i < runTimes; ++i) { char sql[1024] = {0}; @@ -465,6 +556,7 @@ int sqAsyncKillQuery(bool fetch) { SSP_CB_PARAM param = {0}; param.fetch = fetch; param.end = &qEnd; + param.taos = taos; taos_query_a(taos, sql, sqKillQueryCb, ¶m); while (0 == qEnd) { usleep(5000); @@ -475,33 +567,81 @@ int sqAsyncKillQuery(bool fetch) { CASE_LEAVE(); } +int sqConKillSyncQuery(bool fetch) { + CASE_ENTER(); + pthread_t qid, cid; + for (int32_t i = 0; i < runTimes; ++i) { + SSP_CB_PARAM param = {0}; + param.fetch = fetch; + pthread_create(&qid, NULL, syncQueryThreadFp, (void*)¶m); + pthread_create(&cid, NULL, killThreadFp, (void*)¶m); + + pthread_join(qid, NULL); + pthread_join(cid, NULL); + } + CASE_LEAVE(); +} + +int sqConKillAsyncQuery(bool fetch) { + CASE_ENTER(); + pthread_t qid, cid; + for (int32_t i = 0; i < runTimes; ++i) { + SSP_CB_PARAM param = {0}; + param.fetch = fetch; + pthread_create(&qid, NULL, asyncQueryThreadFp, (void*)¶m); + pthread_create(&cid, NULL, killThreadFp, (void*)¶m); + + pthread_join(qid, NULL); + pthread_join(cid, NULL); + } + CASE_LEAVE(); +} + + void sqRunAllCase(void) { /* - sqSyncStopQuery(false); - sqSyncStopQuery(true); - sqAsyncStopQuery(false); - sqAsyncStopQuery(true); + sqStopSyncQuery(false); + sqStopSyncQuery(true); + sqStopAsyncQuery(false); + sqStopAsyncQuery(true); - sqSyncFreeQuery(false); - sqSyncFreeQuery(true); - sqAsyncFreeQuery(false); - sqAsyncFreeQuery(true); + sqFreeSyncQuery(false); + sqFreeSyncQuery(true); + sqFreeAsyncQuery(false); + sqFreeAsyncQuery(true); - sqSyncCloseQuery(false); - sqSyncCloseQuery(true); - sqAsyncCloseQuery(false); - sqAsyncCloseQuery(true); -*/ - sqConSyncCloseQuery(false); -/* - sqConSyncCloseQuery(true); - - sqSyncKillQuery(false); - sqSyncKillQuery(true); - sqAsyncKillQuery(false); - sqAsyncKillQuery(true); + sqCloseSyncQuery(false); + sqCloseSyncQuery(true); + sqCloseAsyncQuery(false); + sqCloseAsyncQuery(true); + + sqConCloseSyncQuery(false); + sqConCloseSyncQuery(true); + sqConCloseAsyncQuery(false); + sqConCloseAsyncQuery(true); */ + +#if 0 + + sqKillSyncQuery(false); + sqKillSyncQuery(true); + sqKillAsyncQuery(false); + sqKillAsyncQuery(true); +#endif + + //sqConKillSyncQuery(false); + sqConKillSyncQuery(true); +#if 0 + sqConKillAsyncQuery(false); + sqConKillAsyncQuery(true); +#endif + + int32_t l = 5; + while (l) { + printf("%d\n", l--); + sleep(1); + } } From 09eceed6b403a80a0fb87b2571f23c3efe0647f3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 1 Jul 2022 21:03:50 +0800 Subject: [PATCH 5/7] fix transport quit fix --- source/libs/transport/inc/transportInt.h | 2 +- source/libs/transport/src/trans.c | 8 ++++++-- source/libs/transport/src/transCli.c | 10 ++++++---- source/libs/transport/src/transComm.c | 2 +- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 462debb247..5fb2980ceb 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -57,7 +57,7 @@ typedef struct { void* parent; void* tcphandle; // returned handle from TCP initialization - int32_t refMgt; + int64_t refId; TdThreadMutex mutex; } SRpcInfo; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index c970440d47..48e7d7c91d 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -76,12 +76,16 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->user) { memcpy(pRpc->user, pInit->user, strlen(pInit->user)); } - int64_t refId = taosAddRef(transGetInstMgt(), pRpc); + + int64_t refId = transAddExHandle(transGetInstMgt(), pRpc); + transAcquireExHandle(transGetInstMgt(), refId); + pRpc->refId = refId; return (void*)refId; } void rpcClose(void* arg) { tInfo("start to close rpc"); - taosRemoveRef(transGetInstMgt(), (int64_t)arg); + transRemoveExHandle(transGetInstMgt(), (int64_t)arg); + transReleaseExHandle(transGetInstMgt(), (int64_t)arg); tInfo("finish to close rpc"); return; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index bda40cbc2a..f948461c40 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -47,6 +47,7 @@ typedef struct SCliMsg { queue q; STransMsgType type; + int64_t refId; uint64_t st; int sent; //(0: no send, 1: alread sent) } SCliMsg; @@ -606,11 +607,9 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { if (clear) { if (!uv_is_closing((uv_handle_t*)conn->stream)) { + uv_read_stop(conn->stream); uv_close((uv_handle_t*)conn->stream, cliDestroy); } - //} else { - // cliDestroy((uv_handle_t*)conn->stream); - //} } } static void cliDestroy(uv_handle_t* handle) { @@ -635,7 +634,6 @@ static bool cliHandleNoResp(SCliConn* conn) { SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0); if (REQUEST_NO_RESP(&pMsg->msg)) { transQueuePop(&conn->cliMsgs); - // taosArrayRemove(msgs, 0); destroyCmsg(pMsg); res = true; } @@ -979,6 +977,7 @@ void cliSendQuit(SCliThrd* thrd) { } void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { + uv_read_stop((uv_stream_t*)handle); uv_close(handle, cliDestroy); } } @@ -1213,6 +1212,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; + cliMsg->refId = (int64_t)shandle; STraceId* trace = &pReq->info.traceId; tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, @@ -1250,6 +1250,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; + cliMsg->refId = (int64_t)shandle; STraceId* trace = &pReq->info.traceId; tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, @@ -1283,6 +1284,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; cliMsg->type = Update; + cliMsg->refId = (int64_t)shandle; SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 676985b31c..6172f4ad5b 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -19,7 +19,7 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static int32_t refMgt; -int32_t instMgt; +static int32_t instMgt; int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { T_MD5_CTX context; From e4de1da8d33b6d8a4b5878558b715070aa3b9489 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 2 Jul 2022 17:00:57 +0800 Subject: [PATCH 6/7] add task queue --- source/client/inc/clientInt.h | 28 ++++++++------ source/client/src/clientEnv.c | 24 +++++++++--- source/client/src/clientImpl.c | 42 +++++++++++++++++--- source/client/src/clientMain.c | 70 ++++++++++++++++------------------ 4 files changed, 103 insertions(+), 61 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 53292ed46a..82a9d0f489 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -65,7 +65,7 @@ enum { typedef struct SAppInstInfo SAppInstInfo; typedef struct { - char* key; + char* key; // statistics int32_t reportCnt; int32_t connKeyCnt; @@ -177,14 +177,14 @@ typedef struct SReqResultInfo { } SReqResultInfo; typedef struct SRequestSendRecvBody { - tsem_t rspSem; // not used now - __taos_async_fn_t queryFp; - __taos_async_fn_t fetchFp; - void* param; - SDataBuf requestMsg; - int64_t queryJob; // query job, created according to sql query DAG. - int32_t subplanNum; - SReqResultInfo resInfo; + tsem_t rspSem; // not used now + __taos_async_fn_t queryFp; + __taos_async_fn_t fetchFp; + void* param; + SDataBuf requestMsg; + int64_t queryJob; // query job, created according to sql query DAG. + int32_t subplanNum; + SReqResultInfo resInfo; } SRequestSendRecvBody; typedef struct { @@ -284,6 +284,7 @@ static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) { extern SAppInfo appInfo; extern int32_t clientReqRefPool; extern int32_t clientConnRefPool; +extern void* tscQhandle; __async_send_cb_fn_t getMsgRspHandle(int32_t msgType); @@ -301,7 +302,7 @@ void destroyRequest(SRequestObj* pRequest); SRequestObj* acquireRequest(int64_t rid); int32_t releaseRequest(int64_t rid); int32_t removeRequest(int64_t rid); -void doDestroyRequest(void *p); +void doDestroyRequest(void* p); char* getDbOfConnection(STscObj* pObj); void setConnectionDB(STscObj* pTscObj, const char* db); @@ -336,8 +337,8 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp); // cluster level SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); void appHbMgrCleanup(void); -void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr); -void closeAllRequests(SHashObj *pRequests); +void hbRemoveAppHbMrg(SAppHbMgr** pAppHbMgr); +void closeAllRequests(SHashObj* pRequests); // conn level int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); @@ -356,6 +357,9 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clie int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx bool qnodeRequired(SRequestObj* pRequest); +void initTscQhandle(); +void cleanupTscQhandle(); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index fefabf6539..f58b942fad 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -25,6 +25,7 @@ #include "tmsg.h" #include "tref.h" #include "trpc.h" +#include "tsched.h" #include "ttime.h" #define TSC_VAR_NOT_RELEASE 1 @@ -34,9 +35,20 @@ SAppInfo appInfo; int32_t clientReqRefPool = -1; int32_t clientConnRefPool = -1; +void *tscQhandle = NULL; + static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; +void initTscQhandle() { + // init handle + tscQhandle = taosInitScheduler(4096, 5, "tsc"); +} + +void cleanupTscQhandle() { + // destroy handle + taosCleanUpScheduler(tscQhandle); +} static int32_t registerRequest(SRequestObj *pRequest) { STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id); if (NULL == pTscObj) { @@ -156,9 +168,9 @@ void destroyTscObj(void *pObj) { if (NULL == pObj) { return; } - + STscObj *pTscObj = pObj; - int64_t tscId = pTscObj->id; + int64_t tscId = pTscObj->id; tscDebug("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; @@ -272,11 +284,11 @@ void doDestroyRequest(void *p) { if (NULL == p) { return; } - + SRequestObj *pRequest = (SRequestObj *)p; - int64_t reqId = pRequest->self; + int64_t reqId = pRequest->self; tscDebug("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); - + taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); if (pRequest->body.queryJob != 0) { @@ -314,7 +326,7 @@ void taos_init_imp(void) { // In the APIs of other program language, taos_cleanup is not available yet. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. atexit(taos_cleanup); - + initTscQhandle(); errno = TSDB_CODE_SUCCESS; taosSeedRand(taosGetTimestampSec()); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b6009ebbda..178ed63fb4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -25,6 +25,7 @@ #include "tmsgtype.h" #include "tpagedbuf.h" #include "tref.h" +#include "tsched.h" static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); @@ -56,14 +57,14 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i } bool chkRequestKilled(void* param) { - bool killed = false; + bool killed = false; SRequestObj* pRequest = acquireRequest((int64_t)param); if (NULL == pRequest || pRequest->killed) { killed = true; } releaseRequest((int64_t)param); - + return killed; } @@ -769,7 +770,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset); break; } - case TDMT_SCH_QUERY: + case TDMT_SCH_QUERY: case TDMT_SCH_MERGE_QUERY: { code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset); break; @@ -1236,7 +1237,16 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, } } -void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { +typedef struct SchedArg { + SRpcMsg msg; + SEpSet* pEpset; +} SchedArg; + +void doProcessMsgFromServer(SSchedMsg* schedMsg) { + SchedArg* arg = (SchedArg*)schedMsg->ahandle; + SRpcMsg* pMsg = &arg->msg; + SEpSet* pEpSet = arg->pEpset; + SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; assert(pMsg->info.ahandle != NULL); STscObj* pTscObj = NULL; @@ -1269,7 +1279,8 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet); - SDataBuf buf = {.msgType = pMsg->msgType, .len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet}; + SDataBuf buf = { + .msgType = pMsg->msgType, .len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet}; if (pMsg->contLen > 0) { buf.pData = taosMemoryCalloc(1, pMsg->contLen); @@ -1284,6 +1295,25 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { pSendInfo->fp(pSendInfo->param, &buf, pMsg->code); rpcFreeCont(pMsg->pCont); destroySendMsgInfo(pSendInfo); + + taosMemoryFree(arg); +} + +void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { + SSchedMsg schedMsg = {0}; + + SEpSet* tEpSet = pEpSet != NULL ? taosMemoryCalloc(1, sizeof(SEpSet)) : NULL; + if (tEpSet != NULL) { + *tEpSet = *pEpSet; + } + + SchedArg* arg = taosMemoryCalloc(1, sizeof(SchedArg)); + arg->msg = *pMsg; + arg->pEpset = tEpSet; + + schedMsg.fp = doProcessMsgFromServer; + schedMsg.ahandle = arg; + taosScheduleTask(tscQhandle, &schedMsg); } TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) { @@ -1412,7 +1442,7 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU pParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); tsem_init(&pParam->sem, 0, 0); } - + // convert ucs4 to native multi-bytes string pResultInfo->convertUcs4 = convertUcs4; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index d8a9ce581a..dcfe65f6be 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -47,11 +47,9 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { atomic_store_32(&lock, 0); return ret; } - // this function may be called by user or system, or by both simultaneously. void taos_cleanup(void) { - tscInfo("start to cleanup client environment"); - + tscInfo("start to cleanup client environment"); if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) { return; } @@ -74,8 +72,8 @@ void taos_cleanup(void) { catalogDestroy(); schedulerDestroy(); + cleanupTscQhandle(); rpcCleanup(); - tscInfo("all local resources released"); taosCleanupCfg(); taosCloseLog(); @@ -108,7 +106,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha if (pObj) { int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t)); *rid = pObj->id; - return (TAOS*)rid; + return (TAOS *)rid; } return NULL; @@ -196,9 +194,9 @@ void taos_kill_query(TAOS *taos) { if (NULL == taos) { return; } - int64_t rid = *(int64_t*)taos; - - STscObj* pTscObj = acquireTscObj(rid); + int64_t rid = *(int64_t *)taos; + + STscObj *pTscObj = acquireTscObj(rid); closeAllRequests(pTscObj->pRequests); releaseTscObj(rid); } @@ -244,7 +242,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { #endif } else if (TD_RES_TMQ(res)) { - SMqRspObj *msg = ((SMqRspObj *)res); + SMqRspObj * msg = ((SMqRspObj *)res); SReqResultInfo *pResultInfo; if (msg->resIter == -1) { pResultInfo = tmqGetNextResInfo(res, true); @@ -420,7 +418,7 @@ int taos_affected_rows(TAOS_RES *res) { return 0; } - SRequestObj *pRequest = (SRequestObj *)res; + SRequestObj * pRequest = (SRequestObj *)res; SReqResultInfo *pResInfo = &pRequest->body.resInfo; return pResInfo->numOfRows; } @@ -606,7 +604,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { } SReqResultInfo *pResInfo = tscGetCurResInfo(res); - TAOS_FIELD *pField = &pResInfo->userFields[columnIndex]; + TAOS_FIELD * pField = &pResInfo->userFields[columnIndex]; if (!IS_VAR_DATA_TYPE(pField->type)) { return 0; } @@ -650,8 +648,8 @@ const char *taos_get_server_info(TAOS *taos) { typedef struct SqlParseWrapper { SParseContext *pCtx; SCatalogReq catalogReq; - SRequestObj *pRequest; - SQuery *pQuery; + SRequestObj * pRequest; + SQuery * pQuery; } SqlParseWrapper; static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) { @@ -672,8 +670,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { tscDebug("enter meta callback, code %s", tstrerror(code)); SqlParseWrapper *pWrapper = (SqlParseWrapper *)param; - SQuery *pQuery = pWrapper->pQuery; - SRequestObj *pRequest = pWrapper->pRequest; + SQuery * pQuery = pWrapper->pQuery; + SRequestObj * pRequest = pWrapper->pRequest; if (code == TSDB_CODE_SUCCESS) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); @@ -722,31 +720,29 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { return TSDB_CODE_OUT_OF_MEMORY; } - **pCxt = (SParseContext){ - .requestId = pRequest->requestId, - .requestRid = pRequest->self, - .acctId = pTscObj->acctId, - .db = pRequest->pDb, - .topicQuery = false, - .pSql = pRequest->sqlstr, - .sqlLen = pRequest->sqlLen, - .pMsg = pRequest->msgBuf, - .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, - .pTransporter = pTscObj->pAppInfo->pTransporter, - .pStmtCb = NULL, - .pUser = pTscObj->user, - .schemalessType = pTscObj->schemalessType, - .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), - .async = true, - .svrVer = pTscObj->sVer, - .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes) - }; + **pCxt = (SParseContext){.requestId = pRequest->requestId, + .requestRid = pRequest->self, + .acctId = pTscObj->acctId, + .db = pRequest->pDb, + .topicQuery = false, + .pSql = pRequest->sqlstr, + .sqlLen = pRequest->sqlLen, + .pMsg = pRequest->msgBuf, + .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, + .pTransporter = pTscObj->pAppInfo->pTransporter, + .pStmtCb = NULL, + .pUser = pTscObj->user, + .schemalessType = pTscObj->schemalessType, + .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), + .async = true, + .svrVer = pTscObj->sVer, + .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)}; return TSDB_CODE_SUCCESS; } void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { SParseContext *pCxt = NULL; - STscObj *pTscObj = pRequest->pTscObj; + STscObj * pTscObj = pRequest->pTscObj; int32_t code = 0; if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { @@ -911,10 +907,10 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { return terrno; } - int64_t rid = *(int64_t*)taos; + int64_t rid = *(int64_t *)taos; const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list int32_t code = 0; - SRequestObj *pRequest = NULL; + SRequestObj * pRequest = NULL; SCatalogReq catalogReq = {0}; if (NULL == tableNameList) { From f213212faf8cfb5e9ff90d77020f10a1660b0ede Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 4 Jul 2022 18:57:06 +0800 Subject: [PATCH 7/7] fix compile error --- source/libs/transport/src/transComm.c | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 3be0c7b72d..5f6e3db615 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -480,10 +480,6 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { } return true; } -static int32_t transGetRefMgt() { - // - return refMgt; -} static void transInitEnv() { refMgt = transOpenRefMgt(50000, transDestoryExHandle); @@ -517,11 +513,11 @@ void transCloseRefMgt(int32_t mgt) { } int64_t transAddExHandle(int32_t refMgt, void* p) { // acquire extern handle - return taosAddRef(transGetRefMgt(), p); + return taosAddRef(refMgt, p); } int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) { // acquire extern handle - return taosRemoveRef(transGetRefMgt(), refId); + return taosRemoveRef(refMgt, refId); } void* transAcquireExHandle(int32_t refMgt, int64_t refId) { @@ -531,7 +527,7 @@ void* transAcquireExHandle(int32_t refMgt, int64_t refId) { int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) { // release extern handle - return taosReleaseRef(transGetRefMgt(), refId); + return taosReleaseRef(refMgt, refId); } void transDestoryExHandle(void* handle) { if (handle == NULL) {