diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 6dc6a82d09..f70bcb936a 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -43,7 +43,7 @@ int16_t tsNumOfTotalVnodes = TSDB_INVALID_VNODE_NUM; int32_t tsNumOfMnodes = 3; // common -int32_t tsRpcTimer = 300; +int32_t tsRpcTimer = 1000; int32_t tsRpcMaxTime = 600; // seconds; int32_t tsMaxShellConns = 5000; int32_t tsMaxConnections = 5000; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 3ab0dddb5c..ad1e971453 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -665,6 +665,12 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { return pConn; } + // if code is not 0, it means it is simple reqhead, just ignore + if (pHead->code != 0) { + terrno = TSDB_CODE_RPC_ALREADY_PROCESSED; + return NULL; + } + int sid = taosAllocateId(pRpc->idPool); if (sid <= 0) { tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions); diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index ff8d055fd4..def1fd9383 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -419,7 +419,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); return -1; } else { - // tDebug("malloc mem: %p", buffer); + tDebug("TCP malloc mem: %p", buffer); } msg = buffer + tsRpcOverhead; diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 91324da474..decd7a8307 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -212,7 +212,7 @@ static void *taosRecvUdpData(void *param) { tError("%s failed to allocate memory, size:%ld", pConn->label, dataLen); continue; } else { - // tTrace("malloc mem: %p", tmsg); + tDebug("UDP malloc mem: %p", tmsg); } tmsg += tsRpcOverhead; // overhead for SRpcReqContext diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 34396c9c29..b499f34060 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -68,10 +68,15 @@ void taosCloseQueue(taos_queue param) { if (param == NULL) return; STaosQueue *queue = (STaosQueue *)param; STaosQnode *pTemp; + STaosQset *qset; + + pthread_mutex_lock(&queue->mutex); STaosQnode *pNode = queue->head; queue->head = NULL; + qset = queue->qset; + pthread_mutex_unlock(&queue->mutex); - if (queue->qset) taosRemoveFromQset(queue->qset, queue); + if (queue->qset) taosRemoveFromQset(qset, queue); pthread_mutex_lock(&queue->mutex);