fix: 修复mqueue问题

【背景】
1.mqueue用例关于NFILE错误码压力测试中,不符合预期结果
2.mq_unlink对于fork出的mqueue不起效
3.已打开的mqueue,在fork后两进程共用一份mqpersonal不合理
【修改方案】
1. 确认是内核关于mqueue的fd_set定义位置不合理导致的,
将fd_set定义位置由mqarray结构体调未全局变量后,问题解决
2.不合理的unlink_ref++导致的,去除相关操作,使用mq_personal
链表判断何时需要删除
3.fork时内核复制一份mqpersonal
【影响】
对现有的产品编译不会有影响。

re #I43P4T

Signed-off-by: lanleinan <lanleinan@163.com>
Change-Id: I09f183cc3a88e5a65201dbc1fc4f4806f78971be
This commit is contained in:
lnlan 2021-08-10 06:40:14 +00:00
parent 024a8f2771
commit 26ee8b836e
3 changed files with 62 additions and 65 deletions

View File

@ -95,7 +95,6 @@ struct mqarray {
mode_s mode_data; /* mode data of mqueue */ mode_s mode_data; /* mode data of mqueue */
uid_t euid; /* euid of mqueue */ uid_t euid; /* euid of mqueue */
gid_t egid; /* egid of mqueue */ gid_t egid; /* egid of mqueue */
fd_set mq_fdset; /* mqueue sysFd bit map */
struct mqnotify mq_notify; struct mqnotify mq_notify;
LosQueueCB *mqcb; LosQueueCB *mqcb;
struct mqpersonal *mq_personal; struct mqpersonal *mq_personal;
@ -423,7 +422,7 @@ extern int mq_timedsend(mqd_t personal, const char *msg, size_t msgLen,
extern ssize_t mq_timedreceive(mqd_t personal, char *msg, size_t msgLen, extern ssize_t mq_timedreceive(mqd_t personal, char *msg, size_t msgLen,
unsigned int *msgPrio, const struct timespec *absTimeout); unsigned int *msgPrio, const struct timespec *absTimeout);
extern void mqueue_refer(int sysFd); extern void MqueueRefer(int sysFd);
extern int OsMqNotify(mqd_t personal, const struct sigevent *sigev); extern int OsMqNotify(mqd_t personal, const struct sigevent *sigev);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -49,6 +49,7 @@
#endif #endif
/* GLOBALS */ /* GLOBALS */
STATIC fd_set g_queueFdSet;
STATIC struct mqarray g_queueTable[LOSCFG_BASE_IPC_QUEUE_LIMIT]; STATIC struct mqarray g_queueTable[LOSCFG_BASE_IPC_QUEUE_LIMIT];
STATIC pthread_mutex_t g_mqueueMutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; STATIC pthread_mutex_t g_mqueueMutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
STATIC struct mqpersonal *g_mqPrivBuf[MAX_MQ_FD]; STATIC struct mqpersonal *g_mqPrivBuf[MAX_MQ_FD];
@ -247,6 +248,44 @@ ERROUT:
return (struct mqpersonal *)-1; return (struct mqpersonal *)-1;
} }
STATIC INT32 DoMqueueClose(struct mqpersonal *privateMqPersonal)
{
struct mqarray *mqueueCB = NULL;
struct mqpersonal *tmp = NULL;
mqueueCB = privateMqPersonal->mq_posixdes;
if (mqueueCB == NULL || mqueueCB->mq_personal == NULL) {
errno = EBADF;
return LOS_NOK;
}
/* find the personal and remove */
if (mqueueCB->mq_personal == privateMqPersonal) {
mqueueCB->mq_personal = privateMqPersonal->mq_next;
} else {
for (tmp = mqueueCB->mq_personal; tmp->mq_next != NULL; tmp = tmp->mq_next) {
if (tmp->mq_next == privateMqPersonal) {
break;
}
}
if (tmp->mq_next == NULL) {
errno = EBADF;
return LOS_NOK;
}
tmp->mq_next = privateMqPersonal->mq_next;
}
/* flag no use */
privateMqPersonal->mq_status = 0;
/* free the personal */
(VOID)LOS_MemFree(OS_SYS_MEM_ADDR, privateMqPersonal);
if ((mqueueCB->unlinkflag == TRUE) && (mqueueCB->mq_personal == NULL)) {
return DoMqueueDelete(mqueueCB);
}
return LOS_OK;
}
/* Translate a sysFd into privateMqPersonal */ /* Translate a sysFd into privateMqPersonal */
STATIC struct mqpersonal *MqGetPrivDataBuff(mqd_t personal) STATIC struct mqpersonal *MqGetPrivDataBuff(mqd_t personal)
{ {
@ -272,29 +311,24 @@ STATIC struct mqpersonal *MqGetPrivDataBuff(mqd_t personal)
STATIC INT32 MqAllocSysFd(int maxfdp, struct mqpersonal *privateMqPersonal) STATIC INT32 MqAllocSysFd(int maxfdp, struct mqpersonal *privateMqPersonal)
{ {
INT32 i; INT32 i;
struct mqarray *mqueueCB = privateMqPersonal->mq_posixdes; fd_set *fdset = &g_queueFdSet;
fd_set *fdset = &mqueueCB->mq_fdset;
for (i = 0; i < maxfdp; i++) { for (i = 0; i < maxfdp; i++) {
/* sysFd: used bit setting, and get the index of swtmrID buffer */ /* sysFd: used bit setting, and get the index of swtmrID buffer */
if (!(fdset && FD_ISSET(i + MQUEUE_FD_OFFSET, fdset))) { if (fdset && !(FD_ISSET(i + MQUEUE_FD_OFFSET, fdset))) {
FD_SET(i + MQUEUE_FD_OFFSET, fdset); FD_SET(i + MQUEUE_FD_OFFSET, fdset);
if (!g_mqPrivBuf[i]) { if (!g_mqPrivBuf[i]) {
g_mqPrivBuf[i] = mqueueCB->mq_personal; g_mqPrivBuf[i] = privateMqPersonal;
return i + MQUEUE_FD_OFFSET; return i + MQUEUE_FD_OFFSET;
} }
} }
} }
/* there are no more mq sysFd to use, free the personal */
LOS_MemFree(OS_SYS_MEM_ADDR, privateMqPersonal);
privateMqPersonal = NULL;
mqueueCB->mq_personal = NULL;
return -1; return -1;
} }
STATIC VOID MqFreeSysFd(struct mqarray *mqueueCB, mqd_t personal) STATIC VOID MqFreeSysFd(mqd_t personal)
{ {
INT32 sysFd = (INT32)personal; INT32 sysFd = (INT32)personal;
fd_set *fdset = &mqueueCB->mq_fdset; fd_set *fdset = &g_queueFdSet;
if (fdset && FD_ISSET(sysFd, fdset)) { if (fdset && FD_ISSET(sysFd, fdset)) {
FD_CLR(sysFd, fdset); FD_CLR(sysFd, fdset);
g_mqPrivBuf[sysFd - MQUEUE_FD_OFFSET] = NULL; g_mqPrivBuf[sysFd - MQUEUE_FD_OFFSET] = NULL;
@ -302,7 +336,7 @@ STATIC VOID MqFreeSysFd(struct mqarray *mqueueCB, mqd_t personal)
} }
/* Mqueue fd reference count */ /* Mqueue fd reference count */
void mqueue_refer(int sysFd) void MqueueRefer(int sysFd)
{ {
struct mqarray *mqueueCB = NULL; struct mqarray *mqueueCB = NULL;
struct mqpersonal *privateMqPersonal = NULL; struct mqpersonal *privateMqPersonal = NULL;
@ -317,10 +351,11 @@ void mqueue_refer(int sysFd)
if (mqueueCB == NULL) { if (mqueueCB == NULL) {
goto OUT_UNLOCK; goto OUT_UNLOCK;
} }
privateMqPersonal->mq_refcount++; privateMqPersonal->mq_refcount++;
mqueueCB->unlink_ref++;
OUT_UNLOCK: OUT_UNLOCK:
(VOID)pthread_mutex_unlock(&g_mqueueMutex); (VOID)pthread_mutex_unlock(&g_mqueueMutex);
return;
} }
STATIC INT32 MqTryClose(struct mqpersonal *privateMqPersonal) STATIC INT32 MqTryClose(struct mqpersonal *privateMqPersonal)
@ -339,15 +374,6 @@ STATIC INT32 MqTryClose(struct mqpersonal *privateMqPersonal)
return FALSE; return FALSE;
} }
STATIC INT32 MqTryUnlink(struct mqarray *mqueueCB)
{
if (mqueueCB->unlink_ref == 0) {
return TRUE;
}
mqueueCB->unlink_ref--;
return FALSE;
}
/* Set the mode data bit,for consumer's mode comparing. */ /* Set the mode data bit,for consumer's mode comparing. */
STATIC INT32 MqueueModeAnalysisSet(struct mqpersonal *privateMqPersonal) STATIC INT32 MqueueModeAnalysisSet(struct mqpersonal *privateMqPersonal)
{ {
@ -494,6 +520,9 @@ mqd_t mq_open(const char *mqName, int openFlag, ...)
} }
/* Set mode data bit ,just for the first node */ /* Set mode data bit ,just for the first node */
if (MqueueModeAnalysisSet(privateMqPersonal)) { if (MqueueModeAnalysisSet(privateMqPersonal)) {
if ((INT32)(UINTPTR)privateMqPersonal > 0) {
(VOID)DoMqueueClose(privateMqPersonal);
}
goto OUT; goto OUT;
} }
} else { } else {
@ -502,15 +531,18 @@ mqd_t mq_open(const char *mqName, int openFlag, ...)
} }
privateMqPersonal = DoMqueueOpen(mqueueCB, openFlag); privateMqPersonal = DoMqueueOpen(mqueueCB, openFlag);
} }
OUT:
if ((INT32)(UINTPTR)privateMqPersonal > 0) { if ((INT32)(UINTPTR)privateMqPersonal > 0) {
/* alloc sysFd */ /* alloc sysFd */
sysFd = MqAllocSysFd(MAX_MQ_FD, privateMqPersonal); sysFd = MqAllocSysFd(MAX_MQ_FD, privateMqPersonal);
if (sysFd == -1) { if (sysFd == -1) {
/* there are no more mq sysFd to use, close the personal */
(VOID)DoMqueueClose(privateMqPersonal);
errno = ENFILE; errno = ENFILE;
} }
mqFd = (mqd_t)sysFd; mqFd = (mqd_t)sysFd;
} }
OUT:
(VOID)pthread_mutex_unlock(&g_mqueueMutex); (VOID)pthread_mutex_unlock(&g_mqueueMutex);
return mqFd; return mqFd;
} }
@ -518,9 +550,7 @@ OUT:
int mq_close(mqd_t personal) int mq_close(mqd_t personal)
{ {
INT32 ret = -1; INT32 ret = -1;
struct mqarray *mqueueCB = NULL;
struct mqpersonal *privateMqPersonal = NULL; struct mqpersonal *privateMqPersonal = NULL;
struct mqpersonal *tmp = NULL;
(VOID)pthread_mutex_lock(&g_mqueueMutex); (VOID)pthread_mutex_lock(&g_mqueueMutex);
@ -534,47 +564,18 @@ int mq_close(mqd_t personal)
errno = EBADF; errno = EBADF;
goto OUT_UNLOCK; goto OUT_UNLOCK;
} }
/* there have other thread used the fd */
if (!MqTryClose(privateMqPersonal)) { if (!MqTryClose(privateMqPersonal)) {
ret = 0; ret = 0;
goto OUT_UNLOCK; goto OUT_UNLOCK;
} }
mqueueCB = privateMqPersonal->mq_posixdes;
if (mqueueCB->mq_personal == NULL) { ret = DoMqueueClose(privateMqPersonal);
errno = EBADF; if (ret < 0) {
goto OUT_UNLOCK; goto OUT_UNLOCK;
} }
MqFreeSysFd(personal);
/* find the personal and remove */
if (mqueueCB->mq_personal == privateMqPersonal) {
mqueueCB->mq_personal = privateMqPersonal->mq_next;
} else {
for (tmp = mqueueCB->mq_personal; tmp->mq_next != NULL; tmp = tmp->mq_next) {
if (tmp->mq_next == privateMqPersonal) {
break;
}
}
if (tmp->mq_next == NULL) {
errno = EBADF;
goto OUT_UNLOCK;
}
tmp->mq_next = privateMqPersonal->mq_next;
}
/* flag no use */
privateMqPersonal->mq_status = 0;
MqFreeSysFd(mqueueCB, personal);
/* free the personal */
ret = LOS_MemFree(OS_SYS_MEM_ADDR, privateMqPersonal);
if (ret != LOS_OK) {
errno = EFAULT;
ret = -1;
goto OUT_UNLOCK;
}
if ((mqueueCB->unlinkflag == TRUE) && (mqueueCB->mq_personal == NULL)) {
ret = DoMqueueDelete(mqueueCB);
}
OUT_UNLOCK: OUT_UNLOCK:
(VOID)pthread_mutex_unlock(&g_mqueueMutex); (VOID)pthread_mutex_unlock(&g_mqueueMutex);
return ret; return ret;
@ -667,10 +668,7 @@ int mq_unlink(const char *mqName)
errno = ENOENT; errno = ENOENT;
goto ERROUT_UNLOCK; goto ERROUT_UNLOCK;
} }
if (!MqTryUnlink(mqueueCB)) {
(VOID)pthread_mutex_unlock(&g_mqueueMutex);
return 0;
}
if (mqueueCB->mq_personal != NULL) { if (mqueueCB->mq_personal != NULL) {
mqueueCB->unlinkflag = TRUE; mqueueCB->unlinkflag = TRUE;
} else if (mqueueCB->unlink_ref == 0) { } else if (mqueueCB->unlink_ref == 0) {

View File

@ -317,7 +317,7 @@ static void FdRefer(int sysFd)
#endif #endif
#if defined(LOSCFG_COMPAT_POSIX) #if defined(LOSCFG_COMPAT_POSIX)
if ((sysFd >= MQUEUE_FD_OFFSET) && (sysFd < (MQUEUE_FD_OFFSET + CONFIG_NQUEUE_DESCRIPTORS))) { if ((sysFd >= MQUEUE_FD_OFFSET) && (sysFd < (MQUEUE_FD_OFFSET + CONFIG_NQUEUE_DESCRIPTORS))) {
mqueue_refer(sysFd); MqueueRefer(sysFd);
} }
#endif #endif
} }