Merge pull request #10983 from taosdata/feature/shm

adjust logs and refcount
This commit is contained in:
Shengliang Guan 2022-03-25 10:51:46 +08:00 committed by GitHub
commit 33164d4faf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 117 additions and 57 deletions

View File

@ -180,6 +180,10 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) {
memcpy(&tsDiskCfg[index], pCfg, sizeof(SDiskCfg)); memcpy(&tsDiskCfg[index], pCfg, sizeof(SDiskCfg));
if (pCfg->level == 0 && pCfg->primary == 1) { if (pCfg->level == 0 && pCfg->primary == 1) {
tstrncpy(tsDataDir, pCfg->dir, PATH_MAX); tstrncpy(tsDataDir, pCfg->dir, PATH_MAX);
if (taosMkDir(tsDataDir) != 0) {
uError("failed to create dataDir:%s since %s", tsDataDir, terrstr());
return -1;
}
} }
if (taosMkDir(pCfg->dir) != 0) { if (taosMkDir(pCfg->dir) != 0) {
uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr()); uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr());

View File

@ -77,7 +77,11 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
return -1; return -1;
} }
dDebug("bnode workers are initialized");
return 0; return 0;
} }
void bmStopWorker(SBnodeMgmt *pMgmt) { tMultiWorkerCleanup(&pMgmt->writeWorker); } void bmStopWorker(SBnodeMgmt *pMgmt) {
tMultiWorkerCleanup(&pMgmt->writeWorker);
dDebug("bnode workers are closed");
}

View File

@ -19,7 +19,6 @@
#include "os.h" #include "os.h"
#include "cJSON.h" #include "cJSON.h"
#include "monitor.h"
#include "tcache.h" #include "tcache.h"
#include "tcrc32c.h" #include "tcrc32c.h"
#include "tdatablock.h" #include "tdatablock.h"
@ -36,8 +35,7 @@
#include "tworker.h" #include "tworker.h"
#include "dnode.h" #include "dnode.h"
#include "tfs.h" #include "monitor.h"
#include "wal.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -53,7 +51,6 @@ extern "C" {
typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType; typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType;
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus; typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus; typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus;
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType;
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType; typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
typedef struct SMgmtFp SMgmtFp; typedef struct SMgmtFp SMgmtFp;
@ -127,29 +124,30 @@ typedef struct SDnode {
bool dropped; bool dropped;
EDndStatus status; EDndStatus status;
EDndEvent event; EDndEvent event;
EProcType procType;
SStartupReq startup; SStartupReq startup;
TdFilePtr pLockFile; TdFilePtr pLockFile;
STransMgmt trans; STransMgmt trans;
SMgmtWrapper wrappers[NODE_MAX]; SMgmtWrapper wrappers[NODE_MAX];
} SDnode; } SDnode;
EDndStatus dndGetStatus(SDnode *pDnode); EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat); void dndSetStatus(SDnode *pDnode, EDndStatus stat);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType); void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId); void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); void dndSendMonitorReport(SDnode *pDnode);
void dndSendMonitorReport(SDnode *pDnode);
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed); int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -34,7 +34,7 @@ int32_t dndInit();
void dndCleanup(); void dndCleanup();
const char *dndStatStr(EDndStatus stat); const char *dndStatStr(EDndStatus stat);
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
TdFilePtr dndCheckRunning(char *dataDir); TdFilePtr dndCheckRunning(const char *dataDir);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
// dndMsg.c // dndMsg.c
@ -50,10 +50,6 @@ SDnode *dndCreate(const SDnodeOpt *pOption);
void dndClose(SDnode *pDnode); void dndClose(SDnode *pDnode);
void dndHandleEvent(SDnode *pDnode, EDndEvent event); void dndHandleEvent(SDnode *pDnode, EDndEvent event);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
// dndTransport.c // dndTransport.c
int32_t dndInitServer(SDnode *pDnode); int32_t dndInitServer(SDnode *pDnode);
void dndCleanupServer(SDnode *pDnode); void dndCleanupServer(SDnode *pDnode);

View File

@ -50,16 +50,23 @@ int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
} }
void dndCloseNode(SMgmtWrapper *pWrapper) { void dndCloseNode(SMgmtWrapper *pWrapper) {
dDebug("node:%s, start to close", pWrapper->name);
taosWLockLatch(&pWrapper->latch); taosWLockLatch(&pWrapper->latch);
if (pWrapper->deployed) { if (pWrapper->deployed) {
(*pWrapper->fp.closeFp)(pWrapper); (*pWrapper->fp.closeFp)(pWrapper);
pWrapper->deployed = false; pWrapper->deployed = false;
} }
taosWUnLockLatch(&pWrapper->latch);
while (pWrapper->refCount > 0) {
taosMsleep(10);
}
if (pWrapper->pProc) { if (pWrapper->pProc) {
taosProcCleanup(pWrapper->pProc); taosProcCleanup(pWrapper->pProc);
pWrapper->pProc = NULL; pWrapper->pProc = NULL;
} }
taosWUnLockLatch(&pWrapper->latch); dDebug("node:%s, has been closed", pWrapper->name);
} }
static int32_t dndRunInSingleProcess(SDnode *pDnode) { static int32_t dndRunInSingleProcess(SDnode *pDnode) {

View File

@ -15,11 +15,12 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndInt.h" #include "dndInt.h"
#include "wal.h"
static int8_t once = DND_ENV_INIT; static int8_t once = DND_ENV_INIT;
int32_t dndInit() { int32_t dndInit() {
dDebug("start to init dnode env"); dInfo("start to init dnode env");
if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
terrno = TSDB_CODE_REPEAT_INIT; terrno = TSDB_CODE_REPEAT_INIT;
dError("failed to init dnode env since %s", terrstr()); dError("failed to init dnode env since %s", terrstr());
@ -52,7 +53,7 @@ int32_t dndInit() {
} }
void dndCleanup() { void dndCleanup() {
dDebug("start to cleanup dnode env"); dInfo("start to cleanup dnode env");
if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
dError("dnode env is already cleaned up"); dError("dnode env is already cleaned up");
return; return;
@ -92,7 +93,7 @@ const char *dndStatStr(EDndStatus status) {
} }
} }
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc) { void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
SStartupReq *pStartup = &pDnode->startup; SStartupReq *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
@ -104,21 +105,21 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING); pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
} }
TdFilePtr dndCheckRunning(char *dataDir) { TdFilePtr dndCheckRunning(const char *dataDir) {
char filepath[PATH_MAX] = {0}; char filepath[PATH_MAX] = {0};
snprintf(filepath, sizeof(filepath), "%s/.running", dataDir); snprintf(filepath, sizeof(filepath), "%s/.running", dataDir);
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to lock file:%s since %s, quit", filepath, terrstr()); dError("failed to lock file:%s since %s", filepath, terrstr());
return NULL; return NULL;
} }
int32_t ret = taosLockFile(pFile); int32_t ret = taosLockFile(pFile);
if (ret != 0) { if (ret != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to lock file:%s since %s, quit", filepath, terrstr()); dError("failed to lock file:%s since %s", filepath, terrstr());
taosCloseFile(&pFile); taosCloseFile(&pFile);
return NULL; return NULL;
} }
@ -129,12 +130,10 @@ TdFilePtr dndCheckRunning(char *dataDir) {
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received"); dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq)); SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
dndGetStartup(pDnode, pStartup); dndGetStartup(pDnode, pStartup);
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)}; SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }

View File

@ -22,7 +22,12 @@ static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) {
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name)); tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
pInfo->tempdir.size = tsTempSpace.size; pInfo->tempdir.size = tsTempSpace.size;
return vmMonitorTfsInfo(dndAcquireWrapper(pDnode, VNODES), pInfo); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES);
if (pWrapper != NULL) {
vmMonitorTfsInfo(pWrapper, pInfo);
dndReleaseWrapper(pWrapper);
}
return 0;
} }
static void dndGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { static void dndGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
@ -45,8 +50,17 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
taosGetCardInfo(&pInfo->net_in, &pInfo->net_out); taosGetCardInfo(&pInfo->net_in, &pInfo->net_out);
taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk);
vmMonitorVnodeReqs(dndAcquireWrapper(pDnode, VNODES), pInfo); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES);
pInfo->has_mnode = (dndAcquireWrapper(pDnode, MNODE)->required); if (pWrapper != NULL) {
vmMonitorVnodeReqs(pWrapper, pInfo);
dndReleaseWrapper(pWrapper);
}
pWrapper = dndAcquireWrapper(pDnode, MNODE);
if (pWrapper != NULL) {
pInfo->has_mnode = pWrapper->required;
dndReleaseWrapper(pWrapper);
}
} }
void dndSendMonitorReport(SDnode *pDnode) { void dndSendMonitorReport(SDnode *pDnode) {
@ -63,10 +77,15 @@ void dndSendMonitorReport(SDnode *pDnode) {
SMonClusterInfo clusterInfo = {0}; SMonClusterInfo clusterInfo = {0};
SMonVgroupInfo vgroupInfo = {0}; SMonVgroupInfo vgroupInfo = {0};
SMonGrantInfo grantInfo = {0}; SMonGrantInfo grantInfo = {0};
if (mmMonitorMnodeInfo(dndAcquireWrapper(pDnode, MNODE), &clusterInfo, &vgroupInfo, &grantInfo) == 0) {
monSetClusterInfo(pMonitor, &clusterInfo); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE);
monSetVgroupInfo(pMonitor, &vgroupInfo); if (pWrapper != NULL) {
monSetGrantInfo(pMonitor, &grantInfo); if (mmMonitorMnodeInfo(pWrapper, &clusterInfo, &vgroupInfo, &grantInfo) == 0) {
monSetClusterInfo(pMonitor, &clusterInfo);
monSetVgroupInfo(pMonitor, &vgroupInfo);
monSetGrantInfo(pMonitor, &grantInfo);
}
dndReleaseWrapper(pWrapper);
} }
SMonDnodeInfo dnodeInfo = {0}; SMonDnodeInfo dnodeInfo = {0};

View File

@ -20,8 +20,8 @@ static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper != NULL) { if (pWrapper != NULL) {
dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet); dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet);
dndReleaseWrapper(pWrapper);
} }
dndReleaseWrapper(pWrapper);
} }
static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {

View File

@ -58,7 +58,7 @@ static void dndClearMemory(SDnode *pDnode) {
SDnode *dndCreate(const SDnodeOpt *pOption) { SDnode *dndCreate(const SDnodeOpt *pOption) {
dInfo("start to create dnode object"); dInfo("start to create dnode object");
int32_t code = -1; int32_t code = -1;
char path[PATH_MAX]; char path[PATH_MAX] = {0};
SDnode *pDnode = NULL; SDnode *pDnode = NULL;
pDnode = calloc(1, sizeof(SDnode)); pDnode = calloc(1, sizeof(SDnode));

View File

@ -146,9 +146,14 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRpcRsp) { static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRpcRsp) {
STransMgmt *pMgmt = &pDnode->trans; STransMgmt *pMgmt = &pDnode->trans;
SEpSet epSet = {0};
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper != NULL) {
dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
dndReleaseWrapper(pWrapper);
}
SEpSet epSet = {0};
dmGetMnodeEpSet(dndAcquireWrapper(pDnode, DNODE)->pMgmt, &epSet);
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp); rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
} }
@ -182,9 +187,14 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
return 0; return 0;
} }
if (mmGetUserAuth(dndAcquireWrapper(pDnode, MNODE), user, spi, encrypt, secret, ckey) == 0) { SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE);
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt); if (pWrapper != NULL) {
return 0; if (mmGetUserAuth(pWrapper, user, spi, encrypt, secret, ckey) == 0) {
dndReleaseWrapper(pWrapper);
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
return 0;
}
dndReleaseWrapper(pWrapper);
} }
if (terrno != TSDB_CODE_APP_NOT_READY) { if (terrno != TSDB_CODE_APP_NOT_READY) {
@ -271,7 +281,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
int32_t vgId = pWrapper->msgVgIds[msgIndex]; int32_t vgId = pWrapper->msgVgIds[msgIndex];
if (msgFp == NULL) continue; if (msgFp == NULL) continue;
dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId); // dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId);
SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex]; SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
if (vgId == QND_VGID) { if (vgId == QND_VGID) {
@ -328,7 +338,12 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
STransMgmt *pTrans = &pDnode->trans; STransMgmt *pTrans = &pDnode->trans;
SEpSet epSet = {0}; SEpSet epSet = {0};
dmGetMnodeEpSet(dndAcquireWrapper(pDnode, DNODE)->pMgmt, &epSet);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper != NULL) {
dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
dndReleaseWrapper(pWrapper);
}
return dndSendRpcReq(pTrans, &epSet, pReq); return dndSendRpcReq(pTrans, &epSet, pReq);
} }
} }
@ -336,7 +351,12 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_APP_NOT_READY) { if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE); SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp); if (pDnodeWrapper != NULL) {
dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp);
dndReleaseWrapper(pDnodeWrapper);
} else {
rpcSendResponse(pRsp);
}
} else { } else {
rpcSendResponse(pRsp); rpcSendResponse(pRsp);
} }

View File

@ -209,7 +209,7 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) {
} }
pMgmt->updateTime = taosGetTimestampMs(); pMgmt->updateTime = taosGetTimestampMs();
dDebug("successed to write %s", file); dDebug("successed to write %s", realfile);
return 0; return 0;
} }

View File

@ -74,14 +74,14 @@ void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
static int32_t dmStart(SMgmtWrapper *pWrapper) { static int32_t dmStart(SMgmtWrapper *pWrapper) {
dDebug("dnode mgmt start to run"); dDebug("dnode-mgmt start to run");
return dmStartThread(pWrapper->pMgmt); return dmStartThread(pWrapper->pMgmt);
} }
int32_t dmInit(SMgmtWrapper *pWrapper) { int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt)); SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt));
dInfo("dnode-mgmt is initialized"); dInfo("dnode-mgmt start to init");
pDnode->dnodeId = 0; pDnode->dnodeId = 0;
pDnode->dropped = 0; pDnode->dropped = 0;

View File

@ -41,8 +41,12 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN); memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad)); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES);
vmMonitorVnodeLoads(dndAcquireWrapper(pDnode, VNODES), req.pVloads); if (pWrapper != NULL) {
req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad));
vmMonitorVnodeLoads(pWrapper, req.pVloads);
dndReleaseWrapper(pWrapper);
}
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen); void *pHead = rpcMallocCont(contLen);

View File

@ -114,6 +114,7 @@ int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
return -1; return -1;
} }
dDebug("dnode workers are initialized");
return 0; return 0;
} }
@ -136,6 +137,7 @@ void dmStopWorker(SDnodeMgmt *pMgmt) {
taosDestoryThread(pMgmt->threadId); taosDestoryThread(pMgmt->threadId);
pMgmt->threadId = NULL; pMgmt->threadId = NULL;
} }
dDebug("dnode workers are closed");
} }
int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
@ -144,6 +146,6 @@ int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
pWorker = &pMgmt->statusWorker; pWorker = &pMgmt->statusWorker;
} }
dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name); dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
return taosWriteQitem(pWorker->queue, pMsg); return taosWriteQitem(pWorker->queue, pMsg);
} }

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mmInt.h" #include "mmInt.h"
#include "wal.h"
static bool mmDeployRequired(SDnode *pDnode) { static bool mmDeployRequired(SDnode *pDnode) {
if (pDnode->dnodeId > 0) return false; if (pDnode->dnodeId > 0) return false;
@ -226,7 +227,7 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) {
} }
static int32_t mmStart(SMgmtWrapper *pWrapper) { static int32_t mmStart(SMgmtWrapper *pWrapper) {
dDebug("mnode mgmt start to run"); dDebug("mnode-mgmt start to run");
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mndStart(pMgmt->pMnode); return mndStart(pMgmt->pMnode);
} }

View File

@ -108,6 +108,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
return -1; return -1;
} }
dDebug("mnode workers are initialized");
return 0; return 0;
} }
@ -115,4 +116,5 @@ void mmStopWorker(SMnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->readWorker); tSingleWorkerCleanup(&pMgmt->readWorker);
tSingleWorkerCleanup(&pMgmt->writeWorker); tSingleWorkerCleanup(&pMgmt->writeWorker);
tSingleWorkerCleanup(&pMgmt->syncWorker); tSingleWorkerCleanup(&pMgmt->syncWorker);
dDebug("mnode workers are closed");
} }

View File

@ -132,10 +132,12 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
return -1; return -1;
} }
dDebug("qnode workers are initialized");
return 0; return 0;
} }
void qmStopWorker(SQnodeMgmt *pMgmt) { void qmStopWorker(SQnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->fetchWorker); tSingleWorkerCleanup(&pMgmt->fetchWorker);
dDebug("qnode workers are closed");
} }

View File

@ -80,6 +80,7 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
return -1; return -1;
} }
dDebug("snode workers are initialized");
return 0; return 0;
} }
@ -90,6 +91,7 @@ void smStopWorker(SSnodeMgmt *pMgmt) {
} }
taosArrayDestroy(pMgmt->uniqueWorkers); taosArrayDestroy(pMgmt->uniqueWorkers);
tSingleWorkerCleanup(&pMgmt->sharedWorker); tSingleWorkerCleanup(&pMgmt->sharedWorker);
dDebug("snode workers are closed");
} }
static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) { static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {

View File

@ -257,14 +257,14 @@ static void vmCleanup(SMgmtWrapper *pWrapper) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt; SVnodesMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return; if (pMgmt == NULL) return;
dInfo("vnodes-mgmt start to cleanup"); dInfo("vnode-mgmt start to cleanup");
vmCloseVnodes(pMgmt); vmCloseVnodes(pMgmt);
vmStopWorker(pMgmt); vmStopWorker(pMgmt);
vnodeCleanup(); vnodeCleanup();
// walCleanUp(); // walCleanUp();
free(pMgmt); free(pMgmt);
pWrapper->pMgmt = NULL; pWrapper->pMgmt = NULL;
dInfo("vnodes-mgmt is cleaned up"); dInfo("vnode-mgmt is cleaned up");
} }
static int32_t vmInit(SMgmtWrapper *pWrapper) { static int32_t vmInit(SMgmtWrapper *pWrapper) {
@ -272,7 +272,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
SVnodesMgmt *pMgmt = calloc(1, sizeof(SVnodesMgmt)); SVnodesMgmt *pMgmt = calloc(1, sizeof(SVnodesMgmt));
int32_t code = -1; int32_t code = -1;
dInfo("vnodes-mgmt start to init"); dInfo("vnode-mgmt start to init");
if (pMgmt == NULL) goto _OVER; if (pMgmt == NULL) goto _OVER;
pMgmt->path = pWrapper->path; pMgmt->path = pWrapper->path;
@ -312,7 +312,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
} }
if (vmOpenVnodes(pMgmt) != 0) { if (vmOpenVnodes(pMgmt) != 0) {
dError("failed to open vnodes since %s", terrstr()); dError("failed to open vnode since %s", terrstr());
return -1; return -1;
} }

View File

@ -356,7 +356,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
return -1; return -1;
} }
dDebug("vnode workers is initialized"); dDebug("vnode workers are initialized");
return 0; return 0;
} }
@ -366,5 +366,5 @@ void vmStopWorker(SVnodesMgmt *pMgmt) {
tQWorkerCleanup(&pMgmt->queryPool); tQWorkerCleanup(&pMgmt->queryPool);
tWWorkerCleanup(&pMgmt->writePool); tWWorkerCleanup(&pMgmt->writePool);
tWWorkerCleanup(&pMgmt->syncPool); tWWorkerCleanup(&pMgmt->syncPool);
dDebug("vnode workers is closed"); dDebug("vnode workers are closed");
} }