This commit is contained in:
Shengliang Guan 2022-03-31 16:05:24 +08:00
parent 873c2373f3
commit fa354b85e5
1 changed files with 8 additions and 6 deletions

View File

@ -140,14 +140,16 @@ static void taosProcDestroySem(SProcQueue *pQueue) {
pQueue->sem = NULL; pQueue->sem = NULL;
} }
} }
#endif
static void taosProcCleanupQueue(SProcQueue *pQueue) { static void taosProcCleanupQueue(SProcQueue *pQueue) {
#if 0
if (pQueue != NULL) { if (pQueue != NULL) {
taosProcDestroyMutex(pQueue); taosProcDestroyMutex(pQueue);
taosProcDestroySem(pQueue); taosProcDestroySem(pQueue);
} }
}
#endif #endif
}
static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody, static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody,
int32_t rawBodyLen, ProcFuncType ftype) { int32_t rawBodyLen, ProcFuncType ftype) {
@ -317,7 +319,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
pProc->pChildQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + cstart, csize); pProc->pChildQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + cstart, csize);
pProc->pParentQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + pstart, psize); pProc->pParentQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + pstart, psize);
if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
// taosProcCleanupQueue(pProc->pChildQueue); taosProcCleanupQueue(pProc->pChildQueue);
taosMemoryFree(pProc); taosMemoryFree(pProc);
return NULL; return NULL;
} }
@ -370,7 +372,7 @@ static void taosProcThreadLoop(SProcObj *pProc) {
freeBodyFp = pProc->parentFreeBodyFp; freeBodyFp = pProc->parentFreeBodyFp;
} }
uDebug("proc:%s, start to get msg from queue:%p, isChild:%d", pProc->name, pQueue, pProc->isChild); uDebug("proc:%s, start to get msg from queue:%p", pProc->name, pQueue);
while (1) { while (1) {
int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp, int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp,
@ -406,7 +408,7 @@ int32_t taosProcRun(SProcObj *pProc) {
static void taosProcStop(SProcObj *pProc) { static void taosProcStop(SProcObj *pProc) {
if (!taosCheckPthreadValid(pProc->thread)) return; if (!taosCheckPthreadValid(pProc->thread)) return;
uDebug("proc:%s, start to join thread:%" PRId64 ", isChild:%d", pProc->name, pProc->thread, pProc->isChild); uDebug("proc:%s, start to join thread:%" PRId64, pProc->name, pProc->thread);
SProcQueue *pQueue; SProcQueue *pQueue;
if (pProc->isChild) { if (pProc->isChild) {
pQueue = pProc->pChildQueue; pQueue = pProc->pChildQueue;
@ -421,9 +423,9 @@ void taosProcCleanup(SProcObj *pProc) {
if (pProc != NULL) { if (pProc != NULL) {
uDebug("proc:%s, start to clean up", pProc->name); uDebug("proc:%s, start to clean up", pProc->name);
taosProcStop(pProc); taosProcStop(pProc);
taosProcCleanupQueue(pProc->pChildQueue);
taosProcCleanupQueue(pProc->pParentQueue);
uDebug("proc:%s, is cleaned up", pProc->name); uDebug("proc:%s, is cleaned up", pProc->name);
// taosProcCleanupQueue(pProc->pChildQueue);
// taosProcCleanupQueue(pProc->pParentQueue);
taosMemoryFree(pProc); taosMemoryFree(pProc);
} }
} }