change the pthread_mutex to lock
This commit is contained in:
parent
f9302e8a39
commit
3eccf4d22f
|
@ -116,6 +116,7 @@ typedef struct _RpcConn {
|
||||||
int reqMsgLen; // request message length
|
int reqMsgLen; // request message length
|
||||||
SRpcInfo *pRpc; // the associated SRpcInfo
|
SRpcInfo *pRpc; // the associated SRpcInfo
|
||||||
int connType; // connection type
|
int connType; // connection type
|
||||||
|
int64_t lockedBy; // lock for connection
|
||||||
SRpcReqContext *pContext; // request context
|
SRpcReqContext *pContext; // request context
|
||||||
} SRpcConn;
|
} SRpcConn;
|
||||||
|
|
||||||
|
@ -191,6 +192,8 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
|
||||||
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead);
|
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead);
|
||||||
static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
|
static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
|
||||||
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
|
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
|
||||||
|
static void rpcLockConn(SRpcConn *pConn);
|
||||||
|
static void rpcUnlockConn(SRpcConn *pConn);
|
||||||
|
|
||||||
void *rpcOpen(SRpcInit *pInit) {
|
void *rpcOpen(SRpcInit *pInit) {
|
||||||
SRpcInfo *pRpc;
|
SRpcInfo *pRpc;
|
||||||
|
@ -361,11 +364,11 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
|
||||||
contLen = rpcCompressRpcMsg(pCont, contLen);
|
contLen = rpcCompressRpcMsg(pCont, contLen);
|
||||||
msgLen = rpcMsgLenFromCont(contLen);
|
msgLen = rpcMsgLenFromCont(contLen);
|
||||||
|
|
||||||
pthread_mutex_lock(&pRpc->mutex);
|
rpcLockConn(pConn);
|
||||||
|
|
||||||
if ( pConn->inType == 0 || pConn->user[0] == 0 ) {
|
if ( pConn->inType == 0 || pConn->user[0] == 0 ) {
|
||||||
tTrace("%s %p, connection is already released, rsp wont be sent", pRpc->label, pConn);
|
tTrace("%s %p, connection is already released, rsp wont be sent", pRpc->label, pConn);
|
||||||
pthread_mutex_lock(&pRpc->mutex);
|
rpcUnlockConn(pConn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -390,7 +393,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
|
||||||
pConn->rspMsgLen = msgLen;
|
pConn->rspMsgLen = msgLen;
|
||||||
if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
|
if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
|
||||||
|
|
||||||
pthread_mutex_unlock(&pRpc->mutex);
|
rpcUnlockConn(pConn);
|
||||||
|
|
||||||
taosTmrStopA(&pConn->pTimer);
|
taosTmrStopA(&pConn->pTimer);
|
||||||
rpcSendMsgToPeer(pConn, msg, msgLen);
|
rpcSendMsgToPeer(pConn, msg, msgLen);
|
||||||
|
@ -456,7 +459,7 @@ static void rpcCloseConn(void *thandle) {
|
||||||
SRpcConn *pConn = (SRpcConn *)thandle;
|
SRpcConn *pConn = (SRpcConn *)thandle;
|
||||||
SRpcInfo *pRpc = pConn->pRpc;
|
SRpcInfo *pRpc = pConn->pRpc;
|
||||||
|
|
||||||
pthread_mutex_lock(&pRpc->mutex);
|
rpcLockConn(pConn);
|
||||||
|
|
||||||
if (pConn->user[0]) {
|
if (pConn->user[0]) {
|
||||||
pConn->user[0] = 0;
|
pConn->user[0] = 0;
|
||||||
|
@ -485,7 +488,7 @@ static void rpcCloseConn(void *thandle) {
|
||||||
tTrace("%s %p, rpc connection is closed", pRpc->label, pConn);
|
tTrace("%s %p, rpc connection is closed", pRpc->label, pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&pRpc->mutex);
|
rpcUnlockConn(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
|
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
|
||||||
|
@ -699,14 +702,15 @@ static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
pConn = rpcGetConnObj(pRpc, sid, pHead->user, hashstr);
|
pConn = rpcGetConnObj(pRpc, sid, pHead->user, hashstr);
|
||||||
if (pConn == NULL) return NULL;
|
if (pConn == NULL) return NULL;
|
||||||
|
|
||||||
|
rpcLockConn(pConn);
|
||||||
sid = pConn->sid;
|
sid = pConn->sid;
|
||||||
|
|
||||||
pConn->chandle = pRecv->chandle;
|
pConn->chandle = pRecv->chandle;
|
||||||
if (pConn->peerIp != pRecv->ip) {
|
if (pConn->peerIp != pRecv->ip) {
|
||||||
pConn->peerIp = pRecv->ip;
|
pConn->peerIp = pRecv->ip;
|
||||||
char ipstr[20] = {0};
|
char ipstr[20] = {0};
|
||||||
tinet_ntoa(ipstr, pRecv->ip);
|
tinet_ntoa(ipstr, pRecv->ip);
|
||||||
strcpy(pConn->peerIpstr, ipstr);
|
strcpy(pConn->peerIpstr, ipstr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRecv->port) pConn->peerPort = pRecv->port;
|
if (pRecv->port) pConn->peerPort = pRecv->port;
|
||||||
|
@ -714,18 +718,20 @@ static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
if (pHead->uid) pConn->peerUid = pHead->uid;
|
if (pHead->uid) pConn->peerUid = pHead->uid;
|
||||||
|
|
||||||
terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
|
terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
|
||||||
if (terrno != 0) return pConn;
|
if (terrno == 0) {
|
||||||
|
if (pHead->msgType != TSDB_MSG_TYPE_REG && pHead->encrypt) {
|
||||||
|
// decrypt here
|
||||||
|
}
|
||||||
|
|
||||||
if (pHead->msgType != TSDB_MSG_TYPE_REG && pHead->encrypt) {
|
if ( rpcIsReq(pHead->msgType) ) {
|
||||||
// decrypt here
|
terrno = rpcProcessReqHead(pConn, pHead);
|
||||||
|
pConn->connType = pRecv->connType;
|
||||||
|
} else {
|
||||||
|
terrno = rpcProcessRspHead(pConn, pHead);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( rpcIsReq(pHead->msgType) ) {
|
rpcUnlockConn(pConn);
|
||||||
terrno = rpcProcessReqHead(pConn, pHead);
|
|
||||||
pConn->connType = pRecv->connType;
|
|
||||||
} else {
|
|
||||||
terrno = rpcProcessRspHead(pConn, pHead);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
@ -762,9 +768,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_lock(&pRpc->mutex);
|
|
||||||
pConn = rpcProcessHead(pRpc, pRecv);
|
pConn = rpcProcessHead(pRpc, pRecv);
|
||||||
pthread_mutex_unlock(&pRpc->mutex);
|
|
||||||
|
|
||||||
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
|
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
|
||||||
tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d",
|
tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d",
|
||||||
|
@ -895,7 +899,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_lock(&pRpc->mutex);
|
rpcLockConn(pConn);
|
||||||
|
|
||||||
// set the message header
|
// set the message header
|
||||||
pHead->version = 1;
|
pHead->version = 1;
|
||||||
|
@ -918,7 +922,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
|
||||||
pConn->reqMsgLen = msgLen;
|
pConn->reqMsgLen = msgLen;
|
||||||
pConn->pContext = pContext;
|
pConn->pContext = pContext;
|
||||||
|
|
||||||
pthread_mutex_unlock(&pRpc->mutex);
|
rpcUnlockConn(pConn);
|
||||||
|
|
||||||
rpcSendMsgToPeer(pConn, msg, msgLen);
|
rpcSendMsgToPeer(pConn, msg, msgLen);
|
||||||
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
|
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
|
||||||
|
@ -975,7 +979,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
|
||||||
SRpcInfo *pRpc = pConn->pRpc;
|
SRpcInfo *pRpc = pConn->pRpc;
|
||||||
int reportDisc = 0;
|
int reportDisc = 0;
|
||||||
|
|
||||||
pthread_mutex_lock(&pRpc->mutex);
|
rpcLockConn(pConn);
|
||||||
|
|
||||||
if (pConn->outType && pConn->user[0]) {
|
if (pConn->outType && pConn->user[0]) {
|
||||||
tTrace("%s %p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]);
|
tTrace("%s %p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]);
|
||||||
|
@ -997,7 +1001,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
|
||||||
tTrace("%s %p, retry timer not processed", pRpc->label, pConn);
|
tTrace("%s %p, retry timer not processed", pRpc->label, pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&pRpc->mutex);
|
rpcUnlockConn(pConn);
|
||||||
|
|
||||||
if (reportDisc && pConn->pContext) {
|
if (reportDisc && pConn->pContext) {
|
||||||
pConn->pContext->code = TSDB_CODE_NETWORK_UNAVAIL;
|
pConn->pContext->code = TSDB_CODE_NETWORK_UNAVAIL;
|
||||||
|
@ -1022,7 +1026,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
|
||||||
SRpcConn *pConn = (SRpcConn *)param;
|
SRpcConn *pConn = (SRpcConn *)param;
|
||||||
SRpcInfo *pRpc = pConn->pRpc;
|
SRpcInfo *pRpc = pConn->pRpc;
|
||||||
|
|
||||||
pthread_mutex_lock(&pRpc->mutex);
|
rpcLockConn(pConn);
|
||||||
|
|
||||||
if (pConn->inType && pConn->user[0]) {
|
if (pConn->inType && pConn->user[0]) {
|
||||||
tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn);
|
tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn);
|
||||||
|
@ -1032,7 +1036,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
|
||||||
tTrace("%s %p, progress timer:%p not processed", pRpc->label, pConn, tmrId);
|
tTrace("%s %p, progress timer:%p not processed", pRpc->label, pConn, tmrId);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&pRpc->mutex);
|
rpcUnlockConn(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rpcFreeOutMsg(void *msg) {
|
static void rpcFreeOutMsg(void *msg) {
|
||||||
|
@ -1195,4 +1199,20 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void rpcLockConn(SRpcConn *pConn) {
|
||||||
|
int64_t tid = taosGetPthreadId();
|
||||||
|
int i = 0;
|
||||||
|
while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) {
|
||||||
|
if (++i % 1000 == 0) {
|
||||||
|
sched_yield();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void rpcUnlockConn(SRpcConn *pConn) {
|
||||||
|
int64_t tid = taosGetPthreadId();
|
||||||
|
if (atomic_val_compare_exchange_64(&(pConn->lockedBy), tid, 0) != tid) {
|
||||||
|
assert(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue