This commit is contained in:
Hongze Cheng 2021-11-29 11:37:26 +08:00
parent 0d02f7fced
commit e3478a52a5
7 changed files with 103 additions and 54 deletions

View File

@ -74,7 +74,7 @@ typedef struct STbCfg {
SMeta *metaOpen(const char *path, const SMetaCfg *pOptions); SMeta *metaOpen(const char *path, const SMetaCfg *pOptions);
void metaClose(SMeta *pMeta); void metaClose(SMeta *pMeta);
void metaRemove(const char *path); void metaRemove(const char *path);
int metaCreateTable(SMeta *pMeta, const void *pReq, const int len); int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
int metaDropTable(SMeta *pMeta, const void *pReq, const int len); int metaDropTable(SMeta *pMeta, const void *pReq, const int len);
int metaCommit(SMeta *pMeta); int metaCommit(SMeta *pMeta);

View File

@ -131,6 +131,11 @@ void vnodeOptionsInit(SVnodeCfg *pOptions);
void vnodeOptionsClear(SVnodeCfg *pOptions); void vnodeOptionsClear(SVnodeCfg *pOptions);
/* ------------------------ REQUESTS ------------------------ */ /* ------------------------ REQUESTS ------------------------ */
typedef struct {
uint64_t ver;
char req[];
} SVnodeReq;
typedef struct { typedef struct {
int err; int err;
char info[]; char info[];

View File

@ -27,6 +27,7 @@ typedef struct SVBufPool SVBufPool;
int vnodeOpenBufPool(SVnode *pVnode); int vnodeOpenBufPool(SVnode *pVnode);
void vnodeCloseBufPool(SVnode *pVnode); void vnodeCloseBufPool(SVnode *pVnode);
void *vnodeMalloc(SVnode *pVnode, uint64_t size);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -16,23 +16,14 @@
#ifndef _TD_VNODE_REQUEST_H_ #ifndef _TD_VNODE_REQUEST_H_
#define _TD_VNODE_REQUEST_H_ #define _TD_VNODE_REQUEST_H_
#include "vnode.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct SVnodeReq SVnodeReq; int vnodeBuildCreateTableReq(const SVCreateTableReq *pReq, char *msg, int len);
typedef struct SVnodeRsp SVnodeRsp; int vnodeParseCreateTableReq(const char *msg, int len, SVCreateTableReq *pReq);
typedef enum {} EVReqT;
typedef enum {} EVRspT;
struct SVnodeReq {
EVReqT type;
};
struct SVnodeRsp {
EVRspT type;
};
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -71,6 +71,7 @@ static SListNode * vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type);
static void vBufPoolFreeNode(SListNode *pNode); static void vBufPoolFreeNode(SListNode *pNode);
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf); static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf);
static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma); static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma);
static void * vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size);
int vnodeOpenBufPool(SVnode *pVnode) { int vnodeOpenBufPool(SVnode *pVnode) {
uint64_t capacity; uint64_t capacity;
@ -129,6 +130,24 @@ void vnodeCloseBufPool(SVnode *pVnode) {
} }
} }
void *vnodeMalloc(SVnode *pVnode, uint64_t size) {
void *ptr;
if (pVnode->pBufPool->inuse == NULL) {
SListNode *pNode;
while ((pNode = tdListPopHead(&(pVnode->pBufPool->free))) == NULL) {
// todo
// tsem_wait();
ASSERT(0);
}
pVnode->pBufPool->inuse = pNode;
}
SVMemAllocator *pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data);
return vBufPoolMalloc(pvma, size);
}
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static void vArenaAllocatorInit(SVArenaAllocator *pvaa, uint64_t capacity, uint64_t ssize, uint64_t lsize) { /* TODO */ static void vArenaAllocatorInit(SVArenaAllocator *pvaa, uint64_t capacity, uint64_t ssize, uint64_t lsize) { /* TODO */
pvaa->ssize = ssize; pvaa->ssize = ssize;
@ -201,10 +220,8 @@ static void vBufPoolFreeNode(SListNode *pNode) {
free(pNode); free(pNode);
} }
static void *vBufPoolMalloc(SMemAllocator *pma, uint64_t size) { static void *vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size) {
SVMAWrapper * pvmaw = (SVMAWrapper *)(pma->impl); void *ptr = NULL;
SVMemAllocator *pvma = (SVMemAllocator *)(pvmaw->pNode->data);
void * ptr = NULL;
if (pvma->type == E_V_ARENA_ALLOCATOR) { if (pvma->type == E_V_ARENA_ALLOCATOR) {
SVArenaAllocator *pvaa = &(pvma->vaa); SVArenaAllocator *pvaa = &(pvma->vaa);
@ -229,7 +246,7 @@ static void *vBufPoolMalloc(SMemAllocator *pma, uint64_t size) {
/* TODO */ /* TODO */
} }
return NULL; return ptr;
} }
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf) { static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf) {
@ -267,7 +284,7 @@ static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf) {
pvmaw->pNode = pVnode->pBufPool->inuse; pvmaw->pNode = pVnode->pBufPool->inuse;
pma->impl = pvmaw; pma->impl = pvmaw;
pma->malloc = vBufPoolMalloc; pma->malloc = NULL;
pma->calloc = NULL; /* TODO */ pma->calloc = NULL; /* TODO */
pma->realloc = NULL; /* TODO */ pma->realloc = NULL; /* TODO */
pma->free = NULL; /* TODO */ pma->free = NULL; /* TODO */

View File

@ -16,13 +16,74 @@
#include "vnodeDef.h" #include "vnodeDef.h"
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SRpcMsg *pReq; SRpcMsg * pMsg;
SRpcMsg *pRsp; SVnodeReq *pVnodeReq;
for (size_t i = 0; i < taosArrayGetSize(pMsgs); i++) { for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pReq = taosArrayGet(pMsgs, i); pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
vnodeApplyWMsg(pVnode, pReq, &pRsp); // ser request version
pVnodeReq = (SVnodeReq *)(pMsg->pCont);
pVnodeReq->ver = pVnode->state.processed++;
if (walWrite(pVnode->pWal, pVnodeReq->ver, pVnodeReq->req, pMsg->contLen - sizeof(pVnodeReq->ver)) < 0) {
// TODO: handle error
}
}
walFsync(pVnode->pWal, false);
// Apply each request now
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
pVnodeReq = (SVnodeReq *)(pMsg->pCont);
SVCreateTableReq ctReq;
// Apply the request
{
void *ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) {
// TODO: handle error
}
memcpy(ptr, pVnodeReq, pMsg->contLen);
// todo: change the interface here
if (tqPushMsg(pVnode->pTq, pVnodeReq->req, pVnodeReq->ver) < 0) {
// TODO: handle error
}
switch (pMsg->msgType) {
case TSDB_MSG_TYPE_CREATE_TABLE:
if (vnodeParseCreateTableReq(pVnodeReq->req, pMsg->contLen - sizeof(pVnodeReq->ver), &(ctReq)) < 0) {
// TODO: handle error
}
if (metaCreateTable(pVnode->pMeta, &ctReq) < 0) {
// TODO: handle error
}
// TODO: maybe need to clear the requst struct
break;
case TSDB_MSG_TYPE_DROP_TABLE:
/* code */
break;
case TSDB_MSG_TYPE_SUBMIT:
/* code */
break;
default:
break;
}
}
pVnode->state.applied = pVnodeReq->ver;
// Check if it needs to commit
if (vnodeShouldCommit(pVnode)) {
if (vnodeAsyncCommit(pVnode) < 0) {
// TODO: handle error
}
}
} }
return 0; return 0;
@ -30,31 +91,6 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO // TODO
int code = 0;
switch (pMsg->msgType) {
case TSDB_MSG_TYPE_CREATE_TABLE:
if (metaCreateTable(pVnode->pMeta, pMsg->pCont, pMsg->contLen) < 0) {
/* TODO */
return -1;
}
break;
case TSDB_MSG_TYPE_DROP_TABLE:
if (metaDropTable(pVnode->pMeta, pMsg->pCont, pMsg->contLen) < 0) {
/* TODO */
return -1;
}
break;
case TSDB_MSG_TYPE_SUBMIT:
if (tsdbInsertData(pVnode->pTsdb, pMsg->pCont, pMsg->contLen) < 0) {
/* TODO */
return -1;
}
break;
default:
break;
}
return 0; return 0;
} }

View File

@ -15,8 +15,7 @@
#include "metaDef.h" #include "metaDef.h"
int metaCreateTable(SMeta *pMeta, const void *pCont, const int len) { int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) {
STbCfg *pTbCfg = NULL;
// Validate the tbOptions // Validate the tbOptions
if (metaValidateTbOptions(pMeta, pTbCfg) < 0) { if (metaValidateTbOptions(pMeta, pTbCfg) < 0) {
// TODO: handle error // TODO: handle error