avoid deadlock
This commit is contained in:
parent
ff5c1f1cc4
commit
1a67ca34b9
|
@ -155,6 +155,8 @@ static void clientSentCb(uv_write_t* req, int32_t status) {
|
|||
if (status != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(status);
|
||||
uError("http-report failed to send data %s", uv_strerror(status));
|
||||
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
||||
return;
|
||||
} else {
|
||||
uTrace("http-report succ to send data");
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#include "transComm.h"
|
||||
|
||||
typedef struct SConnList {
|
||||
queue conn;
|
||||
queue conns;
|
||||
int32_t size;
|
||||
} SConnList;
|
||||
|
||||
|
@ -107,11 +107,11 @@ static void doCloseIdleConn(void* param);
|
|||
static void cliReadTimeoutCb(uv_timer_t* handle);
|
||||
// register timer in each thread to clear expire conn
|
||||
// static void cliTimeoutCb(uv_timer_t* handle);
|
||||
// alloc buf for recv
|
||||
// alloc buffer for recv
|
||||
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||
// callback after read nbytes from socket
|
||||
// callback after recv nbytes from socket
|
||||
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
||||
// callback after write data to socket
|
||||
// callback after send data to socket
|
||||
static void cliSendCb(uv_write_t* req, int status);
|
||||
// callback after conn to server
|
||||
static void cliConnCb(uv_connect_t* req, int status);
|
||||
|
@ -129,19 +129,14 @@ static SCliConn* cliCreateConn(SCliThrd* thrd);
|
|||
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
|
||||
static void cliDestroy(uv_handle_t* handle);
|
||||
static void cliSend(SCliConn* pConn);
|
||||
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
|
||||
|
||||
static bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
|
||||
if (code != 0) return false;
|
||||
if (pCtx->retryCnt == 0) return false;
|
||||
if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
|
||||
return true;
|
||||
}
|
||||
// cli util func
|
||||
static bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
|
||||
static void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
|
||||
|
||||
static int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
|
||||
|
||||
void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
|
||||
/*
|
||||
* set TCP connection timeout per-socket level
|
||||
*/
|
||||
static int cliCreateSocket();
|
||||
// process data read from server, add decompress etc later
|
||||
static void cliHandleResp(SCliConn* conn);
|
||||
// handle except about conn
|
||||
|
@ -169,15 +164,14 @@ static void destroyThrdObj(SCliThrd* pThrd);
|
|||
static void cliWalkCb(uv_handle_t* handle, void* arg);
|
||||
|
||||
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
||||
SCliMsg* pMsg = NULL;
|
||||
for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
|
||||
pMsg = transQueueGet(&conn->cliMsgs, i);
|
||||
if (pMsg != NULL && pMsg->ctx != NULL) {
|
||||
if (conn->ctx.freeFunc != NULL) {
|
||||
conn->ctx.freeFunc(pMsg->ctx->ahandle);
|
||||
SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
|
||||
if (msg != NULL && msg->ctx != NULL) {
|
||||
if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
|
||||
conn->ctx.freeFunc(msg->ctx->ahandle);
|
||||
}
|
||||
}
|
||||
destroyCmsg(pMsg);
|
||||
destroyCmsg(msg);
|
||||
}
|
||||
}
|
||||
#define CLI_RELEASE_UV(loop) \
|
||||
|
@ -217,8 +211,10 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
|||
} \
|
||||
if (i == sz) { \
|
||||
pMsg = NULL; \
|
||||
tDebug("msg not found, %" PRIu64 "", ahandle); \
|
||||
} else { \
|
||||
pMsg = transQueueRm(&conn->cliMsgs, i); \
|
||||
tDebug("msg found, %" PRIu64 "", ahandle); \
|
||||
} \
|
||||
} while (0)
|
||||
#define CONN_GET_NEXT_SENDMSG(conn) \
|
||||
|
@ -470,8 +466,8 @@ void* createConnPool(int size) {
|
|||
void* destroyConnPool(void* pool) {
|
||||
SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
|
||||
while (connList != NULL) {
|
||||
while (!QUEUE_IS_EMPTY(&connList->conn)) {
|
||||
queue* h = QUEUE_HEAD(&connList->conn);
|
||||
while (!QUEUE_IS_EMPTY(&connList->conns)) {
|
||||
queue* h = QUEUE_HEAD(&connList->conns);
|
||||
SCliConn* c = QUEUE_DATA(h, SCliConn, q);
|
||||
cliDestroyConn(c, true);
|
||||
}
|
||||
|
@ -484,21 +480,21 @@ void* destroyConnPool(void* pool) {
|
|||
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
||||
char key[32] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||
SHashObj* pPool = pool;
|
||||
SConnList* plist = taosHashGet(pPool, key, strlen(key));
|
||||
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||
if (plist == NULL) {
|
||||
SConnList list = {0};
|
||||
taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
|
||||
plist = taosHashGet(pPool, key, strlen(key));
|
||||
QUEUE_INIT(&plist->conn);
|
||||
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
|
||||
plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||
QUEUE_INIT(&plist->conns);
|
||||
}
|
||||
|
||||
if (QUEUE_IS_EMPTY(&plist->conn)) {
|
||||
if (QUEUE_IS_EMPTY(&plist->conns)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
plist->size -= 1;
|
||||
queue* h = QUEUE_HEAD(&plist->conn);
|
||||
queue* h = QUEUE_HEAD(&plist->conns);
|
||||
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
|
||||
conn->status = ConnNormal;
|
||||
QUEUE_REMOVE(&conn->q);
|
||||
|
@ -514,22 +510,21 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
if (conn->status == ConnInPool) {
|
||||
return;
|
||||
}
|
||||
SCliThrd* thrd = conn->hostThrd;
|
||||
CONN_HANDLE_THREAD_QUIT(thrd);
|
||||
|
||||
allocConnRef(conn, true);
|
||||
|
||||
SCliThrd* thrd = conn->hostThrd;
|
||||
if (conn->timer != NULL) {
|
||||
uv_timer_stop(conn->timer);
|
||||
taosArrayPush(thrd->timerList, &conn->timer);
|
||||
conn->timer->data = NULL;
|
||||
conn->timer = NULL;
|
||||
}
|
||||
if (T_REF_VAL_GET(conn) > 1) {
|
||||
transUnrefCliHandle(conn);
|
||||
}
|
||||
|
||||
cliDestroyConnMsgs(conn, false);
|
||||
|
||||
STrans* pTransInst = thrd->pTransInst;
|
||||
cliReleaseUnfinishedMsg(conn);
|
||||
transQueueClear(&conn->cliMsgs);
|
||||
transCtxCleanup(&conn->ctx);
|
||||
conn->status = ConnInPool;
|
||||
|
||||
if (conn->list == NULL) {
|
||||
|
@ -540,18 +535,15 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
} else {
|
||||
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
|
||||
}
|
||||
assert(conn->list != NULL);
|
||||
QUEUE_INIT(&conn->q);
|
||||
QUEUE_PUSH(&conn->list->conn, &conn->q);
|
||||
QUEUE_PUSH(&conn->list->conns, &conn->q);
|
||||
conn->list->size += 1;
|
||||
|
||||
conn->task = NULL;
|
||||
assert(!QUEUE_IS_EMPTY(&conn->list->conn));
|
||||
|
||||
if (conn->list->size >= 50) {
|
||||
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
|
||||
arg->param1 = conn;
|
||||
arg->param2 = thrd;
|
||||
|
||||
STrans* pTransInst = thrd->pTransInst;
|
||||
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
|
||||
}
|
||||
}
|
||||
|
@ -691,11 +683,10 @@ static void cliDestroy(uv_handle_t* handle) {
|
|||
|
||||
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||
taosMemoryFree(conn->ip);
|
||||
conn->stream->data = NULL;
|
||||
taosMemoryFree(conn->stream);
|
||||
transCtxCleanup(&conn->ctx);
|
||||
cliReleaseUnfinishedMsg(conn);
|
||||
transQueueDestroy(&conn->cliMsgs);
|
||||
|
||||
cliDestroyConnMsgs(conn, true);
|
||||
|
||||
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||
transReqQueueClear(&conn->wreqQueue);
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
|
@ -738,8 +729,6 @@ static void cliSendCb(uv_write_t* req, int status) {
|
|||
}
|
||||
|
||||
void cliSend(SCliConn* pConn) {
|
||||
CONN_HANDLE_BROKEN(pConn);
|
||||
|
||||
assert(!transQueueEmpty(&pConn->cliMsgs));
|
||||
|
||||
SCliMsg* pCliMsg = NULL;
|
||||
|
@ -756,8 +745,8 @@ void cliSend(SCliConn* pConn) {
|
|||
pMsg->pCont = (void*)rpcMallocCont(0);
|
||||
pMsg->contLen = 0;
|
||||
}
|
||||
int msgLen = transMsgLenFromCont(pMsg->contLen);
|
||||
|
||||
int msgLen = transMsgLenFromCont(pMsg->contLen);
|
||||
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
||||
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
|
||||
pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
|
||||
|
@ -769,8 +758,6 @@ void cliSend(SCliConn* pConn) {
|
|||
pHead->traceId = pMsg->info.traceId;
|
||||
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
||||
|
||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||
|
||||
STraceId* trace = &pMsg->info.traceId;
|
||||
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen);
|
||||
|
@ -792,6 +779,8 @@ void cliSend(SCliConn* pConn) {
|
|||
tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
|
||||
uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
|
||||
}
|
||||
|
||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
|
||||
uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
|
||||
return;
|
||||
|
@ -807,7 +796,6 @@ void cliConnCb(uv_connect_t* req, int status) {
|
|||
cliHandleExcept(pConn);
|
||||
return;
|
||||
}
|
||||
// int addrlen = sizeof(pConn->addr);
|
||||
struct sockaddr peername, sockname;
|
||||
|
||||
int addrlen = sizeof(peername);
|
||||
|
@ -840,7 +828,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
int64_t refId = (int64_t)(pMsg->msg.info.handle);
|
||||
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
|
||||
if (exh == NULL) {
|
||||
tDebug("%" PRId64 " already release", refId);
|
||||
tDebug("%" PRId64 " already released", refId);
|
||||
destroyCmsg(pMsg);
|
||||
return;
|
||||
}
|
||||
|
@ -856,6 +844,9 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
return;
|
||||
}
|
||||
cliSend(conn);
|
||||
} else {
|
||||
tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
|
||||
destroyCmsg(pMsg);
|
||||
}
|
||||
}
|
||||
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
|
@ -905,6 +896,27 @@ void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
|
||||
if (code != 0) return false;
|
||||
if (pCtx->retryCnt == 0) return false;
|
||||
if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
|
||||
if (pMsg == NULL) return -1;
|
||||
|
||||
memset(pResp, 0, sizeof(STransMsg));
|
||||
|
||||
pResp->code = TSDB_CODE_RPC_BROKEN_LINK;
|
||||
pResp->msgType = pMsg->msg.msgType + 1;
|
||||
pResp->info.ahandle = pMsg->ctx ? pMsg->ctx->ahandle : NULL;
|
||||
pResp->info.traceId = pMsg->msg.info.traceId;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
|
@ -920,13 +932,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
|
||||
if (ignore == true) {
|
||||
// persist conn already release by server
|
||||
STransMsg resp = {0};
|
||||
resp.code = TSDB_CODE_RPC_BROKEN_LINK;
|
||||
resp.msgType = pMsg->msg.msgType + 1;
|
||||
|
||||
resp.info.ahandle = pMsg && pMsg->ctx ? pMsg->ctx->ahandle : NULL;
|
||||
resp.info.traceId = pMsg->msg.info.traceId;
|
||||
|
||||
STransMsg resp;
|
||||
cliBuildExceptResp(pMsg, &resp);
|
||||
pTransInst->cfp(pTransInst->parent, &resp, NULL);
|
||||
destroyCmsg(pMsg);
|
||||
return;
|
||||
|
@ -991,9 +998,6 @@ static void cliAsyncCb(uv_async_t* handle) {
|
|||
QUEUE_REMOVE(h);
|
||||
|
||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||
if (pMsg == NULL) {
|
||||
continue;
|
||||
}
|
||||
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
|
||||
count++;
|
||||
}
|
||||
|
@ -1035,24 +1039,58 @@ static void cliPrepareCb(uv_prepare_t* handle) {
|
|||
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
|
||||
}
|
||||
|
||||
void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
|
||||
transCtxCleanup(&conn->ctx);
|
||||
cliReleaseUnfinishedMsg(conn);
|
||||
if (destroy == 1) {
|
||||
transQueueDestroy(&conn->cliMsgs);
|
||||
} else {
|
||||
transQueueClear(&conn->cliMsgs);
|
||||
}
|
||||
}
|
||||
|
||||
void cliIteraConnMsgs(SCliConn* conn) {
|
||||
SCliThrd* pThrd = conn->hostThrd;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
|
||||
SCliMsg* cmsg = transQueueGet(&conn->cliMsgs, i);
|
||||
if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STransMsg resp = {0};
|
||||
if (-1 == cliBuildExceptResp(cmsg, &resp)) {
|
||||
continue;
|
||||
}
|
||||
pTransInst->cfp(pTransInst->parent, &resp, NULL);
|
||||
|
||||
cmsg->ctx->ahandle = NULL;
|
||||
}
|
||||
}
|
||||
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
|
||||
if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
|
||||
uint64_t ahandle = pHead->ahandle;
|
||||
tDebug("ahandle = %" PRIu64 "", ahandle);
|
||||
SCliMsg* pMsg = NULL;
|
||||
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
|
||||
|
||||
transClearBuffer(&conn->readBuf);
|
||||
transFreeMsg(transContFromHead((char*)pHead));
|
||||
if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) {
|
||||
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0);
|
||||
if (cliMsg->type == Release) return true;
|
||||
|
||||
for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->cliMsgs); i++) {
|
||||
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i);
|
||||
if (cliMsg->type == Release) {
|
||||
assert(pMsg == NULL);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
cliIteraConnMsgs(conn);
|
||||
|
||||
tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
|
||||
if (T_REF_VAL_GET(conn) > 1) {
|
||||
transUnrefCliHandle(conn);
|
||||
}
|
||||
destroyCmsg(pMsg);
|
||||
cliReleaseUnfinishedMsg(conn);
|
||||
transQueueClear(&conn->cliMsgs);
|
||||
|
||||
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -492,7 +492,6 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
|||
// release handle to rpc init
|
||||
if (msg->type == Quit) {
|
||||
(*transAsyncHandle[msg->type])(msg, pThrd);
|
||||
continue;
|
||||
} else {
|
||||
STransMsg transMsg = msg->msg;
|
||||
|
||||
|
@ -771,7 +770,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
|
|||
// conn set
|
||||
QUEUE_INIT(&pThrd->conn);
|
||||
|
||||
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 1, pThrd, uvWorkerAsyncCb);
|
||||
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
|
||||
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
|
||||
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
|
||||
return true;
|
||||
|
|
Loading…
Reference in New Issue