From 3343bef1b794b88a6b7205547b88c76db54481a1 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 11 Mar 2022 21:51:34 +0800 Subject: [PATCH] shm --- include/util/tprocess.h | 1 + source/common/src/tglobal.c | 1 - source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c | 5 +- .../dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c | 14 +- source/util/src/tprocess.c | 201 ++++++++++++++---- 5 files changed, 181 insertions(+), 41 deletions(-) diff --git a/include/util/tprocess.h b/include/util/tprocess.h index eddf1ac72e..1ac8155827 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -43,6 +43,7 @@ typedef struct { ProcFreeFp parentFreeBodyFp; bool testFlag; void *pParent; + const char *name; } SProcCfg; SProcObj *taosProcInit(const SProcCfg *pCfg); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 9a0309e024..e7180f2560 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -309,7 +309,6 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "os release", info.release, 1) != 0) return -1; if (cfgAddString(pCfg, "os version", info.version, 1) != 0) return -1; if (cfgAddString(pCfg, "os machine", info.machine, 1) != 0) return -1; - if (cfgAddString(pCfg, "os sysname", info.sysname, 1) != 0) return -1; if (cfgAddString(pCfg, "version", version, 1) != 0) return -1; if (cfgAddString(pCfg, "compatible_version", compatible_version, 1) != 0) return -1; diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c index 8a072ce63f..6bd504014c 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c @@ -134,8 +134,9 @@ int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { .parentFreeHeadFp = (ProcFreeFp)free, .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .testFlag = true, - .pParent = pDnode}; + .testFlag = 0, + .pParent = pDnode, + .name = "mnode"}; pMgmt->pProcess = taosProcInit(&cfg); if (pMgmt->pProcess == NULL) { diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c index 22ff2088a5..8d400cf867 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c @@ -159,7 +159,6 @@ static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) { pMsg->rpcMsg = *pRpc; pMsg->createdTime = taosGetTimestampSec(); - dTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user); return 0; } @@ -183,6 +182,8 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { goto _OVER; } + dTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user); + if (pMgmt->singleProc) { code = (*msgFp)(pDnode, pMsg); } else { @@ -193,6 +194,7 @@ _OVER: if (code == 0) { if (!pMgmt->singleProc) { + dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); } @@ -202,6 +204,7 @@ _OVER: SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; mmSendRpcRsp(pDnode, &rsp); } + dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); } @@ -231,6 +234,7 @@ static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMs SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) return -1; + dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0); mmRelease(pDnode, pMnode); return code; @@ -242,11 +246,13 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs return -1; } + dTrace("msg:%p, is created", pMsg); pMsg->rpcMsg = *pRpc; pMsg->createdTime = taosGetTimestampSec(); int32_t code = mmPutMndMsgToWorker(pDnode, pWorker, pMsg); if (code != 0) { + dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); } @@ -271,6 +277,7 @@ void mmPutRpcRspToWorker(SDnode *pDnode, SRpcMsg *pRpc) { } void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { + dTrace("msg:%p, get from child queue", pMsg); SMndMgmt *pMgmt = &pDnode->mmgmt; SRpcMsg *pRpc = &pMsg->rpcMsg; @@ -285,18 +292,22 @@ void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pC SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; mmPutRpcRspToWorker(pDnode, &rsp); } + + dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); rpcFreeCont(pCont); } } void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { + dTrace("msg:%p, get from parent queue", pMsg); pMsg->pCont = pCont; mmSendRpcRsp(pDnode, pMsg); free(pMsg); } static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) { + dTrace("msg:%p, get from msg queue", pMsg); SMnode *pMnode = mmAcquire(pDnode); SRpcMsg *pRpc = &pMsg->rpcMsg; bool isReq = (pRpc->msgType & 1U); @@ -321,6 +332,7 @@ static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) { } } + dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pRpc->pCont); taosFreeQitem(pMsg); } diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 2876cfba01..7a1f0d7900 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -19,25 +19,32 @@ #include "tlog.h" #include "tqueue.h" +// todo +#include +#include + #define SHM_DEFAULT_SIZE (20 * 1024 * 1024) #define CEIL8(n) (ceil((float)(n) / 8) * 8) typedef void *(*ProcThreadFp)(void *param); typedef struct SProcQueue { - int32_t head; - int32_t tail; - int32_t total; - int32_t avail; - int32_t items; - char *pBuffer; - ProcMallocFp mallocHeadFp; - ProcFreeFp freeHeadFp; - ProcMallocFp mallocBodyFp; - ProcFreeFp freeBodyFp; - ProcConsumeFp consumeFp; - void *pParent; - tsem_t sem; - pthread_mutex_t mutex; + int32_t head; + int32_t tail; + int32_t total; + int32_t avail; + int32_t items; + char *pBuffer; + ProcMallocFp mallocHeadFp; + ProcFreeFp freeHeadFp; + ProcMallocFp mallocBodyFp; + ProcFreeFp freeBodyFp; + ProcConsumeFp consumeFp; + void *pParent; + tsem_t sem; + pthread_mutex_t *mutex; + int32_t mutexShmid; + int32_t bufferShmid; + const char *name; } SProcQueue; typedef struct SProcObj { @@ -45,34 +52,132 @@ typedef struct SProcObj { SProcQueue *pChildQueue; pthread_t parentThread; SProcQueue *pParentQueue; + const char *name; int32_t pid; bool isChild; bool stopFlag; bool testFlag; } SProcObj; +static int32_t taosProcInitMutex(pthread_mutex_t **ppMutex, int32_t *pShmid) { + pthread_mutex_t *pMutex = NULL; + pthread_mutexattr_t mattr = {0}; + int32_t shmid = -1; + int32_t code = -1; + + if (pthread_mutexattr_init(&mattr) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to init mutex while init attr since %s", terrstr()); + goto _OVER; + } + + if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to init mutex while set shared since %s", terrstr()); + goto _OVER; + } + + shmid = shmget(IPC_PRIVATE, sizeof(pthread_mutex_t), 0600); + if (shmid <= 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to init mutex while shmget since %s", terrstr()); + goto _OVER; + } + + pMutex = (pthread_mutex_t *)shmat(shmid, NULL, 0); + if (pMutex == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to init mutex while shmat since %s", terrstr()); + goto _OVER; + } + + if (pthread_mutex_init(pMutex, &mattr) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to init mutex since %s", terrstr()); + goto _OVER; + } + + code = 0; + +_OVER: + if (code != 0) { + pthread_mutex_destroy(pMutex); + shmctl(shmid, IPC_RMID, NULL); + } else { + *ppMutex = pMutex; + *pShmid = shmid; + } + + pthread_mutexattr_destroy(&mattr); + return code; +} + +static void taosProcDestroyMutex(pthread_mutex_t *pMutex, int32_t *pShmid) { + if (pMutex != NULL) { + pthread_mutex_destroy(pMutex); + } + if (*pShmid > 0) { + shmctl(*pShmid, IPC_RMID, NULL); + } +} + +static int32_t taosProcInitBuffer(void **ppBuffer, int32_t size) { + int32_t shmid = shmget(IPC_PRIVATE, size, IPC_CREAT | 0600); + if (shmid <= 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to init buffer while shmget since %s", terrstr()); + return -1; + } + + void *shmptr = (pthread_mutex_t *)shmat(shmid, NULL, 0); + if (shmptr == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to init buffer while shmat since %s", terrstr()); + shmctl(shmid, IPC_RMID, NULL); + return -1; + } + + *ppBuffer = shmptr; + return shmid; +} + +static void taosProcDestroyBuffer(void *pBuffer, int32_t *pShmid) { + if (*pShmid > 0) { + shmctl(*pShmid, IPC_RMID, NULL); + } +} + static SProcQueue *taosProcQueueInit(int32_t size) { if (size <= 0) size = SHM_DEFAULT_SIZE; int32_t bufSize = CEIL8(size); int32_t headSize = CEIL8(sizeof(SProcQueue)); - SProcQueue *pQueue = malloc(bufSize + headSize); - if (pQueue == NULL) { + SProcQueue *pQueue = NULL; + int32_t shmId = taosProcInitBuffer((void **)&pQueue, bufSize + headSize); + if (shmId <= 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - if (pthread_mutex_init(&pQueue->mutex, NULL) != 0) { + pQueue->bufferShmid = shmId; + + if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) { + free(pQueue); + return NULL; + } + + if (tsem_init(&pQueue->sem, 1, 0) != 0) { + taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid); free(pQueue); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - if (tsem_init(&pQueue->sem, 0, 0) != 0) { - pthread_mutex_destroy(&pQueue->mutex); + if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) { + taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid); + tsem_destroy(&pQueue->sem); free(pQueue); - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -87,7 +192,8 @@ static SProcQueue *taosProcQueueInit(int32_t size) { static void taosProcQueueCleanup(SProcQueue *pQueue) { if (pQueue != NULL) { - pthread_mutex_destroy(&pQueue->mutex); + uDebug("proc:%s, queue:%p clean up", pQueue->name, pQueue); + taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid); tsem_destroy(&pQueue->sem); free(pQueue); } @@ -98,9 +204,9 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t fullLen = headLen + bodyLen + 8; - pthread_mutex_lock(&pQueue->mutex); + pthread_mutex_lock(pQueue->mutex); if (fullLen > pQueue->avail) { - pthread_mutex_unlock(&pQueue->mutex); + pthread_mutex_unlock(pQueue->mutex); terrno = TSDB_CODE_OUT_OF_SHM_MEM; return -1; } @@ -146,11 +252,10 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea pQueue->avail -= fullLen; pQueue->items++; - pthread_mutex_unlock(&pQueue->mutex); + pthread_mutex_unlock(pQueue->mutex); tsem_post(&pQueue->sem); - // (*pQueue->freeHeadFp)(pHead); - // (*pQueue->freeBodyFp)(pBody); + uTrace("proc:%s, push msg:%p:%d cont:%p:%d to queue:%p", pQueue->name, pHead, rawHeadLen, pBody, rawBodyLen, pQueue); return 0; } @@ -158,9 +263,9 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea int32_t *pBodyLen) { tsem_wait(&pQueue->sem); - pthread_mutex_lock(&pQueue->mutex); + pthread_mutex_lock(pQueue->mutex); if (pQueue->total - pQueue->avail <= 0) { - pthread_mutex_unlock(&pQueue->mutex); + pthread_mutex_unlock(pQueue->mutex); tsem_post(&pQueue->sem); terrno = TSDB_CODE_OUT_OF_SHM_MEM; return -1; @@ -179,7 +284,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea void *pHead = (*pQueue->mallocHeadFp)(headLen); void *pBody = (*pQueue->mallocBodyFp)(bodyLen); if (pHead == NULL || pBody == NULL) { - pthread_mutex_unlock(&pQueue->mutex); + pthread_mutex_unlock(pQueue->mutex); tsem_post(&pQueue->sem); (*pQueue->freeHeadFp)(pHead); (*pQueue->freeBodyFp)(pBody); @@ -220,12 +325,14 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea pQueue->avail = pQueue->avail + headLen + bodyLen + 8; pQueue->items--; - pthread_mutex_unlock(&pQueue->mutex); + pthread_mutex_unlock(pQueue->mutex); *ppHead = pHead; *ppBody = pBody; *pHeadLen = headLen; *pBodyLen = bodyLen; + + uTrace("proc:%s, get msg:%p:%d cont:%p:%d from queue:%p", pQueue->name, pHead, headLen, pBody, bodyLen, pQueue); return 0; } @@ -236,22 +343,25 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { return NULL; } + pProc->name = pCfg->name; + pProc->testFlag = pCfg->testFlag; + pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize); pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize); - if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { + if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { taosProcQueueCleanup(pProc->pChildQueue); - taosProcQueueCleanup(pProc->pParentQueue); free(pProc); return NULL; } - pProc->testFlag = pCfg->testFlag; + pProc->pChildQueue->name = pCfg->name; pProc->pChildQueue->pParent = pCfg->pParent; pProc->pChildQueue->mallocHeadFp = pCfg->childMallocHeadFp; pProc->pChildQueue->freeHeadFp = pCfg->childFreeHeadFp; pProc->pChildQueue->mallocBodyFp = pCfg->childMallocBodyFp; pProc->pChildQueue->freeBodyFp = pCfg->childFreeBodyFp; pProc->pChildQueue->consumeFp = pCfg->childConsumeFp; + pProc->pParentQueue->name = pCfg->name; pProc->pParentQueue->pParent = pCfg->pParent; pProc->pParentQueue->mallocHeadFp = pCfg->parentdMallocHeadFp; pProc->pParentQueue->freeHeadFp = pCfg->parentFreeHeadFp; @@ -259,8 +369,20 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { pProc->pParentQueue->freeBodyFp = pCfg->parentFreeBodyFp; pProc->pParentQueue->consumeFp = pCfg->parentConsumeFp; - // todo - pProc->isChild = 0; + uDebug("proc:%s, initialized, child queue:%p parent queue:%p", pProc->name, pProc->pChildQueue, pProc->pParentQueue); + + if (!pProc->testFlag) { + pProc->pid = fork(); + if (pProc->pid == 0) { + tsLogInited = 0; + taosInitLog("mnodelog", 1); + pProc->isChild = 1; + uInfo("this is child process, pid:%d", pProc->pid); + } else { + pProc->isChild = 0; + uInfo("this is parent process, pid:%d", pProc->pid); + } + } return pProc; } @@ -271,13 +393,15 @@ static void taosProcThreadLoop(SProcQueue *pQueue) { void *pHead, *pBody; int32_t headLen, bodyLen; + uDebug("proc:%s, start to get message from queue:%p", pQueue->name, pQueue); + while (1) { int32_t code = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen); if (code < 0) { - uDebug("queue:%p, got no message and exiting", pQueue); + uDebug("proc:%s, get no message from queue:%p and exiting", pQueue->name, pQueue); break; } else if (code < 0) { - uTrace("queue:%p, got no message since %s", pQueue, terrstr()); + uTrace("proc:%s, get no message from queue:%p since %s", pQueue->name, pQueue, terrstr()); taosMsleep(1); continue; } else { @@ -297,6 +421,7 @@ int32_t taosProcStart(SProcObj *pProc) { uError("failed to create thread since %s", terrstr()); return -1; } + uDebug("proc:%s, child start to consume queue:%p", pProc->name, pProc->pChildQueue); } if (!pProc->isChild || pProc->testFlag) { @@ -305,6 +430,7 @@ int32_t taosProcStart(SProcObj *pProc) { uError("failed to create thread since %s", terrstr()); return -1; } + uDebug("proc:%s, parent start to consume queue:%p", pProc->name, pProc->pParentQueue); } return 0; @@ -318,6 +444,7 @@ void taosProcStop(SProcObj *pProc) { void taosProcCleanup(SProcObj *pProc) { if (pProc != NULL) { + uDebug("proc:%s, clean up", pProc->name); taosProcQueueCleanup(pProc->pChildQueue); taosProcQueueCleanup(pProc->pParentQueue); free(pProc);