diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 70a519d8ae..a63aedc02c 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -16,10 +16,10 @@ #include "catalog.h" #include "clientInt.h" #include "clientLog.h" -#include "scheduler.h" -#include "trpc.h" -#include "tglobal.h" #include "clientMonitor.h" +#include "scheduler.h" +#include "tglobal.h" +#include "trpc.h" typedef struct { union { @@ -245,11 +245,9 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog goto _return; } - TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog, - (rsp->useDbRsp->db[0] == 'i') ? - TSDB_PERFORMANCE_SCHEMA_DB : - TSDB_INFORMATION_SCHEMA_DB, - rsp->useDbRsp->uid, vgInfo)); + TSC_ERR_JRET(catalogUpdateDBVgInfo( + pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB, + rsp->useDbRsp->uid, vgInfo)); } } } @@ -557,7 +555,6 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { } } - taosHashRelease(pAppHbMgr->activeInfo, pReq); return TSDB_CODE_SUCCESS; @@ -610,8 +607,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { } pInst->monitorParas = pRsp.monitorParas; - tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", - pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope); + tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", pInst->clusterId, + pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope); if (rspNum) { tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, @@ -1109,7 +1106,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req if (clientHbMgr.appHbHash) { code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0); if (TSDB_CODE_SUCCESS != code) { - tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); + tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, + tstrerror(code)); return code; } } @@ -1262,7 +1260,7 @@ int32_t hbGatherAppInfo(void) { SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); if (pAppHbMgr == NULL) continue; - int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; + int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); if (NULL == pApp) { (void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary)); @@ -1304,8 +1302,7 @@ static void *hbThreadFunc(void *param) { return NULL; } if (sz > 1 && !clientHbMgr.appHbHash) { - clientHbMgr.appHbHash = - taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); + clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); if (NULL == clientHbMgr.appHbHash) { tscError("taosHashInit failed"); return NULL; @@ -1325,13 +1322,13 @@ static void *hbThreadFunc(void *param) { continue; } SClientHbBatchReq *pReq = NULL; - int32_t code = hbGatherAllInfo(pAppHbMgr, &pReq); + int32_t code = hbGatherAllInfo(pAppHbMgr, &pReq); if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) { terrno = code ? code : TSDB_CODE_OUT_OF_RANGE; tFreeClientHbBatchReq(pReq); continue; } - int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq); + int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq); if (tlen == -1) { tFreeClientHbBatchReq(pReq); break; @@ -1369,9 +1366,8 @@ static void *hbThreadFunc(void *param) { pInfo->requestObjRefId = 0; SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo; - int64_t transporterId = 0; SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); - if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo)) { + if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, NULL, pInfo)) { tscWarn("failed to async send msg to server"); } tFreeClientHbBatchReq(pReq); @@ -1390,7 +1386,7 @@ static void *hbThreadFunc(void *param) { } static int32_t hbCreateThread() { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; TdThreadAttr thAttr; TSC_ERR_JRET(taosThreadAttrInit(&thAttr)); TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE)); @@ -1468,9 +1464,9 @@ int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMg TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock)); if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - (void)taosThreadMutexUnlock(&clientHbMgr.lock); - goto _return; + code = TSDB_CODE_OUT_OF_MEMORY; + (void)taosThreadMutexUnlock(&clientHbMgr.lock); + goto _return; } (*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1; TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock)); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index d54db8ab5c..a0695bb5d3 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -311,8 +311,9 @@ int32_t transSetConnOption(uv_tcp_t* stream, int keepalive); void transRefSrvHandle(void* handle); void transUnrefSrvHandle(void* handle); -void transRefCliHandle(void* handle); -void transUnrefCliHandle(void* handle); +void transRefCliHandle(void* handle); +int32_t transUnrefCliHandle(void* handle); +int32_t transGetRefCount(void* handle); int32_t transReleaseCliHandle(void* handle); int32_t transReleaseSrvHandle(void* handle); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index ff53d01ca0..53df76f866 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -21,7 +21,7 @@ void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOf void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient}; void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle}; -void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle}; +void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, NULL}; int (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle}; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f1d1cd6284..206253f9c2 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -95,7 +95,6 @@ typedef struct SCliConn { int64_t refId; int32_t seq; - int32_t shareCnt; int8_t registered; int8_t connnected; @@ -104,17 +103,9 @@ typedef struct SCliConn { void* pInitUserReq; } SCliConn; -// #define TRANS_CONN_REF_INC(tconn) \ -// do { \ -// if (tcond) (tconn)->ref++; \ -// } while (0) - -// #define TRANS_CONN_REF_DEC(tcond) if (tcond) (tconn)\ -// do { \ -// if (tcond) (tconn)->ref--; \ -// } while (0) - -#define TRANS_CONN_REF_GET(tconn) ((tconn) ? (tconn)->ref : 0) +// #define TRANS_CONN_REF_INC(tconn) ((tconn) ? (tconn)->ref++ : 0) +// #define TRANS_CONN_REF_DEC(tconn) ((tconn) ? (tconn)->ref-- : 0) +// #define TRANS_CONN_REF_GET(tconn) ((tconn) ? (tconn)->ref : 0) typedef struct { SCliConn* conn; @@ -216,27 +207,11 @@ static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); // callback after conn to server static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); -// static void cliIdleCb(uv_idle_t* handle); -// static void cliPrepareCb(uv_prepare_t* handle); - -// static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd); static void cliSendBatchCb(uv_write_t* req, int status); SCliBatch* cliGetHeadFromList(SCliBatchList* pList); -void destroyCliConnQTable(SCliConn* conn) { - void* pIter = taosHashIterate(conn->pQTable, NULL); - while (pIter != NULL) { - int64_t* qid = taosHashGetKey(pIter, NULL); - STransCtx* ctx = pIter; - transCtxCleanup(ctx); - pIter = taosHashIterate(conn->pQTable, pIter); - } - taosHashCleanup(conn->pQTable); - conn->pQTable = NULL; -} - -// static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); +static void destroyCliConnQTable(SCliConn* conn); static void cliHandleBatch_shareConnExcept(SCliConn* conn); static int32_t allocConnRef(SCliConn* conn, bool update); @@ -246,12 +221,10 @@ void cliResetConnTimer(SCliConn* conn); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroy(uv_handle_t* handle); -// static int32_t cliSend(SCliConn* pConn); -// static void cliSendBatch(SCliConn* pConn); + static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); -static void doFreeTimeoutMsg(void* param); -static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliReq** pReq); +static void doFreeTimeoutMsg(void* param); static void cliDestroyBatch(SCliBatch* pBatch); // cli util func @@ -259,7 +232,8 @@ static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, SReqCtx* pCtx); static FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr); static FORCE_INLINE int32_t cliBuildExceptResp(SCliReq* pReq, STransMsg* resp); -static FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliReq* pReq, int32_t code); + +// static FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliReq* pReq, int32_t code); static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr); static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn); @@ -267,9 +241,6 @@ static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn); static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst); // process data read from server, add decompress etc later // handle except about conn -static void cliHandleExcept(SCliConn* conn, int32_t code); -// static void cliReleaseUnfinishedMsg(SCliConn* conn); -static void cliHandleFastFail(SCliConn* pConn, int status); static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code); // handle req from app @@ -344,14 +315,7 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p); #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pInst))->label) -// #define CONN_NO_PERSIST_BY_APP(conn) \ -// (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && TRANS_CONN_REF_GET(conn) == 1) -// #define CONN_RELEASE_BY_SERVER(conn) \ -// (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && TRANS_CONN_REF_GET(conn) == 1) - -#define REQUEST_NO_RESP(msg) ((msg)->info.noResp == 1) -#define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1) -#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) +#define REQUEST_NO_RESP(msg) ((msg)->info.noResp == 1) #define EPSET_IS_VALID(epSet) ((epSet) != NULL && (epSet)->numOfEps >= 0 && (epSet)->inUse >= 0) #define EPSET_GET_SIZE(epSet) (epSet)->numOfEps @@ -396,6 +360,17 @@ void cliResetConnTimer(SCliConn* conn) { void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); } +void destroyCliConnQTable(SCliConn* conn) { + void* pIter = taosHashIterate(conn->pQTable, NULL); + while (pIter != NULL) { + int64_t* qid = taosHashGetKey(pIter, NULL); + STransCtx* ctx = pIter; + transCtxCleanup(ctx); + pIter = taosHashIterate(conn->pQTable, pIter); + } + taosHashCleanup(conn->pQTable); + conn->pQTable = NULL; +} bool filteBySeq(void* key, void* arg) { int32_t* seq = arg; SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); @@ -492,13 +467,13 @@ int32_t cliConnMayHandleState_releaseReq(SCliConn* conn, STransMsgHead* pHead) { transQueueRemoveByFilter(&conn->reqsToSend, filterByQid, &qId, &set, -1); while (!QUEUE_IS_EMPTY(&set)) { - queue* el = QUEUE_HEAD(&set); - SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); + queue* el = QUEUE_HEAD(&set); QUEUE_REMOVE(el); - if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { - pThrd->destroyAhandleFp(pReq->ctx->ahandle); - } - destroyReq(pReq); + SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); + + STraceId* trace = &pReq->msg.info.traceId; + tGDebug("start to free msg %p", pReq); + destroyReqWrapper(pReq, pThrd); } taosMemoryFree(pHead); return 1; @@ -521,7 +496,7 @@ int32_t cliMayHandleState(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp TMSG_INFO(pHead->msgType)); return 0; } -void cliHandleResp2(SCliConn* conn) { +void cliHandleResp(SCliConn* conn) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; @@ -602,35 +577,21 @@ void cliHandleResp2(SCliConn* conn) { if (cliMayRecycleConn(conn)) { return; } + transRefCliHandle(conn); (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); } -void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { return; } -void cliHandleExcept(SCliConn* conn, int32_t code) { - tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn)); - if (code != TSDB_CODE_RPC_FQDN_ERROR) { - code = -1; - } - cliHandleExceptImpl(conn, -1); -} - void cliConnTimeout(uv_timer_t* handle) { SCliConn* conn = handle->data; SCliThrd* pThrd = conn->hostThrd; + int32_t ref = transUnrefCliHandle(conn); + if (ref <= 0) { + cliResetConnTimer(conn); + return; + } - tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn)); - - cliResetConnTimer(conn); - // cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); - // cliHandleFastFail(conn, UV_ECANCELED); + tTrace("%s conn %p conn timeout", CONN_GET_INST_LABEL(conn)); } -// void cliReadTimeoutCb(uv_timer_t* handle) { -// // set up timeout cb -// SCliConn* conn = handle->data; -// tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn)); -// (void)uv_read_stop(conn->stream); -// cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT); -// } void* createConnPool(int size) { // thread local, no lock @@ -641,25 +602,12 @@ void* destroyConnPool(SCliThrd* pThrd) { SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conns)) { - queue* h = QUEUE_HEAD(&connList->conns); + queue* h = QUEUE_HEAD(&connList->conns); + QUEUE_REMOVE(h); SCliConn* c = QUEUE_DATA(h, SCliConn, q); cliDestroyConn(c, true); } - SMsgList* msglist = connList->list; - while (!QUEUE_IS_EMPTY(&msglist->msgQ)) { - queue* h = QUEUE_HEAD(&msglist->msgQ); - QUEUE_REMOVE(h); - - SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - - transDQCancel(pThrd->waitConnQueue, pReq->ctx->task); - pReq->ctx->task = NULL; - - doNotifyCb(pReq, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); - } - taosMemoryFree(msglist); - connList = taosHashIterate((SHashObj*)pool, connList); } taosHashCleanup(pool); @@ -798,117 +746,6 @@ static int32_t cliGetOrCreateConn(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pCo } return code; } -static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliReq** pReq) { - void* pool = pThrd->pool; - STrans* pInst = pThrd->pInst; - size_t klen = strlen(key); - SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); - if (plist == NULL) { - SConnList list = {0}; - (void)taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list)); - plist = taosHashGet(pool, key, klen); - - SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); - if (nList == NULL) { - // doNotifyApp(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); - *pReq = NULL; - return NULL; - } - QUEUE_INIT(&nList->msgQ); - nList->numOfConn++; - - QUEUE_INIT(&plist->conns); - plist->list = nList; - } - - // STraceId* trace = &(*pReq)->msg.info.traceId; - // // no avaliable conn in pool - // if (QUEUE_IS_EMPTY(&plist->conns)) { - // SMsgList* list = plist->list; - // if ((list)->numOfConn >= pInst->connLimitNum) { - // STraceId* trace = &(*pReq)->msg.info.traceId; - // if (pInst->notWaitAvaliableConn || (pInst->noDelayFp != NULL && pInst->noDelayFp((*pReq)->msg.msgType))) { - // tDebug("%s msg %s not to send, reason: %s", pInst->label, TMSG_INFO((*pReq)->msg.msgType), - // tstrerror(TSDB_CODE_RPC_NETWORK_BUSY)); - // doNotifyCb(*pReq, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); - // *pReq = NULL; - // return NULL; - // } - - // STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); - // if (arg == NULL) { - // doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); - // *pReq = NULL; - // return NULL; - // } - // arg->param1 = *pReq; - // arg->param2 = pThrd; - - // SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn); - // if (task == NULL) { - // taosMemoryFree(arg); - // doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); - // *pReq = NULL; - // return NULL; - // } - // (*pReq)->ctx->task = task; - // tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pReq)->msg.msgType)); - // QUEUE_PUSH(&(list)->msgQ, &(*pReq)->q); - // *pReq = NULL; - // } else { - // // send msg in delay queue - // if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) { - // STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); - // if (arg == NULL) { - // doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); - // *pReq = NULL; - // return NULL; - // } - // arg->param1 = *pReq; - // arg->param2 = pThrd; - - // SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn); - // if (task == NULL) { - // taosMemoryFree(arg); - // doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); - // *pReq = NULL; - // return NULL; - // } - - // (*pReq)->ctx->task = task; - // tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pReq)->msg.msgType)); - - // QUEUE_PUSH(&(list)->msgQ, &(*pReq)->q); - // queue* h = QUEUE_HEAD(&(list)->msgQ); - // QUEUE_REMOVE(h); - // SCliReq* ans = QUEUE_DATA(h, SCliReq, q); - - // *pReq = ans; - - // trace = &(*pReq)->msg.info.traceId; - // tGTrace("%s msg %s pop from delay queue, start to send", pInst->label, TMSG_INFO((*pReq)->msg.msgType)); - // transDQCancel(pThrd->waitConnQueue, ans->ctx->task); - // } - // list->numOfConn++; - // } - // tDebug("%s numOfConn: %d, limit: %d, dst:%s", pInst->label, list->numOfConn, pInst->connLimitNum, key); - // return NULL; - // } - - queue* h = QUEUE_TAIL(&plist->conns); - plist->size -= 1; - QUEUE_REMOVE(h); - - SCliConn* conn = QUEUE_DATA(h, SCliConn, q); - conn->status = ConnNormal; - QUEUE_INIT(&conn->q); - tDebug("conn %p get from pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr); - if (conn->task != NULL) { - transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); - conn->task = NULL; - } - return conn; -} static void addConnToPool(void* pool, SCliConn* conn) { if (conn->status == ConnInPool) { return; @@ -919,34 +756,10 @@ static void addConnToPool(void* pool, SCliConn* conn) { SCliThrd* thrd = conn->hostThrd; cliResetConnTimer(conn); - if (TRANS_CONN_REF_GET(conn) > 1) { - transUnrefCliHandle(conn); - } - // cliDestroyConnMsgs(conn, false); - if (conn->list == NULL && conn->dstAddr != NULL) { conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr)); } - SConnList* pList = conn->list; - SMsgList* msgList = pList->list; - // if (!QUEUE_IS_EMPTY(&msgList->msgQ)) { - // queue* h = QUEUE_HEAD(&(msgList)->msgQ); - // QUEUE_REMOVE(h); - - // SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - - // transDQCancel(thrd->waitConnQueue, pReq->ctx->task); - // pReq->ctx->task = NULL; - - // transCtxMerge(&conn->ctx, &pReq->ctx->userCtx); - // (void)transQueuePush(&conn->reqsToSend, pReq); - - // conn->status = ConnNormal; - // (void)cliSend2(conn); - // return; - // } - conn->status = ConnInPool; QUEUE_PUSH(&conn->list->conns, &conn->q); conn->list->size += 1; @@ -998,30 +811,6 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { return 0; } -static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { - if (handle == 0) return -1; - if (update) { - (void)transReleaseExHandle(transGetRefMgt(), conn->refId); - (void)transRemoveExHandle(transGetRefMgt(), conn->refId); - conn->refId = -1; - } - SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); - if (exh == NULL) { - return -1; - } - taosWLockLatch(&exh->latch); - exh->handle = conn; - exh->pThrd = conn->hostThrd; - taosWUnLockLatch(&exh->latch); - - conn->refId = exh->refId; - - tDebug("conn %p specified by %" PRId64 "", conn, handle); - - (void)transReleaseExHandle(transGetRefMgt(), handle); - return 0; -} - static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; @@ -1040,17 +829,23 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { uv_fileno((uv_handle_t*)handle, &fd); setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, (int[]){1}, sizeof(int)); - SCliConn* conn = handle->data; + SCliConn* conn = handle->data; + int32_t ref = transUnrefCliHandle(conn); + if (ref <= 0) { + return; + } + SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { pBuf->len += nread; while (transReadComplete(pBuf)) { tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); if (pBuf->invalid) { - return cliHandleBatch_shareConnExcept(conn); + transUnrefCliHandle(conn); + return; break; } else { - cliHandleResp2(conn); + cliHandleResp(conn); } } return; @@ -1064,10 +859,9 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { return; } if (nread < 0) { - tDebug("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), - TRANS_CONN_REF_GET(conn)); + tDebug("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), ref); conn->broken = true; - cliHandleExcept(conn, -1); + transUnrefCliHandle(conn); } } @@ -1151,8 +945,6 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int conn->stream->data = conn; conn->connReq.data = conn; - transRefCliHandle(conn); - *pCliConn = conn; return code; _failed: @@ -1173,47 +965,7 @@ _failed: taosMemoryFree(conn); return code; } -static void cliDestroyConn(SCliConn* conn, bool clear) { - SCliThrd* pThrd = conn->hostThrd; - tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); - conn->broken = true; - QUEUE_REMOVE(&conn->q); - - conn->broken = true; - if (conn->list == NULL) { - conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr)); - } - - if (conn->list) { - SConnList* list = conn->list; - list->list->numOfConn--; - if (conn->status == ConnInPool) { - list->size--; - } - } - conn->list = NULL; - - (void)transReleaseExHandle(transGetRefMgt(), conn->refId); - (void)transRemoveExHandle(transGetRefMgt(), conn->refId); - conn->refId = -1; - - if (conn->task != NULL) { - transDQCancel(pThrd->timeoutQueue, conn->task); - conn->task = NULL; - } - - if (conn->pInitUserReq) { - taosMemoryFree(conn->pInitUserReq); - conn->pInitUserReq = NULL; - } - - if (clear) { - if (!uv_is_closing((uv_handle_t*)conn->stream)) { - (void)uv_read_stop(conn->stream); - uv_close((uv_handle_t*)conn->stream, cliDestroy); - } - } -} +static void cliDestroyConn(SCliConn* conn, bool clear) { cliHandleBatch_shareConnExcept(conn); } static void cliDestroy(uv_handle_t* handle) { if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) { return; @@ -1239,7 +991,6 @@ static void cliDestroy(uv_handle_t* handle) { tDebug("%s conn %p destroy state %ld", CONN_GET_INST_LABEL(conn), conn, *qid); } - // cliDestroyConnMsgs(conn, true); destroyCliConnQTable(conn); if (conn->pInitUserReq) { @@ -1283,7 +1034,6 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { if (pReq) { resp.info.traceId = pReq->msg.info.traceId; } - // resp.info.traceId = pReq ? pReq->msg.info.traceId : {0, 0}; // handle noresp and inter manage msg if (pCtx == NULL || REQUEST_NO_RESP(&pReq->msg)) { @@ -1300,7 +1050,8 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { destroyReq(pReq); } } - if (!uv_is_closing((uv_handle_t*)conn->stream)) { + int8_t ref = transGetRefCount(conn); + if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) { uv_close((uv_handle_t*)conn->stream, cliDestroy); } } @@ -1330,16 +1081,21 @@ static void cliConnRmReqs(SCliConn* conn) { static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { SCliConn* conn = req->data; SCliThrd* pThrd = conn->hostThrd; - conn->shareCnt -= 1; + + int32_t ref = transUnrefCliHandle(conn); + if (ref <= 0) { + taosMemoryFree(req); + return; + } cliConnRmReqs(conn); if (status != 0) { tDebug("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status)); - if (!uv_is_closing((uv_handle_t*)&conn->stream)) { - cliHandleBatch_shareConnExcept(conn); - } + transUnrefCliHandle(conn); return; } + + transRefCliHandle(conn); (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); taosMemoryFree(req); @@ -1442,9 +1198,9 @@ void cliSendBatch_shareConn(SCliConn* pConn) { return; } + transRefCliHandle(pConn); uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); req->data = pConn; - pConn->shareCnt += 1; tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, totalLen); uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliSendBatch_shareConnCb); taosMemoryFree(wb); @@ -1514,12 +1270,14 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) { return code; } + transRefCliHandle(conn); ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { tError("failed connect to %s, reason:%s", conn->dstAddr, uv_err_name(ret)); TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1); } + transRefCliHandle(conn); ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); if (ret != 0) { tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); @@ -1540,39 +1298,11 @@ _exception2: // cliResetConnTimer(conn); // cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); - // cliHandleFastFail(conn, code); // // taosMemoryFree(conn); return code; } -static void cliHandleFastFail_resp(SCliConn* pConn, int status) { - SCliThrd* pThrd = pConn->hostThrd; - STrans* pInst = pThrd->pInst; - // //kSCliReq* pReq = transQueueGet(&pConn->reqsToSend, 0); - - // STraceId* trace = &pReq->msg.info.traceId; - // tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), - // TMSG_INFO(pReq->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status)); -} - -static void cliHandleFastFail_noresp(SCliConn* pConn, int status) { - tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), pConn, - pConn->dstAddr, uv_strerror(status)); - cliDestroyBatch(pConn->pBatch); - pConn->pBatch = NULL; -} -static void cliHandleFastFail(SCliConn* pConn, int status) { - if (status == -1) status = UV_EADDRNOTAVAIL; - - if (pConn->pBatch == NULL) { - cliHandleFastFail_resp(pConn, status); - } else { - cliHandleFastFail_noresp(pConn, status); - } - cliHandleExcept(pConn, status); -} - int32_t cliConnSetSockInfo(SCliConn* pConn) { struct sockaddr peername, sockname; int addrlen = sizeof(peername); @@ -1593,20 +1323,20 @@ int32_t cliConnSetSockInfo(SCliConn* pConn) { return 0; }; -static int32_t cliBuildExeceptMsg(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { - SCliThrd* pThrd = pConn->hostThrd; - STrans* pInst = pThrd->pInst; - memset(pResp, 0, sizeof(STransMsg)); - STransMsg resp = {0}; - resp.contLen = 0; - resp.pCont = NULL; - resp.msgType = pReq->msg.msgType + 1; - resp.info.ahandle = pReq->ctx->ahandle; - resp.info.traceId = pReq->msg.info.traceId; - resp.info.hasEpSet = false; - resp.info.cliVer = pInst->compatibilityVer; - return 0; -} +// static int32_t cliBuildExeceptMsg(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { +// SCliThrd* pThrd = pConn->hostThrd; +// STrans* pInst = pThrd->pInst; +// memset(pResp, 0, sizeof(STransMsg)); +// STransMsg resp = {0}; +// resp.contLen = 0; +// resp.pCont = NULL; +// resp.msgType = pReq->msg.msgType + 1; +// resp.info.ahandle = pReq->ctx->ahandle; +// resp.info.traceId = pReq->msg.info.traceId; +// resp.info.hasEpSet = false; +// resp.info.cliVer = pInst->compatibilityVer; +// return 0; +// } bool filteGetAll(void* q, void* arg) { return true; } void cliConnCb(uv_connect_t* req, int status) { @@ -1614,8 +1344,13 @@ void cliConnCb(uv_connect_t* req, int status) { SCliThrd* pThrd = pConn->hostThrd; bool timeout = false; + int32_t ref = transUnrefCliHandle(pConn); + if (ref <= 0) { + return; + } if (pConn->timer == NULL) { timeout = true; + return; } else { cliResetConnTimer(pConn); } @@ -1625,14 +1360,11 @@ void cliConnCb(uv_connect_t* req, int status) { if (status != 0) { tDebug("%s conn %p failed to connect to %s, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr, uv_strerror(status)); - cliHandleBatch_shareConnExcept(pConn); + transUnrefCliHandle(pConn); return; } pConn->connnected = 1; - cliConnSetSockInfo(pConn); - - // addConnToHeapCache(pThrd->connHeapCache, pConn); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); (void)cliSend2(pConn); @@ -1676,68 +1408,9 @@ static void cliHandleQuit(SCliThrd* pThrd, SCliReq* pReq) { (void)destroyConnPool(pThrd); (void)uv_walk(pThrd->loop, cliWalkCb, NULL); } -static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) { - return; - // int64_t refId = (int64_t)(pReq->msg.info.handle); - // SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); - // if (exh == NULL) { - // tDebug("%" PRId64 " already released", refId); - // destroyReq(pReq); - // return; - // } - - // taosRLockLatch(&exh->latch); - // SCliConn* conn = exh->handle; - // taosRUnLockLatch(&exh->latch); - - // (void)transReleaseExHandle(transGetRefMgt(), refId); - // tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); - - // if (TRANS_CONN_REF_GET(conn) == 2) { - // transUnrefCliHandle(conn); - // if (!transQueuePush(&conn->reqsToSend, &pReq->q)) { - // return; - // } - // (void)cliSend2(conn); - // } else { - // tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn); - // destroyReq(pReq); - // } -} +static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) { return; } static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { return; } -SCliConn* cliGetConn(SCliReq** pReq, SCliThrd* pThrd, bool* ignore, char* addr) { - SReqCtx* pCtx = (*pReq)->ctx; - SCliConn* conn = NULL; - - int64_t refId = (int64_t)((*pReq)->msg.info.handle); - if (refId != 0) { - SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); - if (exh == NULL) { - tError("failed to get conn, refId: %" PRId64 "", refId); - *ignore = true; - return NULL; - } else { - taosRLockLatch(&exh->latch); - conn = exh->handle; - taosRUnLockLatch(&exh->latch); - if (conn == NULL) { - conn = getConnFromPool2(pThrd, addr, pReq); - if (conn != NULL) specifyConnRef(conn, true, refId); - } - (void)transReleaseExHandle(transGetRefMgt(), refId); - } - return conn; - }; - - conn = getConnFromPool2(pThrd, addr, pReq); - if (conn != NULL) { - tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); - } else { - tTrace("%s not found conn in conn pool:%p, dst:%s", ((STrans*)pThrd->pInst)->label, pThrd->pool, addr); - } - return conn; -} FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr) { if (pCvtAddr->cvt == false) { if (EPSET_IS_VALID(pEpSet)) { @@ -1775,19 +1448,19 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliReq* pReq, STransMsg* pResp) { return 0; } -FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliReq* pReq, int32_t code) { - STrans* pInst = pThrd->pInst; +// FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliReq* pReq, int32_t code) { +// STrans* pInst = pThrd->pInst; - STransMsg resp = {.code = code}; - code = cliBuildExceptResp(pReq, &resp); - if (code != 0) { - return code; - } - resp.info.cliVer = pInst->compatibilityVer; - pInst->cfp(pInst->parent, &resp, NULL); +// STransMsg resp = {.code = code}; +// code = cliBuildExceptResp(pReq, &resp); +// if (code != 0) { +// return code; +// } +// resp.info.cliVer = pInst->compatibilityVer; +// pInst->cfp(pInst->parent, &resp, NULL); - return 0; -} +// return 0; +// } static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ip) { int32_t code = 0; @@ -2130,39 +1803,7 @@ static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* p *ppBatch = pBatch; return 0; } -static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) { - STrans* pInst = pThrd->pInst; - int32_t code = 0; - - int count = 0; - while (!QUEUE_IS_EMPTY(wq)) { - queue* h = QUEUE_HEAD(wq); - QUEUE_REMOVE(h); - - SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - - // if (pReq->type == Normal && REQUEST_NO_RESP(&pReq->msg)) { - // cliBuildBatch(pReq, h, pThrd); - // continue; - // } - (*cliAsyncHandle[pReq->type])(pThrd, pReq); - count++; - } - - // void** pIter = taosHashIterate(pThrd->batchCache, NULL); - // while (pIter != NULL) { - // SCliBatchList* batchList = (SCliBatchList*)(*pIter); - // SCliBatch* batch = cliGetHeadFromList(batchList); - // if (batch != NULL) { - // cliHandleBatchReq(batch, pThrd); - // } - // pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); - // } - - if (count >= 2) { - tTrace("cli process batch size:%d", count); - } -} +static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) { return cliDoReq(wq, pThrd); } static void cliAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; @@ -2180,42 +1821,13 @@ static void cliAsyncCb(uv_async_t* handle) { if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd, pThrd->stopMsg); } -void cliDestroyConnMsgs(SCliConn* conn, bool destroy) { - // transCtxCleanup(&conn->ctx); - // cliReleaseUnfinishedMsg(conn); - if (destroy == 1) { - transQueueDestroy(&conn->reqsToSend); - } else { - transQueueClear(&conn->reqsToSend); - } - transQueueDestroy(&conn->reqsSentOut); -} - -void cliConnFreeMsgs(SCliConn* conn) { - SCliThrd* pThrd = conn->hostThrd; - STrans* pInst = pThrd->pInst; - - // for (int i = 0; i < transQueueSize(&conn->reqsToSend); i++) { - // SCliReq* cmsg = transQueueGet(&conn->reqsToSend, i); - // if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) { - // continue; - // } - - // if (cliBuildExceptRespAndNotifyCb(pThrd, cmsg, 0) != 0) { - // continue; - // } - - // cmsg->ctx->ahandle = NULL; - // } -} - static FORCE_INLINE void destroyReq(void* arg) { SCliReq* pReq = arg; if (pReq == NULL) { return; } - - tDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx); + STraceId* trace = &pReq->msg.info.traceId; + tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx); if (pReq->ctx) destroyReqCtx(pReq->ctx); transFreeMsg(pReq->msg.pCont); @@ -2226,8 +1838,11 @@ static FORCE_INLINE void destroyReqWrapper(void* arg, void* param) { SCliReq* pReq = arg; SCliThrd* pThrd = param; - if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL) { - if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pReq->msg.info.ahandle); + + if (pReq->ctx != NULL && pReq->ctx->ahandle != NULL) { + if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { + (*pThrd->destroyAhandleFp)(pReq->ctx->ahandle); + } } destroyReq(pReq); } @@ -2237,19 +1852,7 @@ static FORCE_INLINE void destroyReqAndAhanlde(void* param) { STaskArg* arg = param; SCliReq* pReq = arg->param1; SCliThrd* pThrd = arg->param2; - - if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { - pThrd->destroyAhandleFp(pReq->ctx->ahandle); - } - - if (pReq->msg.info.handle != 0) { - (void)transReleaseExHandle(transGetRefMgt(), (int64_t)pReq->msg.info.handle); - (void)transRemoveExHandle(transGetRefMgt(), (int64_t)pReq->msg.info.handle); - } - - destroyReqCtx(pReq->ctx); - transFreeMsg(pReq->msg.pCont); - taosMemoryFree(pReq); + destroyReqWrapper(pReq, pThrd); } static void* cliWorkThread(void* arg) { @@ -2404,11 +2007,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } - // pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - // if (pThrd->failFastCache == NULL) { - // TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); - // } - pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->batchCache == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); @@ -2566,7 +2164,14 @@ static FORCE_INLINE void doCloseIdleConn(void* param) { STaskArg* arg = param; SCliConn* conn = arg->param1; tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn); - conn->task = NULL; + + int32_t ref = transUnrefCliHandle(conn); + if (ref <= 0) { + conn->task = NULL; + taosMemoryFree(arg); + return; + } + cliDestroyConn(conn, true); taosMemoryFree(arg); } @@ -2945,17 +2550,26 @@ void transRefCliHandle(void* handle) { tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL(conn), conn, conn->ref); } -void transUnrefCliHandle(void* handle) { +int32_t transUnrefCliHandle(void* handle) { if (handle == NULL) { - return; + return 0; } SCliConn* conn = (SCliConn*)handle; - conn->ref--; + int32_t ref = conn->ref--; tTrace("%s conn %p ref:%d", CONN_GET_INST_LABEL(conn), conn, conn->ref); if (conn->ref == 0) { cliDestroyConn(conn, true); } + return ref; +} + +int32_t transGetRefCount(void* handle) { + if (handle == NULL) { + return 0; + } + SCliConn* conn = (SCliConn*)handle; + return conn->ref; } static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t handle) { SCliThrd* pThrd = NULL;