Merge pull request #10003 from taosdata/feature/trans

handle close except
This commit is contained in:
Yihao Deng 2022-01-25 09:31:02 +08:00 committed by GitHub
commit 49a4aa3fc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 33 additions and 4 deletions

View File

@ -182,7 +182,7 @@ typedef struct {
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx)) #define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead) + sizeof(STransDigestMsg)) #define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) #define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead)) #define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead)) #define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
@ -201,6 +201,7 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
void transConnCtxDestroy(STransConnCtx* ctx); void transConnCtxDestroy(STransConnCtx* ctx);
void transFreeMsg(void* msg);
typedef struct SConnBuffer { typedef struct SConnBuffer {
char* buf; char* buf;
int len; int len;

View File

@ -107,8 +107,17 @@ static void clientHandleResp(SCliConn* conn) {
SRpcInfo* pRpc = pCtx->pTransInst; SRpcInfo* pRpc = pCtx->pTransInst;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
rpcMsg.pCont = conn->readBuf.buf; STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
rpcMsg.contLen = conn->readBuf.len; pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = transContFromHead(pHead);
rpcMsg.code = pHead->code;
rpcMsg.msgType = pHead->msgType;
// rpcMsg.pCont = conn->readBuf.buf;
// rpcMsg.contLen = conn->readBuf.len;
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = pCtx->ahandle;
(pRpc->cfp)(NULL, &rpcMsg, NULL); (pRpc->cfp)(NULL, &rpcMsg, NULL);
conn->notifyCount += 1; conn->notifyCount += 1;
@ -128,8 +137,12 @@ static void clientHandleExcept(SCliConn* pConn) {
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pRpc = pCtx->pTransInst; SRpcInfo* pRpc = pCtx->pTransInst;
SRpcMsg rpcMsg; transFreeMsg((pMsg->msg.pCont));
pMsg->msg.pCont = NULL;
SRpcMsg rpcMsg = {0};
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = pCtx->ahandle;
rpcMsg.code = -1;
// SRpcInfo* pRpc = pMsg->ctx->pRpc; // SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pRpc->cfp)(NULL, &rpcMsg, NULL); (pRpc->cfp)(NULL, &rpcMsg, NULL);
@ -306,6 +319,10 @@ static void clientDestroy(uv_handle_t* handle) {
static void clientWriteCb(uv_write_t* req, int status) { static void clientWriteCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
SCliMsg* pMsg = pConn->data;
transFreeMsg((pMsg->msg.pCont));
pMsg->msg.pCont = NULL;
if (status == 0) { if (status == 0) {
tDebug("conn %p data already was written out", pConn); tDebug("conn %p data already was written out", pConn);
} else { } else {

View File

@ -191,4 +191,11 @@ void transConnCtxDestroy(STransConnCtx* ctx) {
free(ctx->ip); free(ctx->ip);
free(ctx); free(ctx);
} }
void transFreeMsg(void* msg) {
if (msg == NULL) {
return;
}
free((char*)msg - sizeof(STransMsgHead));
}
#endif #endif

View File

@ -321,6 +321,10 @@ void uvOnWriteCb(uv_write_t* req, int status) {
buf->len = 0; buf->len = 0;
memset(buf->buf, 0, buf->cap); memset(buf->buf, 0, buf->cap);
buf->left = -1; buf->left = -1;
SRpcMsg* pMsg = &conn->sendMsg;
transFreeMsg(pMsg->pCont);
if (status == 0) { if (status == 0) {
tDebug("conn %p data already was written on stream", conn); tDebug("conn %p data already was written on stream", conn);
} else { } else {