diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 11aafe710e..eddf1ac72e 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -24,15 +24,25 @@ extern "C" { typedef struct SProcQueue SProcQueue; typedef struct SProcObj SProcObj; -typedef void *(*ProcFp)(void *pParent, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); +typedef void *(*ProcMallocFp)(int32_t contLen); +typedef void *(*ProcFreeFp)(void *pCont); +typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); typedef struct { - int32_t childQueueSize; - int32_t parentQueueSize; - ProcFp childFp; - ProcFp parentFp; - void *pParent; - bool testFlag; + int32_t childQueueSize; + ProcConsumeFp childConsumeFp; + ProcMallocFp childMallocHeadFp; + ProcFreeFp childFreeHeadFp; + ProcMallocFp childMallocBodyFp; + ProcFreeFp childFreeBodyFp; + int32_t parentQueueSize; + ProcConsumeFp parentConsumeFp; + ProcMallocFp parentdMallocHeadFp; + ProcFreeFp parentFreeHeadFp; + ProcMallocFp parentMallocBodyFp; + ProcFreeFp parentFreeBodyFp; + bool testFlag; + void *pParent; } SProcCfg; SProcObj *taosProcInit(const SProcCfg *pCfg); diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index 32f17ab5a5..091c1f28e8 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -81,7 +81,7 @@ typedef struct { MndMsgFp msgFp[TDMT_MAX]; SProcObj *pProcess; bool singleProc; -} SMnodeMgmt; +} SMndMgmt; typedef struct { int32_t refCount; @@ -144,7 +144,7 @@ typedef struct SDnode { SDnodeDir dir; TdFilePtr pLockFile; SDnodeMgmt dmgmt; - SMnodeMgmt mmgmt; + SMndMgmt mmgmt; SQnodeMgmt qmgmt; SSnodeMgmt smgmt; SBnodeMgmt bmgmt; diff --git a/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h b/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h index 88a0d2483a..57bb95880d 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h +++ b/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h @@ -48,7 +48,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe // mmWorker int32_t mmStartWorker(SDnode *pDnode); void mmStopWorker(SDnode *pDnode); -void mmInitMsgFp(SMnodeMgmt *pMgmt); +void mmInitMsgFp(SMndMgmt *pMgmt); void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c index 89d7eefab2..42158f631c 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c @@ -17,7 +17,7 @@ #include "mm.h" int32_t mmReadFile(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR; int32_t len = 0; @@ -115,7 +115,7 @@ PRASE_MNODE_OVER: } int32_t mmWriteFile(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; char file[PATH_MAX]; snprintf(file, sizeof(file), "%s%smnode.json.bak", pDnode->dir.dnode, TD_DIRSEP); diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmHandle.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmHandle.c index 34fe71eef7..4aab5aa40d 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmHandle.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmHandle.c @@ -122,7 +122,7 @@ int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgro } int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) { diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c index 13443a94b6..e03384f9ad 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c @@ -29,7 +29,7 @@ int32_t mmInit(SDnode *pDnode) { dInfo("mnode mgmt start to init"); int32_t code = -1; - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; taosInitRWLatch(&pMgmt->latch); mmInitMsgFp(pMgmt); @@ -76,7 +76,7 @@ _OVER: void mmCleanup(SDnode *pDnode) { dInfo("mnode mgmt start to clean up"); - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; if (pMgmt->pMnode) { mmStopWorker(pDnode); mndClose(pMgmt->pMnode); @@ -86,7 +86,7 @@ void mmCleanup(SDnode *pDnode) { } SMnode *mmAcquire(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = NULL; int32_t refCount = 0; @@ -108,7 +108,7 @@ SMnode *mmAcquire(SDnode *pDnode) { void mmRelease(SDnode *pDnode, SMnode *pMnode) { if (pMnode == NULL) return; - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; taosRLockLatch(&pMgmt->latch); int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); taosRUnLockLatch(&pMgmt->latch); @@ -116,19 +116,26 @@ void mmRelease(SDnode *pDnode, SMnode *pMnode) { } int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; pMgmt->singleProc = false; int32_t code = mmOpenImp(pDnode, pOption); if (code == 0 && !pMgmt->singleProc) { - SProcCfg cfg = {0}; - cfg.childFp = (ProcFp)mmConsumeChildQueue; - cfg.parentFp = (ProcFp)mmConsumeParentQueue; - cfg.childQueueSize = 1024 * 1024; - cfg.parentQueueSize = 1024 * 1024; - cfg.testFlag = true; - cfg.pParent = pDnode; + SProcCfg cfg = {.childQueueSize = 1024 * 1024, + .childConsumeFp = (ProcConsumeFp)mmConsumeChildQueue, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentQueueSize = 1024 * 1024, + .parentConsumeFp = (ProcConsumeFp)mmConsumeParentQueue, + .parentdMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .parentFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .testFlag = true, + .pParent = pDnode}; pMgmt->pProcess = taosProcInit(&cfg); if (pMgmt->pProcess == NULL) { @@ -142,7 +149,7 @@ int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { } int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) { @@ -161,7 +168,7 @@ int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption) { } int32_t mmDrop(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) { @@ -230,7 +237,7 @@ static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) { pReplica->port = pDnode->cfg.serverPort; memcpy(pReplica->fqdn, pDnode->cfg.localFqdn, TSDB_FQDN_LEN); - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; pMgmt->selfIndex = pOption->selfIndex; pMgmt->replica = pOption->replica; memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); @@ -238,7 +245,7 @@ static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) { static void mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption) { mmInitOption(pDnode, pOption); - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; pOption->selfIndex = pMgmt->selfIndex; pOption->replica = pMgmt->replica; memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); @@ -266,7 +273,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe return -1; } - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; pMgmt->selfIndex = pOption->selfIndex; pMgmt->replica = pOption->replica; memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); @@ -274,7 +281,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe } static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption); if (pMnode == NULL) { diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c index 8cac511a4d..3369d90bd5 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c @@ -28,7 +28,7 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg); int32_t mmStartWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeQueue) != 0) { dError("failed to start mnode read worker since %s", terrstr()); return -1; @@ -48,7 +48,7 @@ int32_t mmStartWorker(SDnode *pDnode) { } void mmStopWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; taosWLockLatch(&pMgmt->latch); pMgmt->deployed = 0; @@ -63,7 +63,7 @@ void mmStopWorker(SDnode *pDnode) { dndCleanupWorker(&pMgmt->syncWorker); } -void mmInitMsgFp(SMnodeMgmt *pMgmt) { +void mmInitMsgFp(SMndMgmt *pMgmt) { // Requests handled by DNODE pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = mmProcessWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = mmProcessWriteMsg; @@ -146,25 +146,19 @@ static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) { dError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle); return -1; } + memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); - pMsg->rpcMsg = *pRpc; pMsg->createdTime = taosGetTimestampSec(); - char *pCont = (char *)pMsg + sizeof(SMndMsg); - memcpy(pCont, pRpc->pCont, pRpc->contLen); - pMsg->rpcMsg = *pRpc; - pMsg->rpcMsg.pCont = pCont; - pMsg->createdTime = taosGetTimestampSec(); - dTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user); return 0; } void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - int32_t code = -1; - SMndMsg *pMsg = NULL; + SMndMgmt *pMgmt = &pDnode->mmgmt; + int32_t code = -1; + SMndMsg *pMsg = NULL; MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)]; if (msgFp == NULL) { @@ -172,8 +166,7 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { goto _OVER; } - int32_t contLen = sizeof(SMndMsg) + pRpc->contLen; - pMsg = taosAllocateQitem(contLen); + pMsg = taosAllocateQitem(sizeof(SMndMsg)); if (pMsg == NULL) { goto _OVER; } @@ -185,18 +178,13 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { if (pMgmt->singleProc) { code = (*msgFp)(pDnode, pMsg); } else { - code = taosProcPutToChildQueue(pMgmt->pProcess, pMsg, sizeof(pMsg), pRpc->pCont, pRpc->contLen); + code = taosProcPutToChildQueue(pMgmt->pProcess, pMsg, sizeof(SMndMsg), pRpc->pCont, pRpc->contLen); } _OVER: - if (code == 0) { - if (!pMgmt->singleProc) { - taosFreeQitem(pMsg); - } - } else { + if (code != 0) { bool isReq = (pRpc->msgType & 1U); - if (isReq) { if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) { dndSendRedirectRsp(pDnode, pRpc); @@ -206,9 +194,8 @@ _OVER: } } taosFreeQitem(pMsg); + rpcFreeCont(pRpc->pCont); } - - rpcFreeCont(pRpc->pCont); } int32_t mmProcessWriteMsg(SDnode *pDnode, SMndMsg *pMsg) { @@ -235,64 +222,61 @@ static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMs SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) return -1; - pMsg->pMnode = pMnode; int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0); - mmRelease(pDnode, pMnode); return code; } static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc) { - int32_t contLen = sizeof(SMndMsg) + pRpc->contLen; - SMndMsg *pMsg = taosAllocateQitem(contLen); + SMndMsg *pMsg = taosAllocateQitem(sizeof(SMndMsg)); if (pMsg == NULL) { return -1; } pMsg->rpcMsg = *pRpc; - pMsg->rpcMsg.pCont = (char *)pMsg + sizeof(SMndMsg); - memcpy(pMsg->rpcMsg.pCont, pRpc->pCont, pRpc->contLen); - rpcFreeCont(pRpc->pCont); + pMsg->createdTime = taosGetTimestampSec(); int32_t code = mmPutMndMsgToWorker(pDnode, pWorker, pMsg); if (code != 0) { taosFreeQitem(pMsg); + rpcFreeCont(pRpc->pCont); } return code; } void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; SRpcMsg *pRpc = &pMsg->rpcMsg; - pRpc->pCont = (char *)pMsg + sizeof(SMndMsg); + pRpc->pCont = pCont; MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)]; int32_t code = (*msgFp)(pDnode, pMsg); - if (code == 0) return; - - bool isReq = (pRpc->msgType & 1U); - - if (isReq) { - if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) { - dndSendRedirectRsp(pDnode, pRpc); - } else { - SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; - rpcSendResponse(&rsp); + if (code != 0) { + bool isReq = (pRpc->msgType & 1U); + if (isReq) { + if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) { + dndSendRedirectRsp(pDnode, pRpc); + } else { + SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; + rpcSendResponse(&rsp); + } } + taosFreeQitem(pMsg); + rpcFreeCont(pCont); } - taosFreeQitem(pMsg); } void mmConsumeParentQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {} static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = mmAcquire(pDnode); if (pMnode != NULL) { + pMsg->pMnode = pMnode; mndProcessMsg(pMsg); mmRelease(pDnode, pMnode); } else { @@ -300,4 +284,5 @@ static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg) { } taosFreeQitem(pMsg); + rpcFreeCont(pMsg->rpcMsg.pCont); } diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 3f5c4c7477..9eca5915db 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -21,6 +21,7 @@ #define SHM_DEFAULT_SIZE (20 * 1024 * 1024) #define CEIL8(n) (ceil((float)(n) / 8) * 8) +typedef void *(*ProcThreadFp)(void *param); typedef struct SProcQueue { int32_t head; @@ -29,18 +30,21 @@ typedef struct SProcQueue { int32_t avail; int32_t items; char *pBuffer; + ProcMallocFp mallocHeadFp; + ProcFreeFp freeHeadFp; + ProcMallocFp mallocBodyFp; + ProcFreeFp freeBodyFp; + ProcConsumeFp consumeFp; + void *pParent; tsem_t sem; pthread_mutex_t mutex; } SProcQueue; typedef struct SProcObj { - SProcQueue *pChildQueue; - SProcQueue *pParentQueue; pthread_t childThread; + SProcQueue *pChildQueue; pthread_t parentThread; - ProcFp childFp; - ProcFp parentFp; - void *pParent; + SProcQueue *pParentQueue; int32_t pid; bool isChild; bool stopFlag; @@ -144,6 +148,9 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea pQueue->items++; pthread_mutex_unlock(&pQueue->mutex); tsem_post(&pQueue->sem); + + (*pQueue->freeHeadFp)(pHead); + (*pQueue->freeBodyFp)(pBody); return 0; } @@ -169,13 +176,13 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea bodyLen = *(int32_t *)(pQueue->pBuffer + 4); } - void *pHead = taosAllocateQitem(headLen); - void *pBody = malloc(bodyLen); + void *pHead = (*pQueue->mallocHeadFp)(headLen); + void *pBody = (*pQueue->mallocBodyFp)(bodyLen); if (pHead == NULL || pBody == NULL) { pthread_mutex_unlock(&pQueue->mutex); tsem_post(&pQueue->sem); - taosFreeQitem(pHead); - free(pBody); + (*pQueue->freeHeadFp)(pHead); + (*pQueue->freeBodyFp)(pBody); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -229,13 +236,8 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { return NULL; } - pProc->pParent = pCfg->pParent; - pProc->childFp = pCfg->childFp; - pProc->parentFp = pCfg->parentFp; - pProc->testFlag = pCfg->testFlag; pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize); pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize); - if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { taosProcQueueCleanup(pProc->pChildQueue); taosProcQueueCleanup(pProc->pParentQueue); @@ -243,17 +245,31 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { return NULL; } + pProc->testFlag = pCfg->testFlag; + pProc->pChildQueue->pParent = pCfg->pParent; + pProc->pChildQueue->mallocHeadFp = pCfg->childMallocHeadFp; + pProc->pChildQueue->freeHeadFp = pCfg->childFreeHeadFp; + pProc->pChildQueue->mallocBodyFp = pCfg->childMallocBodyFp; + pProc->pChildQueue->freeBodyFp = pCfg->childFreeBodyFp; + pProc->pChildQueue->consumeFp = pCfg->childConsumeFp; + pProc->pParentQueue->pParent = pCfg->pParent; + pProc->pParentQueue->mallocHeadFp = pCfg->parentdMallocHeadFp; + pProc->pParentQueue->freeHeadFp = pCfg->parentFreeHeadFp; + pProc->pParentQueue->mallocBodyFp = pCfg->parentMallocBodyFp; + pProc->pParentQueue->freeBodyFp = pCfg->parentFreeBodyFp; + pProc->pParentQueue->consumeFp = pCfg->parentConsumeFp; + // todo pProc->isChild = 0; return pProc; } -static void taosProcThreadLoop(SProcQueue *pQueue, ProcFp procFp, void *pParent) { - void *pHead; - void *pBody; - int32_t headLen; - int32_t bodyLen; +static void taosProcThreadLoop(SProcQueue *pQueue) { + ProcConsumeFp consumeFp = pQueue->consumeFp; + void *pParent = pQueue->pParent; + void *pHead, *pBody; + int32_t headLen, bodyLen; while (1) { int32_t code = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen); @@ -265,30 +281,18 @@ static void taosProcThreadLoop(SProcQueue *pQueue, ProcFp procFp, void *pParent) taosMsleep(1); continue; } else { - (*procFp)(pParent, pHead, headLen, pBody, bodyLen); + (*consumeFp)(pParent, pHead, headLen, pBody, bodyLen); } } } -static void *taosProcThreadChildLoop(void *param) { - SProcObj *pProc = param; - taosProcThreadLoop(pProc->pChildQueue, pProc->childFp, pProc->pParent); - return NULL; -} - -static void *taosProcThreadParentLoop(void *param) { - SProcObj *pProc = param; - taosProcThreadLoop(pProc->pParentQueue, pProc->parentFp, pProc->pParent); - return NULL; -} - int32_t taosProcStart(SProcObj *pProc) { pthread_attr_t thAttr = {0}; pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); if (pProc->isChild || pProc->testFlag) { - if (pthread_create(&pProc->childThread, &thAttr, taosProcThreadChildLoop, pProc) != 0) { + if (pthread_create(&pProc->childThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pChildQueue) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to create thread since %s", terrstr()); return -1; @@ -296,7 +300,7 @@ int32_t taosProcStart(SProcObj *pProc) { } if (!pProc->isChild || pProc->testFlag) { - if (pthread_create(&pProc->parentThread, &thAttr, taosProcThreadParentLoop, pProc) != 0) { + if (pthread_create(&pProc->parentThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pParentQueue) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to create thread since %s", terrstr()); return -1;