TD-10431 refact vnode msg processor
This commit is contained in:
parent
27118082a4
commit
a09c553ab8
|
@ -100,16 +100,16 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB_IN, "drop-stb-internal" )
|
|||
// message from mnode to mnode
|
||||
// message from mnode to qnode
|
||||
// message from mnode to dnode
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_MNODE_IN, "alter-mnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_MNODE_IN, "alter-mnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode-internal" )
|
||||
|
||||
// message from qnode to vnode
|
||||
// message from qnode to mnode
|
||||
|
|
|
@ -140,7 +140,7 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
|||
* @param pRsp The response message
|
||||
* @return int 0 for success, -1 for failure
|
||||
*/
|
||||
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
|
||||
/**
|
||||
* @brief Process a fetch message.
|
||||
|
@ -150,7 +150,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
|||
* @param pRsp The response message
|
||||
* @return int 0 for success, -1 for failure
|
||||
*/
|
||||
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
|
||||
/**
|
||||
* @brief Process a consume message.
|
||||
|
@ -160,7 +160,7 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
|||
* @param pRsp The response message
|
||||
* @return int 0 for success, -1 for failure
|
||||
*/
|
||||
int vnodeProcessConsumeMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
int vnodeProcessConsumeReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
|
||||
/* ------------------------ SVnodeCfg ------------------------ */
|
||||
/**
|
||||
|
@ -241,34 +241,11 @@ void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type);
|
|||
|
||||
/* ------------------------ FOR COMPILE ------------------------ */
|
||||
|
||||
#if 1
|
||||
|
||||
typedef enum {
|
||||
VN_MSG_TYPE_WRITE = 1,
|
||||
VN_MSG_TYPE_APPLY,
|
||||
VN_MSG_TYPE_SYNC,
|
||||
VN_MSG_TYPE_QUERY,
|
||||
VN_MSG_TYPE_FETCH
|
||||
} EVnMsgType;
|
||||
|
||||
typedef struct {
|
||||
int32_t curNum;
|
||||
int32_t allocNum;
|
||||
SRpcMsg rpcMsg[];
|
||||
} SVnodeMsg;
|
||||
|
||||
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
|
||||
int32_t vnodeCompact(SVnode *pVnode);
|
||||
int32_t vnodeSync(SVnode *pVnode);
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||
|
||||
SVnodeMsg *vnodeInitMsg(int32_t msgNum);
|
||||
int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg);
|
||||
void vnodeCleanupMsg(SVnodeMsg *pMsg);
|
||||
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType);
|
||||
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -72,8 +72,8 @@ static void dndFreeVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
|||
static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
||||
static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
||||
|
||||
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg);
|
||||
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg);
|
||||
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
|
||||
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
|
||||
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
|
||||
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
|
||||
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
|
||||
|
@ -83,7 +83,7 @@ void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pE
|
|||
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg);
|
||||
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg);
|
||||
|
||||
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId);
|
||||
static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
|
||||
|
@ -802,40 +802,51 @@ static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
|
|||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) {
|
||||
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_QUERY);
|
||||
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) {
|
||||
SRpcMsg *pRsp = NULL;
|
||||
vnodeProcessQueryReq(pVnode->pImpl, pMsg, &pRsp);
|
||||
}
|
||||
|
||||
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) {
|
||||
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_FETCH);
|
||||
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) {
|
||||
SRpcMsg *pRsp = NULL;
|
||||
vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp);
|
||||
}
|
||||
|
||||
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
|
||||
SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs);
|
||||
SRpcMsg *pRpcMsg = NULL;
|
||||
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg));
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, (void **)&pRpcMsg);
|
||||
vnodeAppendMsg(pMsg, pRpcMsg);
|
||||
|
||||
void *ptr = taosArrayPush(pArray, pRpcMsg);
|
||||
assert(ptr != NULL);
|
||||
|
||||
taosFreeQitem(pRpcMsg);
|
||||
}
|
||||
|
||||
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_WRITE);
|
||||
vnodeProcessWMsgs(pVnode->pImpl, pArray);
|
||||
}
|
||||
|
||||
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
|
||||
SVnodeMsg *pMsg = NULL;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, (void **)&pMsg);
|
||||
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_APPLY);
|
||||
|
||||
SRpcMsg *pRsp = NULL;
|
||||
(void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
|
||||
}
|
||||
}
|
||||
|
||||
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
|
||||
SVnodeMsg *pMsg = NULL;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, (void **)&pMsg);
|
||||
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_SYNC);
|
||||
|
||||
SRpcMsg *pRsp = NULL;
|
||||
(void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -863,40 +874,13 @@ static int32_t dndWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t dndWriteVnodeMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pQueue == NULL) {
|
||||
code = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
} else {
|
||||
SVnodeMsg *pMsg = vnodeInitMsg(1);
|
||||
if (pMsg == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
if (vnodeAppendMsg(pMsg, pRpcMsg) != 0) {
|
||||
code = terrno;
|
||||
} else {
|
||||
if (taosWriteQitem(pQueue, pMsg) != 0) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
|
||||
rpcSendResponse(&rsp);
|
||||
rpcFreeCont(pRpcMsg->pCont);
|
||||
}
|
||||
}
|
||||
|
||||
static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
|
||||
pHead->vgId = htonl(pHead->vgId);
|
||||
|
||||
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
|
||||
if (pVnode == NULL) {
|
||||
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
|
||||
SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
|
||||
rpcSendResponse(&rsp);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
|
@ -920,7 +904,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
|
||||
if (pVnode != NULL) {
|
||||
dndWriteVnodeMsgToVnodeQueue(pVnode->pSyncQ, pMsg);
|
||||
dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg);
|
||||
dndReleaseVnode(pDnode, pVnode);
|
||||
}
|
||||
}
|
||||
|
@ -928,7 +912,7 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
|
||||
if (pVnode != NULL) {
|
||||
dndWriteVnodeMsgToVnodeQueue(pVnode->pQueryQ, pMsg);
|
||||
dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg);
|
||||
dndReleaseVnode(pDnode, pVnode);
|
||||
}
|
||||
}
|
||||
|
@ -936,12 +920,12 @@ void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
|
||||
if (pVnode != NULL) {
|
||||
dndWriteVnodeMsgToVnodeQueue(pVnode->pFetchQ, pMsg);
|
||||
dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg);
|
||||
dndReleaseVnode(pDnode, pVnode);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg) {
|
||||
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) {
|
||||
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
|
||||
if (pVnode == NULL) {
|
||||
return -1;
|
||||
|
@ -1106,7 +1090,7 @@ static void dndCleanupVnodeWriteWorker(SDnode *pDnode) {
|
|||
|
||||
static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||
pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue);
|
||||
pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue);
|
||||
if (pVnode->pSyncQ == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -1117,7 +1101,7 @@ static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
|||
|
||||
static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||
tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pSyncQ);
|
||||
tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
||||
pVnode->pSyncQ = NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -15,53 +15,31 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "vnodeInt.h"
|
||||
#include "tqueue.h"
|
||||
|
||||
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
|
||||
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; }
|
||||
void vnodeDrop(SVnode *pVnode) {}
|
||||
|
||||
int32_t vnodeCompact(SVnode *pVnode) { return 0; }
|
||||
|
||||
int32_t vnodeSync(SVnode *pVnode) { return 0; }
|
||||
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; }
|
||||
|
||||
SVnodeMsg *vnodeInitMsg(int32_t msgNum) {
|
||||
SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg));
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
} else {
|
||||
pMsg->allocNum = msgNum;
|
||||
return pMsg;
|
||||
}
|
||||
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
vInfo("query message is processed");
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) {
|
||||
if (pMsg->curNum >= pMsg->allocNum) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg;
|
||||
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
vInfo("fetch message is processed");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodeCleanupMsg(SVnodeMsg *pMsg) {
|
||||
for (int32_t i = 0; i < pMsg->curNum; ++i) {
|
||||
rpcFreeCont(pMsg->rpcMsg[i].pCont);
|
||||
}
|
||||
taosFreeQitem(pMsg);
|
||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
vInfo("sync message is processed");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType) {
|
||||
switch (msgType) {
|
||||
case VN_MSG_TYPE_WRITE:
|
||||
break;
|
||||
case VN_MSG_TYPE_APPLY:
|
||||
break;
|
||||
case VN_MSG_TYPE_SYNC:
|
||||
break;
|
||||
case VN_MSG_TYPE_QUERY:
|
||||
break;
|
||||
case VN_MSG_TYPE_FETCH:
|
||||
break;
|
||||
}
|
||||
int vnodeProcessConsumeReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
vInfo("consume message is processed");
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue