diff --git a/source/dnode/mgmt/bnode/inc/bmInt.h b/source/dnode/mgmt/bnode/inc/bmInt.h index 9988bedb52..368f47e0e2 100644 --- a/source/dnode/mgmt/bnode/inc/bmInt.h +++ b/source/dnode/mgmt/bnode/inc/bmInt.h @@ -16,7 +16,7 @@ #ifndef _TD_DND_BNODE_INT_H_ #define _TD_DND_BNODE_INT_H_ -#include "mm.h" +#include "bm.h" #include "dm.h" #ifdef __cplusplus @@ -35,43 +35,21 @@ typedef struct SBnodeMgmt { SDnodeWorker writeWorker; } SBnodeMgmt; -// mmFile.c +// bmFile.c int32_t bmReadFile(SBnodeMgmt *pMgmt); int32_t bmWriteFile(SBnodeMgmt *pMgmt); SBnode *bmAcquire(SBnodeMgmt *pMgmt); void bmRelease(SBnodeMgmt *pMgmt, SBnode *pBnode); -// SBnode *mmAcquire(SMnodeMgmt *pMgmt); -// void mmRelease(SMnodeMgmt *pMgmt, SBnode *pMnode); -// int32_t mmOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption); -// int32_t mmAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption); -// int32_t mmDrop(SMnodeMgmt *pMgmt); +// bmInt.c +int32_t bmOpen(SBnodeMgmt *pMgmt); +int32_t bmDrop(SBnodeMgmt *pMgmt); - -// void bmGetMgmtFp(SMgmtWrapper *pMgmt); - -// int32_t dndInitBnode(SDnode *pDnode); -// void dndCleanupBnode(SDnode *pDnode); - -// void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -// int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg); -// int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg); - -// void bmInitMsgHandles(SMgmtWrapper *pWrapper); - -// int32_t bmStartWorker(SDnode *pDnode); -// void bmStopWorker(SDnode *pDnode); -// void bmInitMsgFp(SMnodeMgmt *pMgmt); -// void bmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -// int32_t bmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -// int32_t bmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -// void bmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); -// void bmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); - -// void bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -// void bmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -// void bmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +// bmWorker.c +int32_t bmStartWorker(SBnodeMgmt *pMgmt); +void bmStopWorker(SBnodeMgmt *pMgmt); +int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/bnode/src/bmFile.c b/source/dnode/mgmt/bnode/src/bmFile.c index d5f989fc6e..a0e76ae627 100644 --- a/source/dnode/mgmt/bnode/src/bmFile.c +++ b/source/dnode/mgmt/bnode/src/bmFile.c @@ -98,7 +98,7 @@ int32_t bmWriteFile(SBnodeMgmt *pMgmt) { free(content); char realfile[PATH_MAX]; - snprintf(realfile, sizeof(realfile), "%s%sbnode.json", pMgmt->path); + snprintf(realfile, sizeof(realfile), "%s%sbnode.json", pMgmt->path, TD_DIRSEP); if (taosRenameFile(file, realfile) != 0) { terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR; diff --git a/source/dnode/mgmt/bnode/src/bmInt.c b/source/dnode/mgmt/bnode/src/bmInt.c index 7ffeb1a8da..dbfcfd15f8 100644 --- a/source/dnode/mgmt/bnode/src/bmInt.c +++ b/source/dnode/mgmt/bnode/src/bmInt.c @@ -47,7 +47,7 @@ void bmRelease(SBnodeMgmt *pMgmt, SBnode *pBnode) { static bool bmRequire(SMgmtWrapper *pWrapper) { SBnodeMgmt mgmt = {0}; mgmt.path = pWrapper->path; - if (mmReadFile(&mgmt) != 0) { + if (bmReadFile(&mgmt) != 0) { return false; } @@ -59,18 +59,12 @@ static bool bmRequire(SMgmtWrapper *pWrapper) { if (mgmt.deployed) { dInfo("bnode has been deployed"); - return true; } - bool required = mmDeployRequired(pWrapper->pDnode); - if (required) { - dInfo("bnode need to be deployed"); - } - - return required; + return mgmt.deployed; } -static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { +void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { SDnode *pDnode = pMgmt->pDnode; pOption->pWrapper = pMgmt->pWrapper; @@ -80,24 +74,25 @@ static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { pOption->clusterId = pDnode->clusterId; } -int32_t bmOpen(SBnodeMgmt *pMgmt, SMnodeOpt *pOption) { - SDnode *pDnode = pMgmt->pDnode; +int32_t bmOpen(SBnodeMgmt *pMgmt) { + SBnodeOpt option = {0}; + bmInitOption(pMgmt, &option); - SBnode *pBnode = bmAcquire(pDnode); + SBnode *pBnode = bmAcquire(pMgmt); if (pBnode != NULL) { - bmRelease(pDnode, pBnode); + bmRelease(pMgmt, pBnode); terrno = TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED; dError("failed to create bnode since %s", terrstr()); return -1; } - pBnode = bndOpen(pMgmt->path, pOption); + pBnode = bndOpen(pMgmt->path, &option); if (pBnode == NULL) { dError("failed to open bnode since %s", terrstr()); return -1; } - if (bmStartWorker(pDnode) != 0) { + if (bmStartWorker(pMgmt) != 0) { dError("failed to start bnode worker since %s", terrstr()); bndClose(pBnode); bndDestroy(pMgmt->path); @@ -105,10 +100,10 @@ int32_t bmOpen(SBnodeMgmt *pMgmt, SMnodeOpt *pOption) { } pMgmt->deployed = 1; - if (bmWriteFile(pDnode) != 0) { + if (bmWriteFile(pMgmt) != 0) { dError("failed to write bnode file since %s", terrstr()); pMgmt->deployed = 0; - bmStopWorker(pDnode); + bmStopWorker(pMgmt); bndClose(pBnode); bndDestroy(pMgmt->path); return -1; @@ -155,41 +150,6 @@ int32_t bmDrop(SBnodeMgmt *pMgmt) { return 0; } -static int32_t bmInit(SMgmtWrapper *pWrapper) { - SDnode *pDnode = pWrapper->pDnode; - SBnodeMgmt *pMgmt = calloc(1, sizeof(SBnodeMgmt)); - int32_t code = -1; - SBnodeOpt option = {0}; - - dInfo("bnode-mgmt start to init"); - if (pMgmt == NULL) goto _OVER; - - pMgmt->path = pWrapper->path; - pMgmt->pDnode = pWrapper->pDnode; - pMgmt->pWrapper = pWrapper; - taosInitRWLatch(&pMgmt->latch); - - if (bmReadFile(pMgmt) != 0) { - dError("failed to read file since %s", terrstr()); - goto _OVER; - } - - dInfo("bnode start to open"); - bmInitOption(pDnode, &option); - code = bmOpen(pMgmt, &option); - -_OVER: - if (code == 0) { - pWrapper->pMgmt = pMgmt; - dInfo("bnode-mgmt is initialized"); - } else { - dError("failed to init bnode-mgmt since %s", terrstr()); - bmCleanup(pWrapper); - } - - return code; -} - static void bmCleanup(SMgmtWrapper *pWrapper) { SBnodeMgmt *pMgmt = pWrapper->pMgmt; if (pMgmt == NULL) return; @@ -205,6 +165,39 @@ static void bmCleanup(SMgmtWrapper *pWrapper) { dInfo("bnode-mgmt is cleaned up"); } +static int32_t bmInit(SMgmtWrapper *pWrapper) { + SDnode *pDnode = pWrapper->pDnode; + SBnodeMgmt *pMgmt = calloc(1, sizeof(SBnodeMgmt)); + int32_t code = -1; + + dInfo("bnode-mgmt start to init"); + if (pMgmt == NULL) goto _OVER; + + pMgmt->path = pWrapper->path; + pMgmt->pDnode = pWrapper->pDnode; + pMgmt->pWrapper = pWrapper; + taosInitRWLatch(&pMgmt->latch); + + if (bmReadFile(pMgmt) != 0) { + dError("failed to read file since %s", terrstr()); + goto _OVER; + } + + dInfo("bnode start to open"); + code = bmOpen(pMgmt); + +_OVER: + if (code == 0) { + pWrapper->pMgmt = pMgmt; + dInfo("bnode-mgmt is initialized"); + } else { + dError("failed to init bnode-mgmt since %s", terrstr()); + bmCleanup(pWrapper); + } + + return code; +} + void bmGetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = bmInit; diff --git a/source/dnode/mgmt/bnode/src/bmMsg.c b/source/dnode/mgmt/bnode/src/bmMsg.c index 4a804e6a2c..b0990a493f 100644 --- a/source/dnode/mgmt/bnode/src/bmMsg.c +++ b/source/dnode/mgmt/bnode/src/bmMsg.c @@ -31,7 +31,7 @@ int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { dError("failed to create bnode since %s", terrstr()); return -1; } else { - return bmOpen(pDnode); + return bmOpen(pMgmt); } } @@ -50,7 +50,7 @@ int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { dError("failed to drop bnode since %s", terrstr()); return -1; } else { - return bmDrop(pDnode); + return bmDrop(pMgmt); } } diff --git a/source/dnode/mgmt/bnode/src/bmWorker.c b/source/dnode/mgmt/bnode/src/bmWorker.c index 60f2527657..7b6506bcd3 100644 --- a/source/dnode/mgmt/bnode/src/bmWorker.c +++ b/source/dnode/mgmt/bnode/src/bmWorker.c @@ -14,17 +14,12 @@ */ #define _DEFAULT_SOURCE -// #include "dndBnode.h" -// #include "dndTransport.h" -// #include "dndWorker.h" +#include "bmInt.h" -#if 0 -static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs); +static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs); - -static int32_t bmStartWorker(SDnode *pDnode) { - SBnodeMgmt *pMgmt = &pDnode->bmgmt; - if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, dndProcessBnodeQueue) != 0) { +int32_t bmStartWorker(SBnodeMgmt *pMgmt) { + if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, bmProcessQueue) != 0) { dError("failed to start bnode write worker since %s", terrstr()); return -1; } @@ -32,9 +27,7 @@ static int32_t bmStartWorker(SDnode *pDnode) { return 0; } -static void bmStopWorker(SDnode *pDnode) { - SBnodeMgmt *pMgmt = &pDnode->bmgmt; - +void bmStopWorker(SBnodeMgmt *pMgmt) { taosWLockLatch(&pMgmt->latch); pMgmt->deployed = 0; taosWUnLockLatch(&pMgmt->latch); @@ -46,103 +39,68 @@ static void bmStopWorker(SDnode *pDnode) { dndCleanupWorker(&pMgmt->writeWorker); } -static void dndSendBnodeErrorRsp(SRpcMsg *pMsg, int32_t code) { - SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; - rpcSendResponse(&rpcRsp); - rpcFreeCont(pMsg->pCont); +static void bmSendErrorRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { + SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; + dndSendRsp(pWrapper, &rpcRsp); + rpcFreeCont(pMsg->rpcMsg.pCont); taosFreeQitem(pMsg); } -static void dndSendBnodeErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) { +static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t numOfMsgs, int32_t code) { for (int32_t i = 0; i < numOfMsgs; ++i) { - SRpcMsg *pMsg = NULL; + SNodeMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); - dndSendBnodeErrorRsp(pMsg, code); + bmSendErrorRsp(pWrapper, pMsg, code); } } -static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) { - SBnode *pBnode = bmAcquire(pDnode); +static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) { + SMgmtWrapper *pWrapper = pMgmt->pWrapper; + + SBnode *pBnode = bmAcquire(pMgmt); if (pBnode == NULL) { - dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); + bmSendErrorRsps(pWrapper, qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); return; } - SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); if (pArray == NULL) { - bmRelease(pDnode, pBnode); - dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); + bmRelease(pMgmt, pBnode); + bmSendErrorRsps(pWrapper, qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); return; } for (int32_t i = 0; i < numOfMsgs; ++i) { - SRpcMsg *pMsg = NULL; + SNodeMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); void *ptr = taosArrayPush(pArray, &pMsg); if (ptr == NULL) { - dndSendBnodeErrorRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY); + bmRelease(pMgmt, pBnode); + bmSendErrorRsp(pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY); } } bndProcessWMsgs(pBnode, pArray); for (size_t i = 0; i < numOfMsgs; i++) { - SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); + SNodeMsg *pNodeMsg = *(SNodeMsg **)taosArrayGet(pArray, i); + rpcFreeCont(pNodeMsg->rpcMsg.pCont); + taosFreeQitem(pNodeMsg); } taosArrayDestroy(pArray); - bmRelease(pDnode, pBnode); + bmRelease(pMgmt, pBnode); } -static void dndWriteBnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) { - int32_t code = TSDB_CODE_DND_BNODE_NOT_DEPLOYED; +static int32_t bmPutMsgToWorker(SBnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) { + SBnode *pBnode = bmAcquire(pMgmt); + if (pBnode == NULL) return -1; - SBnode *pBnode = bmAcquire(pDnode); - if (pBnode != NULL) { - code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); - } - bmRelease(pDnode, pBnode); - - if (code != 0) { - if (pMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pMsg->pCont); - } + dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); + int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0); + bmRelease(pMgmt, pBnode); + return code; } -void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndWriteBnodeMsgToWorker(pDnode, &pDnode->bmgmt.writeWorker, pMsg); -} - -int32_t dndInitBnode(SDnode *pDnode) { - SBnodeMgmt *pMgmt = &pDnode->bmgmt; - taosInitRWLatch(&pMgmt->latch); - - if (dndReadBnodeFile(pDnode) != 0) { - return -1; - } - - if (pMgmt->dropped) { - dInfo("bnode has been deployed and needs to be deleted"); - bndDestroy(pDnode->dir.bnode); - return 0; - } - - if (!pMgmt->deployed) return 0; - - return bmOpen(pDnode); -} - -void dndCleanupBnode(SDnode *pDnode) { - SBnodeMgmt *pMgmt = &pDnode->bmgmt; - if (pMgmt->pBnode) { - bmStopWorker(pDnode); - bndClose(pMgmt->pBnode); - pMgmt->pBnode = NULL; - } -} - -#endif \ No newline at end of file +int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { + return bmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg); +} \ No newline at end of file diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index f87c4de244..aa00774603 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -18,7 +18,7 @@ #include "dnd.h" -#include "bmInt.h" +#include "bm.h" #include "dm.h" #include "dndInt.h" #include "mm.h" diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index b64d1ea813..80a6b374aa 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "bmInt.h" +#include "bm.h" #include "dmInt.h" #include "mm.h" #include "qmInt.h" diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index f1698f71b1..d6c3ef9a96 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -59,7 +59,7 @@ int32_t mmOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { return -1; } - SMnode *pMnode = mndOpen(pMgmt->path, pOption); + pMnode = mndOpen(pMgmt->path, pOption); if (pMnode == NULL) { dError("failed to open mnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 017c127a6d..14e57e2949 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -21,10 +21,10 @@ static void vmProcessQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcess static void vmProcessFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchMsg(pVnode->pImpl, pMsg); } static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { - SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); for (int32_t i = 0; i < numOfMsgs; ++i) { - SRpcMsg *pMsg = NULL; + SNodeMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); void *ptr = taosArrayPush(pArray, &pMsg); assert(ptr != NULL); @@ -33,9 +33,10 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO vnodeProcessWMsgs(pVnode->pImpl, pArray); for (size_t i = 0; i < numOfMsgs; i++) { - SRpcMsg *pRsp = NULL; - SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); - int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); + SRpcMsg *pRsp = NULL; + SNodeMsg *pNodeMsg = *(SNodeMsg **)taosArrayGet(pArray, i); + SRpcMsg *pMsg = &pNodeMsg->rpcMsg; + int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); if (pRsp != NULL) { pRsp->ahandle = pMsg->ahandle; rpcSendResponse(pRsp); @@ -48,9 +49,9 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO } for (size_t i = 0; i < numOfMsgs; i++) { - SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); + SNodeMsg *pNodeMsg = *(SNodeMsg **)taosArrayGet(pArray, i); + rpcFreeCont(pNodeMsg->rpcMsg.pCont); + taosFreeQitem(pNodeMsg); } taosArrayDestroy(pArray);