fix: fix rpc quit problem
This commit is contained in:
parent
aaad925d35
commit
ad29b90cc2
|
@ -45,7 +45,7 @@ typedef struct SRpcHandleInfo {
|
||||||
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
|
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
|
||||||
int32_t persistHandle; // persist handle or not
|
int32_t persistHandle; // persist handle or not
|
||||||
STraceId traceId;
|
STraceId traceId;
|
||||||
// int64_t traceId;
|
int8_t hasEpSet;
|
||||||
|
|
||||||
// app info
|
// app info
|
||||||
void *ahandle; // app handle set by client
|
void *ahandle; // app handle set by client
|
||||||
|
|
|
@ -148,6 +148,7 @@ typedef struct {
|
||||||
char release : 2;
|
char release : 2;
|
||||||
char secured : 2;
|
char secured : 2;
|
||||||
char spi : 2;
|
char spi : 2;
|
||||||
|
char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset
|
||||||
|
|
||||||
char user[TSDB_UNI_LEN];
|
char user[TSDB_UNI_LEN];
|
||||||
STraceId traceId;
|
STraceId traceId;
|
||||||
|
|
|
@ -312,6 +312,7 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
transMsg.msgType = pHead->msgType;
|
transMsg.msgType = pHead->msgType;
|
||||||
transMsg.info.ahandle = NULL;
|
transMsg.info.ahandle = NULL;
|
||||||
transMsg.info.traceId = pHead->traceId;
|
transMsg.info.traceId = pHead->traceId;
|
||||||
|
transMsg.info.hasEpSet = pHead->hasEpSet;
|
||||||
|
|
||||||
SCliMsg* pMsg = NULL;
|
SCliMsg* pMsg = NULL;
|
||||||
STransConnCtx* pCtx = NULL;
|
STransConnCtx* pCtx = NULL;
|
||||||
|
@ -1014,6 +1015,23 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
|
||||||
*val = newVal;
|
*val = newVal;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) {
|
||||||
|
if (pResp == NULL || pResp->info.hasEpSet == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
tDeserializeSEpSet(pResp->pCont, pResp->contLen, dst);
|
||||||
|
int32_t tlen = tSerializeSEpSet(NULL, 0, dst);
|
||||||
|
|
||||||
|
int32_t bufLen = pResp->contLen - tlen;
|
||||||
|
char* buf = rpcMallocCont(bufLen);
|
||||||
|
|
||||||
|
memcpy(buf, pResp->pCont + tlen, bufLen);
|
||||||
|
|
||||||
|
pResp->pCont = buf;
|
||||||
|
pResp->contLen = bufLen;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
SCliThrd* pThrd = pConn->hostThrd;
|
SCliThrd* pThrd = pConn->hostThrd;
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
@ -1058,6 +1076,12 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STraceId* trace = &pResp->info.traceId;
|
STraceId* trace = &pResp->info.traceId;
|
||||||
|
|
||||||
|
if (cliTryToExtractEpSet(pResp, &pCtx->epSet)) {
|
||||||
|
char tbuf[256] = {0};
|
||||||
|
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
||||||
|
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
|
}
|
||||||
if (pCtx->pSem != NULL) {
|
if (pCtx->pSem != NULL) {
|
||||||
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
|
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
if (pCtx->pRsp == NULL) {
|
if (pCtx->pRsp == NULL) {
|
||||||
|
|
|
@ -144,15 +144,6 @@ static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp
|
||||||
|
|
||||||
static int32_t exHandlesMgt;
|
static int32_t exHandlesMgt;
|
||||||
|
|
||||||
// void uvInitEnv();
|
|
||||||
// void uvOpenExHandleMgt(int size);
|
|
||||||
// void uvCloseExHandleMgt();
|
|
||||||
// int64_t uvAddExHandle(void* p);
|
|
||||||
// int32_t uvRemoveExHandle(int64_t refId);
|
|
||||||
// int32_t uvReleaseExHandle(int64_t refId);
|
|
||||||
// void uvDestoryExHandle(void* handle);
|
|
||||||
// SExHandle* uvAcquireExHandle(int64_t refId);
|
|
||||||
|
|
||||||
static void uvDestroyConn(uv_handle_t* handle);
|
static void uvDestroyConn(uv_handle_t* handle);
|
||||||
|
|
||||||
// server and worker thread
|
// server and worker thread
|
||||||
|
@ -350,16 +341,8 @@ void uvOnSendCb(uv_write_t* req, int status) {
|
||||||
tTrace("conn %p data already was written on stream", conn);
|
tTrace("conn %p data already was written on stream", conn);
|
||||||
if (!transQueueEmpty(&conn->srvMsgs)) {
|
if (!transQueueEmpty(&conn->srvMsgs)) {
|
||||||
SSvrMsg* msg = transQueuePop(&conn->srvMsgs);
|
SSvrMsg* msg = transQueuePop(&conn->srvMsgs);
|
||||||
// if (msg->type == Release && conn->status != ConnNormal) {
|
|
||||||
// conn->status = ConnNormal;
|
|
||||||
// transUnrefSrvHandle(conn);
|
|
||||||
// reallocConnRef(conn);
|
|
||||||
// destroySmsg(msg);
|
|
||||||
// transQueueClear(&conn->srvMsgs);
|
|
||||||
// return;
|
|
||||||
//}
|
|
||||||
destroySmsg(msg);
|
destroySmsg(msg);
|
||||||
// send second data, just use for push
|
// send cached data
|
||||||
if (!transQueueEmpty(&conn->srvMsgs)) {
|
if (!transQueueEmpty(&conn->srvMsgs)) {
|
||||||
msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
|
msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
|
||||||
if (msg->type == Register && conn->status == ConnAcquire) {
|
if (msg->type == Register && conn->status == ConnAcquire) {
|
||||||
|
@ -396,7 +379,6 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
|
||||||
tError("fail to dispatch conn to work thread");
|
tError("fail to dispatch conn to work thread");
|
||||||
}
|
}
|
||||||
uv_close((uv_handle_t*)req->data, uvFreeCb);
|
uv_close((uv_handle_t*)req->data, uvFreeCb);
|
||||||
// taosMemoryFree(req->data);
|
|
||||||
taosMemoryFree(req);
|
taosMemoryFree(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -410,6 +392,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
||||||
pHead->ahandle = (uint64_t)pMsg->info.ahandle;
|
pHead->ahandle = (uint64_t)pMsg->info.ahandle;
|
||||||
pHead->traceId = pMsg->info.traceId;
|
pHead->traceId = pMsg->info.traceId;
|
||||||
|
pHead->hasEpSet = pMsg->info.hasEpSet;
|
||||||
|
|
||||||
if (pConn->status == ConnNormal) {
|
if (pConn->status == ConnNormal) {
|
||||||
pHead->msgType = pConn->inType + 1;
|
pHead->msgType = pConn->inType + 1;
|
||||||
|
@ -422,6 +405,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
transUnrefSrvHandle(pConn);
|
transUnrefSrvHandle(pConn);
|
||||||
} else {
|
} else {
|
||||||
pHead->msgType = pMsg->msgType;
|
pHead->msgType = pMsg->msgType;
|
||||||
|
// set up resp msg type
|
||||||
if (pHead->msgType == 0 && transMsgLenFromCont(pMsg->contLen) == sizeof(STransMsgHead))
|
if (pHead->msgType == 0 && transMsgLenFromCont(pMsg->contLen) == sizeof(STransMsgHead))
|
||||||
pHead->msgType = pConn->inType + 1;
|
pHead->msgType = pConn->inType + 1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue