From 183e444e959e7e584cf904a68a93c259504f8b4d Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sun, 15 Mar 2020 13:44:07 +0800 Subject: [PATCH 1/3] change tqueue --- src/dnode/inc/dnodeWrite.h | 2 +- src/dnode/src/dnodeMgmt.c | 8 ++--- src/dnode/src/dnodeRead.c | 47 ++++++++++++++--------------- src/dnode/src/dnodeWrite.c | 60 +++++++++++++++++++------------------- src/util/inc/tlog.h | 2 +- src/util/inc/tqueue.h | 16 +++++----- src/util/src/tqueue.c | 55 ++++++++++++++++++++-------------- src/util/src/tsocket.c | 2 +- 8 files changed, 103 insertions(+), 89 deletions(-) diff --git a/src/dnode/inc/dnodeWrite.h b/src/dnode/inc/dnodeWrite.h index 2b1edf9e40..f5904555fb 100644 --- a/src/dnode/inc/dnodeWrite.h +++ b/src/dnode/inc/dnodeWrite.h @@ -23,7 +23,7 @@ extern "C" { int32_t dnodeInitWrite(); void dnodeCleanupWrite(); void dnodeWrite(void *pMsg); -void * dnodeAllocateWriteWorker(); +void * dnodeAllocateWriteWorker(void *pVnode); void dnodeFreeWriteWorker(void *worker); #ifdef __cplusplus diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index e8fe94f358..a4cb752413 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -152,8 +152,8 @@ static int32_t dnodeOpenVnode(int32_t vgId) { vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.refCount = 1; vnodeObj.version = 0; - vnodeObj.wworker = dnodeAllocateWriteWorker(); - vnodeObj.rworker = dnodeAllocateReadWorker(); + vnodeObj.wworker = dnodeAllocateWriteWorker(&vnodeObj); + vnodeObj.rworker = dnodeAllocateReadWorker(&vnodeObj); vnodeObj.wal = NULL; vnodeObj.tsdb = pTsdb; vnodeObj.replica = NULL; @@ -217,8 +217,8 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.refCount = 1; vnodeObj.version = 0; - vnodeObj.wworker = dnodeAllocateWriteWorker(); - vnodeObj.rworker = dnodeAllocateReadWorker(); + vnodeObj.wworker = dnodeAllocateWriteWorker(&vnodeObj); + vnodeObj.rworker = dnodeAllocateReadWorker(&vnodeObj); vnodeObj.wal = NULL; vnodeObj.tsdb = pTsdb; vnodeObj.replica = NULL; diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index dacc93ffc2..1bbc65ef5c 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -33,16 +33,15 @@ typedef struct { void *pCont; int32_t contLen; SRpcMsg rpcMsg; - void *pVnode; SRpcContext *pRpcContext; // RPC message context } SReadMsg; static void *dnodeProcessReadQueue(void *param); -static void dnodeProcessReadResult(SReadMsg *pRead); +static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead); static void dnodeHandleIdleReadWorker(); -static void dnodeProcessQueryMsg(SReadMsg *pMsg); -static void dnodeProcessRetrieveMsg(SReadMsg *pMsg); -static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SReadMsg *pNode); +static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg); +static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg); +static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(void *pVnode, SReadMsg *pNode); // module global variable static taos_qset readQset; @@ -93,15 +92,14 @@ void dnodeRead(void *rpcMsg) { } // put message into queue - SReadMsg readMsg; - readMsg.rpcMsg = *pMsg; - readMsg.pCont = pCont; - readMsg.contLen = contLen; - readMsg.pRpcContext = pRpcContext; - readMsg.pVnode = pVnode; + SReadMsg *pReadMsg = taosAllocateQitem(sizeof(SReadMsg)); + pReadMsg->rpcMsg = *pMsg; + pReadMsg->pCont = pCont; + pReadMsg->contLen = contLen; + pReadMsg->pRpcContext = pRpcContext; taos_queue queue = dnodeGetVnodeRworker(pVnode); - taosWriteQitem(queue, &readMsg); + taosWriteQitem(queue, 0, pReadMsg); // next vnode leftLen -= contLen; @@ -111,11 +109,11 @@ void dnodeRead(void *rpcMsg) { } } -void *dnodeAllocateReadWorker() { +void *dnodeAllocateReadWorker(void *pVnode) { taos_queue *queue = taosOpenQueue(sizeof(SReadMsg)); if (queue == NULL) return NULL; - taosAddIntoQset(readQset, queue); + taosAddIntoQset(readQset, queue, pVnode); // spawn a thread to process queue if (threads < maxThreads) { @@ -140,22 +138,25 @@ void dnodeFreeReadWorker(void *rqueue) { static void *dnodeProcessReadQueue(void *param) { taos_qset qset = (taos_qset)param; - SReadMsg readMsg; + SReadMsg *pReadMsg; + int type; + void *pVnode; while (1) { - if (taosReadQitemFromQset(qset, &readMsg) <= 0) { + if (taosReadQitemFromQset(qset, &type, &pReadMsg, &pVnode) == 0) { dnodeHandleIdleReadWorker(); continue; } terrno = 0; - if (dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) { - (*dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) (&readMsg); + if (dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) { + (*dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) (pVnode, pReadMsg); } else { terrno = TSDB_CODE_MSG_NOT_PROCESSED; } - dnodeProcessReadResult(&readMsg); + dnodeProcessReadResult(pVnode, pReadMsg); + taosFreeQitem(pReadMsg); } return NULL; @@ -173,11 +174,11 @@ static void dnodeHandleIdleReadWorker() { } } -static void dnodeProcessReadResult(SReadMsg *pRead) { +static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) { SRpcContext *pRpcContext = pRead->pRpcContext; int32_t code = 0; - dnodeReleaseVnode(pRead->pVnode); + dnodeReleaseVnode(pVnode); if (pRpcContext) { if (terrno) { @@ -204,10 +205,10 @@ static void dnodeProcessReadResult(SReadMsg *pRead) { rpcFreeCont(pRead->rpcMsg.pCont); // free the received message } -static void dnodeProcessQueryMsg(SReadMsg *pMsg) { +static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) { } -static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { +static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { } diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 9d38a5c207..9e4acd6e1a 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -33,7 +33,6 @@ typedef struct _write { void *pCont; int32_t contLen; SRpcMsg rpcMsg; - void *pVnode; // pointer to vnode SRpcContext *pRpcContext; // RPC message context } SWriteMsg; @@ -49,20 +48,20 @@ typedef struct _thread_obj { SWriteWorker *writeWorker; } SWriteWorkerPool; -static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SWriteMsg *); +static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(void *, SWriteMsg *); static void *dnodeProcessWriteQueue(void *param); static void dnodeHandleIdleWorker(SWriteWorker *pWorker); -static void dnodeProcessWriteResult(SWriteMsg *pWrite); -static void dnodeProcessSubmitMsg(SWriteMsg *pMsg); -static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg); -static void dnodeProcessDropTableMsg(SWriteMsg *pMsg); +static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite); +static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg); +static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg); +static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg); SWriteWorkerPool wWorkerPool; int32_t dnodeInitWrite() { - dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg; + dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg; dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeProcessCreateTableMsg; - dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeProcessDropTableMsg; + dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeProcessDropTableMsg; wWorkerPool.max = tsNumOfCores; wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max); @@ -107,15 +106,14 @@ void dnodeWrite(void *rpcMsg) { } // put message into queue - SWriteMsg writeMsg; - writeMsg.rpcMsg = *pMsg; - writeMsg.pCont = pCont; - writeMsg.contLen = contLen; - writeMsg.pRpcContext = pRpcContext; - writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later + SWriteMsg *pWriteMsg = taosAllocateQitem(sizeof(SWriteMsg)); + pWriteMsg->rpcMsg = *pMsg; + pWriteMsg->pCont = pCont; + pWriteMsg->contLen = contLen; + pWriteMsg->pRpcContext = pRpcContext; taos_queue queue = dnodeGetVnodeWworker(pVnode); - taosWriteQitem(queue, &writeMsg); + taosWriteQitem(queue, 0, pWriteMsg); // next vnode leftLen -= contLen; @@ -123,7 +121,7 @@ void dnodeWrite(void *rpcMsg) { } } -void *dnodeAllocateWriteWorker() { +void *dnodeAllocateWriteWorker(void *pVnode) { SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; if (pWorker->qset == NULL) { @@ -140,9 +138,9 @@ void *dnodeAllocateWriteWorker() { } } - taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg)); + taos_queue *queue = taosOpenQueue(); if (queue) { - taosAddIntoQset(pWorker->qset, queue); + taosAddIntoQset(pWorker->qset, queue, pVnode); wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; } @@ -158,11 +156,13 @@ void dnodeFreeWriteWorker(void *wqueue) { static void *dnodeProcessWriteQueue(void *param) { SWriteWorker *pWorker = (SWriteWorker *)param; taos_qall qall; - SWriteMsg writeMsg; + SWriteMsg *pWriteMsg; int32_t numOfMsgs; + int type; + void *pVnode; while (1) { - numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall); + numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall, &pVnode); if (numOfMsgs <=0) { dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore continue; @@ -170,7 +170,7 @@ static void *dnodeProcessWriteQueue(void *param) { for (int32_t i=0; iwhandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen); } @@ -181,16 +181,16 @@ static void *dnodeProcessWriteQueue(void *param) { // browse all items, and process them one by one taosResetQitems(qall); for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, &writeMsg); + taosGetQitem(qall, &type, &pWriteMsg); terrno = 0; - if (dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) { - (*dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) (&writeMsg); + if (dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) { + (*dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) (pVnode, pWriteMsg); } else { terrno = TSDB_CODE_MSG_NOT_PROCESSED; } - dnodeProcessWriteResult(&writeMsg); + dnodeProcessWriteResult(pVnode, pWriteMsg); } // free the Qitems; @@ -200,11 +200,11 @@ static void *dnodeProcessWriteQueue(void *param) { return NULL; } -static void dnodeProcessWriteResult(SWriteMsg *pWrite) { +static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite) { SRpcContext *pRpcContext = pWrite->pRpcContext; int32_t code = 0; - dnodeReleaseVnode(pWrite->pVnode); + dnodeReleaseVnode(pVnode); if (pRpcContext) { if (terrno) { @@ -244,14 +244,14 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { } } -static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { +static void dnodeProcessSubmitMsg(void *param, SWriteMsg *pMsg) { } -static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { +static void dnodeProcessCreateTableMsg(void *param, SWriteMsg *pMsg) { } -static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) { +static void dnodeProcessDropTableMsg(void *param, SWriteMsg *pMsg) { } diff --git a/src/util/inc/tlog.h b/src/util/inc/tlog.h index 350a2c700e..e1c5f8a06a 100644 --- a/src/util/inc/tlog.h +++ b/src/util/inc/tlog.h @@ -78,7 +78,7 @@ void taosResetLogFile(); // utility log function #define pError(...) \ if (uDebugFlag & DEBUG_ERROR) { \ - tprintf("ERROR UTL ", 255, __VA_ARGS__); \ + tprintf("ERROR UTL ", uDebugFlag, __VA_ARGS__); \ } #define pWarn(...) \ if (uDebugFlag & DEBUG_WARN) { \ diff --git a/src/util/inc/tqueue.h b/src/util/inc/tqueue.h index 97408110d4..4920d08e29 100644 --- a/src/util/inc/tqueue.h +++ b/src/util/inc/tqueue.h @@ -24,24 +24,26 @@ typedef void* taos_queue; typedef void* taos_qset; typedef void* taos_qall; -taos_queue taosOpenQueue(int itemSize); +taos_queue taosOpenQueue(); void taosCloseQueue(taos_queue); -int taosWriteQitem(taos_queue, void *item); -int taosReadQitem(taos_queue, void *item); +void *taosAllocateQitem(int size); +void taosFreeQitem(void *item); +int taosWriteQitem(taos_queue, int type, void *item); +int taosReadQitem(taos_queue, int *type, void **pitem); int taosReadAllQitems(taos_queue, taos_qall *); -int taosGetQitem(taos_qall, void *item); +int taosGetQitem(taos_qall, int *type, void **pitem); void taosResetQitems(taos_qall); void taosFreeQitems(taos_qall); taos_qset taosOpenQset(); void taosCloseQset(); -int taosAddIntoQset(taos_qset, taos_queue); +int taosAddIntoQset(taos_qset, taos_queue, void *ahandle); void taosRemoveFromQset(taos_qset, taos_queue); int taosGetQueueNumber(taos_qset); -int taosReadQitemFromQset(taos_qset, void *item); -int taosReadAllQitemsFromQset(taos_qset, taos_qall *); +int taosReadQitemFromQset(taos_qset, int *type, void **pitem, void **handle); +int taosReadAllQitemsFromQset(taos_qset, taos_qall *, void **handle); int taosGetQueueItemsNumber(taos_queue param); int taosGetQsetItemsNumber(taos_qset param); diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index cb218126a8..ed69d7e7c4 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -19,6 +19,7 @@ #include "tqueue.h" typedef struct _taos_qnode { + int type; struct _taos_qnode *next; char item[]; } STaosQnode; @@ -30,6 +31,7 @@ typedef struct _taos_q { struct _taos_qnode *tail; struct _taos_q *next; // for queue set struct _taos_qset *qset; // for queue set + void *ahandle; // for queue set pthread_mutex_t mutex; } STaosQueue; @@ -48,7 +50,7 @@ typedef struct _taos_qall { int32_t numOfItems; } STaosQall; -taos_queue taosOpenQueue(int itemSize) { +taos_queue taosOpenQueue() { STaosQueue *queue = (STaosQueue *) calloc(sizeof(STaosQueue), 1); if (queue == NULL) { @@ -57,8 +59,6 @@ taos_queue taosOpenQueue(int itemSize) { } pthread_mutex_init(&queue->mutex, NULL); - queue->itemSize = (int32_t)itemSize; - return queue; } @@ -83,16 +83,24 @@ void taosCloseQueue(taos_queue param) { free(queue); } -int taosWriteQitem(taos_queue param, void *item) { +void *taosAllocateQitem(int size) { + STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); + if (pNode == NULL) return NULL; + return (void *)pNode->item; +} + +void taosFreeQitem(void *param) { + if (param == NULL) return; + + char *temp = (char *)param; + temp -= sizeof(STaosQnode); + free(temp); +} + +int taosWriteQitem(taos_queue param, int type, void *item) { STaosQueue *queue = (STaosQueue *)param; - - STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + queue->itemSize, 1); - if ( pNode == NULL ) { - terrno = TSDB_CODE_NO_RESOURCE; - return -1; - } - - memcpy(pNode->item, item, queue->itemSize); + STaosQnode *pNode = (STaosQnode *)((char *)item - sizeof(STaosQnode)); + pNode->type = type; pthread_mutex_lock(&queue->mutex); @@ -112,7 +120,7 @@ int taosWriteQitem(taos_queue param, void *item) { return 0; } -int taosReadQitem(taos_queue param, void *item) { +int taosReadQitem(taos_queue param, int *type, void **pitem) { STaosQueue *queue = (STaosQueue *)param; STaosQnode *pNode = NULL; int code = 0; @@ -121,11 +129,11 @@ int taosReadQitem(taos_queue param, void *item) { if (queue->head) { pNode = queue->head; - memcpy(item, pNode->item, queue->itemSize); + *pitem = pNode->item; + *type = pNode->type; queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; - free(pNode); queue->numOfItems--; if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); code = 1; @@ -168,7 +176,7 @@ int taosReadAllQitems(taos_queue param, taos_qall *res) { return code; } -int taosGetQitem(taos_qall param, void *item) { +int taosGetQitem(taos_qall param, int *type, void **pitem) { STaosQall *qall = (STaosQall *)param; STaosQnode *pNode; int num = 0; @@ -178,7 +186,7 @@ int taosGetQitem(taos_qall param, void *item) { qall->current = pNode->next; if (pNode) { - memcpy(item, pNode->item, qall->itemSize); + *pitem = pNode->item; num = 1; } @@ -221,7 +229,7 @@ void taosCloseQset(taos_qset param) { free(qset); } -int taosAddIntoQset(taos_qset p1, taos_queue p2) { +int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { STaosQueue *queue = (STaosQueue *)p2; STaosQset *qset = (STaosQset *)p1; @@ -230,6 +238,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2) { pthread_mutex_lock(&qset->mutex); queue->next = qset->head; + queue->ahandle = ahandle; qset->head = queue; qset->numOfQueues++; @@ -283,7 +292,7 @@ int taosGetQueueNumber(taos_qset param) { return ((STaosQset *)param)->numOfQueues; } -int taosReadQitemFromQset(taos_qset param, void *item) { +int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phandle) { STaosQset *qset = (STaosQset *)param; STaosQnode *pNode = NULL; int code = 0; @@ -301,11 +310,12 @@ int taosReadQitemFromQset(taos_qset param, void *item) { if (queue->head) { pNode = queue->head; - memcpy(item, pNode->item, queue->itemSize); + *pitem = pNode->item; + *type = pNode->type; + *phandle = queue->ahandle; queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; - free(pNode); queue->numOfItems--; atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1; @@ -318,7 +328,7 @@ int taosReadQitemFromQset(taos_qset param, void *item) { return code; } -int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) { +int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res, void **phandle) { STaosQset *qset = (STaosQset *)param; STaosQueue *queue; STaosQall *qall = NULL; @@ -346,6 +356,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) { qall->numOfItems = queue->numOfItems; qall->itemSize = queue->itemSize; code = qall->numOfItems; + *phandle = queue->ahandle; queue->head = NULL; queue->tail = NULL; diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index 7ab004646e..30c3ada637 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -330,7 +330,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { struct sockaddr_in serverAddr, clientAddr; int ret; - pTrace("open tcp client socket:%s:%d", destIp, destPort); + pTrace("open tcp client socket:%s:%d, local Ip:%s", destIp, destPort, clientIp); sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); From c427033a132f2e0eb5aa6e3891c371ca43a7f6fe Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 17 Mar 2020 22:43:24 +0800 Subject: [PATCH 2/3] minor bugs --- src/rpc/src/rpcMain.c | 2 +- src/util/src/tqueue.c | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 505fba94ce..7ce3b287fb 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -287,7 +287,7 @@ void rpcClose(void *param) { (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle); for (int i = 0; i < pRpc->sessions; ++i) { - if (pRpc->connList[i].user[0]) { + if (pRpc->connList && pRpc->connList[i].user[0]) { rpcCloseConn((void *)(pRpc->connList + i)); } } diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index ed69d7e7c4..f1db24b8ce 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -99,7 +99,7 @@ void taosFreeQitem(void *param) { int taosWriteQitem(taos_queue param, int type, void *item) { STaosQueue *queue = (STaosQueue *)param; - STaosQnode *pNode = (STaosQnode *)((char *)item - sizeof(STaosQnode)); + STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode)); pNode->type = type; pthread_mutex_lock(&queue->mutex); @@ -187,6 +187,7 @@ int taosGetQitem(taos_qall param, int *type, void **pitem) { if (pNode) { *pitem = pNode->item; + *type = pNode->type; num = 1; } From eabc5511d13d9135d5d09f324999c97b82a2a131 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sun, 22 Mar 2020 15:53:21 +0800 Subject: [PATCH 3/3] change the code to accomodate the changes by tqueue --- src/dnode/src/dnodeRead.c | 19 ++++---- src/dnode/src/dnodeWrite.c | 28 ++++++------ src/rpc/test/rserver.c | 32 +++++++++----- src/util/inc/tqueue.h | 11 +++-- src/util/src/tqueue.c | 90 +++++++++++++++++--------------------- src/util/src/tsocket.c | 4 +- 6 files changed, 95 insertions(+), 89 deletions(-) diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 11cb845798..a870a9e159 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -90,7 +90,7 @@ void dnodeRead(SRpcMsg *pMsg) { while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *) pCont; - pHead->vgId = 1;//htonl(pHead->vgId); + pHead->vgId = 1; //htonl(pHead->vgId); pHead->contLen = pMsg->contLen; //htonl(pHead->contLen); void *pVnode = dnodeGetVnode(pHead->vgId); @@ -101,22 +101,19 @@ void dnodeRead(SRpcMsg *pMsg) { } // put message into queue - SReadMsg readMsg; - readMsg.rpcMsg = *pMsg; - readMsg.pCont = pCont; - readMsg.contLen = pHead->contLen; - readMsg.pRpcContext = pRpcContext; - readMsg.pVnode = pVnode; + SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); + pRead->rpcMsg = *pMsg; + pRead->pCont = pCont; + pRead->contLen = pHead->contLen; + pRead->pRpcContext = pRpcContext; taos_queue queue = dnodeGetVnodeRworker(pVnode); - taosWriteQitem(queue, 0, pReadMsg); + taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); // next vnode leftLen -= pHead->contLen; pCont -= pHead->contLen; queuedMsgNum++; - - dnodeReleaseVnode(pVnode); } if (queuedMsgNum == 0) { @@ -179,6 +176,8 @@ static void *dnodeProcessReadQueue(void *param) { dnodeProcessReadResult(pVnode, pReadMsg); taosFreeQitem(pReadMsg); + + dnodeReleaseVnode(pVnode); } return NULL; diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 8e5bcc5844..5dd075e4df 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -115,15 +115,14 @@ void dnodeWrite(SRpcMsg *pMsg) { } // put message into queue - SWriteMsg writeMsg; - writeMsg.rpcMsg = *pMsg; - writeMsg.pCont = pCont; - writeMsg.contLen = pHead->contLen; - writeMsg.pRpcContext = pRpcContext; - writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later + SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); + pWrite->rpcMsg = *pMsg; + pWrite->pCont = pCont; + pWrite->contLen = pHead->contLen; + pWrite->pRpcContext = pRpcContext; taos_queue queue = dnodeGetVnodeWworker(pVnode); - taosWriteQitem(queue, &writeMsg); + taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite); // next vnode leftLen -= pHead->contLen; @@ -145,14 +144,14 @@ void dnodeWrite(SRpcMsg *pMsg) { void *dnodeAllocateWriteWorker(void *pVnode) { SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; - taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg)); + taos_queue *queue = taosOpenQueue(); if (queue == NULL) return NULL; if (pWorker->qset == NULL) { pWorker->qset = taosOpenQset(); if (pWorker->qset == NULL) return NULL; - taosAddIntoQset(pWorker->qset, queue); + taosAddIntoQset(pWorker->qset, queue, pVnode); wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; pthread_attr_t thAttr; @@ -164,7 +163,7 @@ void *dnodeAllocateWriteWorker(void *pVnode) { taosCloseQset(pWorker->qset); } } else { - taosAddIntoQset(pWorker->qset, queue); + taosAddIntoQset(pWorker->qset, queue, pVnode); wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; } @@ -185,8 +184,10 @@ static void *dnodeProcessWriteQueue(void *param) { int type; void *pVnode; + qall = taosAllocateQall(); + while (1) { - numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall, &pVnode); + numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, qall, &pVnode); if (numOfMsgs <=0) { dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore continue; @@ -215,12 +216,13 @@ static void *dnodeProcessWriteQueue(void *param) { } dnodeProcessWriteResult(pVnode, pWriteMsg); + taosFreeQitem(pWriteMsg); } - // free the Qitems; - taosFreeQitems(qall); } + taosFreeQall(qall); + return NULL; } diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index d9e5da51a6..6c5b320809 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -28,10 +28,13 @@ void *qhandle = NULL; void processShellMsg() { static int num = 0; taos_qall qall; - SRpcMsg rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; + int type; + + qall = taosAllocateQall(); while (1) { - int numOfMsgs = taosReadAllQitems(qhandle, &qall); + int numOfMsgs = taosReadAllQitems(qhandle, qall); if (numOfMsgs <= 0) { usleep(1000); continue; @@ -40,10 +43,10 @@ void processShellMsg() { tTrace("%d shell msgs are received", numOfMsgs); for (int i=0; i=0) { - if ( write(dataFd, rpcMsg.pCont, rpcMsg.contLen) <0 ) { + if ( write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) <0 ) { tPrint("failed to write data file, reason:%s", strerror(errno)); } } @@ -62,19 +65,22 @@ void processShellMsg() { taosResetQitems(qall); for (int i=0; ipCont); + rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.contLen = msgSize; - rpcMsg.handle = rpcMsg.handle; + rpcMsg.handle = pRpcMsg->handle; rpcMsg.code = 1; rpcSendResponse(&rpcMsg); + + taosFreeQitem(pRpcMsg); } - taosFreeQitems(qall); } + taosFreeQall(qall); /* SRpcIpSet ipSet; ipSet.numOfIps = 1; @@ -108,8 +114,13 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char } void processRequestMsg(SRpcMsg *pMsg) { - tTrace("request is received, type:%d, contLen:%d", pMsg->msgType, pMsg->contLen); - taosWriteQitem(qhandle, pMsg); + SRpcMsg *pTemp; + + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + + tTrace("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); + taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); } int main(int argc, char *argv[]) { @@ -143,6 +154,7 @@ int main(int argc, char *argv[]) { commit = atoi(argv[++i]); } else if (strcmp(argv[i], "-d")==0 && i < argc-1) { rpcDebugFlag = atoi(argv[++i]); + ddebugFlag = rpcDebugFlag; uDebugFlag = rpcDebugFlag; } else { printf("\nusage: %s [options] \n", argv[0]); diff --git a/src/util/inc/tqueue.h b/src/util/inc/tqueue.h index 4920d08e29..c45eb10518 100644 --- a/src/util/inc/tqueue.h +++ b/src/util/inc/tqueue.h @@ -20,6 +20,10 @@ extern "C" { #endif +#define TAOS_QTYPE_RPC 0 +#define TAOS_QTYPE_FWD 1 +#define TAOS_QTYPE_WAL 2 + typedef void* taos_queue; typedef void* taos_qset; typedef void* taos_qall; @@ -31,10 +35,11 @@ void taosFreeQitem(void *item); int taosWriteQitem(taos_queue, int type, void *item); int taosReadQitem(taos_queue, int *type, void **pitem); -int taosReadAllQitems(taos_queue, taos_qall *); +taos_qall taosAllocateQall(); +void taosFreeQall(taos_qall); +int taosReadAllQitems(taos_queue, taos_qall); int taosGetQitem(taos_qall, int *type, void **pitem); void taosResetQitems(taos_qall); -void taosFreeQitems(taos_qall); taos_qset taosOpenQset(); void taosCloseQset(); @@ -43,7 +48,7 @@ void taosRemoveFromQset(taos_qset, taos_queue); int taosGetQueueNumber(taos_qset); int taosReadQitemFromQset(taos_qset, int *type, void **pitem, void **handle); -int taosReadAllQitemsFromQset(taos_qset, taos_qall *, void **handle); +int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **handle); int taosGetQueueItemsNumber(taos_queue param); int taosGetQsetItemsNumber(taos_qset param); diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index f1db24b8ce..1ac05556d6 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -92,6 +92,8 @@ void *taosAllocateQitem(int size) { void taosFreeQitem(void *param) { if (param == NULL) return; + //pTrace("item:%p is freed", param); + char *temp = (char *)param; temp -= sizeof(STaosQnode); free(temp); @@ -115,6 +117,8 @@ int taosWriteQitem(taos_queue param, int type, void *item) { queue->numOfItems++; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); + //pTrace("item:%p is put into queue, items:%d", item, queue->numOfItems); + pthread_mutex_unlock(&queue->mutex); return 0; @@ -137,6 +141,7 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) { queue->numOfItems--; if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); code = 1; + //pTrace("item:%p is read out from queue, items:%d", *pitem, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); @@ -144,35 +149,38 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) { return code; } -int taosReadAllQitems(taos_queue param, taos_qall *res) { +void *taosAllocateQall() { + void *p = malloc(sizeof(STaosQall)); + return p; +} + +void taosFreeQall(void *param) { + free(param); +} + +int taosReadAllQitems(taos_queue param, taos_qall p2) { STaosQueue *queue = (STaosQueue *)param; - STaosQall *qall = NULL; + STaosQall *qall = (STaosQall *)p2; int code = 0; pthread_mutex_lock(&queue->mutex); if (queue->head) { - qall = (STaosQall *) calloc(sizeof(STaosQall), 1); - if ( qall == NULL ) { - terrno = TSDB_CODE_NO_RESOURCE; - code = -1; - } else { - qall->current = queue->head; - qall->start = queue->head; - qall->numOfItems = queue->numOfItems; - qall->itemSize = queue->itemSize; - code = qall->numOfItems; + memset(qall, 0, sizeof(STaosQall)); + qall->current = queue->head; + qall->start = queue->head; + qall->numOfItems = queue->numOfItems; + qall->itemSize = queue->itemSize; + code = qall->numOfItems; - queue->head = NULL; - queue->tail = NULL; - queue->numOfItems = 0; - if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); - } + queue->head = NULL; + queue->tail = NULL; + queue->numOfItems = 0; + if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); } pthread_mutex_unlock(&queue->mutex); - *res = qall; return code; } @@ -189,6 +197,7 @@ int taosGetQitem(taos_qall param, int *type, void **pitem) { *pitem = pNode->item; *type = pNode->type; num = 1; + //pTrace("item:%p is fetched", *pitem); } return num; @@ -199,19 +208,6 @@ void taosResetQitems(taos_qall param) { qall->current = qall->start; } -void taosFreeQitems(taos_qall param) { - STaosQall *qall = (STaosQall *)param; - STaosQnode *pNode; - - while (qall->current) { - pNode = qall->current; - qall->current = pNode->next; - free(pNode); - } - - free(qall); -} - taos_qset taosOpenQset() { STaosQset *qset = (STaosQset *) calloc(sizeof(STaosQset), 1); @@ -329,10 +325,10 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand return code; } -int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res, void **phandle) { +int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) { STaosQset *qset = (STaosQset *)param; STaosQueue *queue; - STaosQall *qall = NULL; + STaosQall *qall = (STaosQall *)p2; int code = 0; for(int i=0; inumOfQueues; ++i) { @@ -347,23 +343,17 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res, void **phandle) { pthread_mutex_lock(&queue->mutex); if (queue->head) { - qall = (STaosQall *) calloc(sizeof(STaosQall), 1); - if (qall == NULL) { - terrno = TSDB_CODE_NO_RESOURCE; - code = -1; - } else { - qall->current = queue->head; - qall->start = queue->head; - qall->numOfItems = queue->numOfItems; - qall->itemSize = queue->itemSize; - code = qall->numOfItems; - *phandle = queue->ahandle; + qall->current = queue->head; + qall->start = queue->head; + qall->numOfItems = queue->numOfItems; + qall->itemSize = queue->itemSize; + code = qall->numOfItems; + *phandle = queue->ahandle; - queue->head = NULL; - queue->tail = NULL; - queue->numOfItems = 0; - atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); - } + queue->head = NULL; + queue->tail = NULL; + queue->numOfItems = 0; + atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); } pthread_mutex_unlock(&queue->mutex); @@ -371,8 +361,6 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res, void **phandle) { if (code != 0) break; } - *res = qall; - return code; } diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index 30c3ada637..86be428af7 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -330,7 +330,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { struct sockaddr_in serverAddr, clientAddr; int ret; - pTrace("open tcp client socket:%s:%d, local Ip:%s", destIp, destPort, clientIp); + // pTrace("open tcp client socket:%s:%d, local Ip:%s", destIp, destPort, clientIp); sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); @@ -362,7 +362,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); if (ret != 0) { - pError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno)); + //pError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno)); taosCloseSocket(sockFd); sockFd = -1; }