diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index aef7d5711f..8229cae281 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -94,22 +94,22 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) // message from vnode to dnode // message from mnode to vnode -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB_IN, "create-stb-in" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB_IN, "alter-stb-in" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB_IN, "drop-stb-in" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB_IN, "create-stb-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB_IN, "alter-stb-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB_IN, "drop-stb-internal" ) // message from mnode to mnode // message from mnode to qnode // message from mnode to dnode -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_MNODE_IN, "alter-mnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_MNODE_IN, "alter-mnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode-internal" ) // message from qnode to vnode // message from qnode to mnode @@ -290,37 +290,6 @@ typedef struct SSchema { char name[TSDB_COL_NAME_LEN]; } SSchema; -typedef struct { - int32_t contLen; - int32_t vgId; - int8_t tableType; - int16_t numOfColumns; - int16_t numOfTags; - int32_t tid; - int32_t sversion; - int32_t tversion; - int32_t tagDataLen; - int32_t sqlDataLen; - uint64_t uid; - uint64_t superTableUid; - uint64_t createdTime; - char tableFname[TSDB_TABLE_FNAME_LEN]; - char stbFname[TSDB_TABLE_FNAME_LEN]; - char data[]; -} SMDCreateTableMsg; - -typedef struct { - int32_t len; // one create table message - char tableName[TSDB_TABLE_FNAME_LEN]; - int8_t igExists; - int8_t getMeta; - int16_t numOfTags; - int16_t numOfColumns; - int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string - int8_t reserved[16]; - char schema[]; -} SCreateTableMsg; - typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igExists; @@ -341,16 +310,50 @@ typedef struct { } SAlterStbMsg; typedef struct { - char tableFname[TSDB_TABLE_FNAME_LEN]; - char db[TSDB_FULL_DB_NAME_LEN]; - int16_t type; /* operation type */ - int16_t numOfCols; /* number of schema */ - int32_t tagValLen; - SSchema schema[]; - // tagVal is padded after schema - // char tagVal[]; + SMsgHead head; + char name[TSDB_TABLE_FNAME_LEN]; + uint64_t suid; + int32_t sverson; + uint32_t ttl; + uint32_t keep; + int32_t numOfTags; + int32_t numOfColumns; + SSchema pSchema[]; +} SCreateStbInternalMsg; + +typedef struct { + SMsgHead head; + char name[TSDB_TABLE_FNAME_LEN]; + uint64_t suid; +} SDropStbInternalMsg; + +typedef struct { + SMsgHead head; + char name[TSDB_TABLE_FNAME_LEN]; + char stbFname[TSDB_TABLE_FNAME_LEN]; + int8_t tableType; + uint64_t suid; + int32_t sversion; + int32_t numOfTags; + int32_t numOfColumns; + int32_t tagDataLen; + char data[]; +} SCreateTableMsg; + +typedef struct { + SMsgHead head; + char name[TSDB_TABLE_FNAME_LEN]; + int8_t type; /* operation type */ + int32_t numOfCols; /* number of schema */ + int32_t numOfTags; + char data[]; } SAlterTableMsg; +typedef struct { + SMsgHead head; + char name[TSDB_TABLE_FNAME_LEN]; +} SDropTableMsg; + typedef struct { SMsgHead head; int64_t uid; diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 44feac87ab..d51ac48f01 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -132,6 +132,36 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); */ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +/** + * @brief Process a query message. + * + * @param pVnode The vnode object. + * @param pMsg The request message + * @param pRsp The response message + * @return int 0 for success, -1 for failure + */ +int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); + +/** + * @brief Process a fetch message. + * + * @param pVnode The vnode object. + * @param pMsg The request message + * @param pRsp The response message + * @return int 0 for success, -1 for failure + */ +int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); + +/** + * @brief Process a consume message. + * + * @param pVnode The vnode object. + * @param pMsg The request message + * @param pRsp The response message + * @return int 0 for success, -1 for failure + */ +int vnodeProcessConsumeReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); + /* ------------------------ SVnodeCfg ------------------------ */ /** * @brief Initialize VNODE options. @@ -211,59 +241,11 @@ void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type); /* ------------------------ FOR COMPILE ------------------------ */ -#if 1 - -#include "taosmsg.h" -#include "trpc.h" - -// typedef struct { -// char db[TSDB_FULL_DB_NAME_LEN]; -// int32_t cacheBlockSize; // MB -// int32_t totalBlocks; -// int32_t daysPerFile; -// int32_t daysToKeep0; -// int32_t daysToKeep1; -// int32_t daysToKeep2; -// int32_t minRows; -// int32_t maxRows; -// int8_t precision; // time resolution -// int8_t compression; -// int8_t cacheLastRow; -// int8_t update; -// int8_t quorum; -// int8_t replica; -// int8_t selfIndex; -// int8_t walLevel; -// int32_t fsyncPeriod; // millisecond -// SReplica replicas[TSDB_MAX_REPLICA]; -// } SVnodeCfg; - -typedef enum { - VN_MSG_TYPE_WRITE = 1, - VN_MSG_TYPE_APPLY, - VN_MSG_TYPE_SYNC, - VN_MSG_TYPE_QUERY, - VN_MSG_TYPE_FETCH -} EVnMsgType; - -typedef struct { - int32_t curNum; - int32_t allocNum; - SRpcMsg rpcMsg[]; -} SVnodeMsg; - int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); -SVnodeMsg *vnodeInitMsg(int32_t msgNum); -int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg); -void vnodeCleanupMsg(SVnodeMsg *pMsg); -void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType); - -#endif - #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index cc4fc5b7d7..598d6e47be 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -72,8 +72,8 @@ static void dndFreeVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode); static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode); static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg); -static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg); +static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); +static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); @@ -83,7 +83,7 @@ void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pE void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg); +static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg); static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId); static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode); @@ -802,40 +802,70 @@ static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { - vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_QUERY); +static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { + SRpcMsg *pRsp = NULL; + vnodeProcessQueryReq(pVnode->pImpl, pMsg, &pRsp); } -static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { - vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_FETCH); +static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { + SRpcMsg *pRsp = NULL; + vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp); } static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { - SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs); - SRpcMsg *pRpcMsg = NULL; + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, (void **)&pRpcMsg); - vnodeAppendMsg(pMsg, pRpcMsg); - taosFreeQitem(pRpcMsg); + SRpcMsg *pMsg = NULL; + taosGetQitem(qall, (void **)&pMsg); + void *ptr = taosArrayPush(pArray, &pMsg); + assert(ptr != NULL); } - vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_WRITE); + vnodeProcessWMsgs(pVnode->pImpl, pArray); + + for (size_t i = 0; i < numOfMsgs; i++) { + SRpcMsg *pRsp = NULL; + SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); + int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); + if (pRsp != NULL) { + rpcSendResponse(pRsp); + free(pRsp); + } else { + if (code != 0) code = terrno; + SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rpcRsp); + } + } + + for (size_t i = 0; i < numOfMsgs; i++) { + SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } + + taosArrayDestroy(pArray); } static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { - SVnodeMsg *pMsg = NULL; + SRpcMsg *pMsg = NULL; + for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); - vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_APPLY); + + SRpcMsg *pRsp = NULL; + (void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); } } static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { - SVnodeMsg *pMsg = NULL; + SRpcMsg *pMsg = NULL; + for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); - vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_SYNC); + + SRpcMsg *pRsp = NULL; + (void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp); } } @@ -863,40 +893,14 @@ static int32_t dndWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { } } -static int32_t dndWriteVnodeMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { - int32_t code = 0; - - if (pQueue == NULL) { - code = TSDB_CODE_MSG_NOT_PROCESSED; - } else { - SVnodeMsg *pMsg = vnodeInitMsg(1); - if (pMsg == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - } else { - if (vnodeAppendMsg(pMsg, pRpcMsg) != 0) { - code = terrno; - } else { - if (taosWriteQitem(pQueue, pMsg) != 0) { - code = TSDB_CODE_OUT_OF_MEMORY; - } - } - } - } - - if (code != TSDB_CODE_SUCCESS) { - SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; - rpcSendResponse(&rsp); - rpcFreeCont(pRpcMsg->pCont); - } -} - static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { SMsgHead *pHead = (SMsgHead *)pMsg->pCont; + pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId); if (pVnode == NULL) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; + SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; rpcSendResponse(&rsp); rpcFreeCont(pMsg->pCont); } @@ -920,7 +924,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteVnodeMsgToVnodeQueue(pVnode->pSyncQ, pMsg); + dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg); dndReleaseVnode(pDnode, pVnode); } } @@ -928,7 +932,7 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteVnodeMsgToVnodeQueue(pVnode->pQueryQ, pMsg); + dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg); dndReleaseVnode(pDnode, pVnode); } } @@ -936,12 +940,12 @@ void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteVnodeMsgToVnodeQueue(pVnode->pFetchQ, pMsg); + dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg); dndReleaseVnode(pDnode, pVnode); } } -static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg) { +static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) { return -1; @@ -1106,7 +1110,7 @@ static void dndCleanupVnodeWriteWorker(SDnode *pDnode) { static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); + pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); if (pVnode->pSyncQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -1117,7 +1121,7 @@ static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pSyncQ); + tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); pVnode->pSyncQ = NULL; } diff --git a/source/dnode/mgmt/impl/test/sut/deploy.h b/source/dnode/mgmt/impl/test/sut/deploy.h index 4c082df5f3..88d9b06fbb 100644 --- a/source/dnode/mgmt/impl/test/sut/deploy.h +++ b/source/dnode/mgmt/impl/test/sut/deploy.h @@ -46,3 +46,4 @@ void stopServer(SServer* pServer); SClient* createClient(const char* user, const char* pass, const char* fqdn, uint16_t port); void dropClient(SClient* pClient); void sendMsg(SClient* pClient, SRpcMsg* pMsg); + diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index e9cdedd332..8a3a2c798a 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -26,8 +26,9 @@ int32_t mndInitVgroup(SMnode *pMnode); void mndCleanupVgroup(SMnode *pMnode); SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId); void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup); -int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup); +int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); +SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup); SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 61a6cf7833..a51312a2a9 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -21,6 +21,7 @@ #include "mndShow.h" #include "mndTrans.h" #include "mndUser.h" +#include "mndVgroup.h" #include "tname.h" #define TSDB_STB_VER_NUMBER 1 @@ -199,7 +200,54 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) { return mndAcquireDb(pMnode, db); } -static int32_t mndCheckStbMsg(SCreateStbMsg *pCreate) { +static SCreateStbInternalMsg *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { + int32_t totalCols = pStb->numOfTags + pStb->numOfColumns; + int32_t contLen = totalCols * sizeof(SSchema) + sizeof(SCreateStbInternalMsg); + + SCreateStbInternalMsg *pCreate = calloc(1, contLen); + if (pCreate == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pCreate->head.contLen = htonl(contLen); + pCreate->head.vgId = htonl(pVgroup->vgId); + memcpy(pCreate->name, pStb->name, TSDB_TABLE_FNAME_LEN); + pCreate->suid = htobe64(pStb->uid); + pCreate->sverson = htonl(pStb->version); + pCreate->ttl = 0; + pCreate->keep = 0; + pCreate->numOfTags = htonl(pStb->numOfTags); + pCreate->numOfColumns = htonl(pStb->numOfColumns); + + memcpy(pCreate->pSchema, pStb->pSchema, totalCols * sizeof(SSchema)); + for (int32_t t = 0; t < totalCols; ++t) { + SSchema *pSchema = &pCreate->pSchema[t]; + pSchema->bytes = htonl(pSchema->bytes); + pSchema->colId = htonl(pSchema->colId); + } + + return pCreate; +} + +static SDropStbInternalMsg *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { + int32_t contLen = sizeof(SDropStbInternalMsg); + + SDropStbInternalMsg *pDrop = calloc(1, contLen); + if (pDrop == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pDrop->head.contLen = htonl(contLen); + pDrop->head.vgId = htonl(pVgroup->vgId); + memcpy(pDrop->name, pStb->name, TSDB_TABLE_FNAME_LEN); + pDrop->suid = htobe64(pStb->uid); + + return pDrop; +} + +static int32_t mndCheckCreateStbMsg(SCreateStbMsg *pCreate) { pCreate->numOfColumns = htonl(pCreate->numOfColumns); pCreate->numOfTags = htonl(pCreate->numOfTags); int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags; @@ -248,6 +296,103 @@ static int32_t mndCheckStbMsg(SCreateStbMsg *pCreate) { return 0; } +static int32_t mndSetCreateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { + SSdbRaw *pRedoRaw = mndStbActionEncode(pStb); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1; + + return 0; +} + +static int32_t mndSetCreateStbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { + SSdbRaw *pUndoRaw = mndStbActionEncode(pStb); + if (pUndoRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1; + + return 0; +} + +static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { + SSdbRaw *pCommitRaw = mndStbActionEncode(pStb); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; + + return 0; +} + +static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; + void *pIter = NULL; + + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pDb->uid) continue; + + SCreateStbInternalMsg *pMsg = mndBuildCreateStbMsg(pMnode, pVgroup, pStb); + if (pMsg == NULL) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = pMsg; + action.contLen = htonl(pMsg->head.contLen); + action.msgType = TSDB_MSG_TYPE_CREATE_STB_IN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + return -1; + } + sdbRelease(pSdb, pVgroup); + } + + return 0; +} + +static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; + void *pIter = NULL; + + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pDb->uid) continue; + + SDropStbInternalMsg *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb); + if (pMsg == NULL) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = pMsg; + action.contLen = sizeof(SDropStbInternalMsg); + action.msgType = TSDB_MSG_TYPE_DROP_STB_IN; + if (mndTransAppendUndoAction(pTrans, &action) != 0) { + free(pMsg); + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + return -1; + } + sdbRelease(pSdb, pVgroup); + } + + return 0; +} + static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCreate, SDbObj *pDb) { SStbObj stbObj = {0}; tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); @@ -269,6 +414,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre } memcpy(stbObj.pSchema, pCreate->pSchema, totalSize); + int32_t code = 0; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); if (pTrans == NULL) { mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); @@ -276,29 +422,30 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre } mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name); - SSdbRaw *pRedoRaw = mndStbActionEncode(&stbObj); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) { + mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); + goto CREATE_STB_OVER; } - sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING); - SSdbRaw *pUndoRaw = mndStbActionEncode(&stbObj); - if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { - mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) { + mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); + goto CREATE_STB_OVER; } - sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED); - SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) { + mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); + goto CREATE_STB_OVER; + } + + if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto CREATE_STB_OVER; + } + + if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, &stbObj) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto CREATE_STB_OVER; } - sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); @@ -306,8 +453,11 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre return -1; } + code = 0; + +CREATE_STB_OVER: mndTransDrop(pTrans); - return 0; + return code; } static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) { @@ -316,7 +466,7 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) { mDebug("stb:%s, start to create", pCreate->name); - if (mndCheckStbMsg(pCreate) != 0) { + if (mndCheckCreateStbMsg(pCreate) != 0) { mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); return -1; } @@ -353,7 +503,10 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) { SSchema *pSchema = &pAlter->schema; @@ -414,9 +567,44 @@ static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} + +static int32_t mndSetDropStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { + SSdbRaw *pRedoRaw = mndStbActionEncode(pStb); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropStbUndoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { + SSdbRaw *pUndoRaw = mndStbActionEncode(pStb); + if (pUndoRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { + SSdbRaw *pCommitRaw = mndStbActionEncode(pStb); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; } + +static int32_t mndSetDropStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; } static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) { + int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); if (pTrans == NULL) { mError("stb:%s, failed to drop since %s", pStb->name, terrstr()); @@ -424,36 +612,39 @@ static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) { } mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name); - SSdbRaw *pRedoRaw = mndStbActionEncode(pStb); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) { + mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); + goto DROP_STB_OVER; } - sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING); - SSdbRaw *pUndoRaw = mndStbActionEncode(pStb); - if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { - mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetDropStbUndoLogs(pMnode, pTrans, pStb) != 0) { + mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); + goto DROP_STB_OVER; } - sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY); - SSdbRaw *pCommitRaw = mndStbActionEncode(pStb); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) { + mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); + goto DROP_STB_OVER; + } + + if (mndSetDropStbRedoActions(pMnode, pTrans, pStb) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto DROP_STB_OVER; + } + + if (mndSetDropStbUndoActions(pMnode, pTrans, pStb) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto DROP_STB_OVER; } - sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + goto DROP_STB_OVER; } + code = 0; + +DROP_STB_OVER: mndTransDrop(pTrans); return 0; } @@ -488,7 +679,10 @@ static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index b84ee5ec77..54cd6ab501 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -465,11 +465,15 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { } int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { - return mndTransAppendAction(pTrans->redoActions, pAction); + int32_t code = mndTransAppendAction(pTrans->redoActions, pAction); + mTrace("trans:%d, msg:%s append to redo actions, code:0x%x", pTrans->id, taosMsg[pAction->msgType], code); + return code; } int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) { - return mndTransAppendAction(pTrans->undoActions, pAction); + int32_t code = mndTransAppendAction(pTrans->undoActions, pAction); + mTrace("trans:%d, msg:%s append to undo actions, code:0x%x", pTrans->id, taosMsg[pAction->msgType], code); + return code; } int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { @@ -504,7 +508,6 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { return -1; } - mDebug("trans:%d, prepare finished", pNewTrans->id); pNewTrans->rpcHandle = pTrans->rpcHandle; mndTransExecute(pMnode, pNewTrans); mndReleaseTrans(pMnode, pNewTrans); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index eeca8c9546..c65436efcb 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -311,6 +311,27 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { return 0; } +SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) { + SEpSet epset = {0}; + + for (int32_t v = 0; v < pVgroup->replica; ++v) { + SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) continue; + + if (pVgid->role == TAOS_SYNC_STATE_LEADER) { + epset.inUse = epset.numOfEps; + } + + epset.port[epset.numOfEps] = pDnode->port; + memcpy(&epset.fqdn[epset.numOfEps], pDnode->fqdn, TSDB_FQDN_LEN); + epset.numOfEps++; + mndReleaseDnode(pMnode, pDnode); + } + + return epset; +} + static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { mndTransHandleActionRsp(pMsg); return 0; diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index f63f7236fb..1c8b82fd92 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -240,6 +240,8 @@ void sdbRelease(SSdb *pSdb, void *pObj) { } void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { + *ppObj = NULL; + SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return NULL; diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index 8a6fc8bf5e..5deaffe6d2 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -15,53 +15,31 @@ #define _DEFAULT_SOURCE #include "vnodeInt.h" -#include "tqueue.h" int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; } -SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; } -void vnodeDrop(SVnode *pVnode) {} + int32_t vnodeCompact(SVnode *pVnode) { return 0; } + int32_t vnodeSync(SVnode *pVnode) { return 0; } int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; } -SVnodeMsg *vnodeInitMsg(int32_t msgNum) { - SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg)); - if (pMsg == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } else { - pMsg->allocNum = msgNum; - return pMsg; - } +int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + vInfo("query message is processed"); + return 0; } -int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) { - if (pMsg->curNum >= pMsg->allocNum) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg; +int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + vInfo("fetch message is processed"); + return 0; } -void vnodeCleanupMsg(SVnodeMsg *pMsg) { - for (int32_t i = 0; i < pMsg->curNum; ++i) { - rpcFreeCont(pMsg->rpcMsg[i].pCont); - } - taosFreeQitem(pMsg); +int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + vInfo("sync message is processed"); + return 0; } -void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType) { - switch (msgType) { - case VN_MSG_TYPE_WRITE: - break; - case VN_MSG_TYPE_APPLY: - break; - case VN_MSG_TYPE_SYNC: - break; - case VN_MSG_TYPE_QUERY: - break; - case VN_MSG_TYPE_FETCH: - break; - } -} \ No newline at end of file +int vnodeProcessConsumeReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + vInfo("consume message is processed"); + return 0; +} diff --git a/source/dnode/vnode/impl/src/vnodeWrite.c b/source/dnode/vnode/impl/src/vnodeWrite.c index f564afd613..5ec03f1fd3 100644 --- a/source/dnode/vnode/impl/src/vnodeWrite.c +++ b/source/dnode/vnode/impl/src/vnodeWrite.c @@ -70,6 +70,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType); switch (pMsg->msgType) { + case TSDB_MSG_TYPE_CREATE_STB_IN: case TSDB_MSG_TYPE_CREATE_TABLE: if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) { // TODO: handle error @@ -77,6 +78,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO: maybe need to clear the requst struct break; + case TSDB_MSG_TYPE_DROP_STB_IN: case TSDB_MSG_TYPE_DROP_TABLE: if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) { // TODO: handle error