Merge remote-tracking branch 'origin/3.0' into enh/opt-transport

This commit is contained in:
yihaoDeng 2024-09-28 13:39:24 +08:00
parent 58290a3ed8
commit 8b2754fe99
5 changed files with 300 additions and 144 deletions

View File

@ -297,7 +297,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool);
int32_t transInitBuffer(SConnBuffer* buf);
int32_t transClearBuffer(SConnBuffer* buf);
int32_t transDestroyBuffer(SConnBuffer* buf);
void transDestroyBuffer(SConnBuffer* buf);
int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
bool transReadComplete(SConnBuffer* connBuf);
int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
@ -327,7 +327,7 @@ int32_t transRegisterMsg(const STransMsg* msg);
int32_t transSetDefaultAddr(void* pInit, const char* ip, const char* fqdn);
int32_t transSetIpWhiteList(void* pInit, void* arg, FilteFunc* func);
int32_t transSockInfo2Str(struct sockaddr* sockname, char* dst);
void transSockInfo2Str(struct sockaddr* sockname, char* dst);
int32_t transAllocHandle(int64_t* refId);
@ -367,7 +367,7 @@ int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(void* arg));
* put arg into queue
* if queue'size > 1, return false; else return true
*/
int32_t transQueuePush(STransQueue* queue, void* arg);
void transQueuePush(STransQueue* queue, void* arg);
/*
* the size of queue
*/
@ -457,9 +457,9 @@ int32_t transDecompressMsg(char** msg, int32_t* len);
int32_t transOpenRefMgt(int size, void (*func)(void*));
void transCloseRefMgt(int32_t refMgt);
int64_t transAddExHandle(int32_t refMgt, void* p);
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId);
void transRemoveExHandle(int32_t refMgt, int64_t refId);
void* transAcquireExHandle(int32_t refMgt, int64_t refId);
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId);
void transReleaseExHandle(int32_t refMgt, int64_t refId);
void transDestroyExHandle(void* handle);
int32_t transGetRefMgt();
@ -510,6 +510,8 @@ void destroyWQ(queue* wq);
uv_write_t* allocWReqFromWQ(queue* wq, void* arg);
void freeWReqToWQ(queue* wq, SWReqsWrapper* w);
int32_t transSetReadOption(uv_handle_t* handle);
#ifdef __cplusplus
}
#endif

View File

@ -131,13 +131,8 @@ void rpcClose(void* arg) {
if (arg == NULL) {
return;
}
if (transRemoveExHandle(transGetInstMgt(), (int64_t)arg) != 0) {
tError("failed to remove rpc handle");
}
if (transReleaseExHandle(transGetInstMgt(), (int64_t)arg) != 0) {
tError("failed to release rpc handle");
}
transRemoveExHandle(transGetInstMgt(), (int64_t)arg);
transReleaseExHandle(transGetInstMgt(), (int64_t)arg);
tInfo("end to close rpc");
return;
}

View File

@ -308,11 +308,11 @@ int32_t transHeapGet(SHeap* heap, SCliConn** p);
int32_t transHeapInsert(SHeap* heap, SCliConn* p);
int32_t transHeapDelete(SHeap* heap, SCliConn* p);
#define CLI_RELEASE_UV(loop) \
do { \
uv_walk(loop, cliWalkCb, NULL); \
(void)uv_run(loop, UV_RUN_DEFAULT); \
(void)uv_loop_close(loop); \
#define CLI_RELEASE_UV(loop) \
do { \
uv_walk(loop, cliWalkCb, NULL); \
(TAOS_UNUSED(uv_run(loop, UV_RUN_DEFAULT))); \
(TAOS_UNUSED(uv_loop_close(loop))); \
} while (0);
// snprintf may cause performance problem
@ -352,7 +352,11 @@ int32_t cliGetConnTimer(SCliThrd* pThrd, SCliConn* pConn) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tDebug("no available timer, create a timer %p", timer);
(void)uv_timer_init(pThrd->loop, timer);
int ret = uv_timer_init(pThrd->loop, timer);
if (ret != 0) {
tError("conn %p failed to init timer %p, ret:%d", pConn, timer, uv_err_name(ret));
return TSDB_CODE_THIRDPARTY_ERROR;
}
}
timer->data = pConn;
pConn->timer = timer;
@ -365,7 +369,10 @@ void cliResetConnTimer(SCliConn* conn) {
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
TAOS_UNUSED(uv_timer_stop(conn->timer));
}
(void)taosArrayPush(pThrd->timerList, &conn->timer);
if (taosArrayPush(pThrd->timerList, &conn->timer) == NULL) {
tError("%s conn %p failed to push timer %p to list since %s", CONN_GET_INST_LABEL(conn), conn, conn->timer,
tstrerror(terrno));
}
conn->timer->data = NULL;
conn->timer = NULL;
}
@ -392,19 +399,24 @@ void cliConnMayUpdateTimer(SCliConn* conn, int timeout) {
if (cliGetConnTimer(conn->hostThrd, conn) != 0) {
return;
}
(void)uv_timer_start(conn->timer, cliConnTimeout__checkReq, timeout, 0);
int ret = uv_timer_start(conn->timer, cliConnTimeout__checkReq, timeout, 0);
if (ret != 0) {
tError("%s conn %p failed to start timer %p, ret:%d", CONN_GET_INST_LABEL(conn), conn, conn->timer,
uv_err_name(ret));
}
}
void destroyCliConnQTable(SCliConn* conn) {
void* pIter = taosHashIterate(conn->pQTable, NULL);
int32_t code = 0;
void* pIter = taosHashIterate(conn->pQTable, NULL);
while (pIter != NULL) {
int64_t* qid = taosHashGetKey(pIter, NULL);
STransCtx* ctx = pIter;
transCtxCleanup(ctx);
pIter = taosHashIterate(conn->pQTable, pIter);
(void)transReleaseExHandle(transGetRefMgt(), *qid);
(void)transRemoveExHandle(transGetRefMgt(), *qid);
transReleaseExHandle(transGetRefMgt(), *qid);
transRemoveExHandle(transGetRefMgt(), *qid);
}
taosHashCleanup(conn->pQTable);
conn->pQTable = NULL;
@ -439,10 +451,15 @@ int32_t cliGetReqBySeq(SCliConn* conn, int64_t seq, int32_t msgType, SCliReq** p
}
int8_t cliMayRecycleConn(SCliConn* conn) {
int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd;
if (transQueueSize(&conn->reqsToSend) == 0 && transQueueSize(&conn->reqsSentOut) == 0 &&
taosHashGetSize(conn->pQTable) == 0) {
(void)delConnFromHeapCache(pThrd->connHeapCache, conn);
code = delConnFromHeapCache(pThrd->connHeapCache, conn);
if (code != 0) {
tError("%s conn %p failed to remove conn from heap cache since %s", CONN_GET_INST_LABEL(conn), conn,
tstrerror(code));
}
addConnToPool(pThrd->pool, conn);
return 1;
}
@ -508,8 +525,8 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead
transQueueRemoveByFilter(&conn->reqsSentOut, filterByQid, &qId, &set, -1);
transQueueRemoveByFilter(&conn->reqsToSend, filterByQid, &qId, &set, -1);
(void)transReleaseExHandle(transGetRefMgt(), qId);
(void)transRemoveExHandle(transGetRefMgt(), qId);
transReleaseExHandle(transGetRefMgt(), qId);
transRemoveExHandle(transGetRefMgt(), qId);
while (!QUEUE_IS_EMPTY(&set)) {
queue* el = QUEUE_HEAD(&set);
@ -562,9 +579,12 @@ void cliHandleResp(SCliConn* conn) {
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 0);
if (msgLen < 0) {
taosMemoryFree(pHead);
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
tWarn("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
// TODO: notify cb
(void)pThrd->notifyExceptCb(pThrd, NULL, NULL);
code = pThrd->notifyExceptCb(pThrd, NULL, NULL);
if (code != 0) {
tError("%s conn %p failed to notify user since %s", tstrerror(code));
}
return;
}
@ -864,16 +884,18 @@ static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_
}
}
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
int32_t code = 0;
STUB_RAND_NETWORK_ERR(nread);
if (handle->data == NULL) {
return;
}
int32_t fd;
(void)uv_fileno((uv_handle_t*)handle, &fd);
(void)taosSetSockOpt2(fd);
SCliConn* conn = handle->data;
code = transSetReadOption((uv_handle_t*)handle);
if (code != 0) {
tWarn("%s conn %p failed to set recv opt since %s", CONN_GET_INST_LABEL(conn), conn, code);
}
SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) {
@ -882,7 +904,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
if (pBuf->invalid) {
conn->broken = true;
(void)transUnrefCliHandle(conn);
TAOS_UNUSED(transUnrefCliHandle(conn));
return;
break;
} else {
@ -903,7 +925,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
tDebug("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread),
transGetRefCount(conn));
conn->broken = true;
(void)transUnrefCliHandle(conn);
TAOS_UNUSED(transUnrefCliHandle(conn));
}
}
@ -917,8 +939,11 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn)
code = cliHandleState_mayUpdateState(pConn, pReq);
(void)addConnToHeapCache(pThrd->connHeapCache, pConn);
(void)transQueuePush(&pConn->reqsToSend, &pReq->q);
code = addConnToHeapCache(pThrd->connHeapCache, pConn);
if (code != 0) {
TAOS_CHECK_GOTO(code, NULL, _exception);
}
transQueuePush(&pConn->reqsToSend, &pReq->q);
return cliDoConn(pThrd, pConn);
_exception:
// free conn
@ -983,8 +1008,12 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
conn->bufSize = BUFFER_LIMIT;
conn->buf = (uv_buf_t*)taosMemoryCalloc(1, BUFFER_LIMIT * sizeof(uv_buf_t));
if (conn->buf == NULL) {
TAOS_CHECK_GOTO(terrno, NULL, _failed);
}
TAOS_CHECK_GOTO(initWQ(&conn->wq), NULL, _failed);
(void)initWQ(&conn->wq);
conn->stream->data = conn;
conn->connReq.data = conn;
@ -993,11 +1022,11 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
return code;
_failed:
if (conn) {
taosMemoryFree(conn->buf);
taosMemoryFree(conn->stream);
destroyCliConnQTable(conn);
taosHashCleanup(conn->pQTable);
(void)transDestroyBuffer(&conn->readBuf);
transDestroyBuffer(&conn->readBuf);
transQueueDestroy(&conn->reqsToSend);
transQueueDestroy(&conn->reqsSentOut);
taosMemoryFree(conn->dstAddr);
@ -1008,6 +1037,7 @@ _failed:
}
static void cliDestroyConn(SCliConn* conn, bool clear) { cliHandleException(conn); }
static void cliDestroy(uv_handle_t* handle) {
int32_t code = 0;
if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
return;
}
@ -1015,9 +1045,15 @@ static void cliDestroy(uv_handle_t* handle) {
SCliThrd* pThrd = conn->hostThrd;
cliResetConnTimer(conn);
(void)destroyAllReqs(conn);
code = destroyAllReqs(conn);
if (code != 0) {
tDebug("%s conn %p failed to all reqs since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
}
(void)delConnFromHeapCache(pThrd->connHeapCache, conn);
code = delConnFromHeapCache(pThrd->connHeapCache, conn);
if (code != 0) {
tDebug("%s conn %p failed to del conn from heapcach since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
}
taosMemoryFree(conn->dstAddr);
taosMemoryFree(conn->stream);
taosMemoryFree(conn->ipStr);
@ -1025,7 +1061,10 @@ static void cliDestroy(uv_handle_t* handle) {
void* pIter = taosHashIterate(conn->pQTable, NULL);
while (pIter) {
int64_t* qid = taosHashGetKey(pIter, NULL);
(void)taosHashRemove(pThrd->pIdConnTable, qid, sizeof(*qid));
code = taosHashRemove(pThrd->pIdConnTable, qid, sizeof(*qid));
if (code != 0) {
tDebug("%s conn %p failed to remove state %" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, *qid, code);
}
pIter = taosHashIterate(conn->pQTable, pIter);
tDebug("%s conn %p destroy state %" PRId64 "", CONN_GET_INST_LABEL(conn), conn, *qid);
}
@ -1039,7 +1078,7 @@ static void cliDestroy(uv_handle_t* handle) {
taosMemoryFree(conn->buf);
destroyWQ(&conn->wq);
(void)transDestroyBuffer(&conn->readBuf);
transDestroyBuffer(&conn->readBuf);
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
@ -1100,7 +1139,10 @@ static void cliHandleException(SCliConn* conn) {
STrans* pInst = pThrd->pInst;
cliResetConnTimer(conn);
(void)destroyAllReqs(conn);
code = destroyAllReqs(conn);
if (code != 0) {
tError("%s conn %p failed to destroy all reqs on conn since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
}
QUEUE_REMOVE(&conn->q);
if (conn->registered) {
@ -1134,6 +1176,7 @@ static void cliConnRmReqs(SCliConn* conn) {
}
static void cliBatchSendCb(uv_write_t* req, int status) {
int32_t code = 0;
SWReqsWrapper* wrapper = (SWReqsWrapper*)req->data;
SCliConn* conn = wrapper->arg;
@ -1148,18 +1191,27 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
cliConnRmReqs(conn);
if (status != 0) {
tDebug("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
(void)transUnrefCliHandle(conn);
TAOS_UNUSED(transUnrefCliHandle(conn));
return;
}
cliConnMayUpdateTimer(conn, READ_TIMEOUT);
if (conn->readerStart == 0) {
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
code = uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
if (code != 0) {
tDebug("%s conn %p failed to start read since%s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
TAOS_UNUSED(transUnrefCliHandle(conn));
return;
}
conn->readerStart = 1;
}
if (!cliMayRecycleConn(conn)) {
(void)cliBatchSend(conn);
code = cliBatchSend(conn);
if (code != 0) {
tDebug("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
TAOS_UNUSED(transUnrefCliHandle(conn));
}
}
}
bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msgLen) {
@ -1265,7 +1317,7 @@ int32_t cliBatchSend(SCliConn* pConn) {
STraceId* trace = &pCliMsg->msg.info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", qid:%" PRId64 "", CONN_GET_INST_LABEL(pConn),
pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId);
(void)transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
}
transRefCliHandle(pConn);
uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn);
@ -1274,14 +1326,14 @@ int32_t cliBatchSend(SCliConn* pConn) {
if (ret != 0) {
tError("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret));
freeWReqToWQ(&pConn->wq, req->data);
(void)transUnrefCliHandle(pConn);
TAOS_UNUSED(transUnrefCliHandle(pConn));
}
return 0;
}
int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) {
int32_t code = 0;
(void)transQueuePush(&pConn->reqsToSend, &pCliMsg->q);
transQueuePush(&pConn->reqsToSend, &pCliMsg->q);
code = cliBatchSend(pConn);
return code;
@ -1357,7 +1409,7 @@ _exception1:
return code;
_exception2:
(void)transUnrefCliHandle(conn);
TAOS_UNUSED(transUnrefCliHandle(conn));
tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, tstrerror(code));
return code;
}
@ -1366,12 +1418,22 @@ int32_t cliConnSetSockInfo(SCliConn* pConn) {
struct sockaddr peername, sockname;
int addrlen = sizeof(peername);
(void)uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
(void)transSockInfo2Str(&peername, pConn->dst);
int32_t code = uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
if (code != 0) {
tWarn("failed to get perrname since %s", uv_err_name(code));
code = TSDB_CODE_THIRDPARTY_ERROR;
return code;
}
transSockInfo2Str(&peername, pConn->dst);
addrlen = sizeof(sockname);
TAOS_UNUSED(uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen));
TAOS_UNUSED(transSockInfo2Str(&sockname, pConn->src));
code = uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
if (code != 0) {
tWarn("failed to get sock name since %s", uv_err_name(code));
code = TSDB_CODE_THIRDPARTY_ERROR;
return code;
}
transSockInfo2Str(&sockname, pConn->src);
struct sockaddr_in addr = *(struct sockaddr_in*)&sockname;
struct sockaddr_in saddr = *(struct sockaddr_in*)&peername;
@ -1399,6 +1461,7 @@ int32_t cliConnSetSockInfo(SCliConn* pConn) {
bool filteGetAll(void* q, void* arg) { return true; }
void cliConnCb(uv_connect_t* req, int status) {
int32_t code = 0;
SCliConn* pConn = req->data;
SCliThrd* pThrd = pConn->hostThrd;
bool timeout = false;
@ -1419,14 +1482,24 @@ void cliConnCb(uv_connect_t* req, int status) {
if (status != 0) {
tDebug("%s conn %p failed to connect to %s, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr,
uv_strerror(status));
(void)transUnrefCliHandle(pConn);
TAOS_UNUSED(transUnrefCliHandle(pConn));
return;
}
pConn->connnected = 1;
(void)cliConnSetSockInfo(pConn);
code = cliConnSetSockInfo(pConn);
if (code != 0) {
tDebug("%s conn %p failed to get sock info,reason:%s ", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr,
tstrerror(code));
TAOS_UNUSED(transUnrefCliHandle(pConn));
}
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
(void)cliBatchSend(pConn);
code = cliBatchSend(pConn);
if (code != 0) {
tDebug("%s conn %p failed to get sock info,reason:%s ", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr,
tstrerror(code));
TAOS_UNUSED(transUnrefCliHandle(pConn));
}
}
static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) {
@ -1637,17 +1710,17 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
if (pState == NULL) {
if (pReq->ctx == NULL) {
(void)transReleaseExHandle(transGetRefMgt(), qid);
transReleaseExHandle(transGetRefMgt(), qid);
return TSDB_CODE_RPC_STATE_DROPED;
}
tDebug("%s conn %p failed to get statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
(void)transReleaseExHandle(transGetRefMgt(), qid);
transReleaseExHandle(transGetRefMgt(), qid);
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
} else {
*pConn = pState->conn;
tDebug("%s conn %p succ to get conn of statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
}
(void)transReleaseExHandle(transGetRefMgt(), qid);
transReleaseExHandle(transGetRefMgt(), qid);
return 0;
}
}
@ -1668,7 +1741,7 @@ int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq) {
tDebug("%s conn %p succ to add statue, qid:%" PRId64 " (1)", transLabel(pThrd->pInst), pConn, qid);
}
(void)cliHandleState_mayUpdateStateCtx(pConn, pReq);
TAOS_UNUSED(cliHandleState_mayUpdateStateCtx(pConn, pReq));
return code;
}
void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) {
@ -1681,7 +1754,7 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) {
SCliConn* pConn = NULL;
code = cliMayGetStateByQid(pThrd, pReq, &pConn);
if (code == 0) {
(void)cliHandleState_mayUpdateStateCtx(pConn, pReq);
TAOS_UNUSED(cliHandleState_mayUpdateStateCtx(pConn, pReq));
} else if (code == TSDB_CODE_RPC_STATE_DROPED) {
TAOS_CHECK_GOTO(code, &lino, _exception);
return;
@ -1702,9 +1775,12 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) {
// do nothing, notiy
return;
} else if (code == 0) {
(void)addConnToHeapCache(pThrd->connHeapCache, pConn);
code = addConnToHeapCache(pThrd->connHeapCache, pConn);
if (code != 0) {
TAOS_CHECK_GOTO(code, &lino, _exception);
}
} else {
// do nothing, notiy
TAOS_CHECK_GOTO(code, &lino, _exception);
return;
}
}
@ -1719,7 +1795,11 @@ _exception:
resp.code = code;
STraceId* trace = &pReq->msg.info.traceId;
tGWarn("%s failed to process req, reason:%s", pInst->label, tstrerror(code));
(void)(pThrd->notifyExceptCb)(pThrd, pReq, &resp);
code = (pThrd->notifyExceptCb)(pThrd, pReq, &resp);
if (code != 0) {
tGWarn("%s failed to notify user since %s", pInst->label, tstrerror(code));
}
return;
}
@ -1948,7 +2028,7 @@ static void* cliWorkThread(void* arg) {
pThrd->pid = taosGetSelfPthreadId();
tsEnableRandErr = true;
(void)strtolower(threadName, pThrd->pInst->label);
TAOS_UNUSED(strtolower(threadName, pThrd->pInst->label));
setThreadName(threadName);
TAOS_UNUSED(uv_run(pThrd->loop, UV_RUN_DEFAULT));
@ -2075,10 +2155,17 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
for (int i = 0; i < timerSize; i++) {
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
if (timer == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
TAOS_CHECK_GOTO(terrno, NULL, _end);
}
code = uv_timer_init(pThrd->loop, timer);
if (code != 0) {
tError("failed to init timer since %s", uv_err_name(code));
code = TSDB_CODE_THIRDPARTY_ERROR;
TAOS_CHECK_GOTO(code, NULL, _end);
}
if (taosArrayPush(pThrd->timerList, &timer) == NULL) {
TAOS_CHECK_GOTO(terrno, NULL, _end);
}
(void)uv_timer_init(pThrd->loop, timer);
(void)taosArrayPush(pThrd->timerList, &timer);
}
pThrd->pool = createConnPool(4);
@ -2137,7 +2224,7 @@ _end:
TAOS_UNUSED(uv_loop_close(pThrd->loop));
taosMemoryFree(pThrd->loop);
(void)taosThreadMutexDestroy(&pThrd->msgMtx);
TAOS_UNUSED((taosThreadMutexDestroy(&pThrd->msgMtx)));
transAsyncPoolDestroy(pThrd->asyncPool);
for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
@ -2169,7 +2256,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
}
CLI_RELEASE_UV(pThrd->loop);
(void)taosThreadMutexDestroy(&pThrd->msgMtx);
TAOS_UNUSED(taosThreadMutexDestroy(&pThrd->msgMtx));
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliReq, destroyReqWrapper, (void*)pThrd);
transAsyncPoolDestroy(pThrd->asyncPool);
@ -2284,26 +2371,38 @@ static FORCE_INLINE void doCloseIdleConn(void* param) {
taosMemoryFree(arg);
}
static FORCE_INLINE void cliPerfLog_schedMsg(SCliReq* pReq, char* label) {
int32_t code = 0;
if (!(rpcDebugFlag & DEBUG_DEBUG)) {
return;
}
SReqCtx* pCtx = pReq->ctx;
STraceId* trace = &pReq->msg.info.traceId;
char tbuf[512] = {0};
(void)epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
code = epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
if (code != 0) {
tWarn("failed to debug epset since %s", tstrerror(code));
return;
}
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep,
pCtx->retryNextInterval);
return;
}
static FORCE_INLINE void cliPerfLog_epset(SCliConn* pConn, SCliReq* pReq) {
int32_t code = 0;
if (!(rpcDebugFlag & DEBUG_TRACE)) {
return;
}
SReqCtx* pCtx = pReq->ctx;
char tbuf[512] = {0};
(void)epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
code = epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
if (code != 0) {
tWarn("failed to debug epset since %s", tstrerror(code));
return;
}
tTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
return;
}
@ -2699,7 +2798,7 @@ static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t
pThrd = exh->pThrd;
taosWUnLockLatch(&exh->latch);
TAOS_UNUSED(transReleaseExHandle(transGetRefMgt(), handle));
transReleaseExHandle(transGetRefMgt(), handle);
return pThrd;
}
@ -2806,16 +2905,16 @@ int32_t transSendRequest(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq,
EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle);
if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) {
destroyReq(pCliMsg);
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
}
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
return 0;
_exception:
transFreeMsg(pReq->pCont);
pReq->pCont = NULL;
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
return code;
}
int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) {
@ -2852,16 +2951,16 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle);
if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) {
destroyReq(pCliMsg);
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
}
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
return 0;
_exception:
transFreeMsg(pReq->pCont);
pReq->pCont = NULL;
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
return code;
}
@ -2913,7 +3012,7 @@ int32_t transSendRecv(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STr
SCliReq* pCliReq = taosMemoryCalloc(1, sizeof(SCliReq));
if (pCliReq == NULL) {
(void)tsem_destroy(sem);
(TAOS_UNUSED(tsem_destroy(sem)));
taosMemoryFree(sem);
taosMemoryFree(pCtx);
TAOS_CHECK_GOTO(terrno, NULL, _RETURN1);
@ -2940,11 +3039,11 @@ int32_t transSendRecv(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STr
_RETURN:
tsem_destroy(sem);
taosMemoryFree(sem);
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
taosMemoryFree(pTransRsp);
return code;
_RETURN1:
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
taosMemoryFree(pTransRsp);
taosMemoryFree(pReq->pCont);
pReq->pCont = NULL;
@ -3066,15 +3165,15 @@ int32_t transSendRecvWithTimeout(void* pInstRef, SEpSet* pEpSet, STransMsg* pReq
}
}
_RETURN:
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
(void)taosReleaseRef(transGetSyncMsgMgt(), ref);
(void)taosRemoveRef(transGetSyncMsgMgt(), ref);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
TAOS_UNUSED(taosReleaseRef(transGetSyncMsgMgt(), ref));
TAOS_UNUSED(taosRemoveRef(transGetSyncMsgMgt(), ref));
return code;
_RETURN2:
transFreeMsg(pReq->pCont);
pReq->pCont = NULL;
taosMemoryFree(pTransMsg);
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
return code;
}
/*
@ -3128,7 +3227,7 @@ int32_t transSetDefaultAddr(void* pInstRef, const char* ip, const char* fqdn) {
}
}
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
return code;
}
@ -3243,7 +3342,7 @@ static FORCE_INLINE bool filterToDebug(void* e, void* arg) {
tGWarn("%s is sent to, and no resp from server", TMSG_INFO(pReq->msg.msgType));
return false;
}
static FORCE_INLINE int32_t logConnMissHit(SCliConn* pConn) {
static FORCE_INLINE void logConnMissHit(SCliConn* pConn) {
// queue set;
// QUEUE_INIT(&set);
pConn->heapMissHit++;
@ -3253,7 +3352,6 @@ static FORCE_INLINE int32_t logConnMissHit(SCliConn* pConn) {
// if (transQueueSize(&pConn->reqsSentOut) >= BUFFER_LIMIT) {
// transQueueRemoveByFilter(&pConn->reqsSentOut, filterToDebug, NULL, &set, 1);
// }
return 0;
}
static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
int code = 0;
@ -3275,7 +3373,7 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
int32_t stateNum = taosHashGetSize(pConn->pQTable);
if (shouldSWitchToOtherConn(reqsNum, reqsSentOut, stateNum)) {
(void)logConnMissHit(pConn);
logConnMissHit(pConn);
return NULL;
}
}

View File

@ -97,13 +97,12 @@ void transFreeMsg(void* msg) {
tTrace("rpc free cont:%p", (char*)msg - TRANS_MSG_OVERHEAD);
taosMemoryFree((char*)msg - sizeof(STransMsgHead));
}
int transSockInfo2Str(struct sockaddr* sockname, char* dst) {
void transSockInfo2Str(struct sockaddr* sockname, char* dst) {
struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
char buf[20] = {0};
int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
return r;
}
int32_t transInitBuffer(SConnBuffer* buf) {
buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
@ -118,10 +117,9 @@ int32_t transInitBuffer(SConnBuffer* buf) {
buf->invalid = 0;
return 0;
}
int32_t transDestroyBuffer(SConnBuffer* p) {
void transDestroyBuffer(SConnBuffer* p) {
taosMemoryFree(p->buf);
p->buf = NULL;
return 0;
}
int32_t transClearBuffer(SConnBuffer* buf) {
@ -335,11 +333,12 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
SAsyncItem* item = async->data;
if (taosThreadMutexLock(&item->mtx) != 0) {
tError("failed to lock mutex");
tError("failed to lock mutex since %s", tstrerror(terrno));
return terrno;
}
QUEUE_PUSH(&item->qmsg, q);
(void)taosThreadMutexUnlock(&item->mtx);
TAOS_UNUSED(taosThreadMutexUnlock(&item->mtx));
int ret = uv_async_send(async);
if (ret != 0) {
@ -426,12 +425,10 @@ int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) {
wq->size = 0;
return 0;
}
int32_t transQueuePush(STransQueue* q, void* arg) {
void transQueuePush(STransQueue* q, void* arg) {
queue* node = arg;
QUEUE_PUSH(&q->node, node);
q->size++;
return 0;
}
void* transQueuePop(STransQueue* q) {
if (q->size == 0) return NULL;
@ -737,18 +734,24 @@ int64_t transAddExHandle(int32_t refMgt, void* p) {
// acquire extern handle
return taosAddRef(refMgt, p);
}
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) {
void transRemoveExHandle(int32_t refMgt, int64_t refId) {
// acquire extern handle
return taosRemoveRef(refMgt, refId);
int32_t code = taosRemoveRef(refMgt, refId);
if (code != 0) {
tTrace("failed to remove %" PRId64 " from resetId:%d", refId, refMgt);
}
}
void* transAcquireExHandle(int32_t refMgt, int64_t refId) { // acquire extern handle
return (void*)taosAcquireRef(refMgt, refId);
}
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) {
void transReleaseExHandle(int32_t refMgt, int64_t refId) {
// release extern handle
return taosReleaseRef(refMgt, refId);
int32_t code = taosReleaseRef(refMgt, refId);
if (code != 0) {
tTrace("failed to release %" PRId64 " from resetId:%d", refId, refMgt);
}
}
void transDestroyExHandle(void* handle) {
if (handle == NULL) {
@ -869,15 +872,22 @@ int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) {
// }
int32_t initWQ(queue* wq) {
int32_t code = 0;
QUEUE_INIT(wq);
for (int i = 0; i < 4; i++) {
SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper));
if (w == NULL) {
TAOS_CHECK_GOTO(terrno, NULL, _exception);
}
w->wreq.data = w;
w->arg = NULL;
QUEUE_INIT(&w->node);
QUEUE_PUSH(wq, &w->q);
}
return 0;
_exception:
destroyWQ(wq);
return code;
}
void destroyWQ(queue* wq) {
while (!QUEUE_IS_EMPTY(wq)) {
@ -899,6 +909,9 @@ uv_write_t* allocWReqFromWQ(queue* wq, void* arg) {
return &w->wreq;
} else {
SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper));
if (w == NULL) {
return NULL;
}
w->wreq.data = w;
w->arg = arg;
QUEUE_INIT(&w->node);
@ -910,3 +923,15 @@ void freeWReqToWQ(queue* wq, SWReqsWrapper* w) {
QUEUE_INIT(&w->node);
QUEUE_PUSH(wq, &w->q);
}
int32_t transSetReadOption(uv_handle_t* handle) {
int32_t code = 0;
int32_t fd;
int ret = uv_fileno((uv_handle_t*)handle, &fd);
if (ret != 0) {
tWarn("failed to get fd since %s", uv_err_name(ret));
return TSDB_CODE_THIRDPARTY_ERROR;
}
code = taosSetSockOpt2(fd);
return code;
}

View File

@ -161,8 +161,8 @@ static void uvFreeCb(uv_handle_t* handle);
static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg);
static int uvPrepareSendData(SSvrRespMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSvrRespMsg* msg);
static int32_t uvPrepareSendData(SSvrRespMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSvrRespMsg* msg);
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
@ -453,7 +453,10 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
(pInst->cfp)(pInst->parent, &(arg->msg), NULL);
tTrace("conn %p recv release, notify server app, qid:%" PRId64 "", pConn, qId);
(void)taosHashRemove(pConn->pQTable, &qId, sizeof(qId));
code = taosHashRemove(pConn->pQTable, &qId, sizeof(qId));
if (code != 0) {
tDebug("conn %p failed to remove qid:%" PRId64 "", pConn, qId);
}
tTrace("conn %p clear state,qid:%" PRId64 "", pConn, qId);
}
@ -464,11 +467,15 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
.info.seqNum = taosHton64(pHead->seqNum)};
SSvrRespMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
if (srvMsg == NULL) {
tError("conn %p recv release, but invalid qid:%" PRId64 "", pConn, qId);
return terrno;
}
srvMsg->msg = tmsg;
srvMsg->type = Normal;
srvMsg->pConn = pConn;
(void)transQueuePush(&pConn->resps, &srvMsg->q);
transQueuePush(&pConn->resps, &srvMsg->q);
uvStartSendRespImpl(srvMsg);
taosMemoryFree(pHead);
@ -582,14 +589,16 @@ static bool uvHandleReq(SSvrConn* pConn) {
pConnInfo->clientPort = pConn->port;
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
(void)transReleaseExHandle(uvGetConnRefOfThrd(pThrd), pConn->refId);
transReleaseExHandle(uvGetConnRefOfThrd(pThrd), pConn->refId);
(*pInst->cfp)(pInst->parent, &transMsg, NULL);
return true;
}
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
int32_t code = 0;
SSvrConn* conn = cli->data;
STrans* pInst = conn->pInst;
SWorkThrd* pThrd = conn->hostThrd;
STUB_RAND_NETWORK_ERR(nread);
@ -599,10 +608,11 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
destroyConn(conn, true);
return;
}
STrans* pInst = conn->pInst;
int32_t fd = 0;
(void)uv_fileno((uv_handle_t*)cli, &fd);
(void)taosSetSockOpt2(fd);
code = transSetReadOption((uv_handle_t*)cli);
if (code != 0) {
tWarn("%s conn %p failed to set recv opt since %s", transLabel(pInst), conn, code);
}
SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) {
@ -705,7 +715,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
taosMemoryFree(req);
}
static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) {
static int32_t uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) {
SSvrConn* pConn = smsg->pConn;
STransMsg* pMsg = &smsg->msg;
if (pMsg->pCont == 0) {
@ -726,12 +736,10 @@ static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) {
pHead->withUserInfo = pConn->userInited == 0 ? 1 : 0;
// handle invalid drop_task resp, TD-20098
if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
// (void)transQueuePop(&pConn->resps);
// destroySmsg(smsg);
// return TSDB_CODE_INVALID_MSG;
return 0;
}
// if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
// // return TSDB_CODE_INVALID_MSG;
// return 0;
// }
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
// pHead->msgType = pMsg->msgType;
@ -758,6 +766,7 @@ static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) {
return 0;
}
static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* bufNum, queue* toSendQ) {
int32_t code = 0;
int32_t size = transQueueSize(&pConn->resps);
tDebug("%s conn %p has %d msg to send", transLabel(pConn->pInst), pConn, size);
if (size == 0) {
@ -766,6 +775,9 @@ static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* buf
if (pConn->bufSize < size) {
pConn->buf = taosMemoryRealloc(pConn->buf, size * sizeof(uv_buf_t));
if (pConn->buf == NULL) {
return terrno;
}
pConn->bufSize = size;
}
uv_buf_t* pWb = pConn->buf;
@ -776,7 +788,10 @@ static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* buf
queue* el = transQueuePop(&pConn->resps);
SSvrRespMsg* pMsg = QUEUE_DATA(el, SSvrRespMsg, q);
uv_buf_t wb;
(void)uvPrepareSendData(pMsg, &wb);
code = uvPrepareSendData(pMsg, &wb);
if (code != 0) {
return code;
}
pWb[count] = wb;
pMsg->sent = 1;
QUEUE_PUSH(toSendQ, &pMsg->q);
@ -805,7 +820,12 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) {
return;
}
uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn);
uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn);
if (req == NULL) {
uError("%s conn %p failed to alloc write req since %s", transLabel(pConn->pInst), pConn, tstrerror(terrno));
transUnrefSrvHandle(pConn);
return;
}
SWReqsWrapper* pWreq = req->data;
uv_buf_t* pBuf = NULL;
@ -838,6 +858,7 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) {
}
}
int32_t uvMayHandleReleaseResp(SSvrRespMsg* pMsg) {
int32_t code = 0;
SSvrConn* pConn = pMsg->pConn;
int64_t qid = pMsg->msg.info.qId;
if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) {
@ -847,7 +868,10 @@ int32_t uvMayHandleReleaseResp(SSvrRespMsg* pMsg) {
return TSDB_CODE_RPC_NO_STATE;
} else {
transFreeMsg(p->msg.pCont);
(void)taosHashRemove(pConn->pQTable, &qid, sizeof(qid));
code = taosHashRemove(pConn->pQTable, &qid, sizeof(qid));
if (code != 0) {
tError("%s conn %p failed release qid:%d since %s", tstrerror(code));
}
}
}
return 0;
@ -860,7 +884,7 @@ static void uvStartSendResp(SSvrRespMsg* smsg) {
return;
}
(void)transQueuePush(&pConn->resps, &smsg->q);
transQueuePush(&pConn->resps, &smsg->q);
uvStartSendRespImpl(smsg);
return;
}
@ -928,12 +952,12 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
SExHandle* exh2 = transAcquireExHandle(uvGetConnRefOfThrd(pThrd), refId);
if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1);
(void)transReleaseExHandle(uvGetConnRefOfThrd(pThrd), refId);
transReleaseExHandle(uvGetConnRefOfThrd(pThrd), refId);
destroySmsg(msg);
continue;
}
msg->pConn = exh1->handle;
(void)transReleaseExHandle(uvGetConnRefOfThrd(pThrd), refId);
transReleaseExHandle(uvGetConnRefOfThrd(pThrd), refId);
(*transAsyncHandle[msg->type])(msg, pThrd);
}
}
@ -1031,6 +1055,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
}
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
int32_t code = 0;
STUB_RAND_NETWORK_ERR(nread);
if (nread < 0) {
if (nread != UV_EOF) {
@ -1100,9 +1125,16 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
pConn->serverIp = saddr.sin_addr.s_addr;
pConn->port = ntohs(addr.sin_port);
(void)transSetConnOption((uv_tcp_t*)pConn->pTcp, 20);
(void)uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
code = transSetConnOption((uv_tcp_t*)pConn->pTcp, 20);
if (code != 0) {
tWarn("failed to set tcp option since %s", tstrerror(code));
}
code = uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
if (code != 0) {
tWarn("%s conn %p failed to start to read since %s", uv_err_name(code));
transUnrefSrvHandle(pConn);
return;
}
} else {
tDebug("failed to create new connection");
transUnrefSrvHandle(pConn);
@ -1225,10 +1257,14 @@ static int32_t addHandleToAcceptloop(void* arg) {
}
void* transWorkerThread(void* arg) {
int32_t code = 0;
setThreadName("trans-svr-work");
SWorkThrd* pThrd = (SWorkThrd*)arg;
tsEnableRandErr = true;
(void)uv_run(pThrd->loop, UV_RUN_DEFAULT);
code = uv_run(pThrd->loop, UV_RUN_DEFAULT);
if (code != 0) {
tWarn("failed to start to worker thread since %s", uv_err_name(code));
}
return NULL;
}
@ -1318,7 +1354,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
_end:
if (pConn) {
transQueueDestroy(&pConn->resps);
(void)transDestroyBuffer(&pConn->readBuf);
transDestroyBuffer(&pConn->readBuf);
taosHashCleanup(pConn->pQTable);
taosMemoryFree(pConn->pTcp);
destroyWQ(&pConn->wq);
@ -1369,8 +1405,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
}
SWorkThrd* thrd = conn->hostThrd;
(void)transReleaseExHandle(uvGetConnRefOfThrd(thrd), conn->refId);
(void)transRemoveExHandle(uvGetConnRefOfThrd(thrd), conn->refId);
transReleaseExHandle(uvGetConnRefOfThrd(thrd), conn->refId);
transRemoveExHandle(uvGetConnRefOfThrd(thrd), conn->refId);
STrans* pInst = thrd->pInst;
tDebug("%s conn %p destroy", transLabel(pInst), conn);
@ -1383,7 +1419,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
uvConnDestroyAllState(conn);
(void)transDestroyBuffer(&conn->readBuf);
transDestroyBuffer(&conn->readBuf);
destroyWQ(&conn->wq);
taosMemoryFree(conn->buf);
@ -1839,15 +1875,15 @@ int32_t transReleaseSrvHandle(void* handle) {
qId);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m);
(void)transReleaseExHandle(info->refIdMgt, refId);
transReleaseExHandle(info->refIdMgt, refId);
return code;
}
(void)transReleaseExHandle(info->refIdMgt, refId);
transReleaseExHandle(info->refIdMgt, refId);
return 0;
_return1:
tDebug("handle %p failed to send to release handle", exh);
(void)transReleaseExHandle(info->refIdMgt, refId);
transReleaseExHandle(info->refIdMgt, refId);
return code;
_return2:
tDebug("handle %p failed to send to release handle", exh);
@ -1893,17 +1929,17 @@ int32_t transSendResponse(const STransMsg* msg) {
tGDebug("conn %p start to send resp (1/2)", exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m);
(void)transReleaseExHandle(msg->info.refIdMgt, refId);
transReleaseExHandle(msg->info.refIdMgt, refId);
return code;
}
(void)transReleaseExHandle(msg->info.refIdMgt, refId);
transReleaseExHandle(msg->info.refIdMgt, refId);
return 0;
_return1:
tDebug("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont);
(void)transReleaseExHandle(msg->info.refIdMgt, refId);
transReleaseExHandle(msg->info.refIdMgt, refId);
return code;
_return2:
tDebug("handle %p failed to send resp", exh);
@ -1941,17 +1977,17 @@ int32_t transRegisterMsg(const STransMsg* msg) {
tDebug("%s conn %p start to register brokenlink callback", transLabel(pInst), exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m);
(void)transReleaseExHandle(msg->info.refIdMgt, refId);
transReleaseExHandle(msg->info.refIdMgt, refId);
return code;
}
(void)transReleaseExHandle(msg->info.refIdMgt, refId);
transReleaseExHandle(msg->info.refIdMgt, refId);
return 0;
_return1:
tDebug("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont);
(void)transReleaseExHandle(msg->info.refIdMgt, refId);
transReleaseExHandle(msg->info.refIdMgt, refId);
return code;
_return2:
tDebug("handle %p failed to register brokenlink", exh);
@ -1996,7 +2032,7 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
break;
}
}
TAOS_UNUSED(transReleaseExHandle(transGetInstMgt(), (int64_t)thandle));
transReleaseExHandle(transGetInstMgt(), (int64_t)thandle);
if (code != 0) {
tError("ip-white-list update failed since %s", tstrerror(code));