opt parameter

This commit is contained in:
yihaoDeng 2024-09-13 13:32:08 +08:00
parent 24a2da04c1
commit d3634f8999
4 changed files with 146 additions and 535 deletions

View File

@ -16,10 +16,10 @@
#include "catalog.h"
#include "clientInt.h"
#include "clientLog.h"
#include "scheduler.h"
#include "trpc.h"
#include "tglobal.h"
#include "clientMonitor.h"
#include "scheduler.h"
#include "tglobal.h"
#include "trpc.h"
typedef struct {
union {
@ -245,11 +245,9 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
goto _return;
}
TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog,
(rsp->useDbRsp->db[0] == 'i') ?
TSDB_PERFORMANCE_SCHEMA_DB :
TSDB_INFORMATION_SCHEMA_DB,
rsp->useDbRsp->uid, vgInfo));
TSC_ERR_JRET(catalogUpdateDBVgInfo(
pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB,
rsp->useDbRsp->uid, vgInfo));
}
}
}
@ -557,7 +555,6 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
}
}
taosHashRelease(pAppHbMgr->activeInfo, pReq);
return TSDB_CODE_SUCCESS;
@ -610,8 +607,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
}
pInst->monitorParas = pRsp.monitorParas;
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", pInst->clusterId,
pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
if (rspNum) {
tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
@ -1109,7 +1106,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
if (clientHbMgr.appHbHash) {
code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0);
if (TSDB_CODE_SUCCESS != code) {
tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId,
tstrerror(code));
return code;
}
}
@ -1262,7 +1260,7 @@ int32_t hbGatherAppInfo(void) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == NULL) continue;
int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
if (NULL == pApp) {
(void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
@ -1304,8 +1302,7 @@ static void *hbThreadFunc(void *param) {
return NULL;
}
if (sz > 1 && !clientHbMgr.appHbHash) {
clientHbMgr.appHbHash =
taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
if (NULL == clientHbMgr.appHbHash) {
tscError("taosHashInit failed");
return NULL;
@ -1325,13 +1322,13 @@ static void *hbThreadFunc(void *param) {
continue;
}
SClientHbBatchReq *pReq = NULL;
int32_t code = hbGatherAllInfo(pAppHbMgr, &pReq);
int32_t code = hbGatherAllInfo(pAppHbMgr, &pReq);
if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
terrno = code ? code : TSDB_CODE_OUT_OF_RANGE;
tFreeClientHbBatchReq(pReq);
continue;
}
int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
if (tlen == -1) {
tFreeClientHbBatchReq(pReq);
break;
@ -1369,9 +1366,8 @@ static void *hbThreadFunc(void *param) {
pInfo->requestObjRefId = 0;
SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
int64_t transporterId = 0;
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo)) {
if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, NULL, pInfo)) {
tscWarn("failed to async send msg to server");
}
tFreeClientHbBatchReq(pReq);
@ -1390,7 +1386,7 @@ static void *hbThreadFunc(void *param) {
}
static int32_t hbCreateThread() {
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
TdThreadAttr thAttr;
TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
@ -1468,9 +1464,9 @@ int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMg
TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock));
if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
(void)taosThreadMutexUnlock(&clientHbMgr.lock);
goto _return;
code = TSDB_CODE_OUT_OF_MEMORY;
(void)taosThreadMutexUnlock(&clientHbMgr.lock);
goto _return;
}
(*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1;
TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock));

View File

@ -311,8 +311,9 @@ int32_t transSetConnOption(uv_tcp_t* stream, int keepalive);
void transRefSrvHandle(void* handle);
void transUnrefSrvHandle(void* handle);
void transRefCliHandle(void* handle);
void transUnrefCliHandle(void* handle);
void transRefCliHandle(void* handle);
int32_t transUnrefCliHandle(void* handle);
int32_t transGetRefCount(void* handle);
int32_t transReleaseCliHandle(void* handle);
int32_t transReleaseSrvHandle(void* handle);

View File

@ -21,7 +21,7 @@ void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOf
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle};
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle};
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, NULL};
int (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle};

View File

@ -95,7 +95,6 @@ typedef struct SCliConn {
int64_t refId;
int32_t seq;
int32_t shareCnt;
int8_t registered;
int8_t connnected;
@ -104,17 +103,9 @@ typedef struct SCliConn {
void* pInitUserReq;
} SCliConn;
// #define TRANS_CONN_REF_INC(tconn) \
// do { \
// if (tcond) (tconn)->ref++; \
// } while (0)
// #define TRANS_CONN_REF_DEC(tcond) if (tcond) (tconn)\
// do { \
// if (tcond) (tconn)->ref--; \
// } while (0)
#define TRANS_CONN_REF_GET(tconn) ((tconn) ? (tconn)->ref : 0)
// #define TRANS_CONN_REF_INC(tconn) ((tconn) ? (tconn)->ref++ : 0)
// #define TRANS_CONN_REF_DEC(tconn) ((tconn) ? (tconn)->ref-- : 0)
// #define TRANS_CONN_REF_GET(tconn) ((tconn) ? (tconn)->ref : 0)
typedef struct {
SCliConn* conn;
@ -216,27 +207,11 @@ static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
// callback after conn to server
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
// static void cliIdleCb(uv_idle_t* handle);
// static void cliPrepareCb(uv_prepare_t* handle);
// static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd);
static void cliSendBatchCb(uv_write_t* req, int status);
SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
void destroyCliConnQTable(SCliConn* conn) {
void* pIter = taosHashIterate(conn->pQTable, NULL);
while (pIter != NULL) {
int64_t* qid = taosHashGetKey(pIter, NULL);
STransCtx* ctx = pIter;
transCtxCleanup(ctx);
pIter = taosHashIterate(conn->pQTable, pIter);
}
taosHashCleanup(conn->pQTable);
conn->pQTable = NULL;
}
// static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
static void destroyCliConnQTable(SCliConn* conn);
static void cliHandleBatch_shareConnExcept(SCliConn* conn);
static int32_t allocConnRef(SCliConn* conn, bool update);
@ -246,12 +221,10 @@ void cliResetConnTimer(SCliConn* conn);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void cliDestroy(uv_handle_t* handle);
// static int32_t cliSend(SCliConn* pConn);
// static void cliSendBatch(SCliConn* pConn);
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
static void doFreeTimeoutMsg(void* param);
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliReq** pReq);
static void doFreeTimeoutMsg(void* param);
static void cliDestroyBatch(SCliBatch* pBatch);
// cli util func
@ -259,7 +232,8 @@ static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, SReqCtx* pCtx);
static FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr);
static FORCE_INLINE int32_t cliBuildExceptResp(SCliReq* pReq, STransMsg* resp);
static FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliReq* pReq, int32_t code);
// static FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliReq* pReq, int32_t code);
static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr);
static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn);
@ -267,9 +241,6 @@ static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn);
static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst);
// process data read from server, add decompress etc later
// handle except about conn
static void cliHandleExcept(SCliConn* conn, int32_t code);
// static void cliReleaseUnfinishedMsg(SCliConn* conn);
static void cliHandleFastFail(SCliConn* pConn, int status);
static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code);
// handle req from app
@ -344,14 +315,7 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p);
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pInst))->label)
// #define CONN_NO_PERSIST_BY_APP(conn) \
// (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && TRANS_CONN_REF_GET(conn) == 1)
// #define CONN_RELEASE_BY_SERVER(conn) \
// (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && TRANS_CONN_REF_GET(conn) == 1)
#define REQUEST_NO_RESP(msg) ((msg)->info.noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1)
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
#define REQUEST_NO_RESP(msg) ((msg)->info.noResp == 1)
#define EPSET_IS_VALID(epSet) ((epSet) != NULL && (epSet)->numOfEps >= 0 && (epSet)->inUse >= 0)
#define EPSET_GET_SIZE(epSet) (epSet)->numOfEps
@ -396,6 +360,17 @@ void cliResetConnTimer(SCliConn* conn) {
void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); }
void destroyCliConnQTable(SCliConn* conn) {
void* pIter = taosHashIterate(conn->pQTable, NULL);
while (pIter != NULL) {
int64_t* qid = taosHashGetKey(pIter, NULL);
STransCtx* ctx = pIter;
transCtxCleanup(ctx);
pIter = taosHashIterate(conn->pQTable, pIter);
}
taosHashCleanup(conn->pQTable);
conn->pQTable = NULL;
}
bool filteBySeq(void* key, void* arg) {
int32_t* seq = arg;
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
@ -492,13 +467,13 @@ int32_t cliConnMayHandleState_releaseReq(SCliConn* conn, STransMsgHead* pHead) {
transQueueRemoveByFilter(&conn->reqsToSend, filterByQid, &qId, &set, -1);
while (!QUEUE_IS_EMPTY(&set)) {
queue* el = QUEUE_HEAD(&set);
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
queue* el = QUEUE_HEAD(&set);
QUEUE_REMOVE(el);
if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
pThrd->destroyAhandleFp(pReq->ctx->ahandle);
}
destroyReq(pReq);
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
STraceId* trace = &pReq->msg.info.traceId;
tGDebug("start to free msg %p", pReq);
destroyReqWrapper(pReq, pThrd);
}
taosMemoryFree(pHead);
return 1;
@ -521,7 +496,7 @@ int32_t cliMayHandleState(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp
TMSG_INFO(pHead->msgType));
return 0;
}
void cliHandleResp2(SCliConn* conn) {
void cliHandleResp(SCliConn* conn) {
int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
@ -602,35 +577,21 @@ void cliHandleResp2(SCliConn* conn) {
if (cliMayRecycleConn(conn)) {
return;
}
transRefCliHandle(conn);
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
}
void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { return; }
void cliHandleExcept(SCliConn* conn, int32_t code) {
tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn));
if (code != TSDB_CODE_RPC_FQDN_ERROR) {
code = -1;
}
cliHandleExceptImpl(conn, -1);
}
void cliConnTimeout(uv_timer_t* handle) {
SCliConn* conn = handle->data;
SCliThrd* pThrd = conn->hostThrd;
int32_t ref = transUnrefCliHandle(conn);
if (ref <= 0) {
cliResetConnTimer(conn);
return;
}
tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn));
cliResetConnTimer(conn);
// cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
// cliHandleFastFail(conn, UV_ECANCELED);
tTrace("%s conn %p conn timeout", CONN_GET_INST_LABEL(conn));
}
// void cliReadTimeoutCb(uv_timer_t* handle) {
// // set up timeout cb
// SCliConn* conn = handle->data;
// tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn));
// (void)uv_read_stop(conn->stream);
// cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
// }
void* createConnPool(int size) {
// thread local, no lock
@ -641,25 +602,12 @@ void* destroyConnPool(SCliThrd* pThrd) {
SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
while (connList != NULL) {
while (!QUEUE_IS_EMPTY(&connList->conns)) {
queue* h = QUEUE_HEAD(&connList->conns);
queue* h = QUEUE_HEAD(&connList->conns);
QUEUE_REMOVE(h);
SCliConn* c = QUEUE_DATA(h, SCliConn, q);
cliDestroyConn(c, true);
}
SMsgList* msglist = connList->list;
while (!QUEUE_IS_EMPTY(&msglist->msgQ)) {
queue* h = QUEUE_HEAD(&msglist->msgQ);
QUEUE_REMOVE(h);
SCliReq* pReq = QUEUE_DATA(h, SCliReq, q);
transDQCancel(pThrd->waitConnQueue, pReq->ctx->task);
pReq->ctx->task = NULL;
doNotifyCb(pReq, pThrd, TSDB_CODE_RPC_MAX_SESSIONS);
}
taosMemoryFree(msglist);
connList = taosHashIterate((SHashObj*)pool, connList);
}
taosHashCleanup(pool);
@ -798,117 +746,6 @@ static int32_t cliGetOrCreateConn(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pCo
}
return code;
}
static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliReq** pReq) {
void* pool = pThrd->pool;
STrans* pInst = pThrd->pInst;
size_t klen = strlen(key);
SConnList* plist = taosHashGet((SHashObj*)pool, key, klen);
if (plist == NULL) {
SConnList list = {0};
(void)taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, klen);
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
if (nList == NULL) {
// doNotifyApp(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY);
*pReq = NULL;
return NULL;
}
QUEUE_INIT(&nList->msgQ);
nList->numOfConn++;
QUEUE_INIT(&plist->conns);
plist->list = nList;
}
// STraceId* trace = &(*pReq)->msg.info.traceId;
// // no avaliable conn in pool
// if (QUEUE_IS_EMPTY(&plist->conns)) {
// SMsgList* list = plist->list;
// if ((list)->numOfConn >= pInst->connLimitNum) {
// STraceId* trace = &(*pReq)->msg.info.traceId;
// if (pInst->notWaitAvaliableConn || (pInst->noDelayFp != NULL && pInst->noDelayFp((*pReq)->msg.msgType))) {
// tDebug("%s msg %s not to send, reason: %s", pInst->label, TMSG_INFO((*pReq)->msg.msgType),
// tstrerror(TSDB_CODE_RPC_NETWORK_BUSY));
// doNotifyCb(*pReq, pThrd, TSDB_CODE_RPC_NETWORK_BUSY);
// *pReq = NULL;
// return NULL;
// }
// STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
// if (arg == NULL) {
// doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY);
// *pReq = NULL;
// return NULL;
// }
// arg->param1 = *pReq;
// arg->param2 = pThrd;
// SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn);
// if (task == NULL) {
// taosMemoryFree(arg);
// doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY);
// *pReq = NULL;
// return NULL;
// }
// (*pReq)->ctx->task = task;
// tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pReq)->msg.msgType));
// QUEUE_PUSH(&(list)->msgQ, &(*pReq)->q);
// *pReq = NULL;
// } else {
// // send msg in delay queue
// if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) {
// STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
// if (arg == NULL) {
// doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY);
// *pReq = NULL;
// return NULL;
// }
// arg->param1 = *pReq;
// arg->param2 = pThrd;
// SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn);
// if (task == NULL) {
// taosMemoryFree(arg);
// doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY);
// *pReq = NULL;
// return NULL;
// }
// (*pReq)->ctx->task = task;
// tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pReq)->msg.msgType));
// QUEUE_PUSH(&(list)->msgQ, &(*pReq)->q);
// queue* h = QUEUE_HEAD(&(list)->msgQ);
// QUEUE_REMOVE(h);
// SCliReq* ans = QUEUE_DATA(h, SCliReq, q);
// *pReq = ans;
// trace = &(*pReq)->msg.info.traceId;
// tGTrace("%s msg %s pop from delay queue, start to send", pInst->label, TMSG_INFO((*pReq)->msg.msgType));
// transDQCancel(pThrd->waitConnQueue, ans->ctx->task);
// }
// list->numOfConn++;
// }
// tDebug("%s numOfConn: %d, limit: %d, dst:%s", pInst->label, list->numOfConn, pInst->connLimitNum, key);
// return NULL;
// }
queue* h = QUEUE_TAIL(&plist->conns);
plist->size -= 1;
QUEUE_REMOVE(h);
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal;
QUEUE_INIT(&conn->q);
tDebug("conn %p get from pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr);
if (conn->task != NULL) {
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL;
}
return conn;
}
static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->status == ConnInPool) {
return;
@ -919,34 +756,10 @@ static void addConnToPool(void* pool, SCliConn* conn) {
SCliThrd* thrd = conn->hostThrd;
cliResetConnTimer(conn);
if (TRANS_CONN_REF_GET(conn) > 1) {
transUnrefCliHandle(conn);
}
// cliDestroyConnMsgs(conn, false);
if (conn->list == NULL && conn->dstAddr != NULL) {
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr));
}
SConnList* pList = conn->list;
SMsgList* msgList = pList->list;
// if (!QUEUE_IS_EMPTY(&msgList->msgQ)) {
// queue* h = QUEUE_HEAD(&(msgList)->msgQ);
// QUEUE_REMOVE(h);
// SCliReq* pReq = QUEUE_DATA(h, SCliReq, q);
// transDQCancel(thrd->waitConnQueue, pReq->ctx->task);
// pReq->ctx->task = NULL;
// transCtxMerge(&conn->ctx, &pReq->ctx->userCtx);
// (void)transQueuePush(&conn->reqsToSend, pReq);
// conn->status = ConnNormal;
// (void)cliSend2(conn);
// return;
// }
conn->status = ConnInPool;
QUEUE_PUSH(&conn->list->conns, &conn->q);
conn->list->size += 1;
@ -998,30 +811,6 @@ static int32_t allocConnRef(SCliConn* conn, bool update) {
return 0;
}
static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
if (handle == 0) return -1;
if (update) {
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1;
}
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
if (exh == NULL) {
return -1;
}
taosWLockLatch(&exh->latch);
exh->handle = conn;
exh->pThrd = conn->hostThrd;
taosWUnLockLatch(&exh->latch);
conn->refId = exh->refId;
tDebug("conn %p specified by %" PRId64 "", conn, handle);
(void)transReleaseExHandle(transGetRefMgt(), handle);
return 0;
}
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
@ -1040,17 +829,23 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
uv_fileno((uv_handle_t*)handle, &fd);
setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, (int[]){1}, sizeof(int));
SCliConn* conn = handle->data;
SCliConn* conn = handle->data;
int32_t ref = transUnrefCliHandle(conn);
if (ref <= 0) {
return;
}
SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) {
pBuf->len += nread;
while (transReadComplete(pBuf)) {
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
if (pBuf->invalid) {
return cliHandleBatch_shareConnExcept(conn);
transUnrefCliHandle(conn);
return;
break;
} else {
cliHandleResp2(conn);
cliHandleResp(conn);
}
}
return;
@ -1064,10 +859,9 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
return;
}
if (nread < 0) {
tDebug("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread),
TRANS_CONN_REF_GET(conn));
tDebug("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), ref);
conn->broken = true;
cliHandleExcept(conn, -1);
transUnrefCliHandle(conn);
}
}
@ -1151,8 +945,6 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
conn->stream->data = conn;
conn->connReq.data = conn;
transRefCliHandle(conn);
*pCliConn = conn;
return code;
_failed:
@ -1173,47 +965,7 @@ _failed:
taosMemoryFree(conn);
return code;
}
static void cliDestroyConn(SCliConn* conn, bool clear) {
SCliThrd* pThrd = conn->hostThrd;
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
conn->broken = true;
QUEUE_REMOVE(&conn->q);
conn->broken = true;
if (conn->list == NULL) {
conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr));
}
if (conn->list) {
SConnList* list = conn->list;
list->list->numOfConn--;
if (conn->status == ConnInPool) {
list->size--;
}
}
conn->list = NULL;
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1;
if (conn->task != NULL) {
transDQCancel(pThrd->timeoutQueue, conn->task);
conn->task = NULL;
}
if (conn->pInitUserReq) {
taosMemoryFree(conn->pInitUserReq);
conn->pInitUserReq = NULL;
}
if (clear) {
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
(void)uv_read_stop(conn->stream);
uv_close((uv_handle_t*)conn->stream, cliDestroy);
}
}
}
static void cliDestroyConn(SCliConn* conn, bool clear) { cliHandleBatch_shareConnExcept(conn); }
static void cliDestroy(uv_handle_t* handle) {
if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
return;
@ -1239,7 +991,6 @@ static void cliDestroy(uv_handle_t* handle) {
tDebug("%s conn %p destroy state %ld", CONN_GET_INST_LABEL(conn), conn, *qid);
}
// cliDestroyConnMsgs(conn, true);
destroyCliConnQTable(conn);
if (conn->pInitUserReq) {
@ -1283,7 +1034,6 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
if (pReq) {
resp.info.traceId = pReq->msg.info.traceId;
}
// resp.info.traceId = pReq ? pReq->msg.info.traceId : {0, 0};
// handle noresp and inter manage msg
if (pCtx == NULL || REQUEST_NO_RESP(&pReq->msg)) {
@ -1300,7 +1050,8 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
destroyReq(pReq);
}
}
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
int8_t ref = transGetRefCount(conn);
if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) {
uv_close((uv_handle_t*)conn->stream, cliDestroy);
}
}
@ -1330,16 +1081,21 @@ static void cliConnRmReqs(SCliConn* conn) {
static void cliSendBatch_shareConnCb(uv_write_t* req, int status) {
SCliConn* conn = req->data;
SCliThrd* pThrd = conn->hostThrd;
conn->shareCnt -= 1;
int32_t ref = transUnrefCliHandle(conn);
if (ref <= 0) {
taosMemoryFree(req);
return;
}
cliConnRmReqs(conn);
if (status != 0) {
tDebug("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
if (!uv_is_closing((uv_handle_t*)&conn->stream)) {
cliHandleBatch_shareConnExcept(conn);
}
transUnrefCliHandle(conn);
return;
}
transRefCliHandle(conn);
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
taosMemoryFree(req);
@ -1442,9 +1198,9 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
return;
}
transRefCliHandle(pConn);
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
req->data = pConn;
pConn->shareCnt += 1;
tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, totalLen);
uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliSendBatch_shareConnCb);
taosMemoryFree(wb);
@ -1514,12 +1270,14 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) {
return code;
}
transRefCliHandle(conn);
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) {
tError("failed connect to %s, reason:%s", conn->dstAddr, uv_err_name(ret));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
}
transRefCliHandle(conn);
ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
if (ret != 0) {
tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
@ -1540,39 +1298,11 @@ _exception2:
// cliResetConnTimer(conn);
// cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
// cliHandleFastFail(conn, code);
// // taosMemoryFree(conn);
return code;
}
static void cliHandleFastFail_resp(SCliConn* pConn, int status) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
// //kSCliReq* pReq = transQueueGet(&pConn->reqsToSend, 0);
// STraceId* trace = &pReq->msg.info.traceId;
// tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
// TMSG_INFO(pReq->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status));
}
static void cliHandleFastFail_noresp(SCliConn* pConn, int status) {
tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), pConn,
pConn->dstAddr, uv_strerror(status));
cliDestroyBatch(pConn->pBatch);
pConn->pBatch = NULL;
}
static void cliHandleFastFail(SCliConn* pConn, int status) {
if (status == -1) status = UV_EADDRNOTAVAIL;
if (pConn->pBatch == NULL) {
cliHandleFastFail_resp(pConn, status);
} else {
cliHandleFastFail_noresp(pConn, status);
}
cliHandleExcept(pConn, status);
}
int32_t cliConnSetSockInfo(SCliConn* pConn) {
struct sockaddr peername, sockname;
int addrlen = sizeof(peername);
@ -1593,20 +1323,20 @@ int32_t cliConnSetSockInfo(SCliConn* pConn) {
return 0;
};
static int32_t cliBuildExeceptMsg(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
memset(pResp, 0, sizeof(STransMsg));
STransMsg resp = {0};
resp.contLen = 0;
resp.pCont = NULL;
resp.msgType = pReq->msg.msgType + 1;
resp.info.ahandle = pReq->ctx->ahandle;
resp.info.traceId = pReq->msg.info.traceId;
resp.info.hasEpSet = false;
resp.info.cliVer = pInst->compatibilityVer;
return 0;
}
// static int32_t cliBuildExeceptMsg(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
// SCliThrd* pThrd = pConn->hostThrd;
// STrans* pInst = pThrd->pInst;
// memset(pResp, 0, sizeof(STransMsg));
// STransMsg resp = {0};
// resp.contLen = 0;
// resp.pCont = NULL;
// resp.msgType = pReq->msg.msgType + 1;
// resp.info.ahandle = pReq->ctx->ahandle;
// resp.info.traceId = pReq->msg.info.traceId;
// resp.info.hasEpSet = false;
// resp.info.cliVer = pInst->compatibilityVer;
// return 0;
// }
bool filteGetAll(void* q, void* arg) { return true; }
void cliConnCb(uv_connect_t* req, int status) {
@ -1614,8 +1344,13 @@ void cliConnCb(uv_connect_t* req, int status) {
SCliThrd* pThrd = pConn->hostThrd;
bool timeout = false;
int32_t ref = transUnrefCliHandle(pConn);
if (ref <= 0) {
return;
}
if (pConn->timer == NULL) {
timeout = true;
return;
} else {
cliResetConnTimer(pConn);
}
@ -1625,14 +1360,11 @@ void cliConnCb(uv_connect_t* req, int status) {
if (status != 0) {
tDebug("%s conn %p failed to connect to %s, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr,
uv_strerror(status));
cliHandleBatch_shareConnExcept(pConn);
transUnrefCliHandle(pConn);
return;
}
pConn->connnected = 1;
cliConnSetSockInfo(pConn);
// addConnToHeapCache(pThrd->connHeapCache, pConn);
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
(void)cliSend2(pConn);
@ -1676,68 +1408,9 @@ static void cliHandleQuit(SCliThrd* pThrd, SCliReq* pReq) {
(void)destroyConnPool(pThrd);
(void)uv_walk(pThrd->loop, cliWalkCb, NULL);
}
static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) {
return;
// int64_t refId = (int64_t)(pReq->msg.info.handle);
// SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
// if (exh == NULL) {
// tDebug("%" PRId64 " already released", refId);
// destroyReq(pReq);
// return;
// }
// taosRLockLatch(&exh->latch);
// SCliConn* conn = exh->handle;
// taosRUnLockLatch(&exh->latch);
// (void)transReleaseExHandle(transGetRefMgt(), refId);
// tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
// if (TRANS_CONN_REF_GET(conn) == 2) {
// transUnrefCliHandle(conn);
// if (!transQueuePush(&conn->reqsToSend, &pReq->q)) {
// return;
// }
// (void)cliSend2(conn);
// } else {
// tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
// destroyReq(pReq);
// }
}
static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) { return; }
static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { return; }
SCliConn* cliGetConn(SCliReq** pReq, SCliThrd* pThrd, bool* ignore, char* addr) {
SReqCtx* pCtx = (*pReq)->ctx;
SCliConn* conn = NULL;
int64_t refId = (int64_t)((*pReq)->msg.info.handle);
if (refId != 0) {
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) {
tError("failed to get conn, refId: %" PRId64 "", refId);
*ignore = true;
return NULL;
} else {
taosRLockLatch(&exh->latch);
conn = exh->handle;
taosRUnLockLatch(&exh->latch);
if (conn == NULL) {
conn = getConnFromPool2(pThrd, addr, pReq);
if (conn != NULL) specifyConnRef(conn, true, refId);
}
(void)transReleaseExHandle(transGetRefMgt(), refId);
}
return conn;
};
conn = getConnFromPool2(pThrd, addr, pReq);
if (conn != NULL) {
tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
} else {
tTrace("%s not found conn in conn pool:%p, dst:%s", ((STrans*)pThrd->pInst)->label, pThrd->pool, addr);
}
return conn;
}
FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr) {
if (pCvtAddr->cvt == false) {
if (EPSET_IS_VALID(pEpSet)) {
@ -1775,19 +1448,19 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliReq* pReq, STransMsg* pResp) {
return 0;
}
FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliReq* pReq, int32_t code) {
STrans* pInst = pThrd->pInst;
// FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliReq* pReq, int32_t code) {
// STrans* pInst = pThrd->pInst;
STransMsg resp = {.code = code};
code = cliBuildExceptResp(pReq, &resp);
if (code != 0) {
return code;
}
resp.info.cliVer = pInst->compatibilityVer;
pInst->cfp(pInst->parent, &resp, NULL);
// STransMsg resp = {.code = code};
// code = cliBuildExceptResp(pReq, &resp);
// if (code != 0) {
// return code;
// }
// resp.info.cliVer = pInst->compatibilityVer;
// pInst->cfp(pInst->parent, &resp, NULL);
return 0;
}
// return 0;
// }
static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ip) {
int32_t code = 0;
@ -2130,39 +1803,7 @@ static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* p
*ppBatch = pBatch;
return 0;
}
static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) {
STrans* pInst = pThrd->pInst;
int32_t code = 0;
int count = 0;
while (!QUEUE_IS_EMPTY(wq)) {
queue* h = QUEUE_HEAD(wq);
QUEUE_REMOVE(h);
SCliReq* pReq = QUEUE_DATA(h, SCliReq, q);
// if (pReq->type == Normal && REQUEST_NO_RESP(&pReq->msg)) {
// cliBuildBatch(pReq, h, pThrd);
// continue;
// }
(*cliAsyncHandle[pReq->type])(pThrd, pReq);
count++;
}
// void** pIter = taosHashIterate(pThrd->batchCache, NULL);
// while (pIter != NULL) {
// SCliBatchList* batchList = (SCliBatchList*)(*pIter);
// SCliBatch* batch = cliGetHeadFromList(batchList);
// if (batch != NULL) {
// cliHandleBatchReq(batch, pThrd);
// }
// pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
// }
if (count >= 2) {
tTrace("cli process batch size:%d", count);
}
}
static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) { return cliDoReq(wq, pThrd); }
static void cliAsyncCb(uv_async_t* handle) {
SAsyncItem* item = handle->data;
@ -2180,42 +1821,13 @@ static void cliAsyncCb(uv_async_t* handle) {
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd, pThrd->stopMsg);
}
void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
// transCtxCleanup(&conn->ctx);
// cliReleaseUnfinishedMsg(conn);
if (destroy == 1) {
transQueueDestroy(&conn->reqsToSend);
} else {
transQueueClear(&conn->reqsToSend);
}
transQueueDestroy(&conn->reqsSentOut);
}
void cliConnFreeMsgs(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
// for (int i = 0; i < transQueueSize(&conn->reqsToSend); i++) {
// SCliReq* cmsg = transQueueGet(&conn->reqsToSend, i);
// if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) {
// continue;
// }
// if (cliBuildExceptRespAndNotifyCb(pThrd, cmsg, 0) != 0) {
// continue;
// }
// cmsg->ctx->ahandle = NULL;
// }
}
static FORCE_INLINE void destroyReq(void* arg) {
SCliReq* pReq = arg;
if (pReq == NULL) {
return;
}
tDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx);
STraceId* trace = &pReq->msg.info.traceId;
tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx);
if (pReq->ctx) destroyReqCtx(pReq->ctx);
transFreeMsg(pReq->msg.pCont);
@ -2226,8 +1838,11 @@ static FORCE_INLINE void destroyReqWrapper(void* arg, void* param) {
SCliReq* pReq = arg;
SCliThrd* pThrd = param;
if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL) {
if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pReq->msg.info.ahandle);
if (pReq->ctx != NULL && pReq->ctx->ahandle != NULL) {
if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
(*pThrd->destroyAhandleFp)(pReq->ctx->ahandle);
}
}
destroyReq(pReq);
}
@ -2237,19 +1852,7 @@ static FORCE_INLINE void destroyReqAndAhanlde(void* param) {
STaskArg* arg = param;
SCliReq* pReq = arg->param1;
SCliThrd* pThrd = arg->param2;
if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
pThrd->destroyAhandleFp(pReq->ctx->ahandle);
}
if (pReq->msg.info.handle != 0) {
(void)transReleaseExHandle(transGetRefMgt(), (int64_t)pReq->msg.info.handle);
(void)transRemoveExHandle(transGetRefMgt(), (int64_t)pReq->msg.info.handle);
}
destroyReqCtx(pReq->ctx);
transFreeMsg(pReq->msg.pCont);
taosMemoryFree(pReq);
destroyReqWrapper(pReq, pThrd);
}
static void* cliWorkThread(void* arg) {
@ -2404,11 +2007,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
// pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
// if (pThrd->failFastCache == NULL) {
// TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
// }
pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (pThrd->batchCache == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
@ -2566,7 +2164,14 @@ static FORCE_INLINE void doCloseIdleConn(void* param) {
STaskArg* arg = param;
SCliConn* conn = arg->param1;
tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
conn->task = NULL;
int32_t ref = transUnrefCliHandle(conn);
if (ref <= 0) {
conn->task = NULL;
taosMemoryFree(arg);
return;
}
cliDestroyConn(conn, true);
taosMemoryFree(arg);
}
@ -2945,17 +2550,26 @@ void transRefCliHandle(void* handle) {
tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL(conn), conn, conn->ref);
}
void transUnrefCliHandle(void* handle) {
int32_t transUnrefCliHandle(void* handle) {
if (handle == NULL) {
return;
return 0;
}
SCliConn* conn = (SCliConn*)handle;
conn->ref--;
int32_t ref = conn->ref--;
tTrace("%s conn %p ref:%d", CONN_GET_INST_LABEL(conn), conn, conn->ref);
if (conn->ref == 0) {
cliDestroyConn(conn, true);
}
return ref;
}
int32_t transGetRefCount(void* handle) {
if (handle == NULL) {
return 0;
}
SCliConn* conn = (SCliConn*)handle;
return conn->ref;
}
static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t handle) {
SCliThrd* pThrd = NULL;