From ab805d418e8c237672a0dc010be51d3b9fb8e6c3 Mon Sep 17 00:00:00 2001 From: slguan Date: Sun, 15 Mar 2020 21:08:51 +0800 Subject: [PATCH 1/5] [TD-10] refact create table --- src/dnode/src/dnodeWrite.c | 1 + src/inc/taosmsg.h | 3 +- src/mnode/src/mgmtChildTable.c | 12 +-- src/mnode/src/mgmtNormalTable.c | 6 -- src/mnode/src/mgmtTable.c | 172 +++++++++++++++----------------- src/mnode/src/mgmtVgroup.c | 12 +-- 6 files changed, 94 insertions(+), 112 deletions(-) diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 1b6c6c148f..b59c2882f2 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -402,3 +402,4 @@ static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; rpcSendResponse(&rpcRsp); } + diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 94692fb15f..6e15da71d4 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -332,11 +332,10 @@ typedef struct { } SMgmtHead; typedef struct { + int32_t contLen; int32_t vgId; int32_t sid; - int32_t numOfVPeers; uint64_t uid; - SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS]; char tableId[TSDB_TABLE_ID_LEN + 1]; } SMDDropTableMsg; diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 79b2280c17..386d4fecd3 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -373,14 +373,10 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { } strcpy(pRemove->tableId, pTable->tableId); - pRemove->sid = htonl(pTable->sid); - pRemove->uid = htobe64(pTable->uid); - - pRemove->numOfVPeers = htonl(pVgroup->numOfVnodes); - for (int i = 0; i < pVgroup->numOfVnodes; ++i) { - pRemove->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip); - pRemove->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); - } + pRemove->vgId = htonl(pTable->vgId); + pRemove->contLen = htonl(sizeof(SMDDropTableMsg)); + pRemove->sid = htonl(pTable->sid); + pRemove->uid = htobe64(pTable->uid); SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index eb511bcd44..d1556562e5 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -411,12 +411,6 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { pRemove->sid = htonl(pTable->sid); pRemove->uid = htobe64(pTable->uid); - pRemove->numOfVPeers = htonl(pVgroup->numOfVnodes); - for (int i = 0; i < pVgroup->numOfVnodes; ++i) { - pRemove->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip); - pRemove->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); - } - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); mTrace("table:%s, send drop table msg", pRemove->tableId); SRpcMsg rpcMsg = { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 49b0d3bca0..bfdd5d57a8 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -135,80 +135,7 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo } static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) { - SCMCreateTableMsg *pCreate = pMsg->pCont; - int32_t sid = taosAllocateId(pVgroup->idPool); - if (sid < 0) { - mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId); - mgmtCreateVgroup(pMsg); - return; - } - - int32_t code; - STableInfo *pTable; - SMDCreateTableMsg *pMDCreate = NULL; - - if (pCreate->numOfColumns == 0) { - mTrace("table:%s, is a child table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); - code = mgmtCreateChildTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); - } else { - mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); - code = mgmtCreateNormalTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); - } - - if (code != TSDB_CODE_SUCCESS) { - mTrace("table:%s, failed to create in vgroup:%d", pCreate->tableId, pVgroup->vgId); - mgmtSendSimpleResp(pMsg->thandle, code); - return; - } - - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - SRpcMsg rpcMsg = { - .handle = pMsg, - .pCont = pMDCreate, - .contLen = htonl(pMDCreate->contLen), - .code = 0, - .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE - }; - - pMsg->ahandle = pTable; - mgmtSendMsgToDnode(&ipSet, &rpcMsg); -} - -int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) { - STableInfo *pTable = mgmtGetTable(tableId); - if (pTable == NULL) { - if (ignore) { - mTrace("table:%s, table is not exist, think it success", tableId); - return TSDB_CODE_SUCCESS; - } else { - mError("table:%s, failed to create table, table not exist", tableId); - return TSDB_CODE_INVALID_TABLE; - } - } - - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - mError("table:%s, failed to create table, in monitor database", tableId); - return TSDB_CODE_MONITOR_DB_FORBIDDEN; - } - - switch (pTable->type) { - case TSDB_SUPER_TABLE: - mTrace("table:%s, start to drop super table", tableId); - return mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable); - case TSDB_CHILD_TABLE: - mTrace("table:%s, start to drop child table", tableId); - return mgmtDropChildTable(pDb, (SChildTableObj *) pTable); - case TSDB_NORMAL_TABLE: - mTrace("table:%s, start to drop normal table", tableId); - return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); - case TSDB_STREAM_TABLE: - mTrace("table:%s, start to drop stream table", tableId); - return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); - default: - mError("table:%s, invalid table type:%d", tableId, pTable->type); - return TSDB_CODE_INVALID_TABLE; - } } int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter) { @@ -494,29 +421,58 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { if (pVgroup == NULL) { mTrace("table:%s, start to create a new vgroup", pCreate->tableId); mgmtCreateVgroup(newMsg); - } else { - mTrace("table:%s, vgroup:%d is selected", pCreate->tableId, pVgroup->vgId); - mgmtCreateTable(pVgroup, newMsg); + return; } + + int32_t sid = taosAllocateId(pVgroup->idPool); + if (sid < 0) { + mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId); + mgmtCreateVgroup(newMsg); + return; + } + + SMDCreateTableMsg *pMDCreate = NULL; + if (pCreate->numOfColumns == 0) { + mTrace("table:%s, is a child table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); + code = mgmtCreateChildTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); + } else { + mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); + code = mgmtCreateNormalTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); + } + + if (code != TSDB_CODE_SUCCESS) { + mTrace("table:%s, failed to create in vgroup:%d", pCreate->tableId, pVgroup->vgId); + mgmtSendSimpleResp(pMsg->thandle, code); + return; + } + + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + SRpcMsg rpcMsg = { + .handle = newMsg, + .pCont = pMDCreate, + .contLen = htonl(pMDCreate->contLen), + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE + }; + + newMsg->ahandle = pTable; + mgmtSendMsgToDnode(&ipSet, &rpcMsg); } void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; + SCMDropTableMsg *pDrop = pMsg->pCont; + mTrace("table:%s, drop msg is received from thandle:%p", pDrop->tableId, pMsg->thandle); - if (mgmtCheckRedirect(pMsg->thandle)) { - mError("thandle:%p, failed to drop table:%s, need redirect message", pMsg->thandle, pDrop->tableId); + if (mgmtCheckExpired()) { + mError("table:%s, failed to drop, grant expired", pDrop->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED); return; } - SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); - if (pUser == NULL) { - mError("table:%s, failed to drop table, invalid user", pDrop->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); - return; - } - - if (!pUser->writeAuth) { - mError("table:%s, failed to drop table, no rights", pDrop->tableId); + if (!pMsg->pUser->writeAuth) { + mError("table:%s, failed to drop, no rights", pDrop->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); return; } @@ -528,9 +484,45 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { return; } - int32_t code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists); - if (code != TSDB_CODE_ACTION_IN_PROGRESS) { - mgmtSendSimpleResp(pMsg->thandle, code); + STableInfo *pTable = mgmtGetTable(pDrop->tableId); + if (pTable == NULL) { + if (pDrop->igNotExists) { + mTrace("table:%s, table is not exist, think drop success", pDrop->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); + return; + } else { + mError("table:%s, failed to drop table, table not exist", pDrop->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); + return; + } + } + + if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + mError("table:%s, failed to create table, in monitor database", pDrop->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); + return; + } + + switch (pTable->type) { + case TSDB_SUPER_TABLE: + mTrace("table:%s, start to drop super table", pDrop->tableId); + mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable); + break; + case TSDB_CHILD_TABLE: + mTrace("table:%s, start to drop child table", pDrop->tableId); + mgmtDropChildTable(pDb, (SChildTableObj *) pTable); + break; + case TSDB_NORMAL_TABLE: + mTrace("table:%s, start to drop normal table", pDrop->tableId); + mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); + break; + case TSDB_STREAM_TABLE: + mTrace("table:%s, start to drop stream table", pDrop->tableId); + mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); + break; + default: + mError("table:%s, invalid table type:%d", pDrop->tableId, pTable->type); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); } } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index a8c701a213..000bb79d7b 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -189,12 +189,12 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { STableInfo *pTable; if (pVgroup->numOfTables > 0) { - for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) { - if (pVgroup->tableList != NULL) { - pTable = pVgroup->tableList[i]; - if (pTable) mgmtDropTable(pDb, pTable->tableId, 0); - } - } +// for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) { +// if (pVgroup->tableList != NULL) { +// pTable = pVgroup->tableList[i]; +// if (pTable) mgmtDropTable(pDb, pTable->tableId, 0); +// } +// } } mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); From 63d2e696d2eb2b2f43618dc4737bc5c4e592e8dd Mon Sep 17 00:00:00 2001 From: slguan Date: Sun, 15 Mar 2020 22:07:50 +0800 Subject: [PATCH 2/5] [TD-9] refact create table msg --- src/client/src/tscServer.c | 1 + src/inc/mnode.h | 11 +++++--- src/inc/taosmsg.h | 3 ++- src/mnode/inc/mgmtChildTable.h | 5 ++-- src/mnode/inc/mgmtNormalTable.h | 5 ++-- src/mnode/inc/mgmtSuperTable.h | 2 +- src/mnode/src/mgmtChildTable.c | 46 +++++++++++++++------------------ src/mnode/src/mgmtNormalTable.c | 35 +++++++++++-------------- src/mnode/src/mgmtSuperTable.c | 2 +- src/mnode/src/mgmtTable.c | 37 ++++++++++++++++---------- 10 files changed, 78 insertions(+), 69 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ed905a8d54..deb1719527 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2032,6 +2032,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tscClearFieldInfo(&pQueryInfo->fieldsInfo); msgLen = pMsg - (char*)pCreateTableMsg; + pCreateTableMsg->contLen = htonl(msgLen); pCmd->payloadLen = msgLen; pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE; diff --git a/src/inc/mnode.h b/src/inc/mnode.h index b96905ff82..7a07a81d26 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -39,7 +39,7 @@ extern "C" { #include "ttimer.h" #include "tutil.h" - typedef struct { +typedef struct { uint32_t privateIp; int32_t sid; uint32_t moduleStatus; @@ -97,6 +97,7 @@ struct _vg_obj; typedef struct SSuperTableObj { char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; + int8_t dirty; uint64_t uid; int32_t sid; int32_t vgId; @@ -104,7 +105,7 @@ typedef struct SSuperTableObj { int32_t sversion; int32_t numOfColumns; int32_t numOfTags; - int8_t reserved[7]; + int8_t reserved[5]; int8_t updateEnd[1]; int32_t numOfTables; int16_t nextColId; @@ -114,12 +115,13 @@ typedef struct SSuperTableObj { typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; + int8_t dirty; uint64_t uid; int32_t sid; int32_t vgId; int64_t createdTime; char superTableId[TSDB_TABLE_ID_LEN + 1]; - int8_t reserved[7]; + int8_t reserved[1]; int8_t updateEnd[1]; SSuperTableObj *superTable; } SChildTableObj; @@ -127,13 +129,14 @@ typedef struct { typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; + int8_t dirty; uint64_t uid; int32_t sid; int32_t vgId; int64_t createdTime; int32_t sversion; int32_t numOfColumns; - int16_t sqlLen; + int32_t sqlLen; int8_t reserved[3]; int8_t updateEnd[1]; char* sql; //null-terminated string diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 6e15da71d4..4b5068aee4 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -264,7 +264,8 @@ typedef struct { int16_t numOfTags; int16_t numOfColumns; int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string - int16_t reserved[16]; + int32_t contLen; + int8_t reserved[16]; char schema[]; } SCMCreateTableMsg; diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index 5012ae8f17..680c48dc23 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -30,8 +30,9 @@ int32_t mgmtInitChildTables(); void mgmtCleanUpChildTables(); void * mgmtGetChildTable(char *tableId); -int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, - SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut); +void *mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); +void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pCreate, SChildTableObj *pTable); + int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index f740765ed1..c612c1325a 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -28,8 +28,9 @@ int32_t mgmtInitNormalTables(); void mgmtCleanUpNormalTables(); void * mgmtGetNormalTable(char *tableId); -int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, - SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut); +void * mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); +void * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable); + int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable); int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); diff --git a/src/mnode/inc/mgmtSuperTable.h b/src/mnode/inc/mgmtSuperTable.h index e9da9e546d..ba66ebe582 100644 --- a/src/mnode/inc/mgmtSuperTable.h +++ b/src/mnode/inc/mgmtSuperTable.h @@ -31,7 +31,7 @@ void mgmtCleanUpSuperTables(); void * mgmtGetSuperTable(char *tableId); -int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate); +int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate); int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable); int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags); int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName); diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 386d4fecd3..0b577f710f 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -272,19 +272,22 @@ void mgmtCleanUpChildTables() { sdbCloseTable(tsChildTableSdb); } -static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, void *pTagData, int32_t tagDataLen) { - int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags; - int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen; +void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTable) { + char *pTagData = pMsg->schema + TSDB_TABLE_ID_LEN + 1; + int32_t tagDataLen = htonl(pMsg->contLen) - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1; + int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags; + int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen; SMDCreateTableMsg *pCreate = rpcMallocCont(contLen); if (pCreate == NULL) { + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; return NULL; } - memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); - memcpy(pCreate->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN); + memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN + 1); + memcpy(pCreate->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN + 1); pCreate->contLen = htonl(contLen); - pCreate->vgId = htonl(pVgroup->vgId); + pCreate->vgId = htonl(pTable->vgId); pCreate->tableType = pTable->type; pCreate->numOfColumns = htons(pTable->superTable->numOfColumns); pCreate->numOfTags = htons(pTable->superTable->numOfTags); @@ -305,36 +308,38 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou } memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData, tagDataLen); - return pCreate; } -int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, - SMDCreateTableMsg **pMDCreateOut, STableInfo **pTableOut) { +void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t tid) { int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb); if (numOfTables >= tsMaxTables) { mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, tsMaxTables); - return TSDB_CODE_TOO_MANY_TABLES; + terrno = TSDB_CODE_TOO_MANY_TABLES; + return NULL; } char *pTagData = (char *) pCreate->schema; // it is a tag key SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData); if (pSuperTable == NULL) { mError("table:%s, corresponding super table does not exist", pCreate->tableId); - return TSDB_CODE_INVALID_TABLE; + terrno = TSDB_CODE_INVALID_TABLE; + return NULL; } SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1); if (pTable == NULL) { mError("table:%s, failed to alloc memory", pCreate->tableId); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; + return NULL; } + strcpy(pTable->tableId, pCreate->tableId); strcpy(pTable->superTableId, pSuperTable->tableId); pTable->type = TSDB_CHILD_TABLE; pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); - pTable->sid = sid; + pTable->sid = tid; pTable->vgId = pVgroup->vgId; pTable->createdTime = taosGetTimestampMs(); pTable->superTable = pSuperTable; @@ -342,21 +347,12 @@ int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) { free(pTable); mError("table:%s, update sdb error", pCreate->tableId); - return TSDB_CODE_SDB_ERROR; + terrno = TSDB_CODE_SDB_ERROR; + return NULL; } - pTagData += (TSDB_TABLE_ID_LEN + 1); - int32_t tagDataLen = contLen - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1; - *pMDCreateOut = mgmtBuildCreateChildTableMsg(pTable, pVgroup, pTagData, tagDataLen); - if (*pMDCreateOut == NULL) { - mError("table:%s, failed to build create table message", pCreate->tableId); - return TSDB_CODE_SERV_OUT_OF_MEMORY; - } - - *pTableOut = (STableInfo *) pTable; - mTrace("table:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid); - return TSDB_CODE_SUCCESS; + return pTable; } int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index d1556562e5..8c6645b6c2 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -287,18 +287,19 @@ void mgmtCleanUpNormalTables() { sdbCloseTable(tsNormalTableSdb); } -static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { +void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) { int32_t totalCols = pTable->numOfColumns; int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen; SMDCreateTableMsg *pCreate = rpcMallocCont(contLen); if (pCreate == NULL) { + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; return NULL; } memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN + 1); pCreate->contLen = htonl(contLen); - pCreate->vgId = htonl(pVgroup->vgId); + pCreate->vgId = htonl(pTable->vgId); pCreate->tableType = pTable->type; pCreate->numOfColumns = htons(pTable->numOfColumns); pCreate->numOfTags = 0; @@ -319,22 +320,22 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr } memcpy(pCreate + sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen); - return pCreate; } -int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, - SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) { +void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) { int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb); if (numOfTables >= TSDB_MAX_NORMAL_TABLES) { mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_NORMAL_TABLES); - return TSDB_CODE_TOO_MANY_TABLES; + terrno = TSDB_CODE_TOO_MANY_TABLES; + return NULL; } SNormalTableObj *pTable = (SNormalTableObj *) calloc(sizeof(SNormalTableObj), 1); if (pTable == NULL) { mError("table:%s, failed to alloc memory", pCreate->tableId); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; + return NULL; } strcpy(pTable->tableId, pCreate->tableId); @@ -352,7 +353,8 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb pTable->schema = (SSchema *) calloc(1, schemaSize); if (pTable->schema == NULL) { free(pTable); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; + return NULL; } memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); @@ -368,7 +370,8 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb pTable->sql = calloc(1, pTable->sqlLen); if (pTable->sql == NULL) { free(pTable); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; + return NULL; } memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pTable->sqlLen); pTable->sql[pTable->sqlLen - 1] = 0; @@ -378,20 +381,12 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb if (sdbInsertRow(tsNormalTableSdb, pTable, 0) < 0) { mError("table:%s, update sdb error", pTable->tableId); free(pTable); - return TSDB_CODE_SDB_ERROR; + terrno = TSDB_CODE_SDB_ERROR; + return NULL; } - *pDCreateOut = mgmtBuildCreateNormalTableMsg(pTable, pVgroup); - if (*pDCreateOut == NULL) { - mError("table:%s, failed to build create table message", pTable->tableId); - sdbDeleteRow(tsNormalTableSdb, pTable); - return TSDB_CODE_SERV_OUT_OF_MEMORY; - } - - *pTableOut = (STableInfo *) pTable; - mTrace("table:%s, create ntable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid); - return TSDB_CODE_SUCCESS; + return pTable; } int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 7778534424..34665d1db7 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -202,7 +202,7 @@ void mgmtCleanUpSuperTables() { sdbCloseTable(tsSuperTableSdb); } -int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate) { +int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { int32_t numOfTables = sdbGetNumOfRows(tsSuperTableSdb); if (numOfTables >= TSDB_MAX_SUPER_TABLES) { mError("stable:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_SUPER_TABLES); diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index bfdd5d57a8..97f2542a78 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -82,6 +82,9 @@ int32_t mgmtInitTables() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateTableRsp); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, NULL); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, NULL); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, NULL); return TSDB_CODE_SUCCESS; } @@ -134,10 +137,6 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo return TSDB_CODE_SUCCESS; } -static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) { - -} - int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter) { STableInfo *pTable = mgmtGetTable(pAlter->tableId); if (pTable == NULL) { @@ -401,7 +400,7 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { if (pCreate->numOfTags != 0) { mTrace("table:%s, is a super table", pCreate->tableId); - code = mgmtCreateSuperTable(pMsg->pDb, pCreate); + code = mgmtCreateSuperTable(pCreate); mgmtSendSimpleResp(pMsg->thandle, code); return; } @@ -434,16 +433,28 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { SMDCreateTableMsg *pMDCreate = NULL; if (pCreate->numOfColumns == 0) { mTrace("table:%s, is a child table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); - code = mgmtCreateChildTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); + pTable = mgmtCreateChildTable(pCreate, pVgroup, sid); + if (pTable == NULL) { + mgmtSendSimpleResp(pMsg->thandle, terrno); + return; + } + pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, pTable); + if (pCreate == NULL) { + mgmtSendSimpleResp(pMsg->thandle, terrno); + return; + } } else { mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); - code = mgmtCreateNormalTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); - } - - if (code != TSDB_CODE_SUCCESS) { - mTrace("table:%s, failed to create in vgroup:%d", pCreate->tableId, pVgroup->vgId); - mgmtSendSimpleResp(pMsg->thandle, code); - return; + code = mgmtCreateNormalTable(pCreate, pVgroup, sid); + if (pTable == NULL) { + mgmtSendSimpleResp(pMsg->thandle, terrno); + return; + } + pMDCreate = mgmtBuildCreateNormalTableMsg(pTable); + if (pCreate == NULL) { + mgmtSendSimpleResp(pMsg->thandle, terrno); + return; + } } SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); From 58118cfa39051db559f7e940c9e7d341eac8b81d Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 16 Mar 2020 14:14:12 +0800 Subject: [PATCH 3/5] [TD-10] drop stable message --- src/dnode/src/dnodeWrite.c | 148 ++++++++++++++++++++------------ src/inc/taosmsg.h | 8 +- src/mnode/inc/mgmtChildTable.h | 2 +- src/mnode/inc/mgmtNormalTable.h | 2 +- src/mnode/inc/mgmtVgroup.h | 2 +- src/mnode/src/mgmtChildTable.c | 30 +++---- src/mnode/src/mgmtDb.c | 2 +- src/mnode/src/mgmtNormalTable.c | 29 +++---- src/mnode/src/mgmtSuperTable.c | 15 ++-- src/mnode/src/mgmtTable.c | 66 +++++++++++--- src/mnode/src/mgmtVgroup.c | 4 +- 11 files changed, 193 insertions(+), 115 deletions(-) diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index b59c2882f2..25430b91e4 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -93,17 +93,14 @@ void dnodeWrite(SRpcMsg *pMsg) { char *pCont = (char *) pMsg->pCont; SRpcContext *pRpcContext = NULL; - int32_t numOfVnodes = 0; - if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { - // TODO parse head, get number of vnodes; - numOfVnodes = 1; - } else { - numOfVnodes = 1; - } - - if (numOfVnodes > 1) { - pRpcContext = calloc(sizeof(SRpcContext), 1); - pRpcContext->numOfVnodes = numOfVnodes; + if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) { + SWriteMsgDesc *pDesc = pCont; + pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); + pCont += sizeof(SWriteMsgDesc); + if (pDesc->numOfVnodes > 1) { + pRpcContext = calloc(sizeof(SRpcContext), 1); + pRpcContext->numOfVnodes = pDesc->numOfVnodes; + } } while (leftLen > 0) { @@ -291,26 +288,9 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont; - dTrace("table:%s, start to create in dnode, vgroup:%d", pTable->tableId, pTable->vgId); - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - void *pVnode = dnodeGetVnode(pTable->vgId); - if (pVnode == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_VGROUP_ID; - dTrace("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code)); - rpcSendResponse(&rpcRsp); - return; - } - - void *pTsdb = dnodeGetVnodeTsdb(pVnode); - if (pTsdb == NULL) { - dnodeReleaseVnode(pVnode); - rpcRsp.code = TSDB_CODE_NOT_ACTIVE_VNODE; - dTrace("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code)); - rpcSendResponse(&rpcRsp); - return; - } + dTrace("table:%s, start to create in dnode, vgroup:%d", pTable->tableId, pTable->vgId); pTable->numOfColumns = htons(pTable->numOfColumns); pTable->numOfTags = htons(pTable->numOfTags); pTable->sid = htonl(pTable->sid); @@ -344,7 +324,6 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { } tsdbTableSetSchema(&tCfg, pDestTagSchema, false); - // TODO: add data row char *pTagData = pTable->data + totalCols * sizeof(SSchema); int accumBytes = 0; SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); @@ -356,50 +335,107 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { tsdbTableSetTagValue(&tCfg, dataRow, false); } - rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg); - dnodeReleaseVnode(pVnode); + void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); - if (rpcRsp.code != TSDB_CODE_SUCCESS) { - dError("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code)); - rpcSendResponse(&rpcRsp); - } else { - dTrace("table:%s, created in dnode", pTable->tableId); - rpcSendResponse(&rpcRsp); - } + rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg); + dnodeReleaseVnode(pMsg->pVnode); + + dTrace("table:%s, create table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); + rpcSendResponse(&rpcRsp); } static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) { SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont; - dPrint("table:%s, sid:%d is dropped", pTable->tableId, pTable->sid); - -// pTable->sid = htonl(pTable->sid); -// pTable->numOfVPeers = htonl(pTable->numOfVPeers); -// pTable->uid = htobe64(pTable->uid); -// -// for (int i = 0; i < pTable->numOfVPeers; ++i) { -// pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip); -// pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode); -// } -// -// int32_t code = dnodeDropTable(pTable); -// SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + + dTrace("table:%s, start to drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId); + STableId tableId = { + .uid = htobe64(pTable->uid), + .tid = htonl(pTable->sid) + }; + + void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + + rpcRsp.code = tsdbDropTable(pTsdb, tableId); + dnodeReleaseVnode(pMsg->pVnode); + + dTrace("table:%s, drop table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); rpcSendResponse(&rpcRsp); } static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) { SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont; - dPrint("table:%s, sid:%d is alterd", pTable->tableId, pTable->sid); - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + + dTrace("table:%s, start to alter in dnode, vgroup:%d", pTable->tableId, pTable->vgId); + pTable->numOfColumns = htons(pTable->numOfColumns); + pTable->numOfTags = htons(pTable->numOfTags); + pTable->sid = htonl(pTable->sid); + pTable->sversion = htonl(pTable->sversion); + pTable->tagDataLen = htonl(pTable->tagDataLen); + pTable->sqlDataLen = htonl(pTable->sqlDataLen); + pTable->uid = htobe64(pTable->uid); + pTable->superTableUid = htobe64(pTable->superTableUid); + pTable->createdTime = htobe64(pTable->createdTime); + SSchema *pSchema = (SSchema *) pTable->data; + + int totalCols = pTable->numOfColumns + pTable->numOfTags; + for (int i = 0; i < totalCols; i++) { + pSchema[i].colId = htons(pSchema[i].colId); + pSchema[i].bytes = htons(pSchema[i].bytes); + } + + STableCfg tCfg; + tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid); + + STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns); + for (int i = 0; i < pTable->numOfColumns; i++) { + tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); + } + tsdbTableSetSchema(&tCfg, pDestSchema, false); + + if (pTable->numOfTags != 0) { + STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags); + for (int i = pTable->numOfColumns; i < totalCols; i++) { + tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); + } + tsdbTableSetSchema(&tCfg, pDestTagSchema, false); + + char *pTagData = pTable->data + totalCols * sizeof(SSchema); + int accumBytes = 0; + SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); + + for (int i = 0; i < pTable->numOfTags; i++) { + tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i); + accumBytes += pSchema[i + pTable->numOfColumns].bytes; + } + tsdbTableSetTagValue(&tCfg, dataRow, false); + } + + void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + + rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg); + dnodeReleaseVnode(pMsg->pVnode); + + dTrace("table:%s, alter table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); rpcSendResponse(&rpcRsp); } static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) { SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont; - dPrint("stable:%s, is dropped", pTable->tableId); - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + + dTrace("stable:%s, start to drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId); + pTable->uid = htobe64(pTable->uid); + + // TODO: drop stable in vvnode + //void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + //rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid); + + rpcRsp.code = TSDB_CODE_SUCCESS; + dnodeReleaseVnode(pMsg->pVnode); + + dTrace("stable:%s, drop stable result:%s", pTable->tableId, tstrerror(rpcRsp.code)); rpcSendResponse(&rpcRsp); } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 4b5068aee4..6a96b658bc 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -234,6 +234,10 @@ typedef struct { uint32_t ip; } SVnodeDesc; +typedef struct { + int32_t numOfVnodes; +} SWriteMsgDesc; + typedef struct { int32_t contLen; int32_t vgId; @@ -341,8 +345,10 @@ typedef struct { } SMDDropTableMsg; typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; + int32_t contLen; + int32_t vgId; int64_t uid; + char tableId[TSDB_TABLE_ID_LEN + 1]; } SMDDropSTableMsg; typedef struct { diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index 680c48dc23..7847357d65 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -33,7 +33,7 @@ void * mgmtGetChildTable(char *tableId); void *mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pCreate, SChildTableObj *pTable); -int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); +int32_t mgmtDropChildTable(SChildTableObj *pTable); int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp); diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index c612c1325a..0577647d22 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -31,7 +31,7 @@ void * mgmtGetNormalTable(char *tableId); void * mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); void * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable); -int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable); +int32_t mgmtDropNormalTable(SNormalTableObj *pTable); int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 7ac97076c7..10f404f386 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -30,7 +30,7 @@ SVgObj *mgmtGetVgroup(int32_t vgId); SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode); void mgmtCreateVgroup(SQueuedMsg *pMsg); -int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup); +int32_t mgmtDropVgroup(SVgObj *pVgroup); void mgmtUpdateVgroup(SVgObj *pVgroup); void mgmtSetVgroupIdPool(); diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 0b577f710f..6544d969a8 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -355,46 +355,38 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t return pTable; } -int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { +int32_t mgmtDropChildTable(SChildTableObj *pTable) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("table:%s, failed to drop child table, vgroup not exist", pTable->tableId); return TSDB_CODE_OTHERS; } - SMDDropTableMsg *pRemove = rpcMallocCont(sizeof(SMDDropTableMsg)); - if (pRemove == NULL) { + SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg)); + if (pDrop == NULL) { mError("table:%s, failed to drop child table, no enough memory", pTable->tableId); return TSDB_CODE_SERV_OUT_OF_MEMORY; } - strcpy(pRemove->tableId, pTable->tableId); - pRemove->vgId = htonl(pTable->vgId); - pRemove->contLen = htonl(sizeof(SMDDropTableMsg)); - pRemove->sid = htonl(pTable->sid); - pRemove->uid = htobe64(pTable->uid); + strcpy(pDrop->tableId, pTable->tableId); + pDrop->vgId = htonl(pTable->vgId); + pDrop->contLen = htonl(sizeof(SMDDropTableMsg)); + pDrop->sid = htonl(pTable->sid); + pDrop->uid = htobe64(pTable->uid); SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - mTrace("table:%s, send drop table msg", pRemove->tableId); + mTrace("table:%s, send drop table msg", pDrop->tableId); SRpcMsg rpcMsg = { .handle = 0, - .pCont = pRemove, + .pCont = pDrop, .contLen = sizeof(SMDDropTableMsg), .code = 0, .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE }; + mgmtSendMsgToDnode(&ipSet, &rpcMsg); - if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) { - mError("table:%s, update ctables sdb error", pTable->tableId); - return TSDB_CODE_SDB_ERROR; - } - - if (pVgroup->numOfTables <= 0) { - mgmtDropVgroup(pDb, pVgroup); - } - return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 12c34ad057..92699b69a6 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -368,7 +368,7 @@ static bool mgmtCheckDropDbFinished(SDbObj *pDb) { } static void mgmtDropDbFromSdb(SDbObj *pDb) { - while (pDb->pHead) mgmtDropVgroup(pDb, pDb->pHead); + while (pDb->pHead) mgmtDropVgroup(pDb->pHead); // SSuperTableObj *pMetric = pDb->pSTable; // while (pMetric) { diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index 8c6645b6c2..5328945bb9 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -389,43 +389,36 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t return pTable; } -int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { +int32_t mgmtDropNormalTable(SNormalTableObj *pTable) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("table:%s, failed to drop normal table, vgroup not exist", pTable->tableId); return TSDB_CODE_OTHERS; } - SMDDropTableMsg *pRemove = rpcMallocCont(sizeof(SMDDropTableMsg)); - if (pRemove == NULL) { + SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg)); + if (pDrop == NULL) { mError("table:%s, failed to drop normal table, no enough memory", pTable->tableId); return TSDB_CODE_SERV_OUT_OF_MEMORY; } - strcpy(pRemove->tableId, pTable->tableId); - pRemove->sid = htonl(pTable->sid); - pRemove->uid = htobe64(pTable->uid); + strcpy(pDrop->tableId, pTable->tableId); + pDrop->contLen = htonl(sizeof(SMDDropTableMsg)); + pDrop->vgId = htonl(pVgroup->vgId); + pDrop->sid = htonl(pTable->sid); + pDrop->uid = htobe64(pTable->uid); SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - mTrace("table:%s, send drop table msg", pRemove->tableId); + mTrace("table:%s, send drop table msg", pDrop->tableId); SRpcMsg rpcMsg = { .handle = 0, - .pCont = pRemove, + .pCont = pDrop, .contLen = sizeof(SMDDropTableMsg), .code = 0, .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE }; + mgmtSendMsgToDnode(&ipSet, &rpcMsg); - - if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) { - mError("table:%s, update ntables sdb error", pTable->tableId); - return TSDB_CODE_SDB_ERROR; - } - - if (pVgroup->numOfTables <= 0) { - mgmtDropVgroup(pDb, pVgroup); - } - return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 34665d1db7..2f3debf158 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -250,11 +250,16 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { return TSDB_CODE_SUCCESS; } -int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pSuperTable) { - //TODO drop all child tables - - mgmtRemoveSuperTableFromDb(pDb); - return sdbDeleteRow(tsSuperTableSdb, pSuperTable); +int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pStable) { + if (pStable->numOfTables != 0) { + mError("stable:%s, numOfTables:%d not 0", pStable->tableId, pStable->numOfTables); + return TSDB_CODE_OTHERS; + } else { + //TODO: drop child tables + mError("stable:%s, is dropped from sdb", pStable->tableId); + mgmtRemoveSuperTableFromDb(pDb); + return TSDB_CODE_OTHERS; + } } void* mgmtGetSuperTable(char *tableId) { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 97f2542a78..50d1697cd8 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -51,6 +51,7 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessSuperTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg); +static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg); static int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle); @@ -82,7 +83,7 @@ int32_t mgmtInitTables() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateTableRsp); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, NULL); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropTableRsp); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, NULL); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, NULL); @@ -439,19 +440,19 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { return; } pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, pTable); - if (pCreate == NULL) { + if (pMDCreate == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; } } else { mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); - code = mgmtCreateNormalTable(pCreate, pVgroup, sid); + pTable = mgmtCreateNormalTable(pCreate, pVgroup, sid); if (pTable == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; } pMDCreate = mgmtBuildCreateNormalTableMsg(pTable); - if (pCreate == NULL) { + if (pMDCreate == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; } @@ -509,31 +510,36 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { } if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - mError("table:%s, failed to create table, in monitor database", pDrop->tableId); + mError("table:%s, failed to drop table, in monitor database", pDrop->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); return; } + int32_t code; switch (pTable->type) { case TSDB_SUPER_TABLE: mTrace("table:%s, start to drop super table", pDrop->tableId); - mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable); + code = mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable); break; case TSDB_CHILD_TABLE: mTrace("table:%s, start to drop child table", pDrop->tableId); - mgmtDropChildTable(pDb, (SChildTableObj *) pTable); + code = mgmtDropChildTable((SChildTableObj *) pTable); break; case TSDB_NORMAL_TABLE: mTrace("table:%s, start to drop normal table", pDrop->tableId); - mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); + code = mgmtDropNormalTable((SNormalTableObj *) pTable); break; case TSDB_STREAM_TABLE: mTrace("table:%s, start to drop stream table", pDrop->tableId); - mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); + code = mgmtDropNormalTable((SNormalTableObj *) pTable); break; default: + code = TSDB_CODE_INVALID_TABLE_TYPE; mError("table:%s, invalid table type:%d", pDrop->tableId, pTable->type); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); + } + + if (code != TSDB_CODE_SUCCESS) { + mgmtSendSimpleResp(pMsg->thandle, code); } } @@ -778,3 +784,43 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { free(queueMsg); } + +static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { + if (rpcMsg->handle == NULL) return; + + STableInfo *pTable = rpcMsg->handle; + mTrace("table:%s, drop table rsp received, thandle:%p result:%s", pTable->tableId, rpcMsg->handle, tstrerror(rpcMsg->code)); + + if (rpcMsg->code != TSDB_CODE_SUCCESS) { + mError("table:%s, failed to drop in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code)); + mgmtSendSimpleResp(rpcMsg->handle, rpcMsg->code); + return; + } else { + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + mError("table:%s, failed to get vgroup", pTable->tableId); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_VGROUP_ID); + return; + } + + if (pTable->type == TSDB_CHILD_TABLE) { + if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) { + mError("table:%s, update ctables sdb error", pTable->tableId); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SDB_ERROR); + return; + } + } else if (pTable->type == TSDB_NORMAL_TABLE){ + if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) { + mError("table:%s, update ntables sdb error", pTable->tableId); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SDB_ERROR); + return; + } + } + + if (pVgroup->numOfTables <= 0) { + mgmtDropVgroup(pVgroup); + } + } + + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS); +} diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 000bb79d7b..7b3fb02e8a 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -185,7 +185,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) { mgmtSendCreateVgroupMsg(pVgroup, pMsg); } -int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { +int32_t mgmtDropVgroup(SVgObj *pVgroup) { STableInfo *pTable; if (pVgroup->numOfTables > 0) { @@ -197,7 +197,7 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { // } } - mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); + mTrace("vgroup:%d, replica:%d is deleted", pVgroup->vgId, pVgroup->numOfVnodes); //mgmtSendDropVgroupMsg(pVgroup, NULL); From 13b5cd4c3ab6758b21abb521e88410f5eb51b401 Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 16 Mar 2020 15:28:01 +0800 Subject: [PATCH 4/5] [TD-10] fix bugs while drop normal table --- src/dnode/src/dnodeWrite.c | 2 +- src/inc/mnode.h | 1 + src/inc/taosmsg.h | 3 +- src/mnode/inc/mgmtChildTable.h | 2 +- src/mnode/inc/mgmtNormalTable.h | 2 +- src/mnode/inc/mgmtSuperTable.h | 2 +- src/mnode/inc/mgmtVgroup.h | 1 - src/mnode/src/mgmtChildTable.c | 7 ++-- src/mnode/src/mgmtDb.c | 1 - src/mnode/src/mgmtNormalTable.c | 7 ++-- src/mnode/src/mgmtSuperTable.c | 2 +- src/mnode/src/mgmtTable.c | 74 ++++++++++++++++++++------------- src/mnode/src/mgmtVgroup.c | 47 +++++++++++++++++---- src/vnode/tsdb/CMakeLists.txt | 2 +- 14 files changed, 100 insertions(+), 53 deletions(-) diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 25430b91e4..8d35e6afd9 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -425,7 +425,7 @@ static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) { SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - dTrace("stable:%s, start to drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId); + dTrace("stable:%s, start to it drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId); pTable->uid = htobe64(pTable->uid); // TODO: drop stable in vvnode diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 7a07a81d26..dd996c7401 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -77,6 +77,7 @@ typedef struct { } SDnodeObj; typedef struct { + int32_t dnodeId; uint32_t ip; uint32_t publicIp; int32_t vnode; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 6a96b658bc..ca8ce136f8 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -352,8 +352,7 @@ typedef struct { } SMDDropSTableMsg; typedef struct { - int32_t vgId; - int32_t vnode; + int32_t vgId; } SMDDropVnodeMsg; typedef struct SColIndexEx { diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index 7847357d65..20f6ca9c8d 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -33,7 +33,7 @@ void * mgmtGetChildTable(char *tableId); void *mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pCreate, SChildTableObj *pTable); -int32_t mgmtDropChildTable(SChildTableObj *pTable); +int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable); int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp); diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index 0577647d22..6f7bd5321b 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -31,7 +31,7 @@ void * mgmtGetNormalTable(char *tableId); void * mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); void * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable); -int32_t mgmtDropNormalTable(SNormalTableObj *pTable); +int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable); int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); diff --git a/src/mnode/inc/mgmtSuperTable.h b/src/mnode/inc/mgmtSuperTable.h index ba66ebe582..2acf34144f 100644 --- a/src/mnode/inc/mgmtSuperTable.h +++ b/src/mnode/inc/mgmtSuperTable.h @@ -32,7 +32,7 @@ void mgmtCleanUpSuperTables(); void * mgmtGetSuperTable(char *tableId); int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate); -int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable); +int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pTable); int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags); int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName); int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName); diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 10f404f386..524cc87460 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -41,7 +41,6 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable); SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode); void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle); -void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup); SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip); diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 6544d969a8..0c6d964ae7 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -337,11 +337,11 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t strcpy(pTable->tableId, pCreate->tableId); strcpy(pTable->superTableId, pSuperTable->tableId); pTable->type = TSDB_CHILD_TABLE; + pTable->createdTime = taosGetTimestampMs(); pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); pTable->sid = tid; pTable->vgId = pVgroup->vgId; - pTable->createdTime = taosGetTimestampMs(); pTable->superTable = pSuperTable; if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) { @@ -355,7 +355,7 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t return pTable; } -int32_t mgmtDropChildTable(SChildTableObj *pTable) { +int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("table:%s, failed to drop child table, vgroup not exist", pTable->tableId); @@ -378,13 +378,14 @@ int32_t mgmtDropChildTable(SChildTableObj *pTable) { mTrace("table:%s, send drop table msg", pDrop->tableId); SRpcMsg rpcMsg = { - .handle = 0, + .handle = newMsg, .pCont = pDrop, .contLen = sizeof(SMDDropTableMsg), .code = 0, .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE }; + newMsg->ahandle = pTable; mgmtSendMsgToDnode(&ipSet, &rpcMsg); return TSDB_CODE_SUCCESS; diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 92699b69a6..ca441bfb2e 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -962,7 +962,6 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { int32_t code; if (pMsg->pUser->superAuth) { - code = TSDB_CODE_OPS_NOT_SUPPORT; //SCMDropDbMsg *pDrop = rpcMsg->pCont; //rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists); //if (rpcRsp.code == TSDB_CODE_SUCCESS) { diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index 5328945bb9..4ff99308a0 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -341,9 +341,9 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t strcpy(pTable->tableId, pCreate->tableId); pTable->type = TSDB_NORMAL_TABLE; pTable->vgId = pVgroup->vgId; + pTable->createdTime = taosGetTimestampMs(); pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); pTable->sid = sid; - pTable->createdTime = taosGetTimestampMs(); pTable->sversion = 0; pTable->numOfColumns = htons(pCreate->numOfColumns); pTable->sqlLen = htons(pCreate->sqlLen); @@ -389,7 +389,7 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t return pTable; } -int32_t mgmtDropNormalTable(SNormalTableObj *pTable) { +int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("table:%s, failed to drop normal table, vgroup not exist", pTable->tableId); @@ -411,13 +411,14 @@ int32_t mgmtDropNormalTable(SNormalTableObj *pTable) { SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); mTrace("table:%s, send drop table msg", pDrop->tableId); SRpcMsg rpcMsg = { - .handle = 0, + .handle = newMsg, .pCont = pDrop, .contLen = sizeof(SMDDropTableMsg), .code = 0, .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE }; + newMsg->ahandle = pTable; mgmtSendMsgToDnode(&ipSet, &rpcMsg); return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 2f3debf158..02cb466d05 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -250,7 +250,7 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { return TSDB_CODE_SUCCESS; } -int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pStable) { +int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pStable) { if (pStable->numOfTables != 0) { mError("stable:%s, numOfTables:%d not 0", pStable->tableId, pStable->numOfTables); return TSDB_CODE_OTHERS; diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 50d1697cd8..d77d9cbb50 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -515,23 +515,27 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { return; } + SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); + memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); + pMsg->pCont = NULL; int32_t code; + switch (pTable->type) { case TSDB_SUPER_TABLE: mTrace("table:%s, start to drop super table", pDrop->tableId); - code = mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable); + code = mgmtDropSuperTable(newMsg, pDb, (SSuperTableObj *) pTable); break; case TSDB_CHILD_TABLE: mTrace("table:%s, start to drop child table", pDrop->tableId); - code = mgmtDropChildTable((SChildTableObj *) pTable); + code = mgmtDropChildTable(newMsg, (SChildTableObj *) pTable); break; case TSDB_NORMAL_TABLE: mTrace("table:%s, start to drop normal table", pDrop->tableId); - code = mgmtDropNormalTable((SNormalTableObj *) pTable); + code = mgmtDropNormalTable(newMsg, (SNormalTableObj *) pTable); break; case TSDB_STREAM_TABLE: mTrace("table:%s, start to drop stream table", pDrop->tableId); - code = mgmtDropNormalTable((SNormalTableObj *) pTable); + code = mgmtDropNormalTable(newMsg, (SNormalTableObj *) pTable); break; default: code = TSDB_CODE_INVALID_TABLE_TYPE; @@ -539,6 +543,7 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { } if (code != TSDB_CODE_SUCCESS) { + free(newMsg); mgmtSendSimpleResp(pMsg->thandle, code); } } @@ -788,39 +793,48 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { if (rpcMsg->handle == NULL) return; - STableInfo *pTable = rpcMsg->handle; - mTrace("table:%s, drop table rsp received, thandle:%p result:%s", pTable->tableId, rpcMsg->handle, tstrerror(rpcMsg->code)); + SQueuedMsg *queueMsg = rpcMsg->handle; + queueMsg->received++; + + STableInfo *pTable = queueMsg->ahandle; + mTrace("table:%s, drop table rsp received, thandle:%p result:%s", pTable->tableId, queueMsg->thandle, tstrerror(rpcMsg->code)); if (rpcMsg->code != TSDB_CODE_SUCCESS) { mError("table:%s, failed to drop in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code)); - mgmtSendSimpleResp(rpcMsg->handle, rpcMsg->code); + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + free(queueMsg); return; - } else { - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - mError("table:%s, failed to get vgroup", pTable->tableId); - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_VGROUP_ID); + } + + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + mError("table:%s, failed to get vgroup", pTable->tableId); + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID); + free(queueMsg); + return; + } + + if (pTable->type == TSDB_CHILD_TABLE) { + if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) { + mError("table:%s, update ctables sdb error", pTable->tableId); + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); + free(queueMsg); return; } - - if (pTable->type == TSDB_CHILD_TABLE) { - if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) { - mError("table:%s, update ctables sdb error", pTable->tableId); - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SDB_ERROR); - return; - } - } else if (pTable->type == TSDB_NORMAL_TABLE){ - if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) { - mError("table:%s, update ntables sdb error", pTable->tableId); - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SDB_ERROR); - return; - } - } - - if (pVgroup->numOfTables <= 0) { - mgmtDropVgroup(pVgroup); + } else if (pTable->type == TSDB_NORMAL_TABLE){ + if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) { + mError("table:%s, update ntables sdb error", pTable->tableId); + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); + free(queueMsg); + return; } } - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS); + if (pVgroup->numOfTables <= 0) { + mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId); + mgmtDropVgroup(pVgroup); + } + + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS); + free(queueMsg); } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 7b3fb02e8a..e700fcd877 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -44,8 +44,10 @@ static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t static int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg); +static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg); -void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); +static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); +static void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); static void mgmtVgroupActionInit() { SVgObj tObj; @@ -119,6 +121,7 @@ int32_t mgmtInitVgroups() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp); mTrace("vgroup is initialized"); return 0; @@ -186,7 +189,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) { } int32_t mgmtDropVgroup(SVgObj *pVgroup) { - STableInfo *pTable; +// STableInfo *pTable; if (pVgroup->numOfTables > 0) { // for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) { @@ -197,12 +200,9 @@ int32_t mgmtDropVgroup(SVgObj *pVgroup) { // } } - mTrace("vgroup:%d, replica:%d is deleted", pVgroup->vgId, pVgroup->numOfVnodes); - - //mgmtSendDropVgroupMsg(pVgroup, NULL); - + mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); + mgmtSendDropVgroupMsg(pVgroup, NULL); sdbDeleteRow(tsVgroupSdb, pVgroup); - return TSDB_CODE_SUCCESS; } @@ -633,4 +633,37 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { } free(queueMsg); +} + +static SMDDropVnodeMsg *mgmtBuildDropVnodeMsg(SVgObj *pVgroup) { + SMDDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SMDDropVnodeMsg)); + if (pDrop == NULL) return NULL; + + pDrop->vgId = htonl(pVgroup->vgId); + return pDrop; +} + +static void mgmtSendDropVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { + mTrace("vgroup:%d, send drop vnode msg, ahandle:%p", pVgroup->vgId, ahandle); + SMDDropVnodeMsg *pDrop = mgmtBuildDropVnodeMsg(pVgroup); + SRpcMsg rpcMsg = { + .handle = ahandle, + .pCont = pDrop, + .contLen = pDrop ? sizeof(SMDDropVnodeMsg) : 0, + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_DROP_VNODE + }; + mgmtSendMsgToDnode(ipSet, &rpcMsg); +} + +static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { + mTrace("vgroup:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); + mgmtSendDropVnodeMsg(pVgroup, &ipSet, ahandle); + } +} + +static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { + mTrace("drop vnode msg is received"); } \ No newline at end of file diff --git a/src/vnode/tsdb/CMakeLists.txt b/src/vnode/tsdb/CMakeLists.txt index 8a7c7a1a51..b2154969d6 100644 --- a/src/vnode/tsdb/CMakeLists.txt +++ b/src/vnode/tsdb/CMakeLists.txt @@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) TARGET_LINK_LIBRARIES(tsdb common tutil) # Someone has no gtest directory, so comment it - ADD_SUBDIRECTORY(tests) + # ADD_SUBDIRECTORY(tests) ENDIF () From 562967f1bd4280a56f821461a2eacd383edf8926 Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 16 Mar 2020 22:26:44 +0800 Subject: [PATCH 5/5] [TD-9] add code for drop database --- src/client/src/tscServer.c | 6 +- src/dnode/src/dnodeRead.c | 44 ++++-- src/dnode/src/dnodeWrite.c | 8 +- src/inc/mnode.h | 7 +- src/inc/taosmsg.h | 4 +- src/mnode/inc/mgmtChildTable.h | 2 + src/mnode/inc/mgmtNormalTable.h | 2 + src/mnode/inc/mgmtSuperTable.h | 2 + src/mnode/inc/mgmtTable.h | 7 - src/mnode/inc/mgmtVgroup.h | 2 +- src/mnode/src/mgmtChildTable.c | 29 +++- src/mnode/src/mgmtDb.c | 242 +++++++++++--------------------- src/mnode/src/mgmtNormalTable.c | 26 +++- src/mnode/src/mgmtShell.c | 2 + src/mnode/src/mgmtSuperTable.c | 24 ++++ src/mnode/src/mgmtTable.c | 15 +- src/mnode/src/mgmtVgroup.c | 63 +++++---- 17 files changed, 245 insertions(+), 240 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index deb1719527..cc416e1cb7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -180,7 +180,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } - pSql->ipList->ip[0] = inet_addr("192.168.0.1"); + pSql->ipList->ip[0] = inet_addr(tsPrivateIp); if (pSql->cmd.command < TSDB_SQL_MGMT) { pSql->ipList->port = tsDnodeShellPort; @@ -197,7 +197,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg); } else { pSql->ipList->port = tsMnodeShellPort; - tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); + tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); SRpcMsg rpcMsg = { .msgType = pSql->cmd.msgType, @@ -213,7 +213,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { } void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { - tscPrint("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code)); + tscTrace("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code)); SSqlObj *pSql = (SSqlObj *)rpcMsg->handle; if (pSql == NULL || pSql->signature != pSql) { tscError("%p sql is already released, signature:%p", pSql, pSql->signature); diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 20fc948844..9ba6e343dc 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -70,25 +70,31 @@ void dnodeCleanupRead() { } void dnodeRead(SRpcMsg *pMsg) { + int32_t queuedMsgNum = 0; int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; - int32_t contLen = 0; - int32_t numOfVnodes = 0; - int32_t vgId = 0; SRpcContext *pRpcContext = NULL; - // parse head, get number of vnodes; - if ( numOfVnodes > 1) { - pRpcContext = calloc(sizeof(SRpcContext), 1); - pRpcContext->numOfVnodes = 1; +// SMsgDesc *pDesc = pCont; +// pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); +// pCont += sizeof(SMsgDesc); +// if (pDesc->numOfVnodes > 1) { +// pRpcContext = calloc(sizeof(SRpcContext), 1); +// pRpcContext->numOfVnodes = pDesc->numOfVnodes; +// } + if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { + queuedMsgNum = 0; } while (leftLen > 0) { - // todo: parse head, get vgId, contLen + SMsgHead *pHead = (SMsgHead *) pCont; + pHead->vgId = 1; //htonl(pHead->vgId); + pHead->contLen = pMsg->contLen; //htonl(pHead->contLen); - // get pVnode from vgId - void *pVnode = dnodeGetVnode(vgId); + void *pVnode = dnodeGetVnode(pHead->vgId); if (pVnode == NULL) { + leftLen -= pHead->contLen; + pCont -= pHead->contLen; continue; } @@ -96,7 +102,7 @@ void dnodeRead(SRpcMsg *pMsg) { SReadMsg readMsg; readMsg.rpcMsg = *pMsg; readMsg.pCont = pCont; - readMsg.contLen = contLen; + readMsg.contLen = pHead->contLen; readMsg.pRpcContext = pRpcContext; readMsg.pVnode = pVnode; @@ -104,11 +110,23 @@ void dnodeRead(SRpcMsg *pMsg) { taosWriteQitem(queue, &readMsg); // next vnode - leftLen -= contLen; - pCont -= contLen; + leftLen -= pHead->contLen; + pCont -= pHead->contLen; + queuedMsgNum++; dnodeReleaseVnode(pVnode); } + + if (queuedMsgNum == 0) { + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .pCont = NULL, + .contLen = 0, + .code = TSDB_CODE_INVALID_VGROUP_ID, + .msgType = 0 + }; + rpcSendResponse(&rpcRsp); + } } void *dnodeAllocateReadWorker() { diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 8d35e6afd9..b10ca16467 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -94,9 +94,9 @@ void dnodeWrite(SRpcMsg *pMsg) { SRpcContext *pRpcContext = NULL; if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) { - SWriteMsgDesc *pDesc = pCont; + SMsgDesc *pDesc = pCont; pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); - pCont += sizeof(SWriteMsgDesc); + pCont += sizeof(SMsgDesc); if (pDesc->numOfVnodes > 1) { pRpcContext = calloc(sizeof(SRpcContext), 1); pRpcContext->numOfVnodes = pDesc->numOfVnodes; @@ -104,7 +104,7 @@ void dnodeWrite(SRpcMsg *pMsg) { } while (leftLen > 0) { - SWriteMsgHead *pHead = (SWriteMsgHead *) pCont; + SMsgHead *pHead = (SMsgHead *) pCont; pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); @@ -322,7 +322,7 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { for (int i = pTable->numOfColumns; i < totalCols; i++) { tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); } - tsdbTableSetSchema(&tCfg, pDestTagSchema, false); + tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false); char *pTagData = pTable->data + totalCols * sizeof(SSchema); int accumBytes = 0; diff --git a/src/inc/mnode.h b/src/inc/mnode.h index dd996c7401..48aeb2dfe6 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -166,6 +166,7 @@ typedef struct _vg_obj { typedef struct _db_obj { char name[TSDB_DB_NAME_LEN + 1]; + int8_t dirty; int64_t createdTime; SDbCfg cfg; int8_t dropStatus; @@ -175,10 +176,8 @@ typedef struct _db_obj { int32_t numOfVgroups; int32_t numOfTables; int32_t numOfSuperTables; - int32_t vgStatus; - SVgObj *pHead; // empty vgroup first - SVgObj *pTail; // empty vgroup end - void * vgTimer; + SVgObj *pHead; + SVgObj *pTail; } SDbObj; struct _acctObj; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index ca8ce136f8..e4b083bb9d 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -236,12 +236,12 @@ typedef struct { typedef struct { int32_t numOfVnodes; -} SWriteMsgDesc; +} SMsgDesc; typedef struct { int32_t contLen; int32_t vgId; -} SWriteMsgHead; +} SMsgHead; typedef struct { int32_t contLen; diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index 20f6ca9c8d..9ba5da5024 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -38,6 +38,8 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp); +void mgmtDropAllChildTables(SDbObj *pDropDb); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index 6f7bd5321b..c78d9fce43 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -37,6 +37,8 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta *pMeta, bool usePublicIp); +void mgmtDropAllNormalTables(SDbObj *pDropDb); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtSuperTable.h b/src/mnode/inc/mgmtSuperTable.h index 2acf34144f..4b0599b359 100644 --- a/src/mnode/inc/mgmtSuperTable.h +++ b/src/mnode/inc/mgmtSuperTable.h @@ -45,6 +45,8 @@ void * mgmtGetSuperTableVgroup(SSuperTableObj *pStable); int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName); int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable); +void mgmtDropAllSuperTables(SDbObj *pDropDb); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index 9cd28c379e..a6c537a49b 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -32,16 +32,9 @@ STableInfo* mgmtGetTable(char *tableId); STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp); -int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo); -int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore); -int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter); - void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable); void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable); -SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable); -SMDDropSTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable); - #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 524cc87460..56a8fd054f 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -30,7 +30,7 @@ SVgObj *mgmtGetVgroup(int32_t vgId); SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode); void mgmtCreateVgroup(SQueuedMsg *pMsg); -int32_t mgmtDropVgroup(SVgObj *pVgroup); +void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle); void mgmtUpdateVgroup(SVgObj *pVgroup); void mgmtSetVgroupIdPool(); diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 0c6d964ae7..1e9e55e381 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -123,7 +123,6 @@ void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ss SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); return NULL; } @@ -462,13 +461,37 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *p for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { if (usePublicIp) { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } else { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); } pMeta->numOfVpeers = pVgroup->numOfVnodes; return TSDB_CODE_SUCCESS; } + +void mgmtDropAllChildTables(SDbObj *pDropDb) { + void *pNode = NULL; + void *pLastNode = NULL; + int32_t numOfTables = 0; + int32_t dbNameLen = strlen(pDropDb->name); + SChildTableObj *pTable = NULL; + + while (1) { + pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); + if (pTable == NULL) { + break; + } + + if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { + sdbDeleteRow(tsChildTableSdb, pTable); + pNode = pLastNode; + numOfTables ++; + continue; + } + } + + mTrace("db:%s, all child tables:%d is dropped", pDropDb->name, numOfTables); +} \ No newline at end of file diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index ca441bfb2e..10a60cf927 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -27,6 +27,9 @@ #include "mgmtMnode.h" #include "mgmtGrant.h" #include "mgmtShell.h" +#include "mgmtNormalTable.h" +#include "mgmtChildTable.h" +#include "mgmtSuperTable.h" #include "mgmtTable.h" #include "mgmtUser.h" #include "mgmtVgroup.h" @@ -34,10 +37,9 @@ static void *tsDbSdb = NULL; static int32_t tsDbUpdateSize; -static int32_t mgmtUpdateDb(SDbObj *pDb); static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); -static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists); -static int32_t mgmtDropDb(SDbObj *pDb); +static void mgmtDropDb(void *handle, void *tmrId); +static void mgmtSetDbDirty(SDbObj *pDb); static int32_t mgmtGetDbMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -60,7 +62,7 @@ static void mgmtDbActionInit() { mgmtDbActionFp[SDB_TYPE_UPDATE] = mgmtDbActionUpdate; mgmtDbActionFp[SDB_TYPE_ENCODE] = mgmtDbActionEncode; mgmtDbActionFp[SDB_TYPE_DECODE] = mgmtDbActionDecode; - mgmtDbActionFp[SDB_TYPE_RESET] = mgmtDbActionReset; + mgmtDbActionFp[SDB_TYPE_RESET] = mgmtDbActionReset; mgmtDbActionFp[SDB_TYPE_DESTROY] = mgmtDbActionDestroy; } @@ -98,8 +100,6 @@ int32_t mgmtInitDbs() { pDb->numOfTables = 0; pDb->numOfVgroups = 0; pDb->numOfSuperTables = 0; - pDb->vgStatus = TSDB_VG_STATUS_READY; - pDb->vgTimer = NULL; pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct != NULL) mgmtAddDbIntoAcct(pAcct, pDb); @@ -293,135 +293,6 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { return code; } -static int32_t mgmtUpdateDb(SDbObj *pDb) { - return sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1); -} - -static int32_t mgmtSetDbDropping(SDbObj *pDb) { - if (pDb->dropStatus == TSDB_DB_STATUS_DROP_FROM_SDB) return 0; - - SVgObj *pVgroup = pDb->pHead; - while (pVgroup != NULL) { - for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) { - SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i; - SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip); - if (pDnode == NULL) continue; - - SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode]; - if (pVload->dropStatus != TSDB_VN_DROP_STATUS_DROPPING) { - pVload->dropStatus = TSDB_VN_DROP_STATUS_DROPPING; - - mPrint("dnode:%s vnode:%d db:%s set to dropping status", taosIpStr(pDnode->privateIp), pVnodeGid->vnode, pDb->name); - if (mgmtUpdateDnode(pDnode) < 0) { - mError("db:%s drop failed, dnode sdb update error", pDb->name); - return TSDB_CODE_SDB_ERROR; - } - } - } - - //void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { - // mTrace("vgroup:%d send free vgroup msg, ahandle:%p", pVgroup->vgId, ahandle); - // - // for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - // SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); - // mgmtSendDropVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, ahandle); - // } - //} - // -// mgmtSendDropVgroupMsg(pVgroup); - pVgroup = pVgroup->next; - } - - if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) return 0; - - pDb->dropStatus = TSDB_DB_STATUS_DROPPING; - if (mgmtUpdateDb(pDb) < 0) { - mError("db:%s drop failed, db sdb update error", pDb->name); - return TSDB_CODE_SDB_ERROR; - } - - mPrint("db:%s set to dropping status", pDb->name); - return 0; -} - -static bool mgmtCheckDropDbFinished(SDbObj *pDb) { - SVgObj *pVgroup = pDb->pHead; - while (pVgroup) { - for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) { - SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i; - SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip); - - if (pDnode == NULL) continue; - if (pDnode->status == TSDB_DN_STATUS_OFFLINE) continue; - - SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode]; - if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) { - mTrace("dnode:%s, vnode:%d db:%s wait dropping", taosIpStr(pDnode->privateIp), pVnodeGid->vnode, pDb->name); - return false; - } - } - pVgroup = pVgroup->next; - } - - mPrint("db:%s all vnodes drop finished", pDb->name); - return true; -} - -static void mgmtDropDbFromSdb(SDbObj *pDb) { - while (pDb->pHead) mgmtDropVgroup(pDb->pHead); - -// SSuperTableObj *pMetric = pDb->pSTable; -// while (pMetric) { -// SSuperTableObj *pNext = pMetric->next; -// mgmtDropTable(pDb, pMetric->tableId, 0); -// pMetric = pNext; -// } - - mPrint("db:%s all meters drop finished", pDb->name); - sdbDeleteRow(tsDbSdb, pDb); - mPrint("db:%s database drop finished", pDb->name); -} - -static int32_t mgmtDropDb(SDbObj *pDb) { - - if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) { - bool finished = mgmtCheckDropDbFinished(pDb); - if (!finished) { - SVgObj *pVgroup = pDb->pHead; - while (pVgroup != NULL) { - //mgmtSendDropVgroupMsg(pVgroup, NULL); - pVgroup = pVgroup->next; - } - return TSDB_CODE_ACTION_IN_PROGRESS; - } - - // don't sync this action - pDb->dropStatus = TSDB_DB_STATUS_DROP_FROM_SDB; - mgmtDropDbFromSdb(pDb); - return 0; - } else { - int32_t code = mgmtSetDbDropping(pDb); - if (code != 0) return code; - return TSDB_CODE_ACTION_IN_PROGRESS; - } -} - -UNUSED_FUNC -static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) { - SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, name); - if (pDb == NULL) { - if (ignoreNotExists) return TSDB_CODE_SUCCESS; - mWarn("db:%s is not there", name); - return TSDB_CODE_INVALID_DB; - } - - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - return TSDB_CODE_MONITOR_DB_FORBIDDEN; - } - - return mgmtDropDb(pDb); -} - bool mgmtCheckIsMonitorDB(char *db, char *monitordb) { char dbName[TSDB_DB_NAME_LEN + 1] = {0}; extractDBName(db, dbName); @@ -430,20 +301,6 @@ bool mgmtCheckIsMonitorDB(char *db, char *monitordb) { return (strncasecmp(dbName, monitordb, len) == 0 && len == strlen(monitordb)); } -UNUSED_FUNC -static void mgmtMonitorDbDrop(void *unused, void *unusedt) { - void * pNode = NULL; - SDbObj *pDb = NULL; - - while (1) { - pNode = sdbFetchRow(tsDbSdb, pNode, (void **)&pDb); - if (pDb == NULL) break; - if (pDb->dropStatus != TSDB_DB_STATUS_DROPPING) continue; - mgmtDropDb(pDb); - break; - } -} - static int32_t mgmtAlterDb(SAcctObj *pAcct, SCMAlterDbMsg *pAlter) { return 0; // int32_t code = TSDB_CODE_SUCCESS; @@ -840,7 +697,6 @@ void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { pDb->pTail = NULL; pDb->numOfVgroups = 0; pDb->numOfTables = 0; - pDb->vgTimer = NULL; mgmtAddDbIntoAcct(pAcct, pDb); return NULL; @@ -851,6 +707,10 @@ void *mgmtDbActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); mgmtRemoveDbFromAcct(pAcct, pDb); + mgmtDropAllNormalTables(pDb); + mgmtDropAllChildTables(pDb); + mgmtDropAllSuperTables(pDb); + return NULL; } @@ -906,6 +766,10 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) { atomic_add_fetch_32(&pDb->numOfTables, -1); } +static void mgmtSetDbDirty(SDbObj *pDb) { + pDb->dirty = true; +} + static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { if (mgmtCheckRedirect(pMsg->thandle)) return; @@ -919,7 +783,6 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { pCreate->commitTime = htonl(pCreate->commitTime); pCreate->blocksPerTable = htons(pCreate->blocksPerTable); pCreate->rowsInFileBlock = htonl(pCreate->rowsInFileBlock); - // pCreate->cacheNumOfBlocks = htonl(pCreate->cacheNumOfBlocks); int32_t code; if (mgmtCheckExpired()) { @@ -957,21 +820,76 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { mgmtSendSimpleResp(pMsg->thandle, code); } +static void mgmtDropDb(void *handle, void *tmrId) { + SQueuedMsg *newMsg = handle; + SDbObj *pDb = newMsg->ahandle; + mPrint("db:%s, drop db from sdb", pDb->name); + + int32_t code = sdbDeleteRow(tsDbSdb, pDb); + if (code != 0) { + code = TSDB_CODE_SDB_ERROR; + } + + mgmtSendSimpleResp(newMsg->thandle, code); + rpcFreeCont(newMsg->pCont); + free(newMsg); +} + static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { if (mgmtCheckRedirect(pMsg->thandle)) return; - int32_t code; - if (pMsg->pUser->superAuth) { - //SCMDropDbMsg *pDrop = rpcMsg->pCont; - //rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists); - //if (rpcRsp.code == TSDB_CODE_SUCCESS) { - // mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user); - //} - } else { - code = TSDB_CODE_NO_RIGHTS; + SCMDropDbMsg *pDrop = pMsg->pCont; + mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->thandle); + + if (mgmtCheckExpired()) { + mError("db:%s, failed to drop, grant expired", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED); + return; } - if (code != TSDB_CODE_SUCCESS) { - mgmtSendSimpleResp(pMsg->thandle, code); + if (!pMsg->pUser->writeAuth) { + mError("db:%s, failed to drop, no rights", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); + return; } + + SDbObj *pDb = mgmtGetDb(pDrop->db); + if (pDb == NULL) { + if (pDrop->ignoreNotExists) { + mTrace("db:%s, db is not exist, think drop success", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); + return; + } else { + mError("db:%s, failed to drop, invalid db", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB); + return; + } + } + + if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + mError("db:%s, can't drop monitor database", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); + return; + } + + mgmtSetDbDirty(pDb); + + SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); + memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); + pMsg->pCont = NULL; + + SVgObj *pVgroup = pDb->pHead; + if (pVgroup != NULL) { + mPrint("vgroup:%d, will be dropped", pVgroup->vgId); + newMsg->ahandle = pVgroup; + newMsg->expected = pVgroup->numOfVnodes; + mgmtDropVgroup(pVgroup, newMsg); + return; + } + + mTrace("db:%s, all vgroups is dropped", pDb->name); + + void *tmpTmr; + newMsg->ahandle = pDb; + taosTmrReset(mgmtDropDb, 10, newMsg, tsMgmtTmr, &tmpTmr); } diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index 4ff99308a0..799d123ac3 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -125,7 +125,6 @@ void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *s SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); return NULL; } @@ -540,14 +539,35 @@ int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { if (usePublicIp) { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } else { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); } pMeta->numOfVpeers = pVgroup->numOfVnodes; return TSDB_CODE_SUCCESS; } +void mgmtDropAllNormalTables(SDbObj *pDropDb) { + void *pNode = NULL; + void *pLastNode = NULL; + int32_t numOfTables = 0; + int32_t dbNameLen = strlen(pDropDb->name); + SNormalTableObj *pTable = NULL; + + while (1) { + pNode = sdbFetchRow(tsNormalTableSdb, pNode, (void **)&pTable); + if (pTable == NULL) break; + + if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { + sdbDeleteRow(tsNormalTableSdb, pTable); + pNode = pLastNode; + numOfTables ++; + continue; + } + } + + mTrace("db:%s, all normal tables:%d is dropped", pDropDb->name, numOfTables); +} diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 4970d8794a..223e67ebfb 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -242,6 +242,8 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { } SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; + mTrace("show:%p, type:%s, retrieve data", pShow, taosGetShowTypeStr(pShow->type)); + if (!mgmtCheckQhandle(pRetrieve->qhandle)) { mError("pShow:%p, query memory is corrupted", pShow); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MEMORY_CORRUPTED); diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 02cb466d05..a4539f7b9f 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -612,6 +612,30 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v return numOfRows; } +void mgmtDropAllSuperTables(SDbObj *pDropDb) { + void *pNode = NULL; + void *pLastNode = NULL; + int32_t numOfTables = 0; + int32_t dbNameLen = strlen(pDropDb->name); + SSuperTableObj *pTable = NULL; + + while (1) { + pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **)&pTable); + if (pTable == NULL) { + break; + } + + if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { + sdbDeleteRow(tsSuperTableSdb, pTable); + pNode = pLastNode; + numOfTables ++; + continue; + } + } + + mTrace("db:%s, all super tables:%d is dropped", pDropDb->name, numOfTables); +} + void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable) { pStable->numOfTables++; } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index d77d9cbb50..abf1d9162c 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -346,13 +346,6 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void * return numOfRows; } -SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) { - SMDDropTableMsg *pRemove = NULL; - - - return pRemove; -} - void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { if (mgmtCheckRedirect(pMsg->thandle)) return; @@ -439,7 +432,7 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; } - pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, pTable); + pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, (SChildTableObj *) pTable); if (pMDCreate == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; @@ -451,7 +444,7 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; } - pMDCreate = mgmtBuildCreateNormalTableMsg(pTable); + pMDCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); if (pMDCreate == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; @@ -475,7 +468,7 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { if (mgmtCheckRedirect(pMsg->thandle)) return; SCMDropTableMsg *pDrop = pMsg->pCont; - mTrace("table:%s, drop msg is received from thandle:%p", pDrop->tableId, pMsg->thandle); + mTrace("table:%s, drop table msg is received from thandle:%p", pDrop->tableId, pMsg->thandle); if (mgmtCheckExpired()) { mError("table:%s, failed to drop, grant expired", pDrop->tableId); @@ -832,7 +825,7 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { if (pVgroup->numOfTables <= 0) { mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId); - mgmtDropVgroup(pVgroup); + mgmtDropVgroup(pVgroup, NULL); } mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS); diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index e700fcd877..c2f3e19951 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -138,18 +138,6 @@ SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) { return pDb->pHead; } -void mgmtProcessVgTimer(void *handle, void *tmrId) { - SDbObj *pDb = (SDbObj *)handle; - if (pDb == NULL) return; - - if (pDb->vgStatus > TSDB_VG_STATUS_IN_PROGRESS) { - mTrace("db:%s, set vgroup status from %d to ready", pDb->name, pDb->vgStatus); - pDb->vgStatus = TSDB_VG_STATUS_READY; - } - - pDb->vgTimer = NULL; -} - void mgmtCreateVgroup(SQueuedMsg *pMsg) { SDbObj *pDb = pMsg->pDb; if (pDb == NULL) { @@ -188,22 +176,14 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) { mgmtSendCreateVgroupMsg(pVgroup, pMsg); } -int32_t mgmtDropVgroup(SVgObj *pVgroup) { -// STableInfo *pTable; - - if (pVgroup->numOfTables > 0) { -// for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) { -// if (pVgroup->tableList != NULL) { -// pTable = pVgroup->tableList[i]; -// if (pTable) mgmtDropTable(pDb, pTable->tableId, 0); -// } -// } +void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) { + if (ahandle != NULL) { + mgmtSendDropVgroupMsg(pVgroup, ahandle); + } else { + mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); + mgmtSendDropVgroupMsg(pVgroup, NULL); + sdbDeleteRow(tsVgroupSdb, pVgroup); } - - mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); - mgmtSendDropVgroupMsg(pVgroup, NULL); - sdbDeleteRow(tsVgroupSdb, pVgroup); - return TSDB_CODE_SUCCESS; } void mgmtSetVgroupIdPool() { @@ -666,4 +646,33 @@ static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { mTrace("drop vnode msg is received"); + if (rpcMsg->handle == NULL) return; + + SQueuedMsg *queueMsg = rpcMsg->handle; + queueMsg->received++; + if (rpcMsg->code == TSDB_CODE_SUCCESS) { + queueMsg->code = rpcMsg->code; + queueMsg->successed++; + } + + SVgObj *pVgroup = queueMsg->ahandle; + mTrace("vgroup:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p", + pVgroup->vgId, tstrerror(rpcMsg->code), queueMsg->received, queueMsg->successed, queueMsg->expected, + queueMsg->thandle, rpcMsg->handle); + + if (queueMsg->received != queueMsg->expected) return; + + sdbDeleteRow(tsVgroupSdb, pVgroup); + + SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); + newMsg->msgType = queueMsg->msgType; + newMsg->thandle = queueMsg->thandle; + newMsg->pDb = queueMsg->pDb; + newMsg->pUser = queueMsg->pUser; + newMsg->contLen = queueMsg->contLen; + newMsg->pCont = rpcMallocCont(newMsg->contLen); + memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); + mgmtAddToShellQueue(newMsg); + + free(queueMsg); } \ No newline at end of file