optimize the tqueue code
for rpc, set the link as secured for success response before sending response
This commit is contained in:
parent
19836961ce
commit
8882fed667
|
@ -1083,6 +1083,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
|
||||||
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn,
|
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn,
|
||||||
pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
|
pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
|
||||||
} else {
|
} else {
|
||||||
|
if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured
|
||||||
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
|
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
|
||||||
tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d",
|
tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d",
|
||||||
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort,
|
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort,
|
||||||
|
|
|
@ -94,8 +94,6 @@ void *taosAllocateQitem(int size) {
|
||||||
void taosFreeQitem(void *param) {
|
void taosFreeQitem(void *param) {
|
||||||
if (param == NULL) return;
|
if (param == NULL) return;
|
||||||
|
|
||||||
uTrace("item:%p is freed", param);
|
|
||||||
|
|
||||||
char *temp = (char *)param;
|
char *temp = (char *)param;
|
||||||
temp -= sizeof(STaosQnode);
|
temp -= sizeof(STaosQnode);
|
||||||
free(temp);
|
free(temp);
|
||||||
|
@ -144,7 +142,7 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) {
|
||||||
queue->numOfItems--;
|
queue->numOfItems--;
|
||||||
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
|
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
|
||||||
code = 1;
|
code = 1;
|
||||||
//uTrace("item:%p is read out from queue, items:%d", *pitem, queue->numOfItems);
|
uTrace("item:%p is read out from queue, items:%d", *pitem, queue->numOfItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&queue->mutex);
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
|
@ -309,13 +307,12 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
|
||||||
pthread_mutex_lock(&qset->mutex);
|
pthread_mutex_lock(&qset->mutex);
|
||||||
|
|
||||||
for(int i=0; i<qset->numOfQueues; ++i) {
|
for(int i=0; i<qset->numOfQueues; ++i) {
|
||||||
//pthread_mutex_lock(&qset->mutex);
|
|
||||||
if (qset->current == NULL)
|
if (qset->current == NULL)
|
||||||
qset->current = qset->head;
|
qset->current = qset->head;
|
||||||
STaosQueue *queue = qset->current;
|
STaosQueue *queue = qset->current;
|
||||||
if (queue) qset->current = queue->next;
|
if (queue) qset->current = queue->next;
|
||||||
//pthread_mutex_unlock(&qset->mutex);
|
|
||||||
if (queue == NULL) break;
|
if (queue == NULL) break;
|
||||||
|
if (queue->head == NULL) continue;
|
||||||
|
|
||||||
pthread_mutex_lock(&queue->mutex);
|
pthread_mutex_lock(&queue->mutex);
|
||||||
|
|
||||||
|
@ -351,13 +348,12 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
|
||||||
pthread_mutex_lock(&qset->mutex);
|
pthread_mutex_lock(&qset->mutex);
|
||||||
|
|
||||||
for(int i=0; i<qset->numOfQueues; ++i) {
|
for(int i=0; i<qset->numOfQueues; ++i) {
|
||||||
// pthread_mutex_lock(&qset->mutex);
|
|
||||||
if (qset->current == NULL)
|
if (qset->current == NULL)
|
||||||
qset->current = qset->head;
|
qset->current = qset->head;
|
||||||
queue = qset->current;
|
queue = qset->current;
|
||||||
if (queue) qset->current = queue->next;
|
if (queue) qset->current = queue->next;
|
||||||
// pthread_mutex_unlock(&qset->mutex);
|
|
||||||
if (queue == NULL) break;
|
if (queue == NULL) break;
|
||||||
|
if (queue->head == NULL) continue;
|
||||||
|
|
||||||
pthread_mutex_lock(&queue->mutex);
|
pthread_mutex_lock(&queue->mutex);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue