more
This commit is contained in:
parent
0896bcf715
commit
e74766790a
|
@ -19,5 +19,5 @@ target_link_libraries(
|
||||||
|
|
||||||
# test
|
# test
|
||||||
if(${BUILD_TEST})
|
if(${BUILD_TEST})
|
||||||
#add_subdirectory(test)
|
add_subdirectory(test)
|
||||||
endif(${BUILD_TEST})
|
endif(${BUILD_TEST})
|
|
@ -71,6 +71,7 @@ struct SVnode {
|
||||||
SWal* pWal;
|
SWal* pWal;
|
||||||
SVnodeSync* pSync;
|
SVnodeSync* pSync;
|
||||||
SVnodeFS* pFs;
|
SVnodeFS* pFs;
|
||||||
|
tsem_t canCommit;
|
||||||
};
|
};
|
||||||
|
|
||||||
int vnodeScheduleTask(SVnodeTask* task);
|
int vnodeScheduleTask(SVnodeTask* task);
|
||||||
|
|
|
@ -38,9 +38,10 @@ int vnodeCommit(void *arg) {
|
||||||
|
|
||||||
metaCommit(pVnode->pMeta);
|
metaCommit(pVnode->pMeta);
|
||||||
tqCommit(pVnode->pTq);
|
tqCommit(pVnode->pTq);
|
||||||
tsdbCommit(pVnode->pTq);
|
tsdbCommit(pVnode->pTsdb);
|
||||||
|
|
||||||
vnodeBufPoolRecycle(pVnode);
|
vnodeBufPoolRecycle(pVnode);
|
||||||
|
tsem_post(&(pVnode->canCommit));
|
||||||
// TODO
|
// TODO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,11 +74,14 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
|
||||||
pVnode->path = strdup(path);
|
pVnode->path = strdup(path);
|
||||||
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
|
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
|
||||||
|
|
||||||
|
tsem_init(&(pVnode->canCommit), 0, 1);
|
||||||
|
|
||||||
return pVnode;
|
return pVnode;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeFree(SVnode *pVnode) {
|
static void vnodeFree(SVnode *pVnode) {
|
||||||
if (pVnode) {
|
if (pVnode) {
|
||||||
|
tsem_destroy(&(pVnode->canCommit));
|
||||||
tfree(pVnode->path);
|
tfree(pVnode->path);
|
||||||
free(pVnode);
|
free(pVnode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,14 +31,14 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
||||||
SRpcMsg *pMsg;
|
SRpcMsg * pMsg;
|
||||||
SVnodeReq *pVnodeReq;
|
SVnodeReq *pVnodeReq;
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
||||||
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
|
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
|
||||||
|
|
||||||
// ser request version
|
// ser request version
|
||||||
void *pBuf = pMsg->pCont;
|
void * pBuf = pMsg->pCont;
|
||||||
int64_t ver = pVnode->state.processed++;
|
int64_t ver = pVnode->state.processed++;
|
||||||
taosEncodeFixedU64(&pBuf, ver);
|
taosEncodeFixedU64(&pBuf, ver);
|
||||||
|
|
||||||
|
@ -52,65 +52,64 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
||||||
// Apply each request now
|
// Apply each request now
|
||||||
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
||||||
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
|
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
|
||||||
SVnodeReq vReq;
|
|
||||||
|
|
||||||
// Apply the request
|
// TODO: Now we just need a
|
||||||
{
|
vnodeApplyWMsg(pVnode, pMsg, NULL);
|
||||||
void *ptr = vnodeMalloc(pVnode, pMsg->contLen);
|
|
||||||
if (ptr == NULL) {
|
|
||||||
// TODO: handle error
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: copy here need to be extended
|
|
||||||
memcpy(ptr, pMsg->pCont, pMsg->contLen);
|
|
||||||
|
|
||||||
// todo: change the interface here
|
|
||||||
uint64_t ver;
|
|
||||||
taosDecodeFixedU64(pMsg->pCont, &ver);
|
|
||||||
if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
}
|
|
||||||
|
|
||||||
vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType);
|
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
|
||||||
case TSDB_MSG_TYPE_CREATE_TABLE:
|
|
||||||
if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: maybe need to clear the requst struct
|
|
||||||
break;
|
|
||||||
case TSDB_MSG_TYPE_DROP_TABLE:
|
|
||||||
if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case TSDB_MSG_TYPE_SUBMIT:
|
|
||||||
if (tsdbInsertData(pVnode->pTsdb, (SSubmitMsg *)ptr) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
pVnode->state.applied = ver;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if it needs to commit
|
|
||||||
if (vnodeShouldCommit(pVnode)) {
|
|
||||||
if (vnodeAsyncCommit(pVnode) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
// TODO
|
SVnodeReq vReq;
|
||||||
|
void * ptr = vnodeMalloc(pVnode, pMsg->contLen);
|
||||||
|
if (ptr == NULL) {
|
||||||
|
// TODO: handle error
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: copy here need to be extended
|
||||||
|
memcpy(ptr, pMsg->pCont, pMsg->contLen);
|
||||||
|
|
||||||
|
// todo: change the interface here
|
||||||
|
uint64_t ver;
|
||||||
|
taosDecodeFixedU64(pMsg->pCont, &ver);
|
||||||
|
if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) {
|
||||||
|
// TODO: handle error
|
||||||
|
}
|
||||||
|
|
||||||
|
vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType);
|
||||||
|
|
||||||
|
switch (pMsg->msgType) {
|
||||||
|
case TSDB_MSG_TYPE_CREATE_TABLE:
|
||||||
|
if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) {
|
||||||
|
// TODO: handle error
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: maybe need to clear the requst struct
|
||||||
|
break;
|
||||||
|
case TSDB_MSG_TYPE_DROP_TABLE:
|
||||||
|
if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
|
||||||
|
// TODO: handle error
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case TSDB_MSG_TYPE_SUBMIT:
|
||||||
|
if (tsdbInsertData(pVnode->pTsdb, (SSubmitMsg *)ptr) < 0) {
|
||||||
|
// TODO: handle error
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pVnode->state.applied = ver;
|
||||||
|
|
||||||
|
// Check if it needs to commit
|
||||||
|
if (vnodeShouldCommit(pVnode)) {
|
||||||
|
tsem_wait(&(pVnode->canCommit));
|
||||||
|
if (vnodeAsyncCommit(pVnode) < 0) {
|
||||||
|
// TODO: handle error
|
||||||
|
}
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue