shm
This commit is contained in:
parent
474939ee49
commit
d7e13ef739
|
@ -51,6 +51,7 @@ void taosProcCleanup(SProcObj *pProc);
|
||||||
int32_t taosProcRun(SProcObj *pProc);
|
int32_t taosProcRun(SProcObj *pProc);
|
||||||
void taosProcStop(SProcObj *pProc);
|
void taosProcStop(SProcObj *pProc);
|
||||||
bool taosProcIsChild(SProcObj *pProc);
|
bool taosProcIsChild(SProcObj *pProc);
|
||||||
|
int32_t taosProcChildId(SProcObj *pProc);
|
||||||
|
|
||||||
int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
|
int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
|
||||||
int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
|
int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
|
||||||
|
|
|
@ -20,7 +20,7 @@ static void dndResetLog(SMgmtWrapper *pMgmt) {
|
||||||
char logname[24] = {0};
|
char logname[24] = {0};
|
||||||
snprintf(logname, sizeof(logname), "%slog", pMgmt->name);
|
snprintf(logname, sizeof(logname), "%slog", pMgmt->name);
|
||||||
|
|
||||||
dInfo("node:%s, reset log to %s", pMgmt->name, logname);
|
dInfo("node:%s, reset log to %s in child process", pMgmt->name, logname);
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
taosInitLog(logname, 1);
|
taosInitLog(logname, 1);
|
||||||
}
|
}
|
||||||
|
@ -51,6 +51,7 @@ int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
|
|
||||||
void dndCloseNode(SMgmtWrapper *pWrapper) {
|
void dndCloseNode(SMgmtWrapper *pWrapper) {
|
||||||
dDebug("node:%s, start to close", pWrapper->name);
|
dDebug("node:%s, start to close", pWrapper->name);
|
||||||
|
pWrapper->required = false;
|
||||||
taosWLockLatch(&pWrapper->latch);
|
taosWLockLatch(&pWrapper->latch);
|
||||||
if (pWrapper->deployed) {
|
if (pWrapper->deployed) {
|
||||||
(*pWrapper->fp.closeFp)(pWrapper);
|
(*pWrapper->fp.closeFp)(pWrapper);
|
||||||
|
@ -199,7 +200,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
|
||||||
dInfo("node:%s, will be initialized in child process", pWrapper->name);
|
dInfo("node:%s, will be initialized in child process", pWrapper->name);
|
||||||
dndOpenNode(pWrapper);
|
dndOpenNode(pWrapper);
|
||||||
} else {
|
} else {
|
||||||
dInfo("node:%s, will not start in parent process", pWrapper->name);
|
dInfo("node:%s, will not start in parent process, child pid:%d", pWrapper->name, taosProcChildId(pProc));
|
||||||
pWrapper->procType = PROC_PARENT;
|
pWrapper->procType = PROC_PARENT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,16 +210,20 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
dndSetStatus(pDnode, DND_STAT_RUNNING);
|
||||||
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
|
|
||||||
if (pWrapper->procType == PROC_PARENT && dmStart(pWrapper->pMgmt) != 0) {
|
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
||||||
dndReleaseWrapper(pWrapper);
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
dError("failed to start dnode worker since %s", terrstr());
|
if (!pWrapper->required) continue;
|
||||||
|
if (pWrapper->fp.startFp == NULL) continue;
|
||||||
|
if (pWrapper->procType == PROC_PARENT && n != DNODE) continue;
|
||||||
|
if (pWrapper->procType == PROC_CHILD && n == DNODE) continue;
|
||||||
|
if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
|
||||||
|
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
dndReleaseWrapper(pWrapper);
|
|
||||||
#endif
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -175,7 +175,7 @@ int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
taosRLockLatch(&pWrapper->latch);
|
taosRLockLatch(&pWrapper->latch);
|
||||||
if (pWrapper->deployed) {
|
if (pWrapper->deployed || (pWrapper->procType == PROC_PARENT && pWrapper->required)) {
|
||||||
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
||||||
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
|
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -208,6 +208,11 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
|
||||||
const int32_t bodyLen = CEIL8(rawBodyLen);
|
const int32_t bodyLen = CEIL8(rawBodyLen);
|
||||||
const int32_t fullLen = headLen + bodyLen + 8;
|
const int32_t fullLen = headLen + bodyLen + 8;
|
||||||
|
|
||||||
|
if (headLen <= 0 || bodyLen <= 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(pQueue->mutex);
|
taosThreadMutexLock(pQueue->mutex);
|
||||||
if (fullLen > pQueue->avail) {
|
if (fullLen > pQueue->avail) {
|
||||||
taosThreadMutexUnlock(pQueue->mutex);
|
taosThreadMutexUnlock(pQueue->mutex);
|
||||||
|
@ -259,7 +264,7 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
|
||||||
taosThreadMutexUnlock(pQueue->mutex);
|
taosThreadMutexUnlock(pQueue->mutex);
|
||||||
tsem_post(&pQueue->sem);
|
tsem_post(&pQueue->sem);
|
||||||
|
|
||||||
uTrace("proc:%s, push msg:%p:%d cont:%p:%d to queue:%p", pQueue->name, pHead, rawHeadLen, pBody, rawBodyLen, pQueue);
|
uTrace("proc:%s, push msg:%p:%d cont:%p:%d to queue:%p", pQueue->name, pHead, headLen, pBody, bodyLen, pQueue);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -376,7 +381,6 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
|
||||||
pProc->pid = fork();
|
pProc->pid = fork();
|
||||||
if (pProc->pid == 0) {
|
if (pProc->pid == 0) {
|
||||||
pProc->isChild = 1;
|
pProc->isChild = 1;
|
||||||
uInfo("this is child process, pid:%d", pProc->pid);
|
|
||||||
} else {
|
} else {
|
||||||
pProc->isChild = 0;
|
pProc->isChild = 0;
|
||||||
uInfo("this is parent process, child pid:%d", pProc->pid);
|
uInfo("this is parent process, child pid:%d", pProc->pid);
|
||||||
|
@ -439,6 +443,8 @@ void taosProcStop(SProcObj *pProc) {
|
||||||
|
|
||||||
bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; }
|
bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; }
|
||||||
|
|
||||||
|
int32_t taosProcChildId(SProcObj *pProc) { return pProc->pid; }
|
||||||
|
|
||||||
void taosProcCleanup(SProcObj *pProc) {
|
void taosProcCleanup(SProcObj *pProc) {
|
||||||
if (pProc != NULL) {
|
if (pProc != NULL) {
|
||||||
uDebug("proc:%s, clean up", pProc->name);
|
uDebug("proc:%s, clean up", pProc->name);
|
||||||
|
|
Loading…
Reference in New Issue