From 79936567c5cc20dda71f43babf7b5c8975237b89 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Mar 2022 19:54:41 +0800 Subject: [PATCH] shm --- include/os/osShm.h | 5 +- include/util/tprocess.h | 6 +- source/dnode/mgmt/main/exe/dndMain.c | 4 + source/dnode/mgmt/main/src/dndExec.c | 10 +- source/dnode/mgmt/main/src/dndFile.c | 30 +-- source/dnode/mgmt/main/src/dndObj.c | 1 + source/os/src/osShm.c | 25 +- source/util/src/tprocess.c | 372 ++++++++++++--------------- 8 files changed, 204 insertions(+), 249 deletions(-) diff --git a/include/os/osShm.h b/include/os/osShm.h index a5d6716d0d..d26a99e277 100644 --- a/include/os/osShm.h +++ b/include/os/osShm.h @@ -22,14 +22,13 @@ extern "C" { typedef struct { int32_t id; - int64_t size; + int32_t size; void* ptr; } SShm; -int32_t taosCreateShm(SShm *pShm, int64_t shmsize) ; +int32_t taosCreateShm(SShm *pShm, int32_t shmsize) ; void taosDropShm(SShm *pShm); int32_t taosAttachShm(SShm *pShm); -void taosDetachShm(SShm *pShm); #ifdef __cplusplus } diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 090762b340..3a47450eec 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -38,21 +38,19 @@ typedef struct { ProcMallocFp childMallocBodyFp; ProcFreeFp childFreeBodyFp; ProcConsumeFp parentConsumeFp; - ProcMallocFp parentdMallocHeadFp; + ProcMallocFp parentMallocHeadFp; ProcFreeFp parentFreeHeadFp; ProcMallocFp parentMallocBodyFp; ProcFreeFp parentFreeBodyFp; SShm shm; void *pParent; const char *name; + bool isChild; } SProcCfg; SProcObj *taosProcInit(const SProcCfg *pCfg); void taosProcCleanup(SProcObj *pProc); int32_t taosProcRun(SProcObj *pProc); -void taosProcStop(SProcObj *pProc); -bool taosProcIsChild(SProcObj *pProc); -int32_t taosProcChildId(SProcObj *pProc); int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, ProcFuncType ftype); int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, diff --git a/source/dnode/mgmt/main/exe/dndMain.c b/source/dnode/mgmt/main/exe/dndMain.c index 525b26d967..5acab06216 100644 --- a/source/dnode/mgmt/main/exe/dndMain.c +++ b/source/dnode/mgmt/main/exe/dndMain.c @@ -83,6 +83,10 @@ static int32_t dndParseArgs(int32_t argc, char const *argv[]) { global.generateGrant = true; } else if (strcmp(argv[i], "-n") == 0) { global.ntype = atoi(argv[++i]); + if (global.ntype <= DNODE || global.ntype > NODE_MAX) { + printf("'-n' range is [1-5], default is 0\n"); + return -1; + } } else if (strcmp(argv[i], "-C") == 0) { global.dumpConfig = true; } else if (strcmp(argv[i], "-V") == 0) { diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index d4dfae2d69..d2a203107a 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -166,14 +166,16 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, - .parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .shm = pWrapper->shm, .pParent = pWrapper, + .isChild = false, .name = pWrapper->name}; + pWrapper->procType = PROC_PARENT; pWrapper->pProc = taosProcInit(&cfg); if (pWrapper->pProc == NULL) { dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); @@ -193,7 +195,6 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { dInfo("node:%s, will not start in parent process", pWrapper->name); // exec new node - pWrapper->procType = PROC_PARENT; if (taosProcRun(pWrapper->pProc) != 0) { dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); return -1; @@ -226,21 +227,22 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, - .parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .shm = pWrapper->shm, .pParent = pWrapper, + .isChild = true, .name = pWrapper->name}; + pWrapper->procType = PROC_CHILD; pWrapper->pProc = taosProcInit(&cfg); if (pWrapper->pProc == NULL) { dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); return -1; } - pWrapper->procType = PROC_CHILD; if (taosProcRun(pWrapper->pProc) != 0) { dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); return -1; diff --git a/source/dnode/mgmt/main/src/dndFile.c b/source/dnode/mgmt/main/src/dndFile.c index bcfb90af13..bbd1cd3b92 100644 --- a/source/dnode/mgmt/main/src/dndFile.c +++ b/source/dnode/mgmt/main/src/dndFile.c @@ -167,23 +167,23 @@ int32_t dndReadShmFile(SDnode *pDnode) { for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { snprintf(itemName, sizeof(itemName), "%s_shmid", dndNodeProcStr(ntype)); cJSON *shmid = cJSON_GetObjectItem(root, itemName); - if (shmid && shmid->type == cJSON_String) { - pDnode->wrappers[ntype].shm.id = atoi(shmid->valuestring); + if (shmid && shmid->type == cJSON_Number) { + pDnode->wrappers[ntype].shm.id = shmid->valueint; } snprintf(itemName, sizeof(itemName), "%s_shmsize", dndNodeProcStr(ntype)); cJSON *shmsize = cJSON_GetObjectItem(root, itemName); - if (shmsize && shmsize->type == cJSON_String) { - pDnode->wrappers[ntype].shm.size = atoll(shmsize->valuestring); + if (shmsize && shmsize->type == cJSON_Number) { + pDnode->wrappers[ntype].shm.size = shmsize->valueint; } } } - if (tsMultiProcess || pDnode->ntype == DNODE) { + if (!tsMultiProcess || pDnode->ntype == DNODE) { for (ENodeType ntype = DNODE; ntype < NODE_MAX; ++ntype) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; - if (pWrapper->shm.id > 0) { - dDebug("shmid:%d, is closed, size:%" PRId64, pWrapper->shm.id, pWrapper->shm.size); + SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; + if (pWrapper->shm.id >= 0) { + dDebug("shmid:%d, is closed, size:%d", pWrapper->shm.id, pWrapper->shm.size); taosDropShm(&pWrapper->shm); } } @@ -194,7 +194,7 @@ int32_t dndReadShmFile(SDnode *pDnode) { dError("shmid:%d, failed to attach since %s", pWrapper->shm.id, terrstr()); goto _OVER; } - dDebug("shmid:%d, is attached, size:%" PRId64, pWrapper->shm.id, pWrapper->shm.size); + dDebug("shmid:%d, is attached, size:%d", pWrapper->shm.id, pWrapper->shm.size); } dDebug("successed to open %s", file); @@ -227,14 +227,12 @@ int32_t dndWriteShmFile(SDnode *pDnode) { len += snprintf(content + len, MAXLEN - len, "{\n"); for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; - len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\": \"%d\",\n", dndNodeProcStr(ntype), pWrapper->shm.id); + SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; + len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\":%d,\n", dndNodeProcStr(ntype), pWrapper->shm.id); if (ntype == NODE_MAX - 1) { - len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": \"%" PRId64 "\"\n", dndNodeProcStr(ntype), - pWrapper->shm.size); + len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d\n", dndNodeProcStr(ntype), pWrapper->shm.size); } else { - len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": \"%" PRId64 "\",\n", dndNodeProcStr(ntype), - pWrapper->shm.size); + len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d,\n", dndNodeProcStr(ntype), pWrapper->shm.size); } } len += snprintf(content + len, MAXLEN - len, "}\n"); @@ -259,7 +257,7 @@ int32_t dndWriteShmFile(SDnode *pDnode) { return -1; } - dDebug("successed to write %s", realfile); + dInfo("successed to write %s", realfile); code = 0; _OVER: diff --git a/source/dnode/mgmt/main/src/dndObj.c b/source/dnode/mgmt/main/src/dndObj.c index 91f2cb233b..387efca846 100644 --- a/source/dnode/mgmt/main/src/dndObj.c +++ b/source/dnode/mgmt/main/src/dndObj.c @@ -91,6 +91,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name); pWrapper->path = strdup(path); + pWrapper->shm.id = -1; pWrapper->pDnode = pDnode; if (pWrapper->path == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/os/src/osShm.c b/source/os/src/osShm.c index cb80aeb5f3..74717878a0 100644 --- a/source/os/src/osShm.c +++ b/source/os/src/osShm.c @@ -17,8 +17,10 @@ #define _DEFAULT_SOURCE #include "os.h" -int32_t taosCreateShm(SShm* pShm, int64_t shmsize) { - int32_t shmid = shmget(IPC_PRIVATE, (size_t)shmsize, IPC_CREAT | 0600); +int32_t taosCreateShm(SShm* pShm, int32_t shmsize) { + pShm->id = -1; + + int32_t shmid = shmget(0X95279527, shmsize, IPC_CREAT | 0600); if (shmid < 0) { return -1; } @@ -35,19 +37,19 @@ int32_t taosCreateShm(SShm* pShm, int64_t shmsize) { } void taosDropShm(SShm* pShm) { - if (pShm->id > 0) { + if (pShm->id >= 0) { if (pShm->ptr != NULL) { shmdt(pShm->ptr); } shmctl(pShm->id, IPC_RMID, NULL); } - pShm->id = 0; + pShm->id = -1; pShm->size = 0; pShm->ptr = NULL; } int32_t taosAttachShm(SShm* pShm) { - if (pShm->id > 0 && pShm->size > 0) { + if (pShm->id >= 0) { pShm->ptr = shmat(pShm->id, NULL, 0); if (pShm->ptr != NULL) { return 0; @@ -56,16 +58,3 @@ int32_t taosAttachShm(SShm* pShm) { return -1; } - -void taosDetachShm(SShm* pShm) { - if (pShm->id > 0) { - if (pShm->ptr != NULL) { - shmdt(pShm->ptr); - pShm->ptr = NULL; - } - } - - pShm->id = 0; - pShm->size = 0; - pShm->ptr = NULL; -} diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 7afbe56587..1d565083e1 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -23,34 +23,36 @@ typedef void *(*ProcThreadFp)(void *param); typedef struct SProcQueue { - int32_t head; - int32_t tail; - int32_t total; - int32_t avail; - int32_t items; - char *pBuffer; - ProcMallocFp mallocHeadFp; - ProcFreeFp freeHeadFp; - ProcMallocFp mallocBodyFp; - ProcFreeFp freeBodyFp; - ProcConsumeFp consumeFp; - void *pParent; - tsem_t sem; - TdThreadMutex *mutex; - int32_t mutexShmid; - int32_t bufferShmid; - const char *name; + int32_t head; + int32_t tail; + int32_t total; + int32_t avail; + int32_t items; + char name[8]; + TdThreadMutex mutex; + tsem_t sem; + char pBuffer[]; } SProcQueue; typedef struct SProcObj { - TdThread childThread; - SProcQueue *pChildQueue; - TdThread parentThread; - SProcQueue *pParentQueue; - const char *name; - int32_t pid; - bool isChild; - bool stopFlag; + TdThread thread; + SProcQueue *pChildQueue; + SProcQueue *pParentQueue; + ProcConsumeFp childConsumeFp; + ProcMallocFp childMallocHeadFp; + ProcFreeFp childFreeHeadFp; + ProcMallocFp childMallocBodyFp; + ProcFreeFp childFreeBodyFp; + ProcConsumeFp parentConsumeFp; + ProcMallocFp parentMallocHeadFp; + ProcFreeFp parentFreeHeadFp; + ProcMallocFp parentMallocBodyFp; + ProcFreeFp parentFreeBodyFp; + void *pParent; + const char *name; + int32_t pid; + bool isChild; + bool stopFlag; } SProcObj; static inline int32_t CEIL8(int32_t v) { @@ -58,150 +60,95 @@ static inline int32_t CEIL8(int32_t v) { return c < 8 ? 8 : c; } -static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) { - TdThreadMutex *pMutex = NULL; +static int32_t taosProcInitMutex(SProcQueue *pQueue) { TdThreadMutexAttr mattr = {0}; - int32_t shmid = -1; - int32_t code = -1; if (pthread_mutexattr_init(&mattr) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to init mutex while init attr since %s", terrstr()); - goto _OVER; + return -1; } if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) { + pthread_mutexattr_destroy(&mattr); terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to init mutex while set shared since %s", terrstr()); - goto _OVER; + return -1; } - shmid = shmget(IPC_PRIVATE, sizeof(TdThreadMutex), IPC_CREAT | 0600); - if (shmid <= 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to init mutex while shmget since %s", terrstr()); - goto _OVER; - } - - pMutex = (TdThreadMutex *)shmat(shmid, NULL, 0); - if (pMutex == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to init mutex while shmat since %s", terrstr()); - goto _OVER; - } - - if (taosThreadMutexInit(pMutex, &mattr) != 0) { + if (taosThreadMutexInit(&pQueue->mutex, &mattr) != 0) { + pthread_mutexattr_destroy(&mattr); terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to init mutex since %s", terrstr()); - goto _OVER; - } - - code = 0; - -_OVER: - if (code != 0) { - if (pMutex != NULL) { - taosThreadMutexDestroy(pMutex); - shmdt(pMutex); - } - if (shmid >= 0) { - shmctl(shmid, IPC_RMID, NULL); - } - } else { - *ppMutex = pMutex; - *pShmid = shmid; + return -1; } pthread_mutexattr_destroy(&mattr); - return code; + return 0; } -static void taosProcDestroyMutex(TdThreadMutex *pMutex, int32_t shmid) { - if (pMutex != NULL) { - taosThreadMutexDestroy(pMutex); - } - if (shmid >= 0) { - shmctl(shmid, IPC_RMID, NULL); - } -} - -static int32_t taosProcInitBuffer(void **ppBuffer, int32_t size) { - int32_t shmid = shmget(IPC_PRIVATE, size, IPC_CREAT | 0600); - if (shmid <= 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to init buffer while shmget since %s", terrstr()); - return -1; - } - - void *shmptr = shmat(shmid, NULL, 0); - if (shmptr == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to init buffer while shmat since %s", terrstr()); - shmctl(shmid, IPC_RMID, NULL); - return -1; - } - - *ppBuffer = shmptr; - return shmid; -} - -static void taosProcDestroyBuffer(void *pBuffer, int32_t shmid) { - if (shmid > 0) { - shmdt(pBuffer); - shmctl(shmid, IPC_RMID, NULL); - } -} - -static SProcQueue *taosProcInitQueue(int32_t size) { - if (size <= 0) size = SHM_DEFAULT_SIZE; - - int32_t bufSize = CEIL8(size); - int32_t headSize = CEIL8(sizeof(SProcQueue)); - - SProcQueue *pQueue = NULL; - int32_t shmId = taosProcInitBuffer((void **)&pQueue, bufSize + headSize); - if (shmId < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - pQueue->bufferShmid = shmId; - - if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) { - taosProcDestroyBuffer(pQueue, pQueue->bufferShmid); - return NULL; - } - +static int32_t taosProcInitSem(SProcQueue *pQueue) { if (tsem_init(&pQueue->sem, 1, 0) != 0) { - taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid); - taosProcDestroyBuffer(pQueue, pQueue->bufferShmid); + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to init sem"); + return -1; + } + + return 0; +} + +static SProcQueue *taosProcInitQueue(const char *name, bool isChild, char *ptr, int32_t size) { + int32_t bufSize = size - CEIL8(sizeof(SProcQueue)); + if (bufSize <= 1024) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) { - tsem_destroy(&pQueue->sem); - taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid); - taosProcDestroyBuffer(pQueue, pQueue->bufferShmid); - return NULL; + SProcQueue *pQueue = (SProcQueue *)(ptr); + + if (!isChild) { + if (taosProcInitMutex(pQueue) != 0) { + return NULL; + } + + if (taosProcInitSem(pQueue) != 0) { + return NULL; + } + + tstrncpy(pQueue->name, name, sizeof(pQueue->name)); + pQueue->head = 0; + pQueue->tail = 0; + pQueue->total = bufSize; + pQueue->avail = bufSize; + pQueue->items = 0; } - pQueue->head = 0; - pQueue->tail = 0; - pQueue->total = bufSize; - pQueue->avail = bufSize; - pQueue->items = 0; - pQueue->pBuffer = (char *)pQueue + headSize; return pQueue; } +#if 0 +static void taosProcDestroyMutex(SProcQueue *pQueue) { + if (pQueue->mutex != NULL) { + taosThreadMutexDestroy(pQueue->mutex); + pQueue->mutex = NULL; + } +} + +static void taosProcDestroySem(SProcQueue *pQueue) { + if (pQueue->sem != NULL) { + tsem_destroy(pQueue->sem); + pQueue->sem = NULL; + } + +} + static void taosProcCleanupQueue(SProcQueue *pQueue) { if (pQueue != NULL) { - uDebug("proc:%s, queue:%p clean up", pQueue->name, pQueue); - tsem_destroy(&pQueue->sem); - taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid); - taosProcDestroyBuffer(pQueue, pQueue->bufferShmid); + taosProcDestroyMutex(pQueue); + taosProcDestroySem(pQueue); } } +#endif static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody, int32_t rawBodyLen, ProcFuncType ftype) { @@ -209,9 +156,9 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t fullLen = headLen + bodyLen + 8; - taosThreadMutexLock(pQueue->mutex); + taosThreadMutexLock(&pQueue->mutex); if (fullLen > pQueue->avail) { - taosThreadMutexUnlock(pQueue->mutex); + taosThreadMutexUnlock(&pQueue->mutex); terrno = TSDB_CODE_OUT_OF_SHM_MEM; return -1; } @@ -260,7 +207,7 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t pQueue->avail -= fullLen; pQueue->items++; - taosThreadMutexUnlock(pQueue->mutex); + taosThreadMutexUnlock(&pQueue->mutex); tsem_post(&pQueue->sem); uTrace("proc:%s, push msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype, @@ -268,13 +215,14 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t return 0; } -static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHeadLen, void **ppBody, - int32_t *pBodyLen, ProcFuncType *pFuncType) { +static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHeadLen, void **ppBody, int32_t *pBodyLen, + ProcFuncType *pFuncType, ProcMallocFp mallocHeadFp, ProcFreeFp freeHeadFp, + ProcMallocFp mallocBodyFp, ProcFreeFp freeBodyFp) { tsem_wait(&pQueue->sem); - taosThreadMutexLock(pQueue->mutex); + taosThreadMutexLock(&pQueue->mutex); if (pQueue->total - pQueue->avail <= 0) { - taosThreadMutexUnlock(pQueue->mutex); + taosThreadMutexUnlock(&pQueue->mutex); tsem_post(&pQueue->sem); terrno = TSDB_CODE_OUT_OF_SHM_MEM; return 0; @@ -293,13 +241,13 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea bodyLen = *(int32_t *)(pQueue->pBuffer + 4); } - void *pHead = (*pQueue->mallocHeadFp)(headLen); - void *pBody = (*pQueue->mallocBodyFp)(bodyLen); + void *pHead = (*mallocHeadFp)(headLen); + void *pBody = (*mallocBodyFp)(bodyLen); if (pHead == NULL || pBody == NULL) { - taosThreadMutexUnlock(pQueue->mutex); + taosThreadMutexUnlock(&pQueue->mutex); tsem_post(&pQueue->sem); - (*pQueue->freeHeadFp)(pHead); - (*pQueue->freeBodyFp)(pBody); + (*freeHeadFp)(pHead); + (*freeBodyFp)(pBody); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -338,7 +286,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea pQueue->avail = pQueue->avail + headLen + bodyLen + 8; pQueue->items--; - taosThreadMutexUnlock(pQueue->mutex); + taosThreadMutexUnlock(&pQueue->mutex); *ppHead = pHead; *ppBody = pBody; @@ -358,65 +306,85 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { return NULL; } + int32_t cstart = 0; + int32_t csize = CEIL8(pCfg->shm.size / 2); + int32_t pstart = csize; + int32_t psize = CEIL8(pCfg->shm.size - pstart); + if (pstart + psize > pCfg->shm.size) { + psize -= 8; + } + pProc->name = pCfg->name; - pProc->pChildQueue = taosProcInitQueue(pCfg->shm.size / 2); - pProc->pParentQueue = taosProcInitQueue(pCfg->shm.size / 2); + pProc->pChildQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + cstart, csize); + pProc->pParentQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + pstart, psize); if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { taosProcCleanupQueue(pProc->pChildQueue); taosMemoryFree(pProc); return NULL; } - pProc->pChildQueue->name = pCfg->name; - pProc->pChildQueue->pParent = pCfg->pParent; - pProc->pChildQueue->mallocHeadFp = pCfg->childMallocHeadFp; - pProc->pChildQueue->freeHeadFp = pCfg->childFreeHeadFp; - pProc->pChildQueue->mallocBodyFp = pCfg->childMallocBodyFp; - pProc->pChildQueue->freeBodyFp = pCfg->childFreeBodyFp; - pProc->pChildQueue->consumeFp = pCfg->childConsumeFp; - pProc->pParentQueue->name = pCfg->name; - pProc->pParentQueue->pParent = pCfg->pParent; - pProc->pParentQueue->mallocHeadFp = pCfg->parentdMallocHeadFp; - pProc->pParentQueue->freeHeadFp = pCfg->parentFreeHeadFp; - pProc->pParentQueue->mallocBodyFp = pCfg->parentMallocBodyFp; - pProc->pParentQueue->freeBodyFp = pCfg->parentFreeBodyFp; - pProc->pParentQueue->consumeFp = pCfg->parentConsumeFp; + pProc->name = pCfg->name; + pProc->pParent = pCfg->pParent; + pProc->childMallocHeadFp = pCfg->childMallocHeadFp; + pProc->childFreeHeadFp = pCfg->childFreeHeadFp; + pProc->childMallocBodyFp = pCfg->childMallocBodyFp; + pProc->childFreeBodyFp = pCfg->childFreeBodyFp; + pProc->childConsumeFp = pCfg->childConsumeFp; + pProc->parentMallocHeadFp = pCfg->parentMallocHeadFp; + pProc->parentFreeHeadFp = pCfg->parentFreeHeadFp; + pProc->parentMallocBodyFp = pCfg->parentMallocBodyFp; + pProc->parentFreeBodyFp = pCfg->parentFreeBodyFp; + pProc->parentConsumeFp = pCfg->parentConsumeFp; + pProc->isChild = pCfg->isChild; - uDebug("proc:%s, is initialized, child queue:%p parent queue:%p", pProc->name, pProc->pChildQueue, pProc->pParentQueue); - - pProc->pid = fork(); - if (pProc->pid == 0) { - pProc->isChild = 1; - prctl(PR_SET_NAME, pProc->name, NULL, NULL, NULL); - } else { - pProc->isChild = 0; - uInfo("this is parent process, child pid:%d", pProc->pid); - } + uDebug("proc:%s, is initialized, child:%d child queue:%p parent queue:%p", pProc->name, pProc->isChild, + pProc->pChildQueue, pProc->pParentQueue); return pProc; } -static void taosProcThreadLoop(SProcQueue *pQueue) { - ProcConsumeFp consumeFp = pQueue->consumeFp; - void *pParent = pQueue->pParent; +static void taosProcThreadLoop(SProcObj *pProc) { void *pHead, *pBody; int16_t headLen; ProcFuncType ftype; int32_t bodyLen; + SProcQueue *pQueue; + ProcConsumeFp consumeFp; + ProcMallocFp mallocHeadFp; + ProcFreeFp freeHeadFp; + ProcMallocFp mallocBodyFp; + ProcFreeFp freeBodyFp; - uDebug("proc:%s, start to get msg from queue:%p", pQueue->name, pQueue); + if (pProc->isChild) { + pQueue = pProc->pChildQueue; + consumeFp = pProc->childConsumeFp; + mallocHeadFp = pProc->childMallocHeadFp; + freeHeadFp = pProc->childFreeHeadFp; + mallocBodyFp = pProc->childMallocBodyFp; + freeBodyFp = pProc->childFreeBodyFp; + } else { + pQueue = pProc->pParentQueue; + consumeFp = pProc->parentConsumeFp; + mallocHeadFp = pProc->parentMallocHeadFp; + freeHeadFp = pProc->parentFreeHeadFp; + mallocBodyFp = pProc->parentMallocBodyFp; + freeBodyFp = pProc->parentFreeBodyFp; + } + + uDebug("proc:%s, start to get msg from queue:%p", pProc->name, pQueue); while (1) { - int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype); + int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp, + mallocBodyFp, freeBodyFp); if (numOfMsgs == 0) { - uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pQueue->name, pQueue); + uInfo("proc:%s, get no msg from queue:%p and exit the proc thread", pProc->name, pQueue); break; } else if (numOfMsgs < 0) { - uTrace("proc:%s, get no msg from queue:%p since %s", pQueue->name, pQueue, terrstr()); + uTrace("proc:%s, get no msg from queue:%p since %s", pProc->name, pQueue, terrstr()); taosMsleep(1); continue; } else { - (*consumeFp)(pParent, pHead, headLen, pBody, bodyLen, ftype); + (*consumeFp)(pProc->pParent, pHead, headLen, pBody, bodyLen, ftype); } } } @@ -426,34 +394,30 @@ int32_t taosProcRun(SProcObj *pProc) { taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pProc->isChild) { - if (taosThreadCreate(&pProc->childThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pChildQueue) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to create thread since %s", terrstr()); - return -1; - } - uDebug("proc:%s, child start to consume queue:%p", pProc->name, pProc->pChildQueue); - } else { - if (taosThreadCreate(&pProc->parentThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pParentQueue) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to create thread since %s", terrstr()); - return -1; - } - uDebug("proc:%s, parent start to consume queue:%p", pProc->name, pProc->pParentQueue); + if (taosThreadCreate(&pProc->thread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to create thread since %s", terrstr()); + return -1; } + uDebug("proc:%s, start to consume queue:%p", pProc->name, pProc->pChildQueue); return 0; } -void taosProcStop(SProcObj *pProc) { - pProc->stopFlag = true; - // todo join +static void taosProcStop(SProcObj *pProc) { + if (!taosCheckPthreadValid(pProc->thread)) return; + + uDebug("proc:%s, start to join thread", pProc->name); + SProcQueue *pQueue; + if (pProc->isChild) { + pQueue = pProc->pParentQueue; + } else { + pQueue = pProc->pChildQueue; + } + tsem_post(&pQueue->sem); + taosThreadJoin(pProc->thread, NULL); } -bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; } - -int32_t taosProcChildId(SProcObj *pProc) { return pProc->pid; } - void taosProcCleanup(SProcObj *pProc) { if (pProc != NULL) { uDebug("proc:%s, clean up", pProc->name);