Merge branch 'enh/TD-31494' of https://github.com/taosdata/TDengine into enh/TD-31494
This commit is contained in:
commit
b07a12fefe
|
@ -476,6 +476,8 @@ int32_t subnetDebugInfoToBuf(SubnetUtils* pUtils, char* buf);
|
||||||
int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf);
|
int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf);
|
||||||
int32_t transUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf);
|
int32_t transUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf);
|
||||||
|
|
||||||
|
enum { REQ_STATUS_INIT = 0, REQ_STATUS_PROCESSING };
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -90,6 +90,8 @@ typedef struct SCliConn {
|
||||||
int64_t refId;
|
int64_t refId;
|
||||||
int32_t seq;
|
int32_t seq;
|
||||||
int32_t shareCnt;
|
int32_t shareCnt;
|
||||||
|
|
||||||
|
int8_t registered;
|
||||||
} SCliConn;
|
} SCliConn;
|
||||||
|
|
||||||
typedef struct SCliReq {
|
typedef struct SCliReq {
|
||||||
|
@ -195,11 +197,11 @@ static int32_t allocConnRef(SCliConn* conn, bool update);
|
||||||
static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp);
|
static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp);
|
||||||
void cliResetConnTimer(SCliConn* conn);
|
void cliResetConnTimer(SCliConn* conn);
|
||||||
|
|
||||||
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
|
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
|
||||||
static void cliDestroy(uv_handle_t* handle);
|
static void cliDestroy(uv_handle_t* handle);
|
||||||
static void cliSend(SCliConn* pConn);
|
static int32_t cliSend(SCliConn* pConn);
|
||||||
static void cliSendBatch(SCliConn* pConn);
|
static void cliSendBatch(SCliConn* pConn);
|
||||||
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
|
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
|
||||||
|
|
||||||
static void doFreeTimeoutMsg(void* param);
|
static void doFreeTimeoutMsg(void* param);
|
||||||
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliReq** pReq);
|
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliReq** pReq);
|
||||||
|
@ -323,6 +325,17 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p);
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
static int32_t cliConnFindToSendMsg(SCliConn* pConn, SCliReq** pReq) {
|
||||||
|
int32_t code = 0;
|
||||||
|
for (int32_t i = 0; i < transQueueSize(&pConn->reqs); i++) {
|
||||||
|
SCliReq* p = transQueueGet(&pConn->reqs, i);
|
||||||
|
if (p->sent == 0) {
|
||||||
|
*pReq = p;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
}
|
||||||
#define CONN_SET_PERSIST_BY_APP(conn) \
|
#define CONN_SET_PERSIST_BY_APP(conn) \
|
||||||
do { \
|
do { \
|
||||||
if (conn->status == ConnNormal) { \
|
if (conn->status == ConnNormal) { \
|
||||||
|
@ -376,7 +389,7 @@ bool cliMaySendCachedMsg(SCliConn* conn) {
|
||||||
if (!transQueueEmpty(&conn->reqs)) {
|
if (!transQueueEmpty(&conn->reqs)) {
|
||||||
SCliReq* pCliMsg = NULL;
|
SCliReq* pCliMsg = NULL;
|
||||||
CONN_GET_NEXT_SENDMSG(conn);
|
CONN_GET_NEXT_SENDMSG(conn);
|
||||||
cliSend(conn);
|
(void)cliSend(conn);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -403,7 +416,7 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) {
|
||||||
(void)transQueuePush(&conn->reqs, t);
|
(void)transQueuePush(&conn->reqs, t);
|
||||||
tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId);
|
tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId);
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
cliSend(conn);
|
(void)cliSend(conn);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&exh->latch);
|
taosWUnLockLatch(&exh->latch);
|
||||||
|
@ -1049,7 +1062,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
(void)transQueuePush(&conn->reqs, pReq);
|
(void)transQueuePush(&conn->reqs, pReq);
|
||||||
|
|
||||||
conn->status = ConnNormal;
|
conn->status = ConnNormal;
|
||||||
cliSend(conn);
|
(void)cliSend(conn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1174,6 +1187,24 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t cliAddReqToConn(SCliConn* conn, SCliReq* pReq) {
|
||||||
|
if (transQueuePush(&conn->reqs, pReq) != 0) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) {
|
||||||
|
// do nothing
|
||||||
|
SCliReq* pTail = transQueuePop(&conn->reqs);
|
||||||
|
if (pTail == NULL) {
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
if (pReq != NULL) {
|
||||||
|
*pReq = pTail;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) {
|
static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCliConn* pConn = NULL;
|
SCliConn* pConn = NULL;
|
||||||
|
@ -1257,49 +1288,13 @@ _failed:
|
||||||
taosMemoryFree(conn->stream);
|
taosMemoryFree(conn->stream);
|
||||||
(void)transDestroyBuffer(&conn->readBuf);
|
(void)transDestroyBuffer(&conn->readBuf);
|
||||||
transQueueDestroy(&conn->reqs);
|
transQueueDestroy(&conn->reqs);
|
||||||
|
taosMemoryFree(conn->dstAddr);
|
||||||
}
|
}
|
||||||
tError("failed to create conn, code:%d", code);
|
tError("failed to create conn, code:%d", code);
|
||||||
taosMemoryFree(conn);
|
taosMemoryFree(conn);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
static void cliDestroyConn(SCliConn* conn, bool clear) {
|
static void cliDestroyConn(SCliConn* conn, bool clear) {}
|
||||||
SCliThrd* pThrd = conn->hostThrd;
|
|
||||||
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
|
||||||
conn->broken = true;
|
|
||||||
QUEUE_REMOVE(&conn->q);
|
|
||||||
QUEUE_INIT(&conn->q);
|
|
||||||
|
|
||||||
conn->broken = true;
|
|
||||||
if (conn->list == NULL && conn->dstAddr) {
|
|
||||||
conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (conn->list) {
|
|
||||||
SConnList* list = conn->list;
|
|
||||||
list->list->numOfConn--;
|
|
||||||
if (conn->status == ConnInPool) {
|
|
||||||
list->size--;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
conn->list = NULL;
|
|
||||||
|
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
|
|
||||||
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
|
|
||||||
conn->refId = -1;
|
|
||||||
|
|
||||||
if (conn->task != NULL) {
|
|
||||||
transDQCancel(pThrd->timeoutQueue, conn->task);
|
|
||||||
conn->task = NULL;
|
|
||||||
}
|
|
||||||
cliResetConnTimer(conn);
|
|
||||||
|
|
||||||
if (clear) {
|
|
||||||
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
|
|
||||||
(void)uv_read_stop(conn->stream);
|
|
||||||
uv_close((uv_handle_t*)conn->stream, cliDestroy);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
static void cliDestroy(uv_handle_t* handle) {
|
static void cliDestroy(uv_handle_t* handle) {
|
||||||
if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
|
if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -1581,25 +1576,34 @@ _exception:
|
||||||
pConn->pBatch = NULL;
|
pConn->pBatch = NULL;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
void cliSend(SCliConn* pConn) {
|
|
||||||
|
// int32_t cliSend2(SCliConn* pConn) {}
|
||||||
|
int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) {
|
||||||
|
int32_t code = 0;
|
||||||
|
transQueuePush(&pConn->reqs, pCliMsg);
|
||||||
|
code = cliSend(pConn);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t cliSend(SCliConn* pConn) {
|
||||||
SCliThrd* pThrd = pConn->hostThrd;
|
SCliThrd* pThrd = pConn->hostThrd;
|
||||||
STrans* pInst = pThrd->pInst;
|
STrans* pInst = pThrd->pInst;
|
||||||
|
SCliReq* pCliReq = NULL;
|
||||||
|
int32_t code = cliConnFindToSendMsg(pConn, &pCliReq);
|
||||||
|
|
||||||
if (transQueueEmpty(&pConn->reqs)) {
|
if (code != 0) {
|
||||||
tError("%s conn %p not msg to send", pInst->label, pConn);
|
return code;
|
||||||
cliHandleExcept(pConn, -1);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SCliReq* pCliMsg = NULL;
|
SReqCtx* pCtx = pCliReq->ctx;
|
||||||
CONN_GET_NEXT_SENDMSG(pConn);
|
|
||||||
pCliMsg->sent = 1;
|
|
||||||
|
|
||||||
SReqCtx* pCtx = pCliMsg->ctx;
|
STransMsg* pReq = (STransMsg*)(&pCliReq->msg);
|
||||||
|
|
||||||
STransMsg* pReq = (STransMsg*)(&pCliMsg->msg);
|
|
||||||
if (pReq->pCont == 0) {
|
if (pReq->pCont == 0) {
|
||||||
pReq->pCont = (void*)rpcMallocCont(0);
|
pReq->pCont = (void*)rpcMallocCont(0);
|
||||||
|
if (pReq->pCont == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
tDebug("malloc memory: %p", pReq->pCont);
|
tDebug("malloc memory: %p", pReq->pCont);
|
||||||
pReq->contLen = 0;
|
pReq->contLen = 0;
|
||||||
}
|
}
|
||||||
|
@ -1613,7 +1617,7 @@ void cliSend(SCliConn* pConn) {
|
||||||
pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0;
|
pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0;
|
||||||
pHead->msgType = pReq->msgType;
|
pHead->msgType = pReq->msgType;
|
||||||
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||||
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
pHead->release = REQUEST_RELEASE_HANDLE(pCliReq) ? 1 : 0;
|
||||||
memcpy(pHead->user, pInst->user, strlen(pInst->user));
|
memcpy(pHead->user, pInst->user, strlen(pInst->user));
|
||||||
pHead->traceId = pReq->info.traceId;
|
pHead->traceId = pReq->info.traceId;
|
||||||
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
||||||
|
@ -1646,18 +1650,19 @@ void cliSend(SCliConn* pConn) {
|
||||||
tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType),
|
tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType),
|
||||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
cliHandleExcept(pConn, -1);
|
cliHandleExcept(pConn, -1);
|
||||||
return;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pCliReq->sent = 1;
|
||||||
int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
|
int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType),
|
tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType),
|
||||||
uv_err_name(status));
|
uv_err_name(status));
|
||||||
cliHandleExcept(pConn, -1);
|
cliHandleExcept(pConn, -1);
|
||||||
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
}
|
}
|
||||||
return;
|
|
||||||
_RETURN:
|
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cliDestroyBatch(SCliBatch* pBatch) {
|
static void cliDestroyBatch(SCliBatch* pBatch) {
|
||||||
|
@ -1681,7 +1686,7 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) {
|
||||||
uint32_t ipaddr;
|
uint32_t ipaddr;
|
||||||
int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip, &ipaddr);
|
int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip, &ipaddr);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
TAOS_CHECK_GOTO(code, &lino, _exception);
|
TAOS_CHECK_GOTO(code, &lino, _exception1);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
|
@ -1693,39 +1698,50 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) {
|
||||||
|
|
||||||
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
|
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
|
||||||
if (fd < 0) {
|
if (fd < 0) {
|
||||||
TAOS_CHECK_GOTO(terrno, &lino, _exception);
|
TAOS_CHECK_GOTO(terrno, &lino, _exception1);
|
||||||
}
|
}
|
||||||
|
|
||||||
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
|
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("%s conn %p failed to set stream, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
|
tError("%s conn %p failed to set stream, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception);
|
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
|
||||||
}
|
}
|
||||||
ret = transSetConnOption((uv_tcp_t*)conn->stream, 20);
|
ret = transSetConnOption((uv_tcp_t*)conn->stream, 20);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
|
tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception);
|
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("failed connect to %s, reason:%s", conn->dstAddr, uv_err_name(ret));
|
tError("failed connect to %s, reason:%s", conn->dstAddr, uv_err_name(ret));
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception);
|
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
|
ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
|
tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
|
||||||
cliResetConnTimer(conn);
|
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception2);
|
||||||
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
|
|
||||||
cliHandleFastFail(conn, -1);
|
|
||||||
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
|
|
||||||
}
|
}
|
||||||
|
conn->registered = 1;
|
||||||
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
|
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
|
||||||
_exception:
|
|
||||||
tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
|
_exception1:
|
||||||
taosMemoryFree(conn); // free conn later
|
tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, uv_err_name(code));
|
||||||
|
// taosMemoryFree(conn); // free conn later
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_exception2:
|
||||||
|
// already registered to uv, callback handle error
|
||||||
|
tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, uv_err_name(code));
|
||||||
|
// cliRmReqFromConn(conn, NULL);
|
||||||
|
|
||||||
|
// cliResetConnTimer(conn);
|
||||||
|
// cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
|
||||||
|
// cliHandleFastFail(conn, code);
|
||||||
|
|
||||||
|
// // taosMemoryFree(conn);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
|
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
|
||||||
|
@ -1895,7 +1911,7 @@ void cliConnCb(uv_connect_t* req, int status) {
|
||||||
return cliSendBatch_shareConn(pConn);
|
return cliSendBatch_shareConn(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
return cliSend(pConn);
|
(void)cliSend(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) {
|
static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) {
|
||||||
|
@ -1957,7 +1973,7 @@ static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) {
|
||||||
if (!transQueuePush(&conn->reqs, pReq)) {
|
if (!transQueuePush(&conn->reqs, pReq)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
cliSend(conn);
|
(void)cliSend(conn);
|
||||||
} else {
|
} else {
|
||||||
tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
|
tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
|
||||||
destroyReq(pReq);
|
destroyReq(pReq);
|
||||||
|
@ -2220,12 +2236,15 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
|
||||||
if (code == TSDB_CODE_RPC_MAX_SESSIONS) {
|
if (code == TSDB_CODE_RPC_MAX_SESSIONS) {
|
||||||
TAOS_CHECK_GOTO(code, &lino, _exception);
|
TAOS_CHECK_GOTO(code, &lino, _exception);
|
||||||
} else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
|
} else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
|
||||||
// do nothing, notifyCb
|
// do nothing, notiy
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
|
code = cliSendReq(pConn, pReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
tTrace("%s conn %p ready", pInst->label, pConn);
|
tTrace("%s conn %p ready", pInst->label, pConn);
|
||||||
|
return;
|
||||||
|
|
||||||
_exception:
|
_exception:
|
||||||
resp.code = code;
|
resp.code = code;
|
||||||
(void)(pThrd->notifyExceptCb)(pThrd, pReq, &resp);
|
(void)(pThrd->notifyExceptCb)(pThrd, pReq, &resp);
|
||||||
|
@ -3538,6 +3557,7 @@ _RETURN1:
|
||||||
pReq->pCont = NULL;
|
pReq->pCont = NULL;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t transCreateSyncMsg(STransMsg* pTransMsg, int64_t* refId) {
|
int32_t transCreateSyncMsg(STransMsg* pTransMsg, int64_t* refId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t));
|
tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t));
|
||||||
|
@ -3574,6 +3594,7 @@ _EXIT:
|
||||||
taosMemoryFree(pSyncMsg);
|
taosMemoryFree(pSyncMsg);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t transSendRecvWithTimeout(void* pInstRef, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated,
|
int32_t transSendRecvWithTimeout(void* pInstRef, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated,
|
||||||
int32_t timeoutMs) {
|
int32_t timeoutMs) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
Loading…
Reference in New Issue