shm
This commit is contained in:
parent
1e7cbee2b3
commit
3e30f72068
|
@ -75,6 +75,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x010B)
|
||||
#define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x010C)
|
||||
#define TSDB_CODE_INVALID_CFG TAOS_DEF_ERROR_CODE(0, 0x010D)
|
||||
#define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x010E)
|
||||
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0110)
|
||||
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0111)
|
||||
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0112)
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_UTIL_PROCESS_H_
|
||||
#define _TD_UTIL_PROCESS_H_
|
||||
|
||||
#include "os.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct {
|
||||
int32_t contLen;
|
||||
char pCont[];
|
||||
} SBlockItem;
|
||||
|
||||
typedef void *(*ProcFp)(void *parent, SBlockItem *pItem);
|
||||
|
||||
typedef struct SProcQueue SProcQueue;
|
||||
|
||||
typedef struct {
|
||||
int32_t childQueueSize;
|
||||
int32_t parentQueueSize;
|
||||
ProcFp childFp;
|
||||
ProcFp parentFp;
|
||||
} SProcCfg;
|
||||
|
||||
typedef struct {
|
||||
int32_t pid;
|
||||
SProcCfg cfg;
|
||||
SProcQueue *pChildQueue;
|
||||
SProcQueue *pParentQueue;
|
||||
pthread_t childThread;
|
||||
pthread_t parentThread;
|
||||
void *pParent;
|
||||
bool stopFlag;
|
||||
bool testFlag;
|
||||
} SProcObj;
|
||||
|
||||
SProcObj *taosProcInit(const SProcCfg *pCfg);
|
||||
int32_t taosProcStart(SProcObj *pProc);
|
||||
void taosProcStop(SProcObj *pProc);
|
||||
void taosProcCleanup(SProcObj *pProc);
|
||||
int32_t taosProcPushChild(SProcObj *pProc, void *pCont, int32_t contLen);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_UTIL_PROCESS_H_*/
|
|
@ -26,6 +26,8 @@ pthread_t* taosCreateThread(void* (*__start_routine)(void*), void* param);
|
|||
bool taosDestoryThread(pthread_t* pthread);
|
||||
bool taosThreadRunning(pthread_t* pthread);
|
||||
|
||||
typedef void *(*ThreadFp)(void *param);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -74,6 +74,10 @@ typedef struct {
|
|||
int8_t replica;
|
||||
int8_t selfIndex;
|
||||
SReplica replicas[TSDB_MAX_REPLICA];
|
||||
|
||||
//
|
||||
bool multiProcess;
|
||||
SProcObj *pProcess;
|
||||
} SMnodeMgmt;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -47,6 +47,8 @@ extern "C" {
|
|||
#include "vnode.h"
|
||||
#include "tfs.h"
|
||||
|
||||
#include "tprocess.h"
|
||||
|
||||
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
||||
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
|
||||
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
|
||||
|
|
|
@ -364,6 +364,41 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void dndMnodeProcessChildQueue(SDnode *pDnode, SBlockItem *pBlock) {
|
||||
SRpcMsg *pMsg = (SRpcMsg*)pBlock->pCont;
|
||||
dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg);
|
||||
free(pBlock);
|
||||
}
|
||||
|
||||
static void dndMnodeProcessParentQueue(SMnodeMgmt *pMgmt, SBlockItem *pItem) {
|
||||
|
||||
}
|
||||
|
||||
static int32_t dndMnodeOpen(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||
pMgmt->multiProcess = true;
|
||||
|
||||
int32_t code = dndOpenMnode(pDnode, pOption);
|
||||
|
||||
if (code == 0 && pMgmt->multiProcess) {
|
||||
SProcCfg cfg = {0};
|
||||
cfg.childFp = (ProcFp)dndMnodeProcessChildQueue;
|
||||
cfg.parentFp = (ProcFp)dndMnodeProcessParentQueue;
|
||||
cfg.childQueueSize = 1024 * 1024;
|
||||
cfg.parentQueueSize = 1024 * 1024;
|
||||
|
||||
pMgmt->pProcess = taosProcInit(&cfg);
|
||||
if (pMgmt->pProcess == NULL) {
|
||||
return -1;
|
||||
}
|
||||
pMgmt->pProcess->pParent = pDnode;
|
||||
pMgmt->pProcess->testFlag = true;
|
||||
return taosProcStart(pMgmt->pProcess);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||
|
||||
|
@ -557,16 +592,23 @@ static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpc
|
|||
}
|
||||
}
|
||||
|
||||
static void dndMnodeWriteToChildQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
taosProcPushChild(pMgmt->pProcess, pMsg, sizeof(SRpcMsg));
|
||||
}
|
||||
|
||||
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg);
|
||||
dndMnodeWriteToChildQueue(&pDnode->mmgmt, pMsg);
|
||||
// dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg);
|
||||
}
|
||||
|
||||
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg);
|
||||
dndMnodeWriteToChildQueue(&pDnode->mmgmt, pMsg);
|
||||
// dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg);
|
||||
}
|
||||
|
||||
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg);
|
||||
dndMnodeWriteToChildQueue(&pDnode->mmgmt, pMsg);
|
||||
// dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg);
|
||||
}
|
||||
|
||||
int32_t dndInitMnode(SDnode *pDnode) {
|
||||
|
@ -594,12 +636,12 @@ int32_t dndInitMnode(SDnode *pDnode) {
|
|||
dInfo("start to deploy mnode");
|
||||
SMnodeOpt option = {0};
|
||||
dndBuildMnodeDeployOption(pDnode, &option);
|
||||
return dndOpenMnode(pDnode, &option);
|
||||
return dndMnodeOpen(pDnode, &option);
|
||||
} else {
|
||||
dInfo("start to open mnode");
|
||||
SMnodeOpt option = {0};
|
||||
dndBuildMnodeOpenOption(pDnode, &option);
|
||||
return dndOpenMnode(pDnode, &option);
|
||||
return dndMnodeOpen(pDnode, &option);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -84,6 +84,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PARA, "Invalid parameters")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG, "Invalid config option")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_SHM_MEM, "Out of Share memory")
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
|
||||
|
|
|
@ -0,0 +1,266 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "tprocess.h"
|
||||
#include "taoserror.h"
|
||||
#include "tlog.h"
|
||||
|
||||
#define SHM_DEFAULT_SIZE (20 * 1024 * 1024)
|
||||
#define CEIL4(n) (ceil((float)(n) / 4) * 4)
|
||||
|
||||
typedef struct SProcQueue {
|
||||
int32_t head;
|
||||
int32_t tail;
|
||||
int32_t total;
|
||||
int32_t avail;
|
||||
int32_t items;
|
||||
char *pBuffer;
|
||||
tsem_t sem;
|
||||
pthread_mutex_t mutex;
|
||||
} SProcQueue;
|
||||
|
||||
static SProcQueue *taosProcQueueInit(int32_t size) {
|
||||
int32_t bufSize = CEIL4(size);
|
||||
int32_t headSize = CEIL4(sizeof(SProcQueue));
|
||||
|
||||
SProcQueue *pQueue = malloc(bufSize + headSize);
|
||||
if (pQueue == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pQueue->total = bufSize;
|
||||
pQueue->avail = bufSize;
|
||||
pQueue->head = 0;
|
||||
pQueue->tail = 0;
|
||||
pQueue->items = 0;
|
||||
pQueue->pBuffer = (char *)pQueue + headSize;
|
||||
|
||||
if (pthread_mutex_init(&pQueue->mutex, NULL) != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tsem_init(&pQueue->sem, 0, 0);
|
||||
return pQueue;
|
||||
}
|
||||
|
||||
static void taosProcQueueCleanup(SProcQueue *pQueue) {
|
||||
pthread_mutex_destroy(&pQueue->mutex);
|
||||
tsem_destroy(&pQueue->sem);
|
||||
free(pQueue);
|
||||
}
|
||||
|
||||
static int32_t taosProcQueuePush(SProcQueue *pQueue, void *pItem, int32_t itemLen) {
|
||||
char *pHead = NULL;
|
||||
char *pBody1 = NULL;
|
||||
char *pBody2 = NULL;
|
||||
int32_t body1Len = 0;
|
||||
int32_t body2Len = 0;
|
||||
int32_t fullLen = CEIL4(itemLen) + 4;
|
||||
|
||||
pthread_mutex_lock(&pQueue->mutex);
|
||||
if (fullLen > pQueue->avail) {
|
||||
pthread_mutex_unlock(&pQueue->mutex);
|
||||
terrno = TSDB_CODE_OUT_OF_SHM_MEM;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pQueue->tail < pQueue->head) {
|
||||
pHead = pQueue->pBuffer + pQueue->tail;
|
||||
pBody1 = pQueue->pBuffer + pQueue->tail + 4;
|
||||
body1Len = itemLen;
|
||||
pQueue->tail += fullLen;
|
||||
} else {
|
||||
int32_t remain = pQueue->total - pQueue->tail;
|
||||
if (remain >= fullLen) {
|
||||
pHead = pQueue->pBuffer + pQueue->tail;
|
||||
pBody1 = pQueue->pBuffer + pQueue->tail + 4;
|
||||
body1Len = itemLen;
|
||||
pQueue->tail += fullLen;
|
||||
} else {
|
||||
if (remain == 0) {
|
||||
pHead = pQueue->pBuffer;
|
||||
pBody1 = pQueue->pBuffer + 4;
|
||||
body1Len = itemLen;
|
||||
pQueue->tail = fullLen;
|
||||
} else if (remain == 4) {
|
||||
pHead = pQueue->pBuffer + pQueue->tail;
|
||||
pBody1 = pQueue->pBuffer;
|
||||
body1Len = itemLen;
|
||||
pQueue->tail = fullLen - 4;
|
||||
} else {
|
||||
pHead = pQueue->pBuffer + pQueue->tail;
|
||||
pBody1 = pQueue->pBuffer + pQueue->tail + 4;
|
||||
body1Len = remain - 4;
|
||||
pBody2 = pQueue->pBuffer;
|
||||
body2Len = itemLen - body1Len;
|
||||
pQueue->tail = fullLen - body1Len;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
*(int32_t *)(pHead) = fullLen;
|
||||
memcpy(pBody1, pItem, body1Len);
|
||||
if (pBody2 && body2Len != 0) {
|
||||
memcpy(pBody1, pItem + body1Len, body2Len);
|
||||
}
|
||||
|
||||
pQueue->avail -= fullLen;
|
||||
pQueue->items++;
|
||||
|
||||
pthread_mutex_unlock(&pQueue->mutex);
|
||||
tsem_post(&pQueue->sem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t taosProcQueuePop(SProcQueue *pQueue, SBlockItem **ppItem) {
|
||||
tsem_wait(&pQueue->sem);
|
||||
|
||||
pthread_mutex_lock(&pQueue->mutex);
|
||||
if (pQueue->total - pQueue->avail <= 0) {
|
||||
pthread_mutex_unlock(&pQueue->mutex);
|
||||
tsem_post(&pQueue->sem);
|
||||
terrno = TSDB_CODE_OUT_OF_SHM_MEM;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SBlockItem *pBlock = (SBlockItem *)(pQueue->pBuffer + pQueue->head);
|
||||
|
||||
SBlockItem *pItem = malloc(pBlock->contLen);
|
||||
if (pItem == NULL) {
|
||||
pthread_mutex_unlock(&pQueue->mutex);
|
||||
tsem_post(&pQueue->sem);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pQueue->head < pQueue->tail) {
|
||||
memcpy(pItem, pQueue->pBuffer + pQueue->head, pBlock->contLen);
|
||||
pQueue->head += pBlock->contLen;
|
||||
} else {
|
||||
int32_t remain = pQueue->total - pQueue->head;
|
||||
if (remain >= pBlock->contLen) {
|
||||
memcpy(pItem, pQueue->pBuffer + pQueue->head, pBlock->contLen);
|
||||
pQueue->head += pBlock->contLen;
|
||||
} else {
|
||||
memcpy(pItem, pQueue->pBuffer + pQueue->head, remain);
|
||||
memcpy(pItem + remain, pQueue->pBuffer, pBlock->contLen - remain);
|
||||
pQueue->head = pBlock->contLen - remain;
|
||||
}
|
||||
}
|
||||
|
||||
pQueue->avail += pBlock->contLen;
|
||||
pQueue->items--;
|
||||
|
||||
pItem->contLen = pBlock->contLen - 4;
|
||||
*ppItem = pItem;
|
||||
pthread_mutex_unlock(&pQueue->mutex);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
SProcObj *taosProcInit(const SProcCfg *pCfg) {
|
||||
SProcObj *pProc = calloc(1, sizeof(SProcObj));
|
||||
if (pProc == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pProc->cfg = *pCfg;
|
||||
|
||||
if (pProc->cfg.childQueueSize <= 0) {
|
||||
pProc->cfg.childQueueSize = SHM_DEFAULT_SIZE;
|
||||
}
|
||||
|
||||
if (pProc->cfg.parentQueueSize <= 0) {
|
||||
pProc->cfg.parentQueueSize = SHM_DEFAULT_SIZE;
|
||||
}
|
||||
|
||||
pProc->pChildQueue = taosProcQueueInit(pProc->cfg.childQueueSize);
|
||||
pProc->pParentQueue = taosProcQueueInit(pProc->cfg.parentQueueSize);
|
||||
|
||||
return pProc;
|
||||
}
|
||||
|
||||
static bool taosProcIsChild(SProcObj *pProc) { return pProc->pid == 0; }
|
||||
|
||||
static void taosProcThreadLoop(SProcQueue *pQueue, ProcFp procFp, void *pParent) {
|
||||
SBlockItem *pItem = NULL;
|
||||
|
||||
while (1) {
|
||||
int32_t code = taosProcQueuePop(pQueue, &pItem);
|
||||
if (code < 0) {
|
||||
uDebug("queue:%p, got no message and exiting", pQueue);
|
||||
break;
|
||||
} else if (code < 0) {
|
||||
uTrace("queue:%p, got no message since %s", pQueue, terrstr());
|
||||
taosMsleep(1);
|
||||
continue;
|
||||
} else {
|
||||
(*procFp)(pParent, pItem);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void *taosProcThreadChildLoop(void *param) {
|
||||
SProcObj *pProc = param;
|
||||
taosProcThreadLoop(pProc->pChildQueue, pProc->cfg.childFp, pProc->pParent);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void *taosProcThreadParentLoop(void *param) {
|
||||
SProcObj *pProc = param;
|
||||
taosProcThreadLoop(pProc->pParentQueue, pProc->cfg.parentFp, pProc->pParent);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t taosProcStart(SProcObj *pProc) {
|
||||
pthread_attr_t thAttr = {0};
|
||||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
bool isChild = taosProcIsChild(pProc);
|
||||
if (isChild || !pProc->testFlag) {
|
||||
if (pthread_create(&pProc->childThread, &thAttr, taosProcThreadChildLoop, pProc) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
uError("failed to create thread since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (!isChild || !pProc->testFlag) {
|
||||
if (pthread_create(&pProc->parentThread, &thAttr, taosProcThreadParentLoop, pProc) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
uError("failed to create thread since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void taosProcStop(SProcObj *pProc) {
|
||||
pProc->stopFlag = true;
|
||||
// todo join
|
||||
}
|
||||
|
||||
void taosProcCleanup(SProcObj *pProc) {}
|
||||
|
||||
int32_t taosProcPushChild(SProcObj *pProc, void *pCont, int32_t contLen) {
|
||||
SProcQueue *pQueue = pProc->pChildQueue;
|
||||
taosProcQueuePush(pQueue, pCont, contLen);
|
||||
}
|
|
@ -14,6 +14,9 @@
|
|||
#include "os.h"
|
||||
#include "tqueue.h"
|
||||
|
||||
#include <sys/shm.h>
|
||||
#include <sys/wait.h>
|
||||
|
||||
class UtilTestQueue : public ::testing::Test {
|
||||
public:
|
||||
void SetUp() override {}
|
||||
|
@ -24,6 +27,104 @@ class UtilTestQueue : public ::testing::Test {
|
|||
static void TearDownTestSuite() {}
|
||||
};
|
||||
|
||||
TEST_F(UtilTestQueue, 01_ReadQitemFromQsetByThread) {
|
||||
EXPECT_EQ(0, 0);
|
||||
TEST_F(UtilTestQueue, 01_fork) {
|
||||
pid_t pid;
|
||||
int shmid;
|
||||
int* shmptr;
|
||||
int* tmp;
|
||||
|
||||
int err;
|
||||
pthread_mutexattr_t mattr;
|
||||
if ((err = pthread_mutexattr_init(&mattr)) < 0) {
|
||||
printf("mutex addr init error:%s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if ((err = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED)) < 0) {
|
||||
printf("mutex addr get shared error:%s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
pthread_mutex_t* m;
|
||||
int mid = shmget(IPC_PRIVATE, sizeof(pthread_mutex_t), 0600);
|
||||
m = (pthread_mutex_t*)shmat(mid, NULL, 0);
|
||||
|
||||
if ((err = pthread_mutex_init(m, &mattr)) < 0) {
|
||||
printf("mutex mutex init error:%s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if ((shmid = shmget(IPC_PRIVATE, 1000, IPC_CREAT | 0600)) < 0) {
|
||||
perror("shmget error");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if ((shmptr = (int*)shmat(shmid, 0, 0)) == (void*)-1) {
|
||||
perror("shmat error");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
tmp = shmptr;
|
||||
|
||||
int shmid2;
|
||||
int** shmptr2;
|
||||
if ((shmid2 = shmget(IPC_PRIVATE, 20, IPC_CREAT | 0600)) < 0) {
|
||||
perror("shmget2 error");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if ((shmptr2 = (int**)shmat(shmid2, 0, 0)) == (void*)-1) {
|
||||
perror("shmat2 error");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
*shmptr2 = shmptr;
|
||||
|
||||
if ((pid = fork()) < 0) {
|
||||
perror("fork error");
|
||||
exit(1);
|
||||
} else if (pid == 0) {
|
||||
if ((err = pthread_mutex_lock(m)) < 0) {
|
||||
printf("lock error:%s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
for (int i = 0; i < 30; ++i) {
|
||||
**shmptr2 = i;
|
||||
(*shmptr2)++;
|
||||
}
|
||||
|
||||
if ((err = pthread_mutex_unlock(m)) < 0) {
|
||||
printf("unlock error:%s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
exit(0);
|
||||
|
||||
} else {
|
||||
if ((err = pthread_mutex_lock(m)) < 0) {
|
||||
printf("lock error:%s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
for (int i = 10; i < 42; ++i) {
|
||||
**shmptr2 = i;
|
||||
(*shmptr2)++;
|
||||
}
|
||||
if ((err = pthread_mutex_unlock(m)) < 0) {
|
||||
printf("unlock error:%s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
wait(NULL);
|
||||
|
||||
for (int i = 0; i < 70; ++i) {
|
||||
printf("%d ", tmp[i]);
|
||||
}
|
||||
|
||||
printf("\n");
|
||||
|
||||
pthread_mutexattr_destroy(&mattr);
|
||||
//销毁mutex
|
||||
pthread_mutex_destroy(m);
|
||||
|
||||
exit(0);
|
||||
}
|
Loading…
Reference in New Issue