TD-10431 send create stable msg to vnode
This commit is contained in:
parent
a09c553ab8
commit
82a8ed2fa0
|
@ -813,19 +813,38 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
|
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
|
||||||
SRpcMsg *pRpcMsg = NULL;
|
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
|
||||||
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg));
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
taosGetQitem(qall, (void **)&pRpcMsg);
|
SRpcMsg *pMsg = NULL;
|
||||||
|
taosGetQitem(qall, (void **)&pMsg);
|
||||||
void *ptr = taosArrayPush(pArray, pRpcMsg);
|
void *ptr = taosArrayPush(pArray, &pMsg);
|
||||||
assert(ptr != NULL);
|
assert(ptr != NULL);
|
||||||
|
|
||||||
taosFreeQitem(pRpcMsg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodeProcessWMsgs(pVnode->pImpl, pArray);
|
vnodeProcessWMsgs(pVnode->pImpl, pArray);
|
||||||
|
|
||||||
|
for (size_t i = 0; i < numOfMsgs; i++) {
|
||||||
|
SRpcMsg *pRsp = NULL;
|
||||||
|
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
|
||||||
|
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
|
||||||
|
if (pRsp != NULL) {
|
||||||
|
rpcSendResponse(pRsp);
|
||||||
|
free(pRsp);
|
||||||
|
} else {
|
||||||
|
if (code != 0) code = terrno;
|
||||||
|
SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
|
||||||
|
rpcSendResponse(&rpcRsp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t i = 0; i < numOfMsgs; i++) {
|
||||||
|
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
|
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
|
||||||
|
@ -876,6 +895,7 @@ static int32_t dndWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
|
||||||
|
|
||||||
static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
|
static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
|
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
|
||||||
|
pHead->contLen = htonl(pHead->contLen);
|
||||||
pHead->vgId = htonl(pHead->vgId);
|
pHead->vgId = htonl(pHead->vgId);
|
||||||
|
|
||||||
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
|
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
|
||||||
|
|
|
@ -270,8 +270,6 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
||||||
SRpcMsg* pMsg = pClient->pRsp;
|
SRpcMsg* pMsg = pClient->pRsp;
|
||||||
ASSERT_NE(pMsg, nullptr);
|
ASSERT_NE(pMsg, nullptr);
|
||||||
ASSERT_EQ(pMsg->code, 0);
|
ASSERT_EQ(pMsg->code, 0);
|
||||||
|
|
||||||
taosMsleep(100000);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1");
|
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1");
|
||||||
|
|
|
@ -344,7 +344,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
action.pCont = pMsg;
|
action.pCont = pMsg;
|
||||||
action.contLen = sizeof(SCreateStbInternalMsg);
|
action.contLen = htonl(pMsg->head.contLen);
|
||||||
action.msgType = TSDB_MSG_TYPE_CREATE_STB_IN;
|
action.msgType = TSDB_MSG_TYPE_CREATE_STB_IN;
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
free(pMsg);
|
free(pMsg);
|
||||||
|
|
|
@ -70,6 +70,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType);
|
vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType);
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
|
case TSDB_MSG_TYPE_CREATE_STB_IN:
|
||||||
case TSDB_MSG_TYPE_CREATE_TABLE:
|
case TSDB_MSG_TYPE_CREATE_TABLE:
|
||||||
if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) {
|
if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
|
@ -77,6 +78,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
|
|
||||||
// TODO: maybe need to clear the requst struct
|
// TODO: maybe need to clear the requst struct
|
||||||
break;
|
break;
|
||||||
|
case TSDB_MSG_TYPE_DROP_STB_IN:
|
||||||
case TSDB_MSG_TYPE_DROP_TABLE:
|
case TSDB_MSG_TYPE_DROP_TABLE:
|
||||||
if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
|
if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
|
|
Loading…
Reference in New Issue