From 14c63859082c4f63fd21a11d91c0661b8a2cd3c3 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 25 Feb 2020 15:26:29 +0800 Subject: [PATCH] :add queue support --- src/rpc/inc/tqueue.h | 39 +++++++++ src/rpc/inc/tsched.h | 39 +++++++++ src/rpc/src/tqueue.c | 190 +++++++++++++++++++++++++++++++++++++++++ src/rpc/test/rserver.c | 43 +++++++--- 4 files changed, 300 insertions(+), 11 deletions(-) create mode 100644 src/rpc/inc/tqueue.h create mode 100644 src/rpc/inc/tsched.h create mode 100644 src/rpc/src/tqueue.c diff --git a/src/rpc/inc/tqueue.h b/src/rpc/inc/tqueue.h new file mode 100644 index 0000000000..09a25e7e93 --- /dev/null +++ b/src/rpc/inc/tqueue.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TSCHED_H +#define TDENGINE_TSCHED_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _sched_msg { + void *msg; + int msgLen; + int8_t type; + int32_t code; + void *handle; +} SRpcMsg; + +void *taosInitMsgQueue(int queueSize, void (*fp)(int num, SRpcMsg *), const char *label); +int taosPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg); +void taosCleanUpMsgQueue(void *param); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TSCHED_H diff --git a/src/rpc/inc/tsched.h b/src/rpc/inc/tsched.h new file mode 100644 index 0000000000..9b6eb92462 --- /dev/null +++ b/src/rpc/inc/tsched.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TSCHED_H +#define TDENGINE_TSCHED_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _sched_msg { + void *msg; + int msgLen; + int8_t type; + int32_t code + void handle; +} SRpcMsg; + +void *rpcInitMsgQueue(int queueSize, void (*fp)(int num, SRpcQueue *),const char *label); +int rpcPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg); +void rpcCleanUpMsgQueue(void *param); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TSCHED_H diff --git a/src/rpc/src/tqueue.c b/src/rpc/src/tqueue.c new file mode 100644 index 0000000000..2f6f9ac106 --- /dev/null +++ b/src/rpc/src/tqueue.c @@ -0,0 +1,190 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "tlog.h" +#include "tqueue.h" + +#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue. + +typedef struct { + char label[16]; + int num; + tsem_t emptySem; + tsem_t fullSem; + pthread_mutex_t queueMutex; + int fullSlot; + int emptySlot; + int queueSize; + SRpcMsg *queue; + SRpcMsg *oqueue; + pthread_t qthread; + void (*fp)(int num, SRpcMsg *); +} SRpcQueue; + +static void *taosProcessMsgQueue(void *param); + +void *taosInitMsgQueue(int queueSize, void (*fp)(int num, SRpcMsg *), const char *label) { + pthread_attr_t attr; + SRpcQueue * pQueue = (SRpcQueue *)malloc(sizeof(SRpcQueue)); + if (pQueue == NULL) { + pError("%s: no enough memory for pQueue, reason: %s", label, strerror(errno)); + goto _error; + } + + memset(pQueue, 0, sizeof(SRpcQueue)); + pQueue->queueSize = queueSize; + strncpy(pQueue->label, label, sizeof(pQueue->label)); // fix buffer overflow + pQueue->label[sizeof(pQueue->label)-1] = '\0'; + pQueue->fp = fp; + + if (pthread_mutex_init(&pQueue->queueMutex, NULL) < 0) { + pError("init %s:queueMutex failed, reason:%s", pQueue->label, strerror(errno)); + goto _error; + } + + if (tsem_init(&pQueue->emptySem, 0, (unsigned int)pQueue->queueSize) != 0) { + pError("init %s:empty semaphore failed, reason:%s", pQueue->label, strerror(errno)); + goto _error; + } + + if (tsem_init(&pQueue->fullSem, 0, 0) != 0) { + pError("init %s:full semaphore failed, reason:%s", pQueue->label, strerror(errno)); + goto _error; + } + + if ((pQueue->queue = (SRpcMsg *)malloc((size_t)pQueue->queueSize * sizeof(SRpcMsg))) == NULL) { + pError("%s: no enough memory for queue, reason:%s", pQueue->label, strerror(errno)); + goto _error; + } + + memset(pQueue->queue, 0, (size_t)pQueue->queueSize * sizeof(SRpcMsg)); + + if ((pQueue->oqueue = (SRpcMsg *)malloc((size_t)pQueue->queueSize * sizeof(SRpcMsg))) == NULL) { + pError("%s: no enough memory for queue, reason:%s", pQueue->label, strerror(errno)); + goto _error; + } + + memset(pQueue->oqueue, 0, (size_t)pQueue->queueSize * sizeof(SRpcMsg)); + + pQueue->fullSlot = 0; + pQueue->fullSlot = 0; + pQueue->emptySlot = 0; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&pQueue->qthread, &attr, taosProcessMsgQueue, (void *)pQueue) != 0) { + pError("%s: failed to create taos thread, reason:%s", pQueue->label, strerror(errno)); + goto _error; + } + + pTrace("%s RPC msg queue is initialized", pQueue->label); + + return (void *)pQueue; + +_error: + taosCleanUpMsgQueue(pQueue); + return NULL; +} + +void *taosProcessMsgQueue(void *param) { + SRpcQueue *pQueue = (SRpcQueue *)param; + int num = 0; + + while (1) { + if (tsem_wait(&pQueue->fullSem) != 0) { + if (errno == EINTR) { + /* sem_wait is interrupted by interrupt, ignore and continue */ + pTrace("wait %s fullSem was interrupted", pQueue->label); + continue; + } + pError("wait %s fullSem failed, errno:%d, reason:%s", pQueue->label, errno, strerror(errno)); + } + + if (pthread_mutex_lock(&pQueue->queueMutex) != 0) + pError("lock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno)); + + num = 0; + do { + pQueue->oqueue[num] = pQueue->queue[pQueue->fullSlot]; + pQueue->fullSlot = (pQueue->fullSlot + 1) % pQueue->queueSize; + ++num; + pQueue->num--; + } while (pQueue->fullSlot != pQueue->emptySlot); + + if (pthread_mutex_unlock(&pQueue->queueMutex) != 0) + pError("unlock %s queueMutex failed, reason:%s\n", pQueue->label, strerror(errno)); + + for (int i= 0; iemptySem) != 0) + pError("post %s emptySem failed, reason:%s\n", pQueue->label, strerror(errno)); + } + + for (int i=0; ifullSem) != 0) + pError("wait %s fullSem failed, reason:%s\n", pQueue->label, strerror(errno)); + } + + (*pQueue->fp)(num, pQueue->oqueue); + + } + + return NULL; +} + +int taosPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg) { + SRpcQueue *pQueue = (SRpcQueue *)qhandle; + if (pQueue == NULL) { + pError("sched is not ready, msg:%p is dropped", pMsg); + return 0; + } + + while (tsem_wait(&pQueue->emptySem) != 0) { + if (errno != EINTR) { + pError("wait %s emptySem failed, reason:%s", pQueue->label, strerror(errno)); + break; + } + } + + if (pthread_mutex_lock(&pQueue->queueMutex) != 0) + pError("lock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno)); + + pQueue->queue[pQueue->emptySlot] = *pMsg; + pQueue->emptySlot = (pQueue->emptySlot + 1) % pQueue->queueSize; + pQueue->num++; + + if (pthread_mutex_unlock(&pQueue->queueMutex) != 0) + pError("unlock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno)); + + if (tsem_post(&pQueue->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pQueue->label, strerror(errno)); + + return 0; +} + +void taosCleanUpMsgQueue(void *param) { + SRpcQueue *pQueue = (SRpcQueue *)param; + if (pQueue == NULL) return; + + pthread_cancel(pQueue->qthread); + + tsem_destroy(&pQueue->emptySem); + tsem_destroy(&pQueue->fullSem); + pthread_mutex_destroy(&pQueue->queueMutex); + + free(pQueue->queue); + free(pQueue); +} + diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index 89e3221a41..7e418444fc 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -17,24 +17,35 @@ #include "os.h" #include "tlog.h" #include "trpc.h" +#include "tqueue.h" #include int msgSize = 128; int commit = 0; int dataFd = -1; +void *qhandle = NULL; -void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32_t code) { +void processShellMsg(int numOfMsgs, SRpcMsg *pMsg) { static int num = 0; - tTrace("request is received, type:%d, contLen:%d", type, contLen); - if (dataFd >=0) { - if ( write(dataFd, pCont, contLen) <0 ) { - tPrint("failed to write data file, reason:%s", strerror(errno)); + tTrace("%d shell msgs are received", numOfMsgs); + + for (int i=0; i=0) { + if ( write(dataFd, pMsg->msg, pMsg->msgLen) <0 ) { + tPrint("failed to write data file, reason:%s", strerror(errno)); + } } + + void *rsp = rpcMallocCont(msgSize); + rpcSendResponse(pMsg->handle, 1, rsp, msgSize); + rpcFreeCont(pMsg->msg); + pMsg++; } - + if (commit >=2) { - ++num; + num += numOfMsgs; if ( fsync(dataFd) < 0 ) { tPrint("failed to flush data to file, reason:%s", strerror(errno)); } @@ -44,9 +55,6 @@ void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32 } } - void *rsp = rpcMallocCont(msgSize); - - rpcSendResponse(thandle, 1, rsp, msgSize); /* SRpcIpSet ipSet; @@ -58,7 +66,17 @@ void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32 rpcSendRedirectRsp(ahandle, &ipSet); */ - rpcFreeCont(pCont); +} + +void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32_t code) { + tTrace("request is received, type:%d, contLen:%d", type, contLen); + SRpcMsg rpcMsg; + rpcMsg.msg = pCont; + rpcMsg.msgLen = contLen; + rpcMsg.code = code; + rpcMsg.handle = thandle; + rpcMsg.type = type; + taosPutIntoMsgQueue(qhandle, &rpcMsg); } int main(int argc, char *argv[]) { @@ -91,6 +109,7 @@ int main(int argc, char *argv[]) { commit = atoi(argv[++i]); } else if (strcmp(argv[i], "-d")==0 && i < argc-1) { rpcDebugFlag = atoi(argv[++i]); + uDebugFlag = rpcDebugFlag; } else { printf("\nusage: %s [options] \n", argv[0]); printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp); @@ -125,6 +144,8 @@ int main(int argc, char *argv[]) { tPrint("failed to open data file, reason:%s", strerror(errno)); } + qhandle = taosInitMsgQueue(1000, processShellMsg, "SER"); + // loop forever while(1) { sleep(1);