commit
de12d9c549
|
@ -2324,9 +2324,9 @@ struct SRpcMsg;
|
||||||
struct SEpSet;
|
struct SEpSet;
|
||||||
struct SMgmtWrapper;
|
struct SMgmtWrapper;
|
||||||
typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
|
typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
|
||||||
typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* rpcMsg);
|
typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* pReq);
|
||||||
typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg);
|
typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
|
||||||
typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg);
|
typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pRsp);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,15 +46,18 @@ typedef struct SVnodeTask {
|
||||||
typedef struct SVnodeMgr {
|
typedef struct SVnodeMgr {
|
||||||
td_mode_flag_t vnodeInitFlag;
|
td_mode_flag_t vnodeInitFlag;
|
||||||
// For commit
|
// For commit
|
||||||
bool stop;
|
bool stop;
|
||||||
uint16_t nthreads;
|
uint16_t nthreads;
|
||||||
TdThread* threads;
|
TdThread* threads;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
TdThreadCond hasTask;
|
TdThreadCond hasTask;
|
||||||
TD_DLIST(SVnodeTask) queue;
|
TD_DLIST(SVnodeTask) queue;
|
||||||
// For vnode Mgmt
|
// For vnode Mgmt
|
||||||
PutToQueueFp putToQueryQFp;
|
PutToQueueFp putToQueryQFp;
|
||||||
SendReqFp sendReqFp;
|
PutToQueueFp putToFetchQFp;
|
||||||
|
SendReqFp sendReqFp;
|
||||||
|
SendMnodeReqFp sendMnodeReqFp;
|
||||||
|
SendRspFp sendRspFp;
|
||||||
} SVnodeMgr;
|
} SVnodeMgr;
|
||||||
|
|
||||||
extern SVnodeMgr vnodeMgr;
|
extern SVnodeMgr vnodeMgr;
|
||||||
|
@ -85,7 +88,10 @@ struct SVnode {
|
||||||
int vnodeScheduleTask(SVnodeTask* task);
|
int vnodeScheduleTask(SVnodeTask* task);
|
||||||
|
|
||||||
int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq);
|
int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq);
|
||||||
void vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq);
|
int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq);
|
||||||
|
int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq);
|
||||||
|
int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq);
|
||||||
|
void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp);
|
||||||
|
|
||||||
#define vFatal(...) \
|
#define vFatal(...) \
|
||||||
do { \
|
do { \
|
||||||
|
|
|
@ -26,7 +26,10 @@ int vnodeInit(const SVnodeOpt *pOption) {
|
||||||
|
|
||||||
vnodeMgr.stop = false;
|
vnodeMgr.stop = false;
|
||||||
vnodeMgr.putToQueryQFp = pOption->putToQueryQFp;
|
vnodeMgr.putToQueryQFp = pOption->putToQueryQFp;
|
||||||
|
vnodeMgr.putToFetchQFp = pOption->putToFetchQFp;
|
||||||
vnodeMgr.sendReqFp = pOption->sendReqFp;
|
vnodeMgr.sendReqFp = pOption->sendReqFp;
|
||||||
|
vnodeMgr.sendMnodeReqFp = pOption->sendMnodeReqFp;
|
||||||
|
vnodeMgr.sendRspFp = pOption->sendRspFp;
|
||||||
|
|
||||||
// Start commit handers
|
// Start commit handers
|
||||||
if (pOption->nthreads > 0) {
|
if (pOption->nthreads > 0) {
|
||||||
|
@ -90,15 +93,23 @@ int vnodeScheduleTask(SVnodeTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
|
int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
|
||||||
if (pVnode == NULL || pVnode->pMeta == NULL || vnodeMgr.putToQueryQFp == NULL) {
|
|
||||||
terrno = TSDB_CODE_VND_APP_ERROR;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return (*vnodeMgr.putToQueryQFp)(pVnode->pWrapper, pReq);
|
return (*vnodeMgr.putToQueryQFp)(pVnode->pWrapper, pReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {
|
int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq) {
|
||||||
(*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq);
|
return (*vnodeMgr.putToFetchQFp)(pVnode->pWrapper, pReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {
|
||||||
|
return (*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq) {
|
||||||
|
return (*vnodeMgr.sendMnodeReqFp)(pVnode->pWrapper, pReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp) {
|
||||||
|
(*vnodeMgr.sendRspFp)(pVnode->pWrapper, pRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------ STATIC METHODS ------------------------ */
|
/* ------------------------ STATIC METHODS ------------------------ */
|
||||||
|
|
Loading…
Reference in New Issue