fix crash
This commit is contained in:
parent
e2d4c81cdc
commit
a0804f191a
|
@ -35,6 +35,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
pRpc->connType = pInit->connType;
|
||||
pRpc->idleTime = pInit->idleTime;
|
||||
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||
pRpc->parent = pInit->parent;
|
||||
|
||||
return pRpc;
|
||||
}
|
||||
|
|
|
@ -124,10 +124,10 @@ static void clientHandleResp(SCliConn* conn) {
|
|||
rpcMsg.msgType = pHead->msgType;
|
||||
rpcMsg.ahandle = pCtx->ahandle;
|
||||
if (pCtx->pSem == NULL) {
|
||||
tDebug("conn %p handle resp", conn);
|
||||
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
||||
tDebug("client conn %p handle resp, ", conn);
|
||||
(pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
|
||||
} else {
|
||||
tDebug("conn %p handle resp", conn);
|
||||
tDebug("client conn(sync) %p handle resp", conn);
|
||||
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
|
||||
tsem_post(pCtx->pSem);
|
||||
}
|
||||
|
@ -154,7 +154,7 @@ static void clientHandleExcept(SCliConn* pConn) {
|
|||
clientConnDestroy(pConn, true);
|
||||
return;
|
||||
}
|
||||
tDebug("conn %p start to destroy", pConn);
|
||||
tDebug("client conn %p start to destroy", pConn);
|
||||
SCliMsg* pMsg = pConn->data;
|
||||
|
||||
destroyUserdata(&pMsg->msg);
|
||||
|
@ -166,7 +166,7 @@ static void clientHandleExcept(SCliConn* pConn) {
|
|||
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
if (pCtx->pSem == NULL) {
|
||||
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
|
||||
(pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL);
|
||||
(pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL);
|
||||
} else {
|
||||
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
|
||||
// SRpcMsg rpcMsg
|
||||
|
@ -184,7 +184,7 @@ static void clientTimeoutCb(uv_timer_t* handle) {
|
|||
SCliThrdObj* pThrd = handle->data;
|
||||
SRpcInfo* pRpc = pThrd->pTransInst;
|
||||
int64_t currentTime = pThrd->nextTimeout;
|
||||
tDebug("timeout, try to remove expire conn from conn pool");
|
||||
tDebug("client conn timeout, try to remove expire conn from conn pool");
|
||||
|
||||
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
|
||||
while (p != NULL) {
|
||||
|
@ -253,7 +253,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
|
|||
|
||||
tstrncpy(key, ip, strlen(ip));
|
||||
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
|
||||
tDebug("conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
||||
tDebug("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
||||
|
||||
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
|
||||
|
||||
|
@ -294,10 +294,10 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
|
|||
pBuf->len += nread;
|
||||
if (clientReadComplete(pBuf)) {
|
||||
uv_read_stop((uv_stream_t*)conn->stream);
|
||||
tDebug("conn %p read complete", conn);
|
||||
tDebug("client conn %p read complete", conn);
|
||||
clientHandleResp(conn);
|
||||
} else {
|
||||
tDebug("conn %p read partial packet, continue to read", conn);
|
||||
tDebug("client conn %p read partial packet, continue to read", conn);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -309,7 +309,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
|
|||
return;
|
||||
}
|
||||
if (nread < 0 || nread == UV_EOF) {
|
||||
tError("conn %p read error: %s", conn, uv_err_name(nread));
|
||||
tError("client conn %p read error: %s", conn, uv_err_name(nread));
|
||||
clientHandleExcept(conn);
|
||||
}
|
||||
// tDebug("Read error %s\n", uv_err_name(nread));
|
||||
|
@ -320,9 +320,9 @@ static void clientConnDestroy(SCliConn* conn, bool clear) {
|
|||
//
|
||||
conn->ref--;
|
||||
if (conn->ref == 0) {
|
||||
tDebug("conn %p remove from conn pool", conn);
|
||||
tDebug("client conn %p remove from conn pool", conn);
|
||||
QUEUE_REMOVE(&conn->conn);
|
||||
tDebug("conn %p remove from conn pool successfully", conn);
|
||||
tDebug("client conn %p remove from conn pool successfully", conn);
|
||||
if (clear) {
|
||||
uv_close((uv_handle_t*)conn->stream, clientDestroy);
|
||||
}
|
||||
|
@ -334,7 +334,7 @@ static void clientDestroy(uv_handle_t* handle) {
|
|||
|
||||
free(conn->stream);
|
||||
free(conn->writeReq);
|
||||
tDebug("conn %p destroy successfully", conn);
|
||||
tDebug("client conn %p destroy successfully", conn);
|
||||
free(conn);
|
||||
|
||||
// clientConnDestroy(conn, false);
|
||||
|
@ -343,7 +343,7 @@ static void clientDestroy(uv_handle_t* handle) {
|
|||
static void clientWriteCb(uv_write_t* req, int status) {
|
||||
SCliConn* pConn = req->data;
|
||||
if (status == 0) {
|
||||
tDebug("conn %p data already was written out", pConn);
|
||||
tDebug("client conn %p data already was written out", pConn);
|
||||
SCliMsg* pMsg = pConn->data;
|
||||
if (pMsg == NULL) {
|
||||
// handle
|
||||
|
@ -351,7 +351,7 @@ static void clientWriteCb(uv_write_t* req, int status) {
|
|||
}
|
||||
destroyUserdata(&pMsg->msg);
|
||||
} else {
|
||||
tError("conn %p failed to write: %s", pConn, uv_err_name(status));
|
||||
tError("client conn %p failed to write: %s", pConn, uv_err_name(status));
|
||||
clientHandleExcept(pConn);
|
||||
return;
|
||||
}
|
||||
|
@ -370,7 +370,7 @@ static void clientWrite(SCliConn* pConn) {
|
|||
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||
|
||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||
tDebug("conn %p data write out, msgType : %d, len: %d", pConn, pHead->msgType, msgLen);
|
||||
tDebug("client conn %p data write out, msgType : %s, len: %d", pConn, TMSG_INFO(pHead->msgType), msgLen);
|
||||
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
|
||||
}
|
||||
static void clientConnCb(uv_connect_t* req, int status) {
|
||||
|
@ -378,11 +378,11 @@ static void clientConnCb(uv_connect_t* req, int status) {
|
|||
SCliConn* pConn = req->data;
|
||||
if (status != 0) {
|
||||
// tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
|
||||
tError("conn %p failed to connect server: %s", pConn, uv_strerror(status));
|
||||
tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status));
|
||||
clientHandleExcept(pConn);
|
||||
return;
|
||||
}
|
||||
tDebug("conn %p create", pConn);
|
||||
tDebug("client conn %p create", pConn);
|
||||
|
||||
assert(pConn->stream == req->handle);
|
||||
clientWrite(pConn);
|
||||
|
@ -400,14 +400,14 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||
uint64_t et = taosGetTimestampUs();
|
||||
uint64_t el = et - pMsg->st;
|
||||
tDebug("msg tran time cost: %" PRIu64 "", el);
|
||||
tDebug("client msg tran time cost: %" PRIu64 "", el);
|
||||
et = taosGetTimestampUs();
|
||||
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
||||
if (conn != NULL) {
|
||||
// impl later
|
||||
tDebug("conn %p get from conn pool", conn);
|
||||
tDebug("client get conn %p from pool", conn);
|
||||
conn->data = pMsg;
|
||||
conn->writeReq->data = conn;
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
|
|
|
@ -266,6 +266,7 @@ static void uvHandleReq(SSrvConn* pConn) {
|
|||
|
||||
transClearBuffer(&pConn->readBuf);
|
||||
pConn->ref++;
|
||||
tDebug("%s received on %p", TMSG_INFO(rpcMsg.msgType), pConn);
|
||||
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
|
||||
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
|
||||
// auth
|
||||
|
@ -278,7 +279,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
|||
SConnBuffer* pBuf = &conn->readBuf;
|
||||
if (nread > 0) {
|
||||
pBuf->len += nread;
|
||||
tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
|
||||
tDebug("conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
|
||||
if (readComplete(pBuf)) {
|
||||
tDebug("conn %p alread read complete packet", conn);
|
||||
uvHandleReq(conn);
|
||||
|
@ -717,6 +718,9 @@ void taosCloseServer(void* arg) {
|
|||
}
|
||||
|
||||
void rpcSendResponse(const SRpcMsg* pMsg) {
|
||||
if (pMsg->handle == NULL) {
|
||||
return;
|
||||
}
|
||||
SSrvConn* pConn = pMsg->handle;
|
||||
SWorkThrdObj* pThrd = pConn->hostThrd;
|
||||
|
||||
|
|
Loading…
Reference in New Issue