From 58607175eef0a90eb48f5ccda257a1c6478b4b2f Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sun, 5 Apr 2020 23:16:28 +0800 Subject: [PATCH 1/4] optimize the code in dnodeMgmt, so it can close vnode one by one when dnode cleans up remove the RPC warnings --- src/dnode/src/dnodeMgmt.c | 49 ++++++++++++++++++++++++---------- src/inc/vnode.h | 7 ++--- src/rpc/src/rpcMain.c | 14 +++++----- src/vnode/main/src/vnodeMain.c | 29 +++++++++++--------- 4 files changed, 61 insertions(+), 38 deletions(-) diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index f92ba36845..0dfadd7404 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -32,6 +32,7 @@ #include "vnode.h" static int32_t dnodeOpenVnodes(); +static void dnodeCloseVnodes(); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); @@ -64,10 +65,6 @@ int32_t dnodeInitMgmt() { return -1; } - if ( vnodeInitModule() != TSDB_CODE_SUCCESS) { - return -1; - } - int32_t code = dnodeOpenVnodes(); if (code != TSDB_CODE_SUCCESS) { return -1; @@ -88,7 +85,7 @@ void dnodeCleanupMgmt() { tsDnodeTmr = NULL; } - vnodeCleanupModule(); + dnodeCloseVnodes(); } void dnodeMgmt(SRpcMsg *pMsg) { @@ -107,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -static int32_t dnodeOpenVnodes() { +static int dnodeGetVnodeList(int32_t vnodeList[]) { DIR *dir = opendir(tsVnodeDir); if (dir == NULL) { return TSDB_CODE_NO_WRITE_ACCESS; @@ -122,18 +119,42 @@ static int32_t dnodeOpenVnodes() { int32_t vnode = atoi(de->d_name + 5); if (vnode == 0) continue; - char vnodeDir[TSDB_FILENAME_LEN * 3]; - snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/%s", tsVnodeDir, de->d_name); - int32_t code = vnodeOpen(vnode, vnodeDir); - if (code == 0) { - numOfVnodes++; - } + vnodeList[numOfVnodes] = vnode; + numOfVnodes++; } } closedir(dir); - dPrint("dnode mgmt is opened, vnodes:%d", numOfVnodes); - return TSDB_CODE_SUCCESS; + return numOfVnodes; +} + +static int32_t dnodeOpenVnodes() { + char vnodeDir[TSDB_FILENAME_LEN * 3]; + int failed = 0; + + int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000); + int numOfVnodes = dnodeGetVnodeList(vnodeList); + + for (int i=0; iconnType == TAOS_CONN_SERVER) { -// pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true); if (pRpc->hash == NULL) { tError("%s failed to init string hash", pRpc->label); @@ -535,8 +534,7 @@ static void rpcCloseConn(void *thandle) { if ( pRpc->connType == TAOS_CONN_SERVER) { char hashstr[40] = {0}; size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); -// taosDeleteStrHash(pRpc->hash, hashstr); -// taosHashRemove(pRpc->hash, hashstr, size); + taosHashRemove(pRpc->hash, hashstr, size); rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg pConn->pRspMsg = NULL; @@ -588,7 +586,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); // check if it is already allocated -// SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size)); if (ppConn) pConn = *ppConn; if (pConn) return pConn; @@ -621,7 +618,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn->localPort = (pRpc->localPort + pRpc->index); } -// taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u", @@ -834,13 +830,15 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { if (pConn->inType) { // if there are pending request, notify the app tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn); +/* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; rpcMsg.handle = pConn; rpcMsg.msgType = pConn->inType; rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; - // (*(pRpc->cfp))(&rpcMsg); + (*(pRpc->cfp))(&rpcMsg); +*/ } rpcCloseConn(pConn); @@ -1163,13 +1161,15 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { if (pConn->inType && pRpc->cfp) { // if there are pending request, notify the app tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn); +/* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; rpcMsg.handle = pConn; rpcMsg.msgType = pConn->inType; rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; - // (*(pRpc->cfp))(&rpcMsg); + (*(pRpc->cfp))(&rpcMsg); +*/ } rpcCloseConn(pConn); } else { diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 5bb5ef55ef..1ac57089e7 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -33,27 +33,22 @@ static void *tsDnodeVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeBuildVloadMsg(char *pNode, void * param); -int32_t vnodeInitModule() { +static int tsOpennedVnodes; +static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; + +static void vnodeInit() { vnodeInitWriteFp(); tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt); if (tsDnodeVnodesHash == NULL) { dError("failed to init vnode list"); - return -1; } - - return 0; -} - -typedef void (*CleanupFp)(char *); -void vnodeCleanupModule() { - taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose); - taosCleanUpIntHash(tsDnodeVnodesHash); } int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t code; + pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId); @@ -116,6 +111,7 @@ int32_t vnodeDrop(int32_t vgId) { int32_t vnodeOpen(int32_t vnode, char *rootDir) { char temp[TSDB_FILENAME_LEN]; + pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj vnodeObj = {0}; vnodeObj.vgId = vnode; @@ -147,11 +143,14 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->status = VN_STATUS_READY; dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); + tsOpennedVnodes++; return TSDB_CODE_SUCCESS; } -int32_t vnodeClose(void *param) { - SVnodeObj *pVnode = (SVnodeObj *)param; +int32_t vnodeClose(int32_t vgId) { + + SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId); + if (pVnode == NULL) return 0; dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId); pVnode->status = VN_STATUS_CLOSING; @@ -183,6 +182,12 @@ void vnodeRelease(void *pVnodeRaw) { } dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId); + + tsOpennedVnodes--; + if (tsOpennedVnodes <= 0) { + taosCleanUpIntHash(tsDnodeVnodesHash); + vnodeModuleInit = PTHREAD_ONCE_INIT; + } } void *vnodeGetVnode(int32_t vgId) { From 8dabfc562e5a09857819fe4a49a5a039f5e8ba25 Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 6 Apr 2020 00:56:31 +0800 Subject: [PATCH 2/4] handle a reference counting bug --- src/dnode/src/dnodeRead.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 1e92b40977..682aee4c0b 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -291,7 +291,7 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg); } else { // no further execution invoked, release the ref to vnode dnodeProcessReadResult(pVnode, pMsg); - vnodeRelease(pVnode); + //vnodeRelease(pVnode); } } From f4f7dfa65e398fba9705a4d12600c4fee18e701e Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Mon, 6 Apr 2020 07:59:42 +0800 Subject: [PATCH 3/4] remove the status checking on cleanup --- src/vnode/main/src/vnodeMain.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 1ac57089e7..d852d41561 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -187,6 +187,7 @@ void vnodeRelease(void *pVnodeRaw) { if (tsOpennedVnodes <= 0) { taosCleanUpIntHash(tsDnodeVnodesHash); vnodeModuleInit = PTHREAD_ONCE_INIT; + tsDnodeVnodesHash = NULL; } } @@ -240,10 +241,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) { } static void vnodeCleanUp(SVnodeObj *pVnode) { - if (pVnode->status == VN_STATUS_DELETING) { - // fix deadlock occured while close system - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - } + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); //syncStop(pVnode->sync); tsdbCloseRepo(pVnode->tsdb); From d6cda29863c14261c47e8f5ad1f69cbd8793a58f Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 6 Apr 2020 10:49:45 +0800 Subject: [PATCH 4/4] pass clog parameters to dnode --- src/client/src/tscSQLParser.c | 4 ++-- src/dnode/src/dnodeMgmt.c | 1 + src/mnode/src/mgmtDb.c | 4 ++-- src/mnode/src/mgmtVgroup.c | 1 + src/util/src/tglobalcfg.c | 2 +- src/vnode/main/src/vnodeMain.c | 2 +- 6 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 4e7a8ed6bd..40e70c247e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5280,8 +5280,8 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) { char msg[512] = {0}; - if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 1)) { - snprintf(msg, tListLen(msg), "invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog); + if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 2)) { + snprintf(msg, tListLen(msg), "invalid db option commitLog: %d, only 0-2 allowed", pCreate->commitLog); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 0dfadd7404..8e523eaf46 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -163,6 +163,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); + pCreate->cfg.commitLog = pCreate->cfg.commitLog; return vnodeCreate(pCreate); } diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 6a53b0d001..7d13451f7e 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -166,8 +166,8 @@ SDbObj *mgmtGetDbByTableId(char *tableId) { } static int32_t mgmtCheckDBParams(SCMCreateDbMsg *pCreate) { - if (pCreate->commitLog < 0 || pCreate->commitLog > 1) { - mError("invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog); + if (pCreate->commitLog < 0 || pCreate->commitLog > 2) { + mError("invalid db option commitLog: %d, only 0-2 allowed", pCreate->commitLog); return TSDB_CODE_INVALID_OPTION; } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index b16b82cb4a..87f3872b0a 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -490,6 +490,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2); pCfg->daysToKeep = htonl(pCfg->daysToKeep); pCfg->commitTime = htonl(pCfg->commitTime); + pCfg->commitLog = pCfg->commitLog; pCfg->blocksPerTable = htons(pCfg->blocksPerTable); pCfg->replications = (char) pVgroup->numOfVnodes; pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index 0287285f4d..8a0d66068e 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -632,7 +632,7 @@ static void doInitGlobalConfig() { tsInitConfigOption(cfg++, "clog", &tsCommitLog, TSDB_CFG_VTYPE_SHORT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, - 0, 1, 0, TSDB_CFG_UTYPE_NONE); + 0, 2, 0, TSDB_CFG_UTYPE_NONE); tsInitConfigOption(cfg++, "comp", &tsCompression, TSDB_CFG_VTYPE_SHORT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, 0, 2, 0, TSDB_CFG_UTYPE_NONE); diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index d852d41561..1be9bbb64b 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -88,7 +88,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { return code; } - dPrint("vgId:%d, vnode is created", pVnodeCfg->cfg.vgId); + dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog); code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir); return code;