registerBrokenLinkArgFp
This commit is contained in:
parent
dffa1a7285
commit
f82afcfe4d
|
@ -41,14 +41,16 @@ typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueTy
|
||||||
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq);
|
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq);
|
||||||
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
|
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
|
||||||
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp);
|
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp);
|
||||||
|
typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg *pMsg);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMgmtWrapper* pWrapper;
|
SMgmtWrapper* pWrapper;
|
||||||
PutToQueueFp queueFps[QUEUE_MAX];
|
PutToQueueFp queueFps[QUEUE_MAX];
|
||||||
GetQueueSizeFp qsizeFp;
|
GetQueueSizeFp qsizeFp;
|
||||||
SendReqFp sendReqFp;
|
SendReqFp sendReqFp;
|
||||||
SendMnodeReqFp sendMnodeReqFp;
|
SendMnodeReqFp sendMnodeReqFp;
|
||||||
SendRspFp sendRspFp;
|
SendRspFp sendRspFp;
|
||||||
|
RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
|
||||||
} SMsgCb;
|
} SMsgCb;
|
||||||
|
|
||||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
|
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
|
||||||
|
@ -56,6 +58,7 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
|
||||||
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq);
|
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq);
|
||||||
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
|
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
|
||||||
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp);
|
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp);
|
||||||
|
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,11 +22,14 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
typedef enum { PROC_REQ, PROC_RSP, PROC_REGISTER } ProcFuncType;
|
||||||
|
|
||||||
typedef struct SProcQueue SProcQueue;
|
typedef struct SProcQueue SProcQueue;
|
||||||
typedef struct SProcObj SProcObj;
|
typedef struct SProcObj SProcObj;
|
||||||
typedef void *(*ProcMallocFp)(int32_t contLen);
|
typedef void *(*ProcMallocFp)(int32_t contLen);
|
||||||
typedef void *(*ProcFreeFp)(void *pCont);
|
typedef void *(*ProcFreeFp)(void *pCont);
|
||||||
typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
|
typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
|
||||||
|
ProcFuncType ftype);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t childQueueSize;
|
int32_t childQueueSize;
|
||||||
|
@ -52,9 +55,10 @@ int32_t taosProcRun(SProcObj *pProc);
|
||||||
void taosProcStop(SProcObj *pProc);
|
void taosProcStop(SProcObj *pProc);
|
||||||
bool taosProcIsChild(SProcObj *pProc);
|
bool taosProcIsChild(SProcObj *pProc);
|
||||||
int32_t taosProcChildId(SProcObj *pProc);
|
int32_t taosProcChildId(SProcObj *pProc);
|
||||||
|
int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
|
||||||
int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
|
ProcFuncType ftype);
|
||||||
int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
|
int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
|
||||||
|
ProcFuncType ftype);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,3 +33,7 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); }
|
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); }
|
||||||
|
|
||||||
|
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
|
||||||
|
(*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg);
|
||||||
|
}
|
|
@ -24,6 +24,7 @@ static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
|
||||||
msgCb.sendReqFp = dndSendReqToDnode;
|
msgCb.sendReqFp = dndSendReqToDnode;
|
||||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||||
msgCb.sendRspFp = dndSendRsp;
|
msgCb.sendRspFp = dndSendRsp;
|
||||||
|
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
|
||||||
pOption->msgCb = msgCb;
|
pOption->msgCb = msgCb;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -139,6 +139,7 @@ void dndSendMonitorReport(SDnode *pDnode);
|
||||||
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg);
|
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg);
|
||||||
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
|
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
|
||||||
|
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
|
|
||||||
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
|
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
|
||||||
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
|
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
|
||||||
|
|
|
@ -56,6 +56,7 @@ void dndCleanupServer(SDnode *pDnode);
|
||||||
int32_t dndInitClient(SDnode *pDnode);
|
int32_t dndInitClient(SDnode *pDnode);
|
||||||
void dndCleanupClient(SDnode *pDnode);
|
void dndCleanupClient(SDnode *pDnode);
|
||||||
int32_t dndInitMsgHandle(SDnode *pDnode);
|
int32_t dndInitMsgHandle(SDnode *pDnode);
|
||||||
|
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -116,11 +116,12 @@ static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
|
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
|
||||||
|
ProcFuncType ftype) {
|
||||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||||
pRpc->pCont = pCont;
|
pRpc->pCont = pCont;
|
||||||
dTrace("msg:%p, get from child process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType),
|
dTrace("msg:%p, get from child process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle,
|
||||||
pRpc->handle, pRpc->ahandle);
|
pRpc->ahandle);
|
||||||
|
|
||||||
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
|
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
|
||||||
int32_t code = (*msgFp)(pWrapper, pMsg);
|
int32_t code = (*msgFp)(pWrapper, pMsg);
|
||||||
|
@ -138,13 +139,21 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, int32_t msgLen, void *pCont, int32_t contLen) {
|
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
|
||||||
pRpc->pCont = pCont;
|
ProcFuncType ftype) {
|
||||||
dTrace("msg:%p, get from parent process queue, type:%s handle:%p app:%p", pRpc, TMSG_INFO(pRpc->msgType),
|
pMsg->pCont = pCont;
|
||||||
pRpc->handle, pRpc->ahandle);
|
dTrace("msg:%p, get from parent process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pMsg->msgType),
|
||||||
|
pMsg->handle, pMsg->ahandle);
|
||||||
|
|
||||||
dndSendRsp(pWrapper, pRpc);
|
switch (ftype) {
|
||||||
taosMemoryFree(pRpc);
|
case PROC_REGISTER:
|
||||||
|
rpcRegisterBrokenLinkArg(pMsg);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
dndSendRpcRsp(pWrapper, pMsg);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dndRunInMultiProcess(SDnode *pDnode) {
|
static int32_t dndRunInMultiProcess(SDnode *pDnode) {
|
||||||
|
|
|
@ -68,7 +68,7 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
} else if (pWrapper->procType == PROC_PARENT) {
|
} else if (pWrapper->procType == PROC_PARENT) {
|
||||||
dTrace("msg:%p, is created and will put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle,
|
dTrace("msg:%p, is created and will put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle,
|
||||||
pRpc->ahandle, pMsg->user);
|
pRpc->ahandle, pMsg->user);
|
||||||
code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen);
|
code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, PROC_REQ);
|
||||||
} else {
|
} else {
|
||||||
dTrace("msg:%p, should not processed in child process, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle,
|
dTrace("msg:%p, should not processed in child process, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle,
|
||||||
pMsg->user);
|
pMsg->user);
|
||||||
|
|
|
@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
|
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
|
||||||
if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
|
if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
|
||||||
SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
|
SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
|
||||||
if (pDnodeWrapper != NULL) {
|
if (pDnodeWrapper != NULL) {
|
||||||
|
@ -366,7 +366,7 @@ void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
|
||||||
if (pWrapper->procType == PROC_CHILD) {
|
if (pWrapper->procType == PROC_CHILD) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
do {
|
do {
|
||||||
code = taosProcPutToParentQueue(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen);
|
code = taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
}
|
}
|
||||||
|
@ -375,3 +375,17 @@ void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
|
||||||
dndSendRpcRsp(pWrapper, pRsp);
|
dndSendRpcRsp(pWrapper, pRsp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
|
||||||
|
if (pWrapper->procType == PROC_CHILD) {
|
||||||
|
int32_t code = -1;
|
||||||
|
do {
|
||||||
|
code = taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGISTER);
|
||||||
|
if (code != 0) {
|
||||||
|
taosMsleep(10);
|
||||||
|
}
|
||||||
|
} while (code != 0);
|
||||||
|
} else {
|
||||||
|
rpcRegisterBrokenLinkArg(pMsg);
|
||||||
|
}
|
||||||
|
}
|
|
@ -52,6 +52,7 @@ static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
|
||||||
msgCb.sendReqFp = dndSendReqToDnode;
|
msgCb.sendReqFp = dndSendReqToDnode;
|
||||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||||
msgCb.sendRspFp = dndSendRsp;
|
msgCb.sendRspFp = dndSendRsp;
|
||||||
|
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
|
||||||
pOption->msgCb = msgCb;
|
pOption->msgCb = msgCb;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
|
||||||
msgCb.sendReqFp = dndSendReqToDnode;
|
msgCb.sendReqFp = dndSendReqToDnode;
|
||||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||||
msgCb.sendRspFp = dndSendRsp;
|
msgCb.sendRspFp = dndSendRsp;
|
||||||
|
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
|
||||||
pOption->msgCb = msgCb;
|
pOption->msgCb = msgCb;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) {
|
||||||
msgCb.sendReqFp = dndSendReqToDnode;
|
msgCb.sendReqFp = dndSendReqToDnode;
|
||||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||||
msgCb.sendRspFp = dndSendRsp;
|
msgCb.sendRspFp = dndSendRsp;
|
||||||
|
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
|
||||||
pOption->msgCb = msgCb;
|
pOption->msgCb = msgCb;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -137,6 +137,7 @@ static void *vmOpenVnodeFunc(void *param) {
|
||||||
msgCb.sendReqFp = dndSendReqToDnode;
|
msgCb.sendReqFp = dndSendReqToDnode;
|
||||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||||
msgCb.sendRspFp = dndSendRsp;
|
msgCb.sendRspFp = dndSendRsp;
|
||||||
|
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
|
||||||
SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
|
SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
|
||||||
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
|
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
|
||||||
if (pImpl == NULL) {
|
if (pImpl == NULL) {
|
||||||
|
|
|
@ -91,6 +91,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
msgCb.sendReqFp = dndSendReqToDnode;
|
msgCb.sendReqFp = dndSendReqToDnode;
|
||||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||||
msgCb.sendRspFp = dndSendRsp;
|
msgCb.sendRspFp = dndSendRsp;
|
||||||
|
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
|
||||||
|
|
||||||
vnodeCfg.msgCb = msgCb;
|
vnodeCfg.msgCb = msgCb;
|
||||||
vnodeCfg.pTfs = pMgmt->pTfs;
|
vnodeCfg.pTfs = pMgmt->pTfs;
|
||||||
|
|
|
@ -287,7 +287,7 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
|
||||||
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
|
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
|
||||||
};
|
};
|
||||||
|
|
||||||
rpcRegisterBrokenLinkArg(&pMsg);
|
tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -207,7 +207,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHeadLen, char *pBody, int32_t rawBodyLen) {
|
static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int16_t rawHeadLen, char *pBody, int32_t rawBodyLen,
|
||||||
|
ProcFuncType funcType) {
|
||||||
const int32_t headLen = CEIL8(rawHeadLen);
|
const int32_t headLen = CEIL8(rawHeadLen);
|
||||||
const int32_t bodyLen = CEIL8(rawBodyLen);
|
const int32_t bodyLen = CEIL8(rawBodyLen);
|
||||||
const int32_t fullLen = headLen + bodyLen + 8;
|
const int32_t fullLen = headLen + bodyLen + 8;
|
||||||
|
@ -225,10 +226,12 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQueue->tail < pQueue->total) {
|
if (pQueue->tail < pQueue->total) {
|
||||||
*(int32_t *)(pQueue->pBuffer + pQueue->head) = headLen;
|
*(int16_t *)(pQueue->pBuffer + pQueue->head) = headLen;
|
||||||
|
*(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType;
|
||||||
*(int32_t *)(pQueue->pBuffer + pQueue->head + 4) = bodyLen;
|
*(int32_t *)(pQueue->pBuffer + pQueue->head + 4) = bodyLen;
|
||||||
} else {
|
} else {
|
||||||
*(int32_t *)(pQueue->pBuffer) = headLen;
|
*(int16_t *)(pQueue->pBuffer) = headLen;
|
||||||
|
*(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType;
|
||||||
*(int32_t *)(pQueue->pBuffer + 4) = bodyLen;
|
*(int32_t *)(pQueue->pBuffer + 4) = bodyLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,13 +271,13 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
|
||||||
taosThreadMutexUnlock(pQueue->mutex);
|
taosThreadMutexUnlock(pQueue->mutex);
|
||||||
tsem_post(&pQueue->sem);
|
tsem_post(&pQueue->sem);
|
||||||
|
|
||||||
uTrace("proc:%s, push msg to queue:%p remains:%d, head:%d:%p body:%d:%p", pQueue->name, pQueue, pQueue->items,
|
uTrace("proc:%s, push msg to queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items,
|
||||||
headLen, pHead, bodyLen, pBody);
|
headLen, pHead, bodyLen, pBody, funcType);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHeadLen, void **ppBody,
|
static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHeadLen, void **ppBody,
|
||||||
int32_t *pBodyLen) {
|
int32_t *pBodyLen, ProcFuncType *pFuncType) {
|
||||||
tsem_wait(&pQueue->sem);
|
tsem_wait(&pQueue->sem);
|
||||||
|
|
||||||
taosThreadMutexLock(pQueue->mutex);
|
taosThreadMutexLock(pQueue->mutex);
|
||||||
|
@ -285,13 +288,16 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t headLen = 0;
|
int16_t headLen = 0;
|
||||||
|
int8_t ftype = 0;
|
||||||
int32_t bodyLen = 0;
|
int32_t bodyLen = 0;
|
||||||
if (pQueue->head < pQueue->total) {
|
if (pQueue->head < pQueue->total) {
|
||||||
headLen = *(int32_t *)(pQueue->pBuffer + pQueue->head);
|
headLen = *(int16_t *)(pQueue->pBuffer + pQueue->head);
|
||||||
|
ftype = *(int8_t *)(pQueue->pBuffer + pQueue->head + 2);
|
||||||
bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4);
|
bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4);
|
||||||
} else {
|
} else {
|
||||||
headLen = *(int32_t *)(pQueue->pBuffer);
|
headLen = *(int16_t *)(pQueue->pBuffer);
|
||||||
|
headLen = *(int8_t *)(pQueue->pBuffer + 2);
|
||||||
bodyLen = *(int32_t *)(pQueue->pBuffer + 4);
|
bodyLen = *(int32_t *)(pQueue->pBuffer + 4);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,9 +351,10 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea
|
||||||
*ppBody = pBody;
|
*ppBody = pBody;
|
||||||
*pHeadLen = headLen;
|
*pHeadLen = headLen;
|
||||||
*pBodyLen = bodyLen;
|
*pBodyLen = bodyLen;
|
||||||
|
*pFuncType = (ProcFuncType)ftype;
|
||||||
|
|
||||||
uTrace("proc:%s, pop msg from queue:%p remains:%d, head:%d:%p body:%d:%p", pQueue->name, pQueue, pQueue->items,
|
uTrace("proc:%s, pop msg from queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items,
|
||||||
headLen, pHead, bodyLen, pBody);
|
headLen, pHead, bodyLen, pBody, ftype);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -400,12 +407,14 @@ static void taosProcThreadLoop(SProcQueue *pQueue) {
|
||||||
ProcConsumeFp consumeFp = pQueue->consumeFp;
|
ProcConsumeFp consumeFp = pQueue->consumeFp;
|
||||||
void *pParent = pQueue->pParent;
|
void *pParent = pQueue->pParent;
|
||||||
void *pHead, *pBody;
|
void *pHead, *pBody;
|
||||||
int32_t headLen, bodyLen;
|
int16_t headLen;
|
||||||
|
ProcFuncType ftype;
|
||||||
|
int32_t bodyLen;
|
||||||
|
|
||||||
uDebug("proc:%s, start to get msg from queue:%p", pQueue->name, pQueue);
|
uDebug("proc:%s, start to get msg from queue:%p", pQueue->name, pQueue);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen);
|
int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
|
||||||
if (numOfMsgs == 0) {
|
if (numOfMsgs == 0) {
|
||||||
uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pQueue->name, pQueue);
|
uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pQueue->name, pQueue);
|
||||||
break;
|
break;
|
||||||
|
@ -414,7 +423,7 @@ static void taosProcThreadLoop(SProcQueue *pQueue) {
|
||||||
taosMsleep(1);
|
taosMsleep(1);
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
(*consumeFp)(pParent, pHead, headLen, pBody, bodyLen);
|
(*consumeFp)(pParent, pHead, headLen, pBody, bodyLen, ftype);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -462,10 +471,12 @@ void taosProcCleanup(SProcObj *pProc) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen) {
|
int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
|
||||||
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen);
|
ProcFuncType funcType) {
|
||||||
|
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, funcType);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen) {
|
int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
|
||||||
return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen);
|
ProcFuncType funcType) {
|
||||||
|
return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, funcType);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue