From cbb3b39e747943ce37dc58b100927f46146c4995 Mon Sep 17 00:00:00 2001 From: bryanchang0603 Date: Mon, 7 Jun 2021 11:29:51 +0800 Subject: [PATCH 01/10] append alter_keep.py into the branch --- tests/pytest/alter/alter_keep.py | 181 +++++++++++++++++++++++++++++++ 1 file changed, 181 insertions(+) create mode 100644 tests/pytest/alter/alter_keep.py diff --git a/tests/pytest/alter/alter_keep.py b/tests/pytest/alter/alter_keep.py new file mode 100644 index 0000000000..00efa742b4 --- /dev/null +++ b/tests/pytest/alter/alter_keep.py @@ -0,0 +1,181 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from util.log import * +from util.cases import * +from util.sql import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def alterKeepCommunity(self): + ## community accepts both 1 paramater, 2 parmaters and 3 paramaters + ## but paramaters other than paramater 1 will be ignored + ## only paramater 1 will be used + tdSql.query('show databases') + tdSql.checkData(0,7,'3650,3650,3650') + + tdSql.execute('alter database db keep 10') + tdSql.query('show databases') + tdSql.checkData(0,7,'10,10,10') + + tdSql.execute('alter database db keep 50') + tdSql.query('show databases') + tdSql.checkData(0,7,'50,50,50') + + tdSql.execute('alter database db keep 20') + tdSql.query('show databases') + tdSql.checkData(0,7,'20,20,20') + + tdSql.execute('alter database db keep 100, 98 ,99') + tdSql.query('show databases') + tdSql.checkData(0,7,'100,100,100') + + tdSql.execute('alter database db keep 99, 100 ,101') + tdSql.query('show databases') + tdSql.checkData(0,7,'99,99,99') + + tdSql.execute('alter database db keep 200, 199 ,198') + tdSql.query('show databases') + tdSql.checkData(0,7,'200,200,200') + + tdSql.execute('alter database db keep 4000,4001') + tdSql.query('show databases') + tdSql.checkData(0,7,'4000,4000,4000') + + tdSql.execute('alter database db keep 5000,50') + tdSql.query('show databases') + tdSql.checkData(0,7,'5000,5000,5000') + + tdSql.execute('alter database db keep 50,5000') + tdSql.query('show databases') + tdSql.checkData(0,7,'50,50,50') + + + def alterKeepEnterprise(self): + ## enterprise only accept three inputs + ## does not accept 1 paramaters nor 3 paramaters + tdSql.query('show databases') + tdSql.checkData(0,7,'3650,3650,3650') + + tdSql.error('alter database db keep 10') + tdSql.query('show databases') + tdSql.checkData(0,7,'3650,3650,3650') + + ## the order for altering keep is keep(D), keep0, keep1. + ## if the order is changed, please modify the following test + ## to make sure the the test is accurate + + tdSql.execute('alter database db keep 10, 10 ,10') + tdSql.query('show databases') + tdSql.checkData(0,7,'10,10,10') + + tdSql.execute('alter database db keep 100, 98 ,99') + tdSql.query('show databases') + tdSql.checkData(0,7,'98,99,100') + + tdSql.execute('alter database db keep 200, 200 ,200') + tdSql.query('show databases') + tdSql.checkData(0,7,'200,200,200') + + tdSql.error('alter database db keep 198, 199 ,200') + tdSql.query('show databases') + tdSql.checkData(0,7,'200,200,200') + + # tdSql.execute('alter database db keep 3650,3650,3650') + # tdSql.error('alter database db keep 4000,3640') + # tdSql.error('alter database db keep 10,10') + # tdSql.query('show databases') + # tdSql.checkData(0,7,'3650,3650,3650') + + def run(self): + tdSql.prepare() + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + tdLog.debug('running enterprise test') + self.alterKeepEnterprise() + else: + tdLog.debug('running community test') + self.alterKeepCommunity() + + + ##TODO: need to wait for TD-4445 to implement the following + ## tests + + ## preset the keep + tdSql.prepare() + tdSql.execute('create table tb (ts timestamp, speed int)') + tdSql.execute('alter database db keep 10,10,10') + tdSql.execute('insert into tb values (now, 10)') + tdSql.execute('insert into tb values (now + 10m, 10)') + tdSql.query('select * from tb') + tdSql.checkRows(2) + + + #after alter from small to large, check if the alter if functioning + #test if change through test.py is consistent with change from taos client + #test case for TD-4459 and TD-4445 + tdSql.execute('alter database db keep 40,40,40') + tdSql.query('show databases') + tdSql.checkData(0,7,'40,40,40') + tdSql.error('insert into tb values (now-60d, 10)') + tdSql.execute('insert into tb values (now-30d, 10)') + tdSql.query('select * from tb') + tdSql.checkRows(3) + + rowNum = 3 + for i in range(30): + rowNum += 1 + tdSql.execute('alter database db keep 20,20,20') + tdSql.execute('alter database db keep 40,40,40') + tdSql.query('show databases') + tdSql.checkData(0,7,'40,40,40') + tdSql.error('insert into tb values (now-60d, 10)') + tdSql.execute('insert into tb values (now-30d, 10)') + tdSql.query('select * from tb') + tdSql.checkRows(rowNum) + + tdSql.execute('alter database db keep 10,10,10') + tdSql.query('show databases') + tdSql.checkData(0,7,'10,10,10') + + # if uncomment these three lines, timestamp out of range error will appear + # tdSql.execute('alter database db keep 15,15,15') + # tdSql.query('show databases') + # tdSql.checkData(0,7,'15,15,15') + tdSql.execute('insert into tb values (now-15d, 10)') + tdSql.query('select * from tb') + tdSql.checkRows(rowNum + 1) + + # tdSql.execute('alter database db keep 20,20,20') + # tdSql.query('show databases') + # tdSql.checkData(0,7,'20,20,20') + # tdSql.error('insert into tb values (now-30d, 10)') + # tdSql.query('show databases') + # tdSql.checkData(0,7,'20,20,20') + # tdSql.query('select * from tb') + # tdSql.checkRows(rowNum) + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) From cd54930352b186a969d33568e5735a39d18c131b Mon Sep 17 00:00:00 2001 From: bryanchang0603 Date: Mon, 7 Jun 2021 11:32:35 +0800 Subject: [PATCH 02/10] adding comments to alter_test.py --- tests/pytest/alter/alter_keep.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/pytest/alter/alter_keep.py b/tests/pytest/alter/alter_keep.py index 00efa742b4..d38d9fa879 100644 --- a/tests/pytest/alter/alter_keep.py +++ b/tests/pytest/alter/alter_keep.py @@ -158,6 +158,9 @@ class TDTestCase: # tdSql.execute('alter database db keep 15,15,15') # tdSql.query('show databases') # tdSql.checkData(0,7,'15,15,15') + + # the following line should generate an error, but the insert was a success + # the time now-15d is out of range of now -10d tdSql.execute('insert into tb values (now-15d, 10)') tdSql.query('select * from tb') tdSql.checkRows(rowNum + 1) From 68c7fc493da1a7bd74c8fa6a1de787c0c32775bd Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 8 Jun 2021 10:54:51 +0800 Subject: [PATCH 03/10] [TD-3963]alter db rpc return when vnode response --- src/dnode/src/dnodeVMgmt.c | 1 - src/mnode/inc/mnodeVgroup.h | 2 +- src/mnode/src/mnodeDb.c | 54 +++++++++++++++++++++++++++----- src/mnode/src/mnodeVgroup.c | 38 +++++++++++++++------- tests/pytest/alter/alter_keep.py | 4 +-- 5 files changed, 75 insertions(+), 24 deletions(-) diff --git a/src/dnode/src/dnodeVMgmt.c b/src/dnode/src/dnodeVMgmt.c index 66c94bf675..ef99e70bd1 100644 --- a/src/dnode/src/dnodeVMgmt.c +++ b/src/dnode/src/dnodeVMgmt.c @@ -154,7 +154,6 @@ static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { SCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg); - void *pVnode = vnodeAcquire(pCreate->cfg.vgId); if (pVnode != NULL) { dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId); diff --git a/src/mnode/inc/mnodeVgroup.h b/src/mnode/inc/mnodeVgroup.h index 73b0e6ae1b..c796365843 100644 --- a/src/mnode/inc/mnodeVgroup.h +++ b/src/mnode/inc/mnodeVgroup.h @@ -49,7 +49,7 @@ int32_t mnodeAddTableIntoVgroup(SVgObj *pVgroup, SCTableObj *pTable, bool needCh void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable); void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle); void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); -void mnodeSendAlterVgroupMsg(SVgObj *pVgroup); +void mnodeSendAlterVgroupMsg(SVgObj *pVgroup,SMnodeMsg *pMsg); void mnodeSendSyncVgroupMsg(SVgObj *pVgroup); SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup); diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index b0ac1192bb..8b0faaf75c 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -24,12 +24,14 @@ #include "tdataformat.h" #include "tp.h" #include "mnode.h" +#include "dnode.h" #include "mnodeDef.h" #include "mnodeInt.h" #include "mnodeAcct.h" #include "mnodeDb.h" #include "mnodeDnode.h" #include "mnodeMnode.h" +#include "mnodePeer.h" #include "mnodeProfile.h" #include "mnodeWrite.h" #include "mnodeSdb.h" @@ -43,6 +45,8 @@ int64_t tsDbRid = -1; void * tsDbSdb = NULL; static int32_t tsDbUpdateSize; +#define ALTER_CDB_RETRY_TIMES 3 + static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *pMsg); static int32_t mnodeDropDb(SMnodeMsg *newMsg); static int32_t mnodeSetDbDropping(SDbObj *pDb); @@ -51,6 +55,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg); +static void mnodeProcessAlterDbRsp(SRpcMsg *rpcMsg); int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg); #ifndef _TOPIC @@ -198,6 +203,7 @@ int32_t mnodeInitDbs() { mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_SYNC_DB, mnodeProcessSyncDbMsg); + mnodeAddPeerRspHandle(TSDB_MSG_TYPE_CM_ALTER_DB_RSP, mnodeProcessAlterDbRsp); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb); @@ -1070,27 +1076,30 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) { return newCfg; } -static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) { - if (code != TSDB_CODE_SUCCESS) return code; +static int32_t mnodeAlterDbFp(SMnodeMsg *pMsg) { SDbObj *pDb = pMsg->pDb; void *pIter = NULL; SVgObj *pVgroup = NULL; - while (1) { + while (1) { pIter = mnodeGetNextVgroup(pIter, &pVgroup); if (pVgroup == NULL) break; if (pVgroup->pDb == pDb) { - mnodeSendAlterVgroupMsg(pVgroup); + mnodeSendAlterVgroupMsg(pVgroup,pMsg); + pMsg->expected += 1; } mnodeDecVgroupRef(pVgroup); } - + // in case there is no vnode(no db in vnode) + if (pMsg->expected == 0) { + return TSDB_CODE_SUCCESS; + } mDebug("db:%s, all vgroups is altered", pDb->name); mLInfo("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); - bnNotify(); + //bnNotify(); - return TSDB_CODE_SUCCESS; + return TSDB_CODE_MND_ACTION_IN_PROGRESS; } static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { @@ -1114,7 +1123,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { .pTable = tsDbSdb, .pObj = pDb, .pMsg = pMsg, - .fpRsp = mnodeAlterDbCb + .fpReq = mnodeAlterDbFp }; code = sdbUpdateRow(&row); @@ -1279,6 +1288,35 @@ void mnodeDropAllDbs(SAcctObj *pAcct) { mInfo("acct:%s, all dbs:%d is dropped from sdb", pAcct->user, numOfDbs); } +static void mnodeProcessAlterDbRsp(SRpcMsg *rpcMsg) { + if (rpcMsg->ahandle == NULL) return; + + SMnodeMsg *pMsg = rpcMsg->ahandle; + pMsg->received++; + + SDbObj *pDb = (SDbObj *)pMsg->pDb; + assert(pDb); + + if (rpcMsg->code == TSDB_CODE_SUCCESS) { + mDebug("msg:%p, app:%p db:%s, altered in dnode, thandle:%p result:%s", pMsg, pMsg->rpcMsg.ahandle, + pDb->name, pMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); + + dnodeSendRpcMWriteRsp(pMsg, TSDB_CODE_SUCCESS); + } else { + if (pMsg->retry++ < ALTER_CDB_RETRY_TIMES) { + mDebug("msg:%p, app:%p db:%s, alter table rsp received, need retry, times:%d result:%s thandle:%p", + pMsg->rpcMsg.ahandle, pMsg, pDb->name, pMsg->retry, tstrerror(rpcMsg->code), + pMsg->rpcMsg.handle); + + dnodeDelayReprocessMWriteMsg(pMsg); + } else { + mError("msg:%p, app:%p db:%s, failed to alter in dnode, result:%s thandle:%p", pMsg, pMsg->rpcMsg.ahandle, + pDb->name, tstrerror(rpcMsg->code), pMsg->rpcMsg.handle); + dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code); + } + } +} + int32_t mnodeCompactDbs() { void *pIter = NULL; SDbObj *pDb = NULL; diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index a64d256484..47773dcd90 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -60,7 +60,6 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg); -static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg); static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) ; static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); @@ -237,7 +236,6 @@ int32_t mnodeInitVgroups() { mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_VGROUP, mnodeCancelGetNextVgroup); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessAlterVnodeRsp); - mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessSyncVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mnodeProcessDropVnodeRsp); mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mnodeProcessVnodeCfgMsg); @@ -271,7 +269,7 @@ void mnodeUpdateVgroup(SVgObj *pVgroup) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("vgId:%d, failed to update vgroup", pVgroup->vgId); } - mnodeSendAlterVgroupMsg(pVgroup); + mnodeSendAlterVgroupMsg(pVgroup,NULL); } /* @@ -350,7 +348,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl mError("dnode:%d, vgId:%d, vnode cfgVersion:%d:%d repica:%d not match with mnode cfgVersion:%d:%d replica:%d", pDnode->dnodeId, pVload->vgId, pVload->dbCfgVersion, pVload->vgCfgVersion, pVload->replica, pVgroup->pDb->dbCfgVersion, pVgroup->vgCfgVersion, pVgroup->numOfVnodes); - mnodeSendAlterVgroupMsg(pVgroup); + mnodeSendAlterVgroupMsg(pVgroup,NULL); } } @@ -946,10 +944,10 @@ SRpcEpSet mnodeGetEpSetFromIp(char *ep) { return epSet; } -static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) { +static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, SMnodeMsg *pMsg) { SAlterVnodeMsg *pAlter = mnodeBuildVnodeMsg(pVgroup); SRpcMsg rpcMsg = { - .ahandle = NULL, + .ahandle = pMsg, .pCont = pAlter, .contLen = pAlter ? sizeof(SAlterVnodeMsg) : 0, .code = 0, @@ -958,14 +956,17 @@ static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) { dnodeSendMsgToDnode(epSet, &rpcMsg); } -void mnodeSendAlterVgroupMsg(SVgObj *pVgroup) { +void mnodeSendAlterVgroupMsg(SVgObj *pVgroup,SMnodeMsg *pMsg) { mDebug("vgId:%d, send alter all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, pVgroup->dbName); + if (pMsg) { + pMsg->pVgroup = pVgroup; + } for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); mDebug("vgId:%d, index:%d, send alter vnode msg to dnode %s", pVgroup->vgId, i, pVgroup->vnodeGid[i].pDnode->dnodeEp); - mnodeSendAlterVnodeMsg(pVgroup, &epSet); + mnodeSendAlterVnodeMsg(pVgroup, &epSet,pMsg); } } @@ -1026,11 +1027,24 @@ void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { } static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) { - mDebug("alter vnode rsp received"); -} + mDebug("alter vnode rsp is received, handle:%p", rpcMsg->ahandle); + if (rpcMsg->ahandle == NULL) return; -static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg) { - mDebug("sync vnode rsp received"); + SMnodeMsg *mnodeMsg = rpcMsg->ahandle; + mnodeMsg->received++; + if (rpcMsg->code == TSDB_CODE_SUCCESS) { + mnodeMsg->code = rpcMsg->code; + mnodeMsg->successed++; + } + + SVgObj *pVgroup = mnodeMsg->pVgroup; + mDebug("vgId:%d, alter vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p", + pVgroup->vgId, tstrerror(rpcMsg->code), mnodeMsg->received, mnodeMsg->successed, mnodeMsg->expected, + mnodeMsg->rpcMsg.handle, rpcMsg->ahandle); + + if (mnodeMsg->received != mnodeMsg->expected) return; + + dnodeReprocessMWriteMsg(mnodeMsg); } static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { diff --git a/tests/pytest/alter/alter_keep.py b/tests/pytest/alter/alter_keep.py index d38d9fa879..2135936a4f 100644 --- a/tests/pytest/alter/alter_keep.py +++ b/tests/pytest/alter/alter_keep.py @@ -161,9 +161,9 @@ class TDTestCase: # the following line should generate an error, but the insert was a success # the time now-15d is out of range of now -10d - tdSql.execute('insert into tb values (now-15d, 10)') + tdSql.error('insert into tb values (now-15d, 10)') tdSql.query('select * from tb') - tdSql.checkRows(rowNum + 1) + tdSql.checkRows(rowNum) # tdSql.execute('alter database db keep 20,20,20') # tdSql.query('show databases') From 54a1cf26fcf41f05d0b4f92347e32b436eff707b Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 8 Jun 2021 23:14:58 +0800 Subject: [PATCH 04/10] [TD-3963]fix alter vgroup bugs --- src/mnode/inc/mnodeDb.h | 1 + src/mnode/src/mnodeDb.c | 11 +++++++++++ src/mnode/src/mnodeVgroup.c | 5 ++++- 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/mnode/inc/mnodeDb.h b/src/mnode/inc/mnodeDb.h index 0fa1a15e2d..8f014ae9c8 100644 --- a/src/mnode/inc/mnodeDb.h +++ b/src/mnode/inc/mnodeDb.h @@ -40,6 +40,7 @@ void mnodeIncDbRef(SDbObj *pDb); void mnodeDecDbRef(SDbObj *pDb); bool mnodeCheckIsMonitorDB(char *db, char *monitordb); void mnodeDropAllDbs(SAcctObj *pAcct); +int mnodeInsertAlterRow(SDbObj *pDb, void *pMsg); int32_t mnodeCompactDbs(); diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 8b0faaf75c..0c03cbb5a0 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -1102,6 +1102,17 @@ static int32_t mnodeAlterDbFp(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } +int mnodeInsertAlterRow(SDbObj *pDb, void *pMsg) { + SSdbRow desc = { + .type = SDB_OPER_GLOBAL, + .pTable = tsDbSdb, + .pObj = pDb, + .pMsg = pMsg, + }; + + return sdbInsertRowToQueue(&desc); +} + static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { mDebug("db:%s, type:%d do alter operation", pDb->name, pDb->cfg.dbType); diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 47773dcd90..9eb736f0bb 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -961,6 +961,7 @@ void mnodeSendAlterVgroupMsg(SVgObj *pVgroup,SMnodeMsg *pMsg) { pVgroup->dbName); if (pMsg) { pMsg->pVgroup = pVgroup; + mnodeIncVgroupRef(pVgroup); } for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); @@ -1044,7 +1045,9 @@ static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) { if (mnodeMsg->received != mnodeMsg->expected) return; - dnodeReprocessMWriteMsg(mnodeMsg); + mnodeInsertAlterRow(pVgroup->pDb, mnodeMsg); + + dnodeSendRpcMWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); } static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { From e141eb96428cbcd10805c8ca17608064dc3d357b Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 9 Jun 2021 10:26:06 +0800 Subject: [PATCH 05/10] [TD-3963]fix alter vgroup bugs --- src/mnode/inc/mnodeDb.h | 2 +- src/mnode/src/mnodeDb.c | 35 ++--------------------------------- src/mnode/src/mnodeVgroup.c | 13 +++++++++---- 3 files changed, 12 insertions(+), 38 deletions(-) diff --git a/src/mnode/inc/mnodeDb.h b/src/mnode/inc/mnodeDb.h index 8f014ae9c8..40c5afc2c8 100644 --- a/src/mnode/inc/mnodeDb.h +++ b/src/mnode/inc/mnodeDb.h @@ -40,7 +40,7 @@ void mnodeIncDbRef(SDbObj *pDb); void mnodeDecDbRef(SDbObj *pDb); bool mnodeCheckIsMonitorDB(char *db, char *monitordb); void mnodeDropAllDbs(SAcctObj *pAcct); -int mnodeInsertAlterRow(SDbObj *pDb, void *pMsg); +int mnodeInsertAlterDbRow(SDbObj *pDb, void *pMsg); int32_t mnodeCompactDbs(); diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 0c03cbb5a0..95a5532b42 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -55,7 +55,6 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg); -static void mnodeProcessAlterDbRsp(SRpcMsg *rpcMsg); int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg); #ifndef _TOPIC @@ -203,7 +202,6 @@ int32_t mnodeInitDbs() { mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_SYNC_DB, mnodeProcessSyncDbMsg); - mnodeAddPeerRspHandle(TSDB_MSG_TYPE_CM_ALTER_DB_RSP, mnodeProcessAlterDbRsp); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb); @@ -1102,7 +1100,7 @@ static int32_t mnodeAlterDbFp(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -int mnodeInsertAlterRow(SDbObj *pDb, void *pMsg) { +int mnodeInsertAlterDbRow(SDbObj *pDb, void *pMsg) { SSdbRow desc = { .type = SDB_OPER_GLOBAL, .pTable = tsDbSdb, @@ -1110,7 +1108,7 @@ int mnodeInsertAlterRow(SDbObj *pDb, void *pMsg) { .pMsg = pMsg, }; - return sdbInsertRowToQueue(&desc); + return sdbUpdateRow(&desc); } static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { @@ -1299,35 +1297,6 @@ void mnodeDropAllDbs(SAcctObj *pAcct) { mInfo("acct:%s, all dbs:%d is dropped from sdb", pAcct->user, numOfDbs); } -static void mnodeProcessAlterDbRsp(SRpcMsg *rpcMsg) { - if (rpcMsg->ahandle == NULL) return; - - SMnodeMsg *pMsg = rpcMsg->ahandle; - pMsg->received++; - - SDbObj *pDb = (SDbObj *)pMsg->pDb; - assert(pDb); - - if (rpcMsg->code == TSDB_CODE_SUCCESS) { - mDebug("msg:%p, app:%p db:%s, altered in dnode, thandle:%p result:%s", pMsg, pMsg->rpcMsg.ahandle, - pDb->name, pMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); - - dnodeSendRpcMWriteRsp(pMsg, TSDB_CODE_SUCCESS); - } else { - if (pMsg->retry++ < ALTER_CDB_RETRY_TIMES) { - mDebug("msg:%p, app:%p db:%s, alter table rsp received, need retry, times:%d result:%s thandle:%p", - pMsg->rpcMsg.ahandle, pMsg, pDb->name, pMsg->retry, tstrerror(rpcMsg->code), - pMsg->rpcMsg.handle); - - dnodeDelayReprocessMWriteMsg(pMsg); - } else { - mError("msg:%p, app:%p db:%s, failed to alter in dnode, result:%s thandle:%p", pMsg, pMsg->rpcMsg.ahandle, - pDb->name, tstrerror(rpcMsg->code), pMsg->rpcMsg.handle); - dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code); - } - } -} - int32_t mnodeCompactDbs() { void *pIter = NULL; SDbObj *pDb = NULL; diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 9eb736f0bb..93b897cb1e 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -1044,10 +1044,15 @@ static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) { mnodeMsg->rpcMsg.handle, rpcMsg->ahandle); if (mnodeMsg->received != mnodeMsg->expected) return; - - mnodeInsertAlterRow(pVgroup->pDb, mnodeMsg); - - dnodeSendRpcMWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); + uint8_t msgType = mnodeMsg->rpcMsg.msgType; + if (msgType == TSDB_MSG_TYPE_CM_ALTER_DB || msgType == TSDB_MSG_TYPE_CM_CREATE_TP || msgType == TSDB_MSG_TYPE_CM_ALTER_TP) { + int32_t code = mnodeInsertAlterDbRow(pVgroup->pDb, mnodeMsg); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + dnodeSendRpcMWriteRsp(mnodeMsg, code); + } + } else { + dnodeSendRpcMWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); + } } static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { From ee334d6f78d247c88b9d28461691111b250e9be1 Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 9 Jun 2021 15:19:10 +0800 Subject: [PATCH 06/10] [TD-3963]fix alter db bug when there is no vgroup case --- src/mnode/src/mnodeDb.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 95a5532b42..0ab4baedb2 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -1090,7 +1090,14 @@ static int32_t mnodeAlterDbFp(SMnodeMsg *pMsg) { } // in case there is no vnode(no db in vnode) if (pMsg->expected == 0) { - return TSDB_CODE_SUCCESS; + SSdbRow row = { + .type = SDB_OPER_GLOBAL, + .pTable = tsDbSdb, + .pObj = pDb, + .pMsg = pMsg, + }; + + return sdbUpdateRow(&row); } mDebug("db:%s, all vgroups is altered", pDb->name); mLInfo("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); From 3cc8280846397faab5d84e2ca4cf3402b406c5cc Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 9 Jun 2021 17:37:30 +0800 Subject: [PATCH 07/10] [TD-3963]fix alter db bug when there is no vgroup case --- src/mnode/src/mnodeDb.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 0ab4baedb2..ffce3a197b 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -45,8 +45,6 @@ int64_t tsDbRid = -1; void * tsDbSdb = NULL; static int32_t tsDbUpdateSize; -#define ALTER_CDB_RETRY_TIMES 3 - static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *pMsg); static int32_t mnodeDropDb(SMnodeMsg *newMsg); static int32_t mnodeSetDbDropping(SDbObj *pDb); @@ -1088,6 +1086,10 @@ static int32_t mnodeAlterDbFp(SMnodeMsg *pMsg) { } mnodeDecVgroupRef(pVgroup); } + + mDebug("db:%s, all vgroups is altered", pDb->name); + mLInfo("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); + // in case there is no vnode(no db in vnode) if (pMsg->expected == 0) { SSdbRow row = { @@ -1099,8 +1101,6 @@ static int32_t mnodeAlterDbFp(SMnodeMsg *pMsg) { return sdbUpdateRow(&row); } - mDebug("db:%s, all vgroups is altered", pDb->name); - mLInfo("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); //bnNotify(); @@ -1146,7 +1146,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("db:%s, failed to alter, reason:%s", pDb->name, tstrerror(code)); } - } +} return code; } From b84c533244b25689a922d5afb38a87605644864c Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 10 Jun 2021 10:16:35 +0800 Subject: [PATCH 08/10] [TD-3963]sleep more time to see test result --- tests/pytest/stream/table_1.py | 2 +- tests/pytest/tag_lite/unsignedInt.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/pytest/stream/table_1.py b/tests/pytest/stream/table_1.py index a9fd573931..b205491fad 100644 --- a/tests/pytest/stream/table_1.py +++ b/tests/pytest/stream/table_1.py @@ -51,7 +51,7 @@ class TDTestCase: tdSql.execute( "insert into tb%d values (now - %dm, %d, %d)" % (i, 1440 - j, j, j)) - time.sleep(0.1) + time.sleep(1) self.createFuncStream("count(*)", "c1", rowNum) self.createFuncStream("count(tbcol)", "c2", rowNum) diff --git a/tests/pytest/tag_lite/unsignedInt.py b/tests/pytest/tag_lite/unsignedInt.py index 6efe12edf1..2d8474ff54 100644 --- a/tests/pytest/tag_lite/unsignedInt.py +++ b/tests/pytest/tag_lite/unsignedInt.py @@ -4,7 +4,7 @@ import sys from util.log import * from util.cases import * from util.sql import * - +import time class TDTestCase: def init(self, conn, logSql): @@ -114,6 +114,7 @@ class TDTestCase: # TSIM: endw # TSIM: # TSIM: print =============== step2 + time.sleep(1) tdLog.info('=============== step2') # TSIM: sleep 100 # TSIM: sql select * from $tb From 3759aeacd4de1d3347391f2a14973369cc907e7d Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 10 Jun 2021 13:13:35 +0800 Subject: [PATCH 09/10] [TD-3963]fix alter db core in cluster case --- src/mnode/src/mnodeDb.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index ea6d019a46..334450f70b 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -1104,12 +1104,13 @@ static int32_t mnodeAlterDbFp(SMnodeMsg *pMsg) { void *pIter = NULL; SVgObj *pVgroup = NULL; + pMsg->expected = 0; while (1) { pIter = mnodeGetNextVgroup(pIter, &pVgroup); if (pVgroup == NULL) break; if (pVgroup->pDb == pDb) { + pMsg->expected += pVgroup->numOfVnodes; mnodeSendAlterVgroupMsg(pVgroup,pMsg); - pMsg->expected += 1; } mnodeDecVgroupRef(pVgroup); } @@ -1117,7 +1118,7 @@ static int32_t mnodeAlterDbFp(SMnodeMsg *pMsg) { mDebug("db:%s, all vgroups is altered", pDb->name); mLInfo("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); - // in case there is no vnode(no db in vnode) + // in case there is no vnode for this db currently(no table in db,etc.) if (pMsg->expected == 0) { SSdbRow row = { .type = SDB_OPER_GLOBAL, From 899c21302a84bed1902a3f6357f3922f23e37519 Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 10 Jun 2021 15:19:24 +0800 Subject: [PATCH 10/10] [TD-3963]fix alter vgroup rsp handle --- src/mnode/src/mnodeVgroup.c | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 593b094dc2..af816d7ae7 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -1044,14 +1044,10 @@ static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) { mnodeMsg->rpcMsg.handle, rpcMsg->ahandle); if (mnodeMsg->received != mnodeMsg->expected) return; - uint8_t msgType = mnodeMsg->rpcMsg.msgType; - if (msgType == TSDB_MSG_TYPE_CM_ALTER_DB || msgType == TSDB_MSG_TYPE_CM_CREATE_TP || msgType == TSDB_MSG_TYPE_CM_ALTER_TP) { - int32_t code = mnodeInsertAlterDbRow(pVgroup->pDb, mnodeMsg); - if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { - dnodeSendRpcMWriteRsp(mnodeMsg, code); - } - } else { - dnodeSendRpcMWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); + + int32_t code = mnodeInsertAlterDbRow(pVgroup->pDb, mnodeMsg); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + dnodeSendRpcMWriteRsp(mnodeMsg, code); } }