more
This commit is contained in:
parent
018661e177
commit
e861bdeb0a
|
@ -144,9 +144,17 @@ void vnodeOptionsInit(SVnodeCfg *pOptions);
|
||||||
void vnodeOptionsClear(SVnodeCfg *pOptions);
|
void vnodeOptionsClear(SVnodeCfg *pOptions);
|
||||||
|
|
||||||
/* ------------------------ REQUESTS ------------------------ */
|
/* ------------------------ REQUESTS ------------------------ */
|
||||||
|
typedef STbCfg SVCreateTableReq;
|
||||||
|
typedef struct {
|
||||||
|
tb_uid_t uid;
|
||||||
|
} SVDropTableReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint64_t ver;
|
uint64_t ver;
|
||||||
char req[];
|
union {
|
||||||
|
SVCreateTableReq ctReq;
|
||||||
|
SVDropTableReq dtReq;
|
||||||
|
};
|
||||||
} SVnodeReq;
|
} SVnodeReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -154,25 +162,22 @@ typedef struct {
|
||||||
char info[];
|
char info[];
|
||||||
} SVnodeRsp;
|
} SVnodeRsp;
|
||||||
|
|
||||||
/// Create table request
|
#define VNODE_INIT_CREATE_STB_REQ(VER, NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) \
|
||||||
typedef STbCfg SVCreateTableReq;
|
{ .ver = (VER), .ctReq = META_INIT_STB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) }
|
||||||
|
|
||||||
|
#define VNODE_INIT_CREATE_CTB_REQ(VER, NAME, TTL, KEEP, SUID, PTAG) \
|
||||||
|
{ .ver = (VER), .ctReq = META_INIT_CTB_CFG(NAME, TTL, KEEP, SUID, PTAG) }
|
||||||
|
|
||||||
|
#define VNODE_INIT_CREATE_NTB_REQ(VER, NAME, TTL, KEEP, SUID, PSCHEMA) \
|
||||||
|
{ .ver = (VER), .ctReq = META_INIT_NTB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA) }
|
||||||
|
|
||||||
|
int vnodeBuildReq(void **buf, const SVnodeReq *pReq, uint8_t type);
|
||||||
|
void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type);
|
||||||
|
|
||||||
|
// TODO
|
||||||
int vnodeBuildCreateTableReq(void **buf, const SVCreateTableReq *pReq);
|
int vnodeBuildCreateTableReq(void **buf, const SVCreateTableReq *pReq);
|
||||||
void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq);
|
void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq);
|
||||||
|
|
||||||
/// Drop table request
|
|
||||||
typedef struct {
|
|
||||||
tb_uid_t uid;
|
|
||||||
} SVDropTableReq;
|
|
||||||
/// Alter table request
|
|
||||||
typedef struct {
|
|
||||||
// TODO
|
|
||||||
} SVAlterTableReq;
|
|
||||||
|
|
||||||
int vnodeCreateTable(SVnode *pVnode, SVCreateTableReq *pReq, SVnodeRsp *pRsp);
|
|
||||||
int vnodeDropTable(SVnode *pVnode, SVDropTableReq *pReq, SVnodeRsp *pRsp);
|
|
||||||
int vnodeAlterTable(SVnode *pVnode, SVAlterTableReq *pReq, SVnodeRsp *pRsp);
|
|
||||||
|
|
||||||
/* ------------------------ FOR COMPILE ------------------------ */
|
/* ------------------------ FOR COMPILE ------------------------ */
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
|
|
|
@ -15,6 +15,39 @@
|
||||||
|
|
||||||
#include "vnodeDef.h"
|
#include "vnodeDef.h"
|
||||||
|
|
||||||
|
int vnodeBuildReq(void **buf, const SVnodeReq *pReq, uint8_t type) {
|
||||||
|
int tsize = 0;
|
||||||
|
|
||||||
|
tsize += taosEncodeFixedU64(buf, pReq->ver);
|
||||||
|
switch (type) {
|
||||||
|
case TSDB_MSG_TYPE_CREATE_TABLE:
|
||||||
|
tsize += vnodeBuildCreateTableReq(buf, &(pReq->ctReq));
|
||||||
|
/* code */
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
/* TODO */
|
||||||
|
return tsize;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type) {
|
||||||
|
buf = taosDecodeFixedU64(buf, &(pReq->ver));
|
||||||
|
|
||||||
|
switch (type) {
|
||||||
|
case TSDB_MSG_TYPE_CREATE_TABLE:
|
||||||
|
buf = vnodeParseCreateTableReq(buf, &(pReq->ctReq));
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
int vnodeBuildCreateTableReq(void **buf, const SVCreateTableReq *pReq) {
|
int vnodeBuildCreateTableReq(void **buf, const SVCreateTableReq *pReq) {
|
||||||
int tsize = 0;
|
int tsize = 0;
|
||||||
|
|
||||||
|
@ -69,11 +102,11 @@ void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq) {
|
int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq) {
|
||||||
// TODO
|
// TODO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq) {
|
void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq) {
|
||||||
// TODO
|
// TODO
|
||||||
}
|
}
|
|
@ -23,10 +23,10 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
||||||
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
|
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
|
||||||
|
|
||||||
// ser request version
|
// ser request version
|
||||||
pVnodeReq = (SVnodeReq *)(pMsg->pCont);
|
void **pBuf = &(pMsg->pCont);
|
||||||
pVnodeReq->ver = pVnode->state.processed++;
|
taosEncodeFixedU64(pBuf, pVnode->state.processed++);
|
||||||
|
|
||||||
if (walWrite(pVnode->pWal, pVnodeReq->ver, pVnodeReq->req, pMsg->contLen - sizeof(pVnodeReq->ver)) < 0) {
|
if (walWrite(pVnode->pWal, pVnodeReq->ver, pMsg->pCont, pMsg->contLen) < 0) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,9 +36,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
||||||
// Apply each request now
|
// Apply each request now
|
||||||
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
||||||
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
|
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
|
||||||
pVnodeReq = (SVnodeReq *)(pMsg->pCont);
|
SVnodeReq vReq;
|
||||||
SVCreateTableReq ctReq;
|
|
||||||
SVDropTableReq dtReq;
|
|
||||||
|
|
||||||
// Apply the request
|
// Apply the request
|
||||||
{
|
{
|
||||||
|
@ -47,29 +45,28 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(ptr, pVnodeReq, pMsg->contLen);
|
// TODO: copy here need to be extended
|
||||||
|
memcpy(ptr, pMsg->pCont, pMsg->contLen);
|
||||||
|
|
||||||
// todo: change the interface here
|
// // todo: change the interface here
|
||||||
if (tqPushMsg(pVnode->pTq, ptr, pVnodeReq->ver) < 0) {
|
uint64_t ver;
|
||||||
|
taosDecodeFixedU64(pMsg->pCont, &ver);
|
||||||
|
if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType);
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TSDB_MSG_TYPE_CREATE_TABLE:
|
case TSDB_MSG_TYPE_CREATE_TABLE:
|
||||||
vnodeParseCreateTableReq(pVnodeReq->req, &(ctReq));
|
if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) {
|
||||||
|
|
||||||
if (metaCreateTable(pVnode->pMeta, &ctReq) < 0) {
|
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: maybe need to clear the requst struct
|
// TODO: maybe need to clear the requst struct
|
||||||
break;
|
break;
|
||||||
case TSDB_MSG_TYPE_DROP_TABLE:
|
case TSDB_MSG_TYPE_DROP_TABLE:
|
||||||
if (vnodeParseDropTableReq(pVnodeReq->req, &(dtReq)) < 0) {
|
if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
|
||||||
// TODO: handle error
|
|
||||||
}
|
|
||||||
|
|
||||||
if (metaDropTable(pVnode->pMeta, dtReq.uid) < 0) {
|
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -17,18 +17,18 @@ TEST(vnodeApiTest, vnodeOpen_vnodeClose_test) {
|
||||||
STSchema *pTagSchema = NULL;
|
STSchema *pTagSchema = NULL;
|
||||||
char tbname[128] = "st";
|
char tbname[128] = "st";
|
||||||
|
|
||||||
SArray *pMsgs = (SArray *)taosArrayInit(1, sizeof(SRpcMsg *));
|
SArray * pMsgs = (SArray *)taosArrayInit(1, sizeof(SRpcMsg *));
|
||||||
STbCfg stbCfg = META_INIT_STB_CFG(tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema);
|
SVnodeReq vCreateSTbReq = VNODE_INIT_CREATE_STB_REQ(0, tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema);
|
||||||
|
|
||||||
int zs = vnodeBuildCreateTableReq(NULL, &stbCfg);
|
int zs = vnodeBuildReq(NULL, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE);
|
||||||
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + zs);
|
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + zs);
|
||||||
pMsg->contLen = zs;
|
pMsg->contLen = zs;
|
||||||
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(SRpcMsg));
|
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(SRpcMsg));
|
||||||
|
|
||||||
void **pBuf = &(pMsg->pCont);
|
void **pBuf = &(pMsg->pCont);
|
||||||
|
|
||||||
vnodeBuildCreateTableReq(pBuf, &stbCfg);
|
vnodeBuildReq(pBuf, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE);
|
||||||
META_CLEAR_TB_CFG(&stbCfg);
|
META_CLEAR_TB_CFG(&vCreateSTbReq);
|
||||||
|
|
||||||
taosArrayPush(pMsgs, &(pMsg));
|
taosArrayPush(pMsgs, &(pMsg));
|
||||||
|
|
||||||
|
@ -48,16 +48,16 @@ TEST(vnodeApiTest, vnodeOpen_vnodeClose_test) {
|
||||||
SRow *pTag = NULL;
|
SRow *pTag = NULL;
|
||||||
char tbname[128];
|
char tbname[128];
|
||||||
sprintf(tbname, "tb%d", i * batch + j);
|
sprintf(tbname, "tb%d", i * batch + j);
|
||||||
STbCfg ctbCfg = META_INIT_CTB_CFG(tbname, UINT32_MAX, UINT32_MAX, suid, pTag);
|
SVnodeReq vCreateCTbReq = VNODE_INIT_CREATE_CTB_REQ(0, tbname, UINT32_MAX, UINT32_MAX, suid, pTag);
|
||||||
|
|
||||||
int tz = vnodeBuildCreateTableReq(NULL, &ctbCfg);
|
int tz = vnodeBuildReq(NULL, &vCreateCTbReq, TSDB_MSG_TYPE_CREATE_TABLE);
|
||||||
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + tz);
|
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + tz);
|
||||||
pMsg->contLen = tz;
|
pMsg->contLen = tz;
|
||||||
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg));
|
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg));
|
||||||
void **pBuf = &(pMsg->pCont);
|
void **pBuf = &(pMsg->pCont);
|
||||||
|
|
||||||
vnodeBuildCreateTableReq(pBuf, &ctbCfg);
|
vnodeBuildReq(pBuf, &vCreateCTbReq, TSDB_MSG_TYPE_CREATE_TABLE);
|
||||||
META_CLEAR_TB_CFG(&ctbCfg);
|
META_CLEAR_TB_CFG(&vCreateCTbReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodeProcessWMsgs(pVnode, pMsgs);
|
vnodeProcessWMsgs(pVnode, pMsgs);
|
||||||
|
|
Loading…
Reference in New Issue