Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
This commit is contained in:
commit
77db022fdf
|
@ -24,6 +24,8 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
typedef enum { VND_WRITE_QUEUE, VND_QUERY_QUEUE, VND_FETCH_QUEUE, VND_APPLY_QUEUE, VND_SYNC_QUEUE } EVndQueueType;
|
||||||
|
|
||||||
typedef struct SVnodesMgmt {
|
typedef struct SVnodesMgmt {
|
||||||
SHashObj *hash;
|
SHashObj *hash;
|
||||||
SRWLatch latch;
|
SRWLatch latch;
|
||||||
|
@ -102,7 +104,8 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
|
||||||
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
|
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
|
||||||
|
|
||||||
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pMsg);
|
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
|
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
|
|
||||||
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
|
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
|
||||||
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
|
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
|
||||||
|
|
|
@ -296,7 +296,10 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
|
||||||
|
|
||||||
vnodeOpt.nthreads = tsNumOfCommitThreads;
|
vnodeOpt.nthreads = tsNumOfCommitThreads;
|
||||||
vnodeOpt.putToQueryQFp = vmPutMsgToQueryQueue;
|
vnodeOpt.putToQueryQFp = vmPutMsgToQueryQueue;
|
||||||
|
vnodeOpt.putToFetchQFp = vmPutMsgToQueryQueue;
|
||||||
vnodeOpt.sendReqFp = dndSendReqToDnode;
|
vnodeOpt.sendReqFp = dndSendReqToDnode;
|
||||||
|
vnodeOpt.sendMnodeReqFp = dndSendReqToMnode;
|
||||||
|
vnodeOpt.sendRspFp = dndSendRsp;
|
||||||
if (vnodeInit(&vnodeOpt) != 0) {
|
if (vnodeInit(&vnodeOpt) != 0) {
|
||||||
dError("failed to init vnode since %s", terrstr());
|
dError("failed to init vnode since %s", terrstr());
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
|
|
@ -16,46 +16,109 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "vmInt.h"
|
#include "vmInt.h"
|
||||||
|
|
||||||
|
static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
|
||||||
|
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
|
||||||
|
dndSendRsp(pWrapper, &rsp);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
|
int32_t code = -1;
|
||||||
|
tmsg_t msgType = pMsg->rpcMsg.msgType;
|
||||||
|
dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg);
|
||||||
|
|
||||||
|
switch (msgType) {
|
||||||
|
case TDMT_DND_CREATE_VNODE:
|
||||||
|
code = vmProcessCreateVnodeReq(pMgmt, pMsg);
|
||||||
|
break;
|
||||||
|
case TDMT_DND_ALTER_VNODE:
|
||||||
|
code = vmProcessAlterVnodeReq(pMgmt, pMsg);
|
||||||
|
break;
|
||||||
|
case TDMT_DND_DROP_VNODE:
|
||||||
|
code = vmProcessDropVnodeReq(pMgmt, pMsg);
|
||||||
|
break;
|
||||||
|
case TDMT_DND_SYNC_VNODE:
|
||||||
|
code = vmProcessSyncVnodeReq(pMgmt, pMsg);
|
||||||
|
break;
|
||||||
|
case TDMT_DND_COMPACT_VNODE:
|
||||||
|
code = vmProcessCompactVnodeReq(pMgmt, pMsg);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
|
dError("msg:%p, not processed in vnode-mgmt queue", pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (msgType & 1u) {
|
||||||
|
if (code != 0 && terrno != 0) code = terrno;
|
||||||
|
vmSendRsp(pMgmt->pWrapper, pMsg, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||||
|
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
|
static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
|
||||||
dTrace("msg:%p, will be processed in vnode query queue", pMsg);
|
dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
|
||||||
vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
|
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
|
||||||
|
if (code != 0) {
|
||||||
|
vmSendRsp(pVnode->pWrapper, pMsg, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||||
|
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
|
static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
|
||||||
dTrace("msg:%p, will be processed in vnode fetch queue", pMsg);
|
dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
|
||||||
vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
|
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
|
||||||
|
if (code != 0) {
|
||||||
|
vmSendRsp(pVnode->pWrapper, pMsg, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||||
|
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
|
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
|
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
|
||||||
|
if (pArray == NULL) {
|
||||||
|
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
SNodeMsg *pMsg = NULL;
|
SNodeMsg *pMsg = NULL;
|
||||||
taosGetQitem(qall, (void **)&pMsg);
|
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||||
dTrace("msg:%p, will be processed in vnode write queue", pMsg);
|
|
||||||
void *ptr = taosArrayPush(pArray, &pMsg);
|
dTrace("msg:%p, will be processed in vnode-write queue", pMsg);
|
||||||
assert(ptr != NULL);
|
if (taosArrayPush(pArray, &pMsg) == NULL) {
|
||||||
|
dTrace("msg:%p, failed to process since %s", pMsg, terrstr());
|
||||||
|
vmSendRsp(pVnode->pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodeProcessWMsgs(pVnode->pImpl, pArray);
|
vnodeProcessWMsgs(pVnode->pImpl, pArray);
|
||||||
|
|
||||||
for (size_t i = 0; i < numOfMsgs; i++) {
|
numOfMsgs = taosArrayGetSize(pArray);
|
||||||
SRpcMsg *pRsp = NULL;
|
for (int32_t i = 0; i < numOfMsgs; i++) {
|
||||||
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
|
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
|
||||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||||
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp);
|
SRpcMsg *pRsp = NULL;
|
||||||
|
|
||||||
|
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp);
|
||||||
if (pRsp != NULL) {
|
if (pRsp != NULL) {
|
||||||
pRsp->ahandle = pRpc->ahandle;
|
pRsp->ahandle = pRpc->ahandle;
|
||||||
dndSendRsp(pVnode->pWrapper, pRsp);
|
dndSendRsp(pVnode->pWrapper, pRsp);
|
||||||
free(pRsp);
|
free(pRsp);
|
||||||
} else {
|
} else {
|
||||||
if (code != 0) code = terrno;
|
if (code != 0 && terrno != 0) code = terrno;
|
||||||
SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
|
vmSendRsp(pVnode->pWrapper, pMsg, code);
|
||||||
dndSendRsp(pVnode->pWrapper, &rpcRsp);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (size_t i = 0; i < numOfMsgs; i++) {
|
for (int32_t i = 0; i < numOfMsgs; i++) {
|
||||||
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
|
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
|
||||||
dTrace("msg:%p, is freed", pMsg);
|
dTrace("msg:%p, is freed", pMsg);
|
||||||
rpcFreeCont(pMsg->rpcMsg.pCont);
|
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||||
|
@ -89,93 +152,112 @@ static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOf
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) {
|
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueueType qtype) {
|
||||||
SRpcMsg *pMsg = &pNodeMsg->rpcMsg;
|
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||||
|
int32_t code = -1;
|
||||||
|
|
||||||
SMsgHead *pHead = pMsg->pCont;
|
SMsgHead *pHead = pRpc->pCont;
|
||||||
pHead->contLen = htonl(pHead->contLen);
|
pHead->contLen = htonl(pHead->contLen);
|
||||||
pHead->vgId = htonl(pHead->vgId);
|
pHead->vgId = htonl(pHead->vgId);
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId);
|
dError("vgId:%d, failed to write msg:%p to queue since %s", pHead->vgId, pMsg, terrstr());
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pVnode;
|
switch (qtype) {
|
||||||
}
|
case VND_QUERY_QUEUE:
|
||||||
|
dTrace("msg:%p, will be written into vnode-query queue", pMsg);
|
||||||
|
code = taosWriteQitem(pVnode->pQueryQ, pMsg);
|
||||||
|
break;
|
||||||
|
case VND_FETCH_QUEUE:
|
||||||
|
dTrace("msg:%p, will be written into vnode-fetch queue", pMsg);
|
||||||
|
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||||
|
break;
|
||||||
|
case VND_WRITE_QUEUE:
|
||||||
|
dTrace("msg:%p, will be written into vnode-write queue", pMsg);
|
||||||
|
code = taosWriteQitem(pVnode->pWriteQ, pMsg);
|
||||||
|
case VND_SYNC_QUEUE:
|
||||||
|
dTrace("msg:%p, will be written into vnode-sync queue", pMsg);
|
||||||
|
code = taosWriteQitem(pVnode->pSyncQ, pMsg);
|
||||||
|
default:
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|
||||||
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg);
|
|
||||||
if (pVnode == NULL) return -1;
|
|
||||||
|
|
||||||
int32_t code = taosWriteQitem(pVnode->pWriteQ, pMsg);
|
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
vmReleaseVnode(pMgmt, pVnode);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg);
|
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_SYNC_QUEUE);
|
||||||
if (pVnode == NULL) return -1;
|
}
|
||||||
|
|
||||||
int32_t code = taosWriteQitem(pVnode->pSyncQ, pMsg);
|
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_WRITE_QUEUE);
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg);
|
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_QUERY_QUEUE);
|
||||||
if (pVnode == NULL) return -1;
|
|
||||||
|
|
||||||
int32_t code = taosWriteQitem(pVnode->pQueryQ, pMsg);
|
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg);
|
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_FETCH_QUEUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
|
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
|
||||||
|
dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
|
||||||
|
return dndWriteMsgToWorker(pWorker, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQueueType qtype) {
|
||||||
|
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
|
int32_t code = -1;
|
||||||
|
SMsgHead *pHead = pRpc->pCont;
|
||||||
|
|
||||||
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||||
if (pVnode == NULL) return -1;
|
if (pVnode == NULL) return -1;
|
||||||
|
|
||||||
int32_t code = taosWriteQitem(pVnode->pFetchQ, pMsg);
|
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
|
||||||
|
pMsg->rpcMsg = *pRpc;
|
||||||
|
switch (qtype) {
|
||||||
|
case VND_QUERY_QUEUE:
|
||||||
|
dTrace("msg:%p, will be put into vnode-query queue", pMsg);
|
||||||
|
code = taosWriteQitem(pVnode->pQueryQ, pMsg);
|
||||||
|
break;
|
||||||
|
case VND_FETCH_QUEUE:
|
||||||
|
dTrace("msg:%p, will be put into vnode-fetch queue", pMsg);
|
||||||
|
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||||
|
break;
|
||||||
|
case VND_APPLY_QUEUE:
|
||||||
|
dTrace("msg:%p, will be put into vnode-apply queue", pMsg);
|
||||||
|
code = taosWriteQitem(pVnode->pApplyQ, pMsg);
|
||||||
|
break;
|
||||||
|
case VND_WRITE_QUEUE:
|
||||||
|
case VND_SYNC_QUEUE:
|
||||||
|
default:
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
vmReleaseVnode(pMgmt, pVnode);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||||
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
|
return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_QUERY_QUEUE);
|
||||||
|
|
||||||
int32_t code = -1;
|
|
||||||
SMsgHead *pHead = pRpc->pCont;
|
|
||||||
// pHead->vgId = htonl(pHead->vgId);
|
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
|
||||||
if (pVnode == NULL) return -1;
|
|
||||||
|
|
||||||
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
|
||||||
if (pMsg != NULL) {
|
|
||||||
pMsg->rpcMsg = *pRpc;
|
|
||||||
code = taosWriteQitem(pVnode->pQueryQ, pMsg);
|
|
||||||
}
|
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pRpc) {
|
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||||
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
|
return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_FETCH_QUEUE);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = -1;
|
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||||
SMsgHead *pHead = pRpc->pCont;
|
return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_APPLY_QUEUE);
|
||||||
// pHead->vgId = htonl(pHead->vgId);
|
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
|
||||||
if (pVnode == NULL) return -1;
|
|
||||||
|
|
||||||
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
|
||||||
if (pMsg != NULL) {
|
|
||||||
pMsg->rpcMsg = *pRpc;
|
|
||||||
code = taosWriteQitem(pVnode->pApplyQ, pMsg);
|
|
||||||
}
|
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
|
@ -191,6 +273,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dDebug("vgId:%d, vnode queue is alloced", pVnode->vgId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,43 +288,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
pVnode->pSyncQ = NULL;
|
pVnode->pSyncQ = NULL;
|
||||||
pVnode->pFetchQ = NULL;
|
pVnode->pFetchQ = NULL;
|
||||||
pVnode->pQueryQ = NULL;
|
pVnode->pQueryQ = NULL;
|
||||||
}
|
dDebug("vgId:%d, vnode queue is freed", pVnode->vgId);
|
||||||
|
|
||||||
static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|
||||||
int32_t code = -1;
|
|
||||||
tmsg_t msgType = pMsg->rpcMsg.msgType;
|
|
||||||
dTrace("msg:%p, will be processed in vnode mgmt queue", pMsg);
|
|
||||||
|
|
||||||
switch (msgType) {
|
|
||||||
case TDMT_DND_CREATE_VNODE:
|
|
||||||
code = vmProcessCreateVnodeReq(pMgmt, pMsg);
|
|
||||||
break;
|
|
||||||
case TDMT_DND_ALTER_VNODE:
|
|
||||||
code = vmProcessAlterVnodeReq(pMgmt, pMsg);
|
|
||||||
break;
|
|
||||||
case TDMT_DND_DROP_VNODE:
|
|
||||||
code = vmProcessDropVnodeReq(pMgmt, pMsg);
|
|
||||||
break;
|
|
||||||
case TDMT_DND_SYNC_VNODE:
|
|
||||||
code = vmProcessSyncVnodeReq(pMgmt, pMsg);
|
|
||||||
break;
|
|
||||||
case TDMT_DND_COMPACT_VNODE:
|
|
||||||
code = vmProcessCompactVnodeReq(pMgmt, pMsg);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
|
||||||
dError("msg:%p, not processed in mgmt queue", pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msgType & 1u) {
|
|
||||||
if (code != 0) code = terrno;
|
|
||||||
SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle};
|
|
||||||
dndSendRsp(pMgmt->pWrapper, &rsp);
|
|
||||||
}
|
|
||||||
|
|
||||||
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
|
||||||
rpcFreeCont(pMsg->rpcMsg.pCont);
|
|
||||||
taosFreeQitem(pMsg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
|
int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
|
||||||
|
@ -275,7 +322,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
|
||||||
if (tWWorkerInit(pWPool) != 0) return -1;
|
if (tWWorkerInit(pWPool) != 0) return -1;
|
||||||
|
|
||||||
if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) {
|
if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) {
|
||||||
dError("failed to start dnode mgmt worker since %s", terrstr());
|
dError("failed to start vnode-mgmt worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,9 +338,3 @@ void vmStopWorker(SVnodesMgmt *pMgmt) {
|
||||||
tWWorkerCleanup(&pMgmt->syncPool);
|
tWWorkerCleanup(&pMgmt->syncPool);
|
||||||
dDebug("vnode workers is closed");
|
dDebug("vnode workers is closed");
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|
||||||
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
|
|
||||||
dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name);
|
|
||||||
return dndWriteMsgToWorker(pWorker, pMsg);
|
|
||||||
}
|
|
|
@ -62,6 +62,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
|
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
|
||||||
PutToQueueFp putToQueryQFp;
|
PutToQueueFp putToQueryQFp;
|
||||||
|
PutToQueueFp putToFetchQFp;
|
||||||
SendReqFp sendReqFp;
|
SendReqFp sendReqFp;
|
||||||
SendMnodeReqFp sendMnodeReqFp;
|
SendMnodeReqFp sendMnodeReqFp;
|
||||||
SendRspFp sendRspFp;
|
SendRspFp sendRspFp;
|
||||||
|
@ -125,9 +126,8 @@ void vnodeDestroy(const char *path);
|
||||||
*
|
*
|
||||||
* @param pVnode The vnode object.
|
* @param pVnode The vnode object.
|
||||||
* @param pMsgs The array of SRpcMsg
|
* @param pMsgs The array of SRpcMsg
|
||||||
* @return int 0 for success, -1 for failure
|
|
||||||
*/
|
*/
|
||||||
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
|
void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Apply a write request message.
|
* @brief Apply a write request message.
|
||||||
|
|
|
@ -1289,7 +1289,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbSaveFSStatus(pRepo, pRepo->fs->cstatus) < 0) {
|
if (tsdbSaveFSStatus(pRepo, pRepo->fs->cstatus) < 0) {
|
||||||
tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno));
|
tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
||||||
SNodeMsg *pMsg;
|
SNodeMsg *pMsg;
|
||||||
SRpcMsg *pRpc;
|
SRpcMsg *pRpc;
|
||||||
|
|
||||||
|
@ -40,7 +40,8 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
||||||
|
|
||||||
// TODO: Integrate RAFT module here
|
// TODO: Integrate RAFT module here
|
||||||
|
|
||||||
return 0;
|
// No results are returned because error handling is difficult
|
||||||
|
// return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
|
|
Loading…
Reference in New Issue