commit
26a3ff9b0f
|
@ -94,22 +94,22 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
|
|||
// message from vnode to dnode
|
||||
|
||||
// message from mnode to vnode
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB_IN, "create-stb-in" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB_IN, "alter-stb-in" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB_IN, "drop-stb-in" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB_IN, "create-stb-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB_IN, "alter-stb-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB_IN, "drop-stb-internal" )
|
||||
// message from mnode to mnode
|
||||
// message from mnode to qnode
|
||||
// message from mnode to dnode
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_MNODE_IN, "alter-mnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_MNODE_IN, "alter-mnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode-internal" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode-internal" )
|
||||
|
||||
// message from qnode to vnode
|
||||
// message from qnode to mnode
|
||||
|
@ -290,37 +290,6 @@ typedef struct SSchema {
|
|||
char name[TSDB_COL_NAME_LEN];
|
||||
} SSchema;
|
||||
|
||||
typedef struct {
|
||||
int32_t contLen;
|
||||
int32_t vgId;
|
||||
int8_t tableType;
|
||||
int16_t numOfColumns;
|
||||
int16_t numOfTags;
|
||||
int32_t tid;
|
||||
int32_t sversion;
|
||||
int32_t tversion;
|
||||
int32_t tagDataLen;
|
||||
int32_t sqlDataLen;
|
||||
uint64_t uid;
|
||||
uint64_t superTableUid;
|
||||
uint64_t createdTime;
|
||||
char tableFname[TSDB_TABLE_FNAME_LEN];
|
||||
char stbFname[TSDB_TABLE_FNAME_LEN];
|
||||
char data[];
|
||||
} SMDCreateTableMsg;
|
||||
|
||||
typedef struct {
|
||||
int32_t len; // one create table message
|
||||
char tableName[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t igExists;
|
||||
int8_t getMeta;
|
||||
int16_t numOfTags;
|
||||
int16_t numOfColumns;
|
||||
int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string
|
||||
int8_t reserved[16];
|
||||
char schema[];
|
||||
} SCreateTableMsg;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t igExists;
|
||||
|
@ -341,16 +310,50 @@ typedef struct {
|
|||
} SAlterStbMsg;
|
||||
|
||||
typedef struct {
|
||||
char tableFname[TSDB_TABLE_FNAME_LEN];
|
||||
char db[TSDB_FULL_DB_NAME_LEN];
|
||||
int16_t type; /* operation type */
|
||||
int16_t numOfCols; /* number of schema */
|
||||
int32_t tagValLen;
|
||||
SSchema schema[];
|
||||
// tagVal is padded after schema
|
||||
// char tagVal[];
|
||||
SMsgHead head;
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
uint64_t suid;
|
||||
int32_t sverson;
|
||||
uint32_t ttl;
|
||||
uint32_t keep;
|
||||
int32_t numOfTags;
|
||||
int32_t numOfColumns;
|
||||
SSchema pSchema[];
|
||||
} SCreateStbInternalMsg;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
uint64_t suid;
|
||||
} SDropStbInternalMsg;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
char stbFname[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t tableType;
|
||||
uint64_t suid;
|
||||
int32_t sversion;
|
||||
int32_t numOfTags;
|
||||
int32_t numOfColumns;
|
||||
int32_t tagDataLen;
|
||||
char data[];
|
||||
} SCreateTableMsg;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t type; /* operation type */
|
||||
int32_t numOfCols; /* number of schema */
|
||||
int32_t numOfTags;
|
||||
char data[];
|
||||
} SAlterTableMsg;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
} SDropTableMsg;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
int64_t uid;
|
||||
|
|
|
@ -132,6 +132,36 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
|||
*/
|
||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
|
||||
/**
|
||||
* @brief Process a query message.
|
||||
*
|
||||
* @param pVnode The vnode object.
|
||||
* @param pMsg The request message
|
||||
* @param pRsp The response message
|
||||
* @return int 0 for success, -1 for failure
|
||||
*/
|
||||
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
|
||||
/**
|
||||
* @brief Process a fetch message.
|
||||
*
|
||||
* @param pVnode The vnode object.
|
||||
* @param pMsg The request message
|
||||
* @param pRsp The response message
|
||||
* @return int 0 for success, -1 for failure
|
||||
*/
|
||||
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
|
||||
/**
|
||||
* @brief Process a consume message.
|
||||
*
|
||||
* @param pVnode The vnode object.
|
||||
* @param pMsg The request message
|
||||
* @param pRsp The response message
|
||||
* @return int 0 for success, -1 for failure
|
||||
*/
|
||||
int vnodeProcessConsumeReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
|
||||
/* ------------------------ SVnodeCfg ------------------------ */
|
||||
/**
|
||||
* @brief Initialize VNODE options.
|
||||
|
@ -211,59 +241,11 @@ void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type);
|
|||
|
||||
/* ------------------------ FOR COMPILE ------------------------ */
|
||||
|
||||
#if 1
|
||||
|
||||
#include "taosmsg.h"
|
||||
#include "trpc.h"
|
||||
|
||||
// typedef struct {
|
||||
// char db[TSDB_FULL_DB_NAME_LEN];
|
||||
// int32_t cacheBlockSize; // MB
|
||||
// int32_t totalBlocks;
|
||||
// int32_t daysPerFile;
|
||||
// int32_t daysToKeep0;
|
||||
// int32_t daysToKeep1;
|
||||
// int32_t daysToKeep2;
|
||||
// int32_t minRows;
|
||||
// int32_t maxRows;
|
||||
// int8_t precision; // time resolution
|
||||
// int8_t compression;
|
||||
// int8_t cacheLastRow;
|
||||
// int8_t update;
|
||||
// int8_t quorum;
|
||||
// int8_t replica;
|
||||
// int8_t selfIndex;
|
||||
// int8_t walLevel;
|
||||
// int32_t fsyncPeriod; // millisecond
|
||||
// SReplica replicas[TSDB_MAX_REPLICA];
|
||||
// } SVnodeCfg;
|
||||
|
||||
typedef enum {
|
||||
VN_MSG_TYPE_WRITE = 1,
|
||||
VN_MSG_TYPE_APPLY,
|
||||
VN_MSG_TYPE_SYNC,
|
||||
VN_MSG_TYPE_QUERY,
|
||||
VN_MSG_TYPE_FETCH
|
||||
} EVnMsgType;
|
||||
|
||||
typedef struct {
|
||||
int32_t curNum;
|
||||
int32_t allocNum;
|
||||
SRpcMsg rpcMsg[];
|
||||
} SVnodeMsg;
|
||||
|
||||
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
|
||||
int32_t vnodeCompact(SVnode *pVnode);
|
||||
int32_t vnodeSync(SVnode *pVnode);
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||
|
||||
SVnodeMsg *vnodeInitMsg(int32_t msgNum);
|
||||
int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg);
|
||||
void vnodeCleanupMsg(SVnodeMsg *pMsg);
|
||||
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType);
|
||||
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -72,8 +72,8 @@ static void dndFreeVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
|||
static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
||||
static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
||||
|
||||
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg);
|
||||
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg);
|
||||
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
|
||||
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
|
||||
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
|
||||
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
|
||||
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
|
||||
|
@ -83,7 +83,7 @@ void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pE
|
|||
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg);
|
||||
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg);
|
||||
|
||||
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId);
|
||||
static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
|
||||
|
@ -802,40 +802,70 @@ static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
|
|||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) {
|
||||
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_QUERY);
|
||||
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) {
|
||||
SRpcMsg *pRsp = NULL;
|
||||
vnodeProcessQueryReq(pVnode->pImpl, pMsg, &pRsp);
|
||||
}
|
||||
|
||||
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) {
|
||||
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_FETCH);
|
||||
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) {
|
||||
SRpcMsg *pRsp = NULL;
|
||||
vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp);
|
||||
}
|
||||
|
||||
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
|
||||
SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs);
|
||||
SRpcMsg *pRpcMsg = NULL;
|
||||
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, (void **)&pRpcMsg);
|
||||
vnodeAppendMsg(pMsg, pRpcMsg);
|
||||
taosFreeQitem(pRpcMsg);
|
||||
SRpcMsg *pMsg = NULL;
|
||||
taosGetQitem(qall, (void **)&pMsg);
|
||||
void *ptr = taosArrayPush(pArray, &pMsg);
|
||||
assert(ptr != NULL);
|
||||
}
|
||||
|
||||
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_WRITE);
|
||||
vnodeProcessWMsgs(pVnode->pImpl, pArray);
|
||||
|
||||
for (size_t i = 0; i < numOfMsgs; i++) {
|
||||
SRpcMsg *pRsp = NULL;
|
||||
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
|
||||
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
|
||||
if (pRsp != NULL) {
|
||||
rpcSendResponse(pRsp);
|
||||
free(pRsp);
|
||||
} else {
|
||||
if (code != 0) code = terrno;
|
||||
SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < numOfMsgs; i++) {
|
||||
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pArray);
|
||||
}
|
||||
|
||||
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
|
||||
SVnodeMsg *pMsg = NULL;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, (void **)&pMsg);
|
||||
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_APPLY);
|
||||
|
||||
SRpcMsg *pRsp = NULL;
|
||||
(void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
|
||||
}
|
||||
}
|
||||
|
||||
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
|
||||
SVnodeMsg *pMsg = NULL;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, (void **)&pMsg);
|
||||
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_SYNC);
|
||||
|
||||
SRpcMsg *pRsp = NULL;
|
||||
(void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -863,40 +893,14 @@ static int32_t dndWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t dndWriteVnodeMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pQueue == NULL) {
|
||||
code = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
} else {
|
||||
SVnodeMsg *pMsg = vnodeInitMsg(1);
|
||||
if (pMsg == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
if (vnodeAppendMsg(pMsg, pRpcMsg) != 0) {
|
||||
code = terrno;
|
||||
} else {
|
||||
if (taosWriteQitem(pQueue, pMsg) != 0) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
|
||||
rpcSendResponse(&rsp);
|
||||
rpcFreeCont(pRpcMsg->pCont);
|
||||
}
|
||||
}
|
||||
|
||||
static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
|
||||
pHead->contLen = htonl(pHead->contLen);
|
||||
pHead->vgId = htonl(pHead->vgId);
|
||||
|
||||
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
|
||||
if (pVnode == NULL) {
|
||||
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
|
||||
SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
|
||||
rpcSendResponse(&rsp);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
|
@ -920,7 +924,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
|
||||
if (pVnode != NULL) {
|
||||
dndWriteVnodeMsgToVnodeQueue(pVnode->pSyncQ, pMsg);
|
||||
dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg);
|
||||
dndReleaseVnode(pDnode, pVnode);
|
||||
}
|
||||
}
|
||||
|
@ -928,7 +932,7 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
|
||||
if (pVnode != NULL) {
|
||||
dndWriteVnodeMsgToVnodeQueue(pVnode->pQueryQ, pMsg);
|
||||
dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg);
|
||||
dndReleaseVnode(pDnode, pVnode);
|
||||
}
|
||||
}
|
||||
|
@ -936,12 +940,12 @@ void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
|
||||
if (pVnode != NULL) {
|
||||
dndWriteVnodeMsgToVnodeQueue(pVnode->pFetchQ, pMsg);
|
||||
dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg);
|
||||
dndReleaseVnode(pDnode, pVnode);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg) {
|
||||
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) {
|
||||
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
|
||||
if (pVnode == NULL) {
|
||||
return -1;
|
||||
|
@ -1106,7 +1110,7 @@ static void dndCleanupVnodeWriteWorker(SDnode *pDnode) {
|
|||
|
||||
static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||
pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue);
|
||||
pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue);
|
||||
if (pVnode->pSyncQ == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -1117,7 +1121,7 @@ static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
|||
|
||||
static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||
tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pSyncQ);
|
||||
tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
||||
pVnode->pSyncQ = NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -46,3 +46,4 @@ void stopServer(SServer* pServer);
|
|||
SClient* createClient(const char* user, const char* pass, const char* fqdn, uint16_t port);
|
||||
void dropClient(SClient* pClient);
|
||||
void sendMsg(SClient* pClient, SRpcMsg* pMsg);
|
||||
|
||||
|
|
|
@ -26,8 +26,9 @@ int32_t mndInitVgroup(SMnode *pMnode);
|
|||
void mndCleanupVgroup(SMnode *pMnode);
|
||||
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
|
||||
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup);
|
||||
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
|
||||
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
|
||||
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
|
||||
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup);
|
||||
|
||||
SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
|
||||
SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "mndShow.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define TSDB_STB_VER_NUMBER 1
|
||||
|
@ -199,7 +200,54 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) {
|
|||
return mndAcquireDb(pMnode, db);
|
||||
}
|
||||
|
||||
static int32_t mndCheckStbMsg(SCreateStbMsg *pCreate) {
|
||||
static SCreateStbInternalMsg *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
|
||||
int32_t totalCols = pStb->numOfTags + pStb->numOfColumns;
|
||||
int32_t contLen = totalCols * sizeof(SSchema) + sizeof(SCreateStbInternalMsg);
|
||||
|
||||
SCreateStbInternalMsg *pCreate = calloc(1, contLen);
|
||||
if (pCreate == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pCreate->head.contLen = htonl(contLen);
|
||||
pCreate->head.vgId = htonl(pVgroup->vgId);
|
||||
memcpy(pCreate->name, pStb->name, TSDB_TABLE_FNAME_LEN);
|
||||
pCreate->suid = htobe64(pStb->uid);
|
||||
pCreate->sverson = htonl(pStb->version);
|
||||
pCreate->ttl = 0;
|
||||
pCreate->keep = 0;
|
||||
pCreate->numOfTags = htonl(pStb->numOfTags);
|
||||
pCreate->numOfColumns = htonl(pStb->numOfColumns);
|
||||
|
||||
memcpy(pCreate->pSchema, pStb->pSchema, totalCols * sizeof(SSchema));
|
||||
for (int32_t t = 0; t < totalCols; ++t) {
|
||||
SSchema *pSchema = &pCreate->pSchema[t];
|
||||
pSchema->bytes = htonl(pSchema->bytes);
|
||||
pSchema->colId = htonl(pSchema->colId);
|
||||
}
|
||||
|
||||
return pCreate;
|
||||
}
|
||||
|
||||
static SDropStbInternalMsg *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
|
||||
int32_t contLen = sizeof(SDropStbInternalMsg);
|
||||
|
||||
SDropStbInternalMsg *pDrop = calloc(1, contLen);
|
||||
if (pDrop == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pDrop->head.contLen = htonl(contLen);
|
||||
pDrop->head.vgId = htonl(pVgroup->vgId);
|
||||
memcpy(pDrop->name, pStb->name, TSDB_TABLE_FNAME_LEN);
|
||||
pDrop->suid = htobe64(pStb->uid);
|
||||
|
||||
return pDrop;
|
||||
}
|
||||
|
||||
static int32_t mndCheckCreateStbMsg(SCreateStbMsg *pCreate) {
|
||||
pCreate->numOfColumns = htonl(pCreate->numOfColumns);
|
||||
pCreate->numOfTags = htonl(pCreate->numOfTags);
|
||||
int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags;
|
||||
|
@ -248,6 +296,103 @@ static int32_t mndCheckStbMsg(SCreateStbMsg *pCreate) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateStbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSdbRaw *pUndoRaw = mndStbActionEncode(pStb);
|
||||
if (pUndoRaw == NULL) return -1;
|
||||
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
void *pIter = NULL;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pDb->uid) continue;
|
||||
|
||||
SCreateStbInternalMsg *pMsg = mndBuildCreateStbMsg(pMnode, pVgroup, pStb);
|
||||
if (pMsg == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
action.pCont = pMsg;
|
||||
action.contLen = htonl(pMsg->head.contLen);
|
||||
action.msgType = TSDB_MSG_TYPE_CREATE_STB_IN;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return -1;
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
void *pIter = NULL;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pDb->uid) continue;
|
||||
|
||||
SDropStbInternalMsg *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb);
|
||||
if (pMsg == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SDropStbInternalMsg);
|
||||
action.msgType = TSDB_MSG_TYPE_DROP_STB_IN;
|
||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return -1;
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCreate, SDbObj *pDb) {
|
||||
SStbObj stbObj = {0};
|
||||
tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
|
@ -269,6 +414,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre
|
|||
}
|
||||
memcpy(stbObj.pSchema, pCreate->pSchema, totalSize);
|
||||
|
||||
int32_t code = 0;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||
if (pTrans == NULL) {
|
||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
|
@ -276,29 +422,30 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre
|
|||
}
|
||||
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
|
||||
|
||||
SSdbRaw *pRedoRaw = mndStbActionEncode(&stbObj);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
|
||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_STB_OVER;
|
||||
}
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
|
||||
|
||||
SSdbRaw *pUndoRaw = mndStbActionEncode(&stbObj);
|
||||
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
|
||||
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
|
||||
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_STB_OVER;
|
||||
}
|
||||
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
|
||||
|
||||
SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
|
||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_STB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto CREATE_STB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, &stbObj) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto CREATE_STB_OVER;
|
||||
}
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
|
@ -306,8 +453,11 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre
|
|||
return -1;
|
||||
}
|
||||
|
||||
code = 0;
|
||||
|
||||
CREATE_STB_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
|
||||
|
@ -316,7 +466,7 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
mDebug("stb:%s, start to create", pCreate->name);
|
||||
|
||||
if (mndCheckStbMsg(pCreate) != 0) {
|
||||
if (mndCheckCreateStbMsg(pCreate) != 0) {
|
||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
@ -353,7 +503,10 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) { return 0; }
|
||||
static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) {
|
||||
mndTransHandleActionRsp(pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) {
|
||||
SSchema *pSchema = &pAlter->schema;
|
||||
|
@ -414,9 +567,44 @@ static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) { return 0; }
|
||||
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) {
|
||||
mndTransHandleActionRsp(pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetDropStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
|
||||
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetDropStbUndoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
|
||||
SSdbRaw *pUndoRaw = mndStbActionEncode(pStb);
|
||||
if (pUndoRaw == NULL) return -1;
|
||||
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetDropStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
|
||||
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; }
|
||||
|
||||
static int32_t mndSetDropStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; }
|
||||
|
||||
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||
if (pTrans == NULL) {
|
||||
mError("stb:%s, failed to drop since %s", pStb->name, terrstr());
|
||||
|
@ -424,36 +612,39 @@ static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) {
|
|||
}
|
||||
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
||||
|
||||
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) {
|
||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||
goto DROP_STB_OVER;
|
||||
}
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING);
|
||||
|
||||
SSdbRaw *pUndoRaw = mndStbActionEncode(pStb);
|
||||
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
|
||||
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
if (mndSetDropStbUndoLogs(pMnode, pTrans, pStb) != 0) {
|
||||
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
|
||||
goto DROP_STB_OVER;
|
||||
}
|
||||
sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);
|
||||
|
||||
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) {
|
||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||
goto DROP_STB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropStbRedoActions(pMnode, pTrans, pStb) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto DROP_STB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropStbUndoActions(pMnode, pTrans, pStb) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto DROP_STB_OVER;
|
||||
}
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
goto DROP_STB_OVER;
|
||||
}
|
||||
|
||||
code = 0;
|
||||
|
||||
DROP_STB_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
return 0;
|
||||
}
|
||||
|
@ -488,7 +679,10 @@ static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { return 0; }
|
||||
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) {
|
||||
mndTransHandleActionRsp(pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
|
|
|
@ -465,11 +465,15 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
|
|||
}
|
||||
|
||||
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
|
||||
return mndTransAppendAction(pTrans->redoActions, pAction);
|
||||
int32_t code = mndTransAppendAction(pTrans->redoActions, pAction);
|
||||
mTrace("trans:%d, msg:%s append to redo actions, code:0x%x", pTrans->id, taosMsg[pAction->msgType], code);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
|
||||
return mndTransAppendAction(pTrans->undoActions, pAction);
|
||||
int32_t code = mndTransAppendAction(pTrans->undoActions, pAction);
|
||||
mTrace("trans:%d, msg:%s append to undo actions, code:0x%x", pTrans->id, taosMsg[pAction->msgType], code);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
||||
|
@ -504,7 +508,6 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
mDebug("trans:%d, prepare finished", pNewTrans->id);
|
||||
pNewTrans->rpcHandle = pTrans->rpcHandle;
|
||||
mndTransExecute(pMnode, pNewTrans);
|
||||
mndReleaseTrans(pMnode, pNewTrans);
|
||||
|
|
|
@ -311,6 +311,27 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) {
|
||||
SEpSet epset = {0};
|
||||
|
||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||
if (pDnode == NULL) continue;
|
||||
|
||||
if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
|
||||
epset.inUse = epset.numOfEps;
|
||||
}
|
||||
|
||||
epset.port[epset.numOfEps] = pDnode->port;
|
||||
memcpy(&epset.fqdn[epset.numOfEps], pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
epset.numOfEps++;
|
||||
mndReleaseDnode(pMnode, pDnode);
|
||||
}
|
||||
|
||||
return epset;
|
||||
}
|
||||
|
||||
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) {
|
||||
mndTransHandleActionRsp(pMsg);
|
||||
return 0;
|
||||
|
|
|
@ -240,6 +240,8 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
|
|||
}
|
||||
|
||||
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
|
||||
*ppObj = NULL;
|
||||
|
||||
SHashObj *hash = sdbGetHash(pSdb, type);
|
||||
if (hash == NULL) return NULL;
|
||||
|
||||
|
|
|
@ -15,53 +15,31 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "vnodeInt.h"
|
||||
#include "tqueue.h"
|
||||
|
||||
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
|
||||
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; }
|
||||
void vnodeDrop(SVnode *pVnode) {}
|
||||
|
||||
int32_t vnodeCompact(SVnode *pVnode) { return 0; }
|
||||
|
||||
int32_t vnodeSync(SVnode *pVnode) { return 0; }
|
||||
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; }
|
||||
|
||||
SVnodeMsg *vnodeInitMsg(int32_t msgNum) {
|
||||
SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg));
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
} else {
|
||||
pMsg->allocNum = msgNum;
|
||||
return pMsg;
|
||||
}
|
||||
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
vInfo("query message is processed");
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) {
|
||||
if (pMsg->curNum >= pMsg->allocNum) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg;
|
||||
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
vInfo("fetch message is processed");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodeCleanupMsg(SVnodeMsg *pMsg) {
|
||||
for (int32_t i = 0; i < pMsg->curNum; ++i) {
|
||||
rpcFreeCont(pMsg->rpcMsg[i].pCont);
|
||||
}
|
||||
taosFreeQitem(pMsg);
|
||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
vInfo("sync message is processed");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType) {
|
||||
switch (msgType) {
|
||||
case VN_MSG_TYPE_WRITE:
|
||||
break;
|
||||
case VN_MSG_TYPE_APPLY:
|
||||
break;
|
||||
case VN_MSG_TYPE_SYNC:
|
||||
break;
|
||||
case VN_MSG_TYPE_QUERY:
|
||||
break;
|
||||
case VN_MSG_TYPE_FETCH:
|
||||
break;
|
||||
}
|
||||
int vnodeProcessConsumeReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
vInfo("consume message is processed");
|
||||
return 0;
|
||||
}
|
|
@ -70,6 +70,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType);
|
||||
|
||||
switch (pMsg->msgType) {
|
||||
case TSDB_MSG_TYPE_CREATE_STB_IN:
|
||||
case TSDB_MSG_TYPE_CREATE_TABLE:
|
||||
if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) {
|
||||
// TODO: handle error
|
||||
|
@ -77,6 +78,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
|
||||
// TODO: maybe need to clear the requst struct
|
||||
break;
|
||||
case TSDB_MSG_TYPE_DROP_STB_IN:
|
||||
case TSDB_MSG_TYPE_DROP_TABLE:
|
||||
if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
|
||||
// TODO: handle error
|
||||
|
|
Loading…
Reference in New Issue