add more optim

This commit is contained in:
yihaoDeng 2022-01-29 18:12:11 +08:00
parent 6c6573cc4d
commit a466e82c7b
4 changed files with 63 additions and 32 deletions

View File

@ -213,6 +213,12 @@ typedef struct SConnBuffer {
typedef void (*AsyncCB)(uv_async_t* handle); typedef void (*AsyncCB)(uv_async_t* handle);
typedef struct {
void* pThrd;
queue qmsg;
pthread_mutex_t mtx; // protect qmsg;
} SAsyncItem;
typedef struct { typedef struct {
int index; int index;
int nAsync; int nAsync;
@ -221,7 +227,7 @@ typedef struct {
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb); SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb);
void transDestroyAsyncPool(SAsyncPool* pool); void transDestroyAsyncPool(SAsyncPool* pool);
int transSendAsync(SAsyncPool* pool); int transSendAsync(SAsyncPool* pool, queue* mq);
int transInitBuffer(SConnBuffer* buf); int transInitBuffer(SConnBuffer* buf);
int transClearBuffer(SConnBuffer* buf); int transClearBuffer(SConnBuffer* buf);

View File

@ -432,14 +432,15 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
} }
} }
static void clientAsyncCb(uv_async_t* handle) { static void clientAsyncCb(uv_async_t* handle) {
SCliThrdObj* pThrd = handle->data; SAsyncItem* item = handle->data;
SCliThrdObj* pThrd = item->pThrd;
SCliMsg* pMsg = NULL; SCliMsg* pMsg = NULL;
queue wq; queue wq;
// batch process to avoid to lock/unlock frequently // batch process to avoid to lock/unlock frequently
pthread_mutex_lock(&pThrd->msgMtx); pthread_mutex_lock(&item->mtx);
QUEUE_MOVE(&pThrd->msg, &wq); QUEUE_MOVE(&item->qmsg, &wq);
pthread_mutex_unlock(&pThrd->msgMtx); pthread_mutex_unlock(&item->mtx);
int count = 0; int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) { while (!QUEUE_IS_EMPTY(&wq)) {
@ -548,11 +549,11 @@ static void clientSendQuit(SCliThrdObj* thrd) {
SCliMsg* msg = calloc(1, sizeof(SCliMsg)); SCliMsg* msg = calloc(1, sizeof(SCliMsg));
msg->ctx = NULL; // msg->ctx = NULL; //
pthread_mutex_lock(&thrd->msgMtx); // pthread_mutex_lock(&thrd->msgMtx);
QUEUE_PUSH(&thrd->msg, &msg->q); // QUEUE_PUSH(&thrd->msg, &msg->q);
pthread_mutex_unlock(&thrd->msgMtx); // pthread_mutex_unlock(&thrd->msgMtx);
transSendAsync(thrd->asyncPool); transSendAsync(thrd->asyncPool, &msg->q);
// uv_async_send(thrd->cliAsync); // uv_async_send(thrd->cliAsync);
} }
void taosCloseClient(void* arg) { void taosCloseClient(void* arg) {
@ -598,14 +599,14 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads]; SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
pthread_mutex_lock(&thrd->msgMtx); // pthread_mutex_lock(&thrd->msgMtx);
QUEUE_PUSH(&thrd->msg, &cliMsg->q); // QUEUE_PUSH(&thrd->msg, &cliMsg->q);
pthread_mutex_unlock(&thrd->msgMtx); // pthread_mutex_unlock(&thrd->msgMtx);
int start = taosGetTimestampUs(); // int start = taosGetTimestampUs();
transSendAsync(thrd->asyncPool); transSendAsync(thrd->asyncPool, &(cliMsg->q));
// uv_async_send(thrd->cliAsync); // uv_async_send(thrd->cliAsync);
int end = taosGetTimestampUs() - start; // int end = taosGetTimestampUs() - start;
// tError("client sent to rpc, time cost: %d", (int)end); // tError("client sent to rpc, time cost: %d", (int)end);
} }
#endif #endif

View File

@ -247,7 +247,7 @@ int transDestroyBuffer(SConnBuffer* buf) {
} }
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) { SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) {
static int sz = 20; static int sz = 10;
SAsyncPool* pool = calloc(1, sizeof(SAsyncPool)); SAsyncPool* pool = calloc(1, sizeof(SAsyncPool));
pool->index = 0; pool->index = 0;
@ -257,24 +257,46 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) {
for (int i = 0; i < pool->nAsync; i++) { for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]); uv_async_t* async = &(pool->asyncs[i]);
uv_async_init(loop, async, cb); uv_async_init(loop, async, cb);
async->data = arg;
SAsyncItem* item = calloc(1, sizeof(SAsyncItem));
item->pThrd = arg;
QUEUE_INIT(&item->qmsg);
pthread_mutex_init(&item->mtx, NULL);
async->data = item;
} }
return pool; return pool;
} }
void transDestroyAsyncPool(SAsyncPool* pool) { void transDestroyAsyncPool(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) { for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]); uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
pthread_mutex_destroy(&item->mtx);
free(item);
} }
free(pool->asyncs); free(pool->asyncs);
free(pool); free(pool);
} }
int transSendAsync(SAsyncPool* pool) { int transSendAsync(SAsyncPool* pool, queue* q) {
int idx = pool->index; int idx = pool->index;
idx = idx % pool->nAsync; idx = idx % pool->nAsync;
// no need mutex here // no need mutex here
if (pool->index++ > pool->nAsync) { if (pool->index++ > pool->nAsync) {
pool->index = 0; pool->index = 0;
} }
return uv_async_send(&(pool->asyncs[idx])); uv_async_t* async = &(pool->asyncs[idx]);
SAsyncItem* item = async->data;
int64_t st = taosGetTimestampUs();
pthread_mutex_lock(&item->mtx);
QUEUE_PUSH(&item->qmsg, q);
pthread_mutex_unlock(&item->mtx);
int64_t el = taosGetTimestampUs() - st;
if (el > 50) {
// tInfo("lock and unlock cost: %d", (int)el);
}
return uv_async_send(async);
} }
#endif #endif

View File

@ -376,13 +376,15 @@ static void destroySmsg(SSrvMsg* smsg) {
free(smsg); free(smsg);
} }
void uvWorkerAsyncCb(uv_async_t* handle) { void uvWorkerAsyncCb(uv_async_t* handle) {
SWorkThrdObj* pThrd = handle->data; SAsyncItem* item = handle->data;
SWorkThrdObj* pThrd = item->pThrd;
SSrvConn* conn = NULL; SSrvConn* conn = NULL;
queue wq; queue wq;
// batch process to avoid to lock/unlock frequently // batch process to avoid to lock/unlock frequently
pthread_mutex_lock(&pThrd->msgMtx); pthread_mutex_lock(&item->mtx);
QUEUE_MOVE(&pThrd->msg, &wq); QUEUE_MOVE(&item->qmsg, &wq);
pthread_mutex_unlock(&pThrd->msgMtx); pthread_mutex_unlock(&item->mtx);
// pthread_mutex_unlock(&mtx);
while (!QUEUE_IS_EMPTY(&wq)) { while (!QUEUE_IS_EMPTY(&wq)) {
queue* head = QUEUE_HEAD(&wq); queue* head = QUEUE_HEAD(&wq);
@ -539,7 +541,7 @@ static bool addHandleToAcceptloop(void* arg) {
tError("failed to bind: %s", uv_err_name(err)); tError("failed to bind: %s", uv_err_name(err));
return false; return false;
} }
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) { if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
tError("failed to listen: %s", uv_err_name(err)); tError("failed to listen: %s", uv_err_name(err));
return false; return false;
} }
@ -671,12 +673,12 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
pthread_mutex_lock(&pThrd->msgMtx); // pthread_mutex_lock(&pThrd->msgMtx);
QUEUE_PUSH(&pThrd->msg, &srvMsg->q); // QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
pthread_mutex_unlock(&pThrd->msgMtx); // pthread_mutex_unlock(&pThrd->msgMtx);
tDebug("send quit msg to work thread"); tDebug("send quit msg to work thread");
transSendAsync(pThrd->asyncPool); transSendAsync(pThrd->asyncPool, &srvMsg->q);
// uv_async_send(pThrd->workerAsync); // uv_async_send(pThrd->workerAsync);
} }
@ -712,12 +714,12 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
srvMsg->pConn = pConn; srvMsg->pConn = pConn;
srvMsg->msg = *pMsg; srvMsg->msg = *pMsg;
pthread_mutex_lock(&pThrd->msgMtx); // pthread_mutex_lock(&pThrd->msgMtx);
QUEUE_PUSH(&pThrd->msg, &srvMsg->q); // QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
pthread_mutex_unlock(&pThrd->msgMtx); // pthread_mutex_unlock(&pThrd->msgMtx);
tDebug("conn %p start to send resp", pConn); tDebug("conn %p start to send resp", pConn);
transSendAsync(pThrd->asyncPool); transSendAsync(pThrd->asyncPool, &srvMsg->q);
// uv_async_send(pThrd->workerAsync); // uv_async_send(pThrd->workerAsync);
} }