commit
fbdb47fe6a
|
@ -213,6 +213,12 @@ typedef struct SConnBuffer {
|
|||
|
||||
typedef void (*AsyncCB)(uv_async_t* handle);
|
||||
|
||||
typedef struct {
|
||||
void* pThrd;
|
||||
queue qmsg;
|
||||
pthread_mutex_t mtx; // protect qmsg;
|
||||
} SAsyncItem;
|
||||
|
||||
typedef struct {
|
||||
int index;
|
||||
int nAsync;
|
||||
|
@ -221,7 +227,7 @@ typedef struct {
|
|||
|
||||
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb);
|
||||
void transDestroyAsyncPool(SAsyncPool* pool);
|
||||
int transSendAsync(SAsyncPool* pool);
|
||||
int transSendAsync(SAsyncPool* pool, queue* mq);
|
||||
|
||||
int transInitBuffer(SConnBuffer* buf);
|
||||
int transClearBuffer(SConnBuffer* buf);
|
||||
|
|
|
@ -432,14 +432,15 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
}
|
||||
}
|
||||
static void clientAsyncCb(uv_async_t* handle) {
|
||||
SCliThrdObj* pThrd = handle->data;
|
||||
SAsyncItem* item = handle->data;
|
||||
SCliThrdObj* pThrd = item->pThrd;
|
||||
SCliMsg* pMsg = NULL;
|
||||
queue wq;
|
||||
|
||||
// batch process to avoid to lock/unlock frequently
|
||||
pthread_mutex_lock(&pThrd->msgMtx);
|
||||
QUEUE_MOVE(&pThrd->msg, &wq);
|
||||
pthread_mutex_unlock(&pThrd->msgMtx);
|
||||
pthread_mutex_lock(&item->mtx);
|
||||
QUEUE_MOVE(&item->qmsg, &wq);
|
||||
pthread_mutex_unlock(&item->mtx);
|
||||
|
||||
int count = 0;
|
||||
while (!QUEUE_IS_EMPTY(&wq)) {
|
||||
|
@ -548,11 +549,11 @@ static void clientSendQuit(SCliThrdObj* thrd) {
|
|||
SCliMsg* msg = calloc(1, sizeof(SCliMsg));
|
||||
msg->ctx = NULL; //
|
||||
|
||||
pthread_mutex_lock(&thrd->msgMtx);
|
||||
QUEUE_PUSH(&thrd->msg, &msg->q);
|
||||
pthread_mutex_unlock(&thrd->msgMtx);
|
||||
// pthread_mutex_lock(&thrd->msgMtx);
|
||||
// QUEUE_PUSH(&thrd->msg, &msg->q);
|
||||
// pthread_mutex_unlock(&thrd->msgMtx);
|
||||
|
||||
transSendAsync(thrd->asyncPool);
|
||||
transSendAsync(thrd->asyncPool, &msg->q);
|
||||
// uv_async_send(thrd->cliAsync);
|
||||
}
|
||||
void taosCloseClient(void* arg) {
|
||||
|
@ -598,14 +599,14 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
|
|||
|
||||
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
|
||||
|
||||
pthread_mutex_lock(&thrd->msgMtx);
|
||||
QUEUE_PUSH(&thrd->msg, &cliMsg->q);
|
||||
pthread_mutex_unlock(&thrd->msgMtx);
|
||||
// pthread_mutex_lock(&thrd->msgMtx);
|
||||
// QUEUE_PUSH(&thrd->msg, &cliMsg->q);
|
||||
// pthread_mutex_unlock(&thrd->msgMtx);
|
||||
|
||||
int start = taosGetTimestampUs();
|
||||
transSendAsync(thrd->asyncPool);
|
||||
// int start = taosGetTimestampUs();
|
||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||
// uv_async_send(thrd->cliAsync);
|
||||
int end = taosGetTimestampUs() - start;
|
||||
// int end = taosGetTimestampUs() - start;
|
||||
// tError("client sent to rpc, time cost: %d", (int)end);
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -247,7 +247,7 @@ int transDestroyBuffer(SConnBuffer* buf) {
|
|||
}
|
||||
|
||||
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) {
|
||||
static int sz = 20;
|
||||
static int sz = 10;
|
||||
|
||||
SAsyncPool* pool = calloc(1, sizeof(SAsyncPool));
|
||||
pool->index = 0;
|
||||
|
@ -257,24 +257,46 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) {
|
|||
for (int i = 0; i < pool->nAsync; i++) {
|
||||
uv_async_t* async = &(pool->asyncs[i]);
|
||||
uv_async_init(loop, async, cb);
|
||||
async->data = arg;
|
||||
|
||||
SAsyncItem* item = calloc(1, sizeof(SAsyncItem));
|
||||
item->pThrd = arg;
|
||||
QUEUE_INIT(&item->qmsg);
|
||||
pthread_mutex_init(&item->mtx, NULL);
|
||||
|
||||
async->data = item;
|
||||
}
|
||||
return pool;
|
||||
}
|
||||
void transDestroyAsyncPool(SAsyncPool* pool) {
|
||||
for (int i = 0; i < pool->nAsync; i++) {
|
||||
uv_async_t* async = &(pool->asyncs[i]);
|
||||
|
||||
SAsyncItem* item = async->data;
|
||||
pthread_mutex_destroy(&item->mtx);
|
||||
free(item);
|
||||
}
|
||||
free(pool->asyncs);
|
||||
free(pool);
|
||||
}
|
||||
int transSendAsync(SAsyncPool* pool) {
|
||||
int transSendAsync(SAsyncPool* pool, queue* q) {
|
||||
int idx = pool->index;
|
||||
idx = idx % pool->nAsync;
|
||||
// no need mutex here
|
||||
if (pool->index++ > pool->nAsync) {
|
||||
pool->index = 0;
|
||||
}
|
||||
return uv_async_send(&(pool->asyncs[idx]));
|
||||
uv_async_t* async = &(pool->asyncs[idx]);
|
||||
SAsyncItem* item = async->data;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
pthread_mutex_lock(&item->mtx);
|
||||
QUEUE_PUSH(&item->qmsg, q);
|
||||
pthread_mutex_unlock(&item->mtx);
|
||||
int64_t el = taosGetTimestampUs() - st;
|
||||
if (el > 50) {
|
||||
// tInfo("lock and unlock cost: %d", (int)el);
|
||||
}
|
||||
|
||||
return uv_async_send(async);
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -376,13 +376,15 @@ static void destroySmsg(SSrvMsg* smsg) {
|
|||
free(smsg);
|
||||
}
|
||||
void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||
SWorkThrdObj* pThrd = handle->data;
|
||||
SAsyncItem* item = handle->data;
|
||||
SWorkThrdObj* pThrd = item->pThrd;
|
||||
SSrvConn* conn = NULL;
|
||||
queue wq;
|
||||
// batch process to avoid to lock/unlock frequently
|
||||
pthread_mutex_lock(&pThrd->msgMtx);
|
||||
QUEUE_MOVE(&pThrd->msg, &wq);
|
||||
pthread_mutex_unlock(&pThrd->msgMtx);
|
||||
pthread_mutex_lock(&item->mtx);
|
||||
QUEUE_MOVE(&item->qmsg, &wq);
|
||||
pthread_mutex_unlock(&item->mtx);
|
||||
// pthread_mutex_unlock(&mtx);
|
||||
|
||||
while (!QUEUE_IS_EMPTY(&wq)) {
|
||||
queue* head = QUEUE_HEAD(&wq);
|
||||
|
@ -539,7 +541,7 @@ static bool addHandleToAcceptloop(void* arg) {
|
|||
tError("failed to bind: %s", uv_err_name(err));
|
||||
return false;
|
||||
}
|
||||
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) {
|
||||
if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
|
||||
tError("failed to listen: %s", uv_err_name(err));
|
||||
return false;
|
||||
}
|
||||
|
@ -671,12 +673,12 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
|||
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
||||
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
|
||||
|
||||
pthread_mutex_lock(&pThrd->msgMtx);
|
||||
QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
|
||||
pthread_mutex_unlock(&pThrd->msgMtx);
|
||||
// pthread_mutex_lock(&pThrd->msgMtx);
|
||||
// QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
|
||||
// pthread_mutex_unlock(&pThrd->msgMtx);
|
||||
tDebug("send quit msg to work thread");
|
||||
|
||||
transSendAsync(pThrd->asyncPool);
|
||||
transSendAsync(pThrd->asyncPool, &srvMsg->q);
|
||||
// uv_async_send(pThrd->workerAsync);
|
||||
}
|
||||
|
||||
|
@ -712,12 +714,12 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
|
|||
srvMsg->pConn = pConn;
|
||||
srvMsg->msg = *pMsg;
|
||||
|
||||
pthread_mutex_lock(&pThrd->msgMtx);
|
||||
QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
|
||||
pthread_mutex_unlock(&pThrd->msgMtx);
|
||||
// pthread_mutex_lock(&pThrd->msgMtx);
|
||||
// QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
|
||||
// pthread_mutex_unlock(&pThrd->msgMtx);
|
||||
|
||||
tDebug("conn %p start to send resp", pConn);
|
||||
transSendAsync(pThrd->asyncPool);
|
||||
transSendAsync(pThrd->asyncPool, &srvMsg->q);
|
||||
// uv_async_send(pThrd->workerAsync);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue