[TD-147] optimize refcount while drop db
This commit is contained in:
parent
9ffcbcc279
commit
37b20dc766
|
@ -32,6 +32,7 @@ int32_t mgmtInitDbs();
|
||||||
void mgmtCleanUpDbs();
|
void mgmtCleanUpDbs();
|
||||||
SDbObj *mgmtGetDb(char *db);
|
SDbObj *mgmtGetDb(char *db);
|
||||||
SDbObj *mgmtGetDbByTableId(char *db);
|
SDbObj *mgmtGetDbByTableId(char *db);
|
||||||
|
void * mgmtGetNextDb(void *pNode, SDbObj **pDb);
|
||||||
void mgmtIncDbRef(SDbObj *pDb);
|
void mgmtIncDbRef(SDbObj *pDb);
|
||||||
void mgmtDecDbRef(SDbObj *pDb);
|
void mgmtDecDbRef(SDbObj *pDb);
|
||||||
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
|
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
|
||||||
|
|
|
@ -237,7 +237,6 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint8_t msgType;
|
uint8_t msgType;
|
||||||
int8_t usePublicIp;
|
|
||||||
int8_t received;
|
int8_t received;
|
||||||
int8_t successed;
|
int8_t successed;
|
||||||
int8_t expected;
|
int8_t expected;
|
||||||
|
|
|
@ -33,16 +33,19 @@ void mgmtCleanupMnodes();
|
||||||
|
|
||||||
int32_t mgmtAddMnode(int32_t dnodeId);
|
int32_t mgmtAddMnode(int32_t dnodeId);
|
||||||
int32_t mgmtDropMnode(int32_t dnodeId);
|
int32_t mgmtDropMnode(int32_t dnodeId);
|
||||||
|
void mgmtDropMnodeLocal(int32_t dnodeId);
|
||||||
|
|
||||||
void * mgmtGetMnode(int32_t mnodeId);
|
void * mgmtGetMnode(int32_t mnodeId);
|
||||||
int32_t mgmtGetMnodesNum();
|
int32_t mgmtGetMnodesNum();
|
||||||
void * mgmtGetNextMnode(void *pNode, struct SMnodeObj **pMnode);
|
void * mgmtGetNextMnode(void *pNode, struct SMnodeObj **pMnode);
|
||||||
void mgmtReleaseMnode(struct SMnodeObj *pMnode);
|
void mgmtIncMnodeRef(struct SMnodeObj *pMnode);
|
||||||
|
void mgmtDecMnodeRef(struct SMnodeObj *pMnode);
|
||||||
|
|
||||||
char * mgmtGetMnodeRoleStr();
|
char * mgmtGetMnodeRoleStr();
|
||||||
void mgmtGetMnodeIpSet(SRpcIpSet *ipSet);
|
void mgmtGetMnodeIpSet(SRpcIpSet *ipSet);
|
||||||
void mgmtGetMnodeInfos(void *mnodes);
|
void mgmtGetMnodeInfos(void *mnodes);
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -22,13 +22,15 @@ extern "C" {
|
||||||
|
|
||||||
#include "mgmtDef.h"
|
#include "mgmtDef.h"
|
||||||
|
|
||||||
int32_t mgmtInitTables();
|
int32_t mgmtInitTables();
|
||||||
void mgmtCleanUpTables();
|
void mgmtCleanUpTables();
|
||||||
STableObj* mgmtGetTable(char* tableId);
|
void * mgmtGetTable(char *tableId);
|
||||||
void mgmtIncTableRef(void *pTable);
|
void mgmtIncTableRef(void *pTable);
|
||||||
void mgmtDecTableRef(void *pTable);
|
void mgmtDecTableRef(void *pTable);
|
||||||
void mgmtDropAllChildTables(SDbObj *pDropDb);
|
void * mgmtGetNextChildTable(void *pNode, SChildTableObj **pTable);
|
||||||
void mgmtDropAllSuperTables(SDbObj *pDropDb);
|
void * mgmtGetNextSuperTable(void *pNode, SSuperTableObj **pTable);
|
||||||
|
void mgmtDropAllChildTables(SDbObj *pDropDb);
|
||||||
|
void mgmtDropAllSuperTables(SDbObj *pDropDb);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,7 @@ SUserObj *mgmtGetUser(char *name);
|
||||||
void * mgmtGetNextUser(void *pNode, SUserObj **pUser);
|
void * mgmtGetNextUser(void *pNode, SUserObj **pUser);
|
||||||
void mgmtIncUserRef(SUserObj *pUser);
|
void mgmtIncUserRef(SUserObj *pUser);
|
||||||
void mgmtDecUserRef(SUserObj *pUser);
|
void mgmtDecUserRef(SUserObj *pUser);
|
||||||
SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp);
|
SUserObj *mgmtGetUserFromConn(void *pConn);
|
||||||
int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass);
|
int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass);
|
||||||
void mgmtDropAllUsers(SAcctObj *pAcct);
|
void mgmtDropAllUsers(SAcctObj *pAcct);
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,8 @@ void mgmtCleanUpVgroups();
|
||||||
SVgObj *mgmtGetVgroup(int32_t vgId);
|
SVgObj *mgmtGetVgroup(int32_t vgId);
|
||||||
void mgmtIncVgroupRef(SVgObj *pVgroup);
|
void mgmtIncVgroupRef(SVgObj *pVgroup);
|
||||||
void mgmtDecVgroupRef(SVgObj *pVgroup);
|
void mgmtDecVgroupRef(SVgObj *pVgroup);
|
||||||
void mgmtDropAllVgroups(SDbObj *pDropDb);
|
void mgmtDropAllDbVgroups(SDbObj *pDropDb);
|
||||||
|
void mgmtDropAllDnodeVgroups(SDnodeObj *pDropDnode);
|
||||||
|
|
||||||
void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup);
|
void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup);
|
||||||
void mgmtUpdateVgroup(SVgObj *pVgroup);
|
void mgmtUpdateVgroup(SVgObj *pVgroup);
|
||||||
|
|
|
@ -27,8 +27,8 @@
|
||||||
#include "mgmtUser.h"
|
#include "mgmtUser.h"
|
||||||
|
|
||||||
void * tsAcctSdb = NULL;
|
void * tsAcctSdb = NULL;
|
||||||
int32_t tsAcctUpdateSize;
|
static int32_t tsAcctUpdateSize;
|
||||||
static void mgmtCreateRootAcct();
|
static void mgmtCreateRootAcct();
|
||||||
|
|
||||||
static int32_t mgmtActionAcctDestroy(SSdbOper *pOper) {
|
static int32_t mgmtActionAcctDestroy(SSdbOper *pOper) {
|
||||||
SAcctObj *pAcct = pOper->pObj;
|
SAcctObj *pAcct = pOper->pObj;
|
||||||
|
|
|
@ -36,7 +36,7 @@
|
||||||
#include "mgmtUser.h"
|
#include "mgmtUser.h"
|
||||||
#include "mgmtVgroup.h"
|
#include "mgmtVgroup.h"
|
||||||
|
|
||||||
void * tsDbSdb = NULL;
|
static void * tsDbSdb = NULL;
|
||||||
static int32_t tsDbUpdateSize;
|
static int32_t tsDbUpdateSize;
|
||||||
|
|
||||||
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
|
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
|
||||||
|
@ -82,7 +82,7 @@ static int32_t mgmtDbActionDelete(SSdbOper *pOper) {
|
||||||
mgmtDropDbFromAcct(pAcct, pDb);
|
mgmtDropDbFromAcct(pAcct, pDb);
|
||||||
mgmtDropAllChildTables(pDb);
|
mgmtDropAllChildTables(pDb);
|
||||||
mgmtDropAllSuperTables(pDb);
|
mgmtDropAllSuperTables(pDb);
|
||||||
mgmtDropAllVgroups(pDb);
|
mgmtDropAllDbVgroups(pDb);
|
||||||
mgmtDecAcctRef(pAcct);
|
mgmtDecAcctRef(pAcct);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -95,6 +95,7 @@ static int32_t mgmtDbActionUpdate(SSdbOper *pOper) {
|
||||||
memcpy(pSaved, pDb, pOper->rowSize);
|
memcpy(pSaved, pDb, pOper->rowSize);
|
||||||
free(pDb);
|
free(pDb);
|
||||||
}
|
}
|
||||||
|
mgmtDecDbRef(pSaved);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,6 +155,10 @@ int32_t mgmtInitDbs() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *mgmtGetNextDb(void *pNode, SDbObj **pDb) {
|
||||||
|
return sdbFetchRow(tsDbSdb, pNode, (void **)pDb);
|
||||||
|
}
|
||||||
|
|
||||||
SDbObj *mgmtGetDb(char *db) {
|
SDbObj *mgmtGetDb(char *db) {
|
||||||
return (SDbObj *)sdbGetRow(tsDbSdb, db);
|
return (SDbObj *)sdbGetRow(tsDbSdb, db);
|
||||||
}
|
}
|
||||||
|
@ -174,7 +179,7 @@ SDbObj *mgmtGetDbByTableId(char *tableId) {
|
||||||
memset(db, 0, sizeof(db));
|
memset(db, 0, sizeof(db));
|
||||||
strncpy(db, tableId, pos - tableId);
|
strncpy(db, tableId, pos - tableId);
|
||||||
|
|
||||||
return (SDbObj *)sdbGetRow(tsDbSdb, db);
|
return mgmtGetDb(db);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mgmtCheckDbCfg(SDbCfg *pCfg) {
|
static int32_t mgmtCheckDbCfg(SDbCfg *pCfg) {
|
||||||
|
@ -397,7 +402,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
|
||||||
SSchema *pSchema = pMeta->schema;
|
SSchema *pSchema = pMeta->schema;
|
||||||
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
|
SUserObj *pUser = mgmtGetUserFromConn(pConn);
|
||||||
if (pUser == NULL) return 0;
|
if (pUser == NULL) return 0;
|
||||||
|
|
||||||
pShow->bytes[cols] = TSDB_DB_NAME_LEN;
|
pShow->bytes[cols] = TSDB_DB_NAME_LEN;
|
||||||
|
@ -545,11 +550,11 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
char * pWrite;
|
char * pWrite;
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
|
SUserObj *pUser = mgmtGetUserFromConn(pConn);
|
||||||
if (pUser == NULL) return 0;
|
if (pUser == NULL) return 0;
|
||||||
|
|
||||||
while (numOfRows < rows) {
|
while (numOfRows < rows) {
|
||||||
pShow->pNode = sdbFetchRow(tsDbSdb, pShow->pNode, (void **) &pDb);
|
pShow->pNode = mgmtGetNextDb(pShow->pNode, &pDb);
|
||||||
if (pDb == NULL) break;
|
if (pDb == NULL) break;
|
||||||
|
|
||||||
cols = 0;
|
cols = 0;
|
||||||
|
@ -674,8 +679,7 @@ static int32_t mgmtSetDbDropping(SDbObj *pDb) {
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.table = tsDbSdb,
|
.table = tsDbSdb,
|
||||||
.pObj = pDb,
|
.pObj = pDb
|
||||||
.rowSize = tsDbUpdateSize
|
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = sdbUpdateRow(&oper);
|
int32_t code = sdbUpdateRow(&oper);
|
||||||
|
@ -803,8 +807,7 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.table = tsDbSdb,
|
.table = tsDbSdb,
|
||||||
.pObj = pDb,
|
.pObj = pDb
|
||||||
.rowSize = tsDbUpdateSize
|
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = sdbUpdateRow(&oper);
|
int32_t code = sdbUpdateRow(&oper);
|
||||||
|
@ -839,21 +842,21 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDbObj *pDb = pMsg->pDb = mgmtGetDb(pAlter->db);
|
if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDb(pAlter->db);
|
||||||
if (pDb == NULL) {
|
if (pMsg->pDb == NULL) {
|
||||||
mError("db:%s, failed to alter, invalid db", pAlter->db);
|
mError("db:%s, failed to alter, invalid db", pAlter->db);
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB);
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = mgmtAlterDb(pDb, pAlter);
|
int32_t code = mgmtAlterDb(pMsg->pDb, pAlter);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
mError("db:%s, failed to alter, invalid db option", pAlter->db);
|
mError("db:%s, failed to alter, invalid db option", pAlter->db);
|
||||||
mgmtSendSimpleResp(pMsg->thandle, code);
|
mgmtSendSimpleResp(pMsg->thandle, code);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("db:%s, all vgroups is altered", pDb->name);
|
mTrace("db:%s, all vgroups is altered", pMsg->pDb->name);
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -884,8 +887,8 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDbObj *pDb = pMsg->pDb = mgmtGetDb(pDrop->db);
|
if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDb(pDrop->db);
|
||||||
if (pDb == NULL) {
|
if (pMsg->pDb == NULL) {
|
||||||
if (pDrop->ignoreNotExists) {
|
if (pDrop->ignoreNotExists) {
|
||||||
mTrace("db:%s, db is not exist, think drop success", pDrop->db);
|
mTrace("db:%s, db is not exist, think drop success", pDrop->db);
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
|
||||||
|
@ -897,20 +900,20 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
|
if (mgmtCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) {
|
||||||
mError("db:%s, can't drop monitor database", pDrop->db);
|
mError("db:%s, can't drop monitor database", pDrop->db);
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN);
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = mgmtSetDbDropping(pDb);
|
int32_t code = mgmtSetDbDropping(pMsg->pDb);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code));
|
mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code));
|
||||||
mgmtSendSimpleResp(pMsg->thandle, code);
|
mgmtSendSimpleResp(pMsg->thandle, code);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgObj *pVgroup = pDb->pHead;
|
SVgObj *pVgroup = pMsg->pDb->pHead;
|
||||||
if (pVgroup != NULL) {
|
if (pVgroup != NULL) {
|
||||||
mPrint("vgroup:%d, will be dropped", pVgroup->vgId);
|
mPrint("vgroup:%d, will be dropped", pVgroup->vgId);
|
||||||
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
|
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
|
||||||
|
@ -920,7 +923,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("db:%s, all vgroups is dropped", pDb->name);
|
mTrace("db:%s, all vgroups is dropped", pMsg->pDb->name);
|
||||||
mgmtDropDb(pMsg);
|
mgmtDropDb(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -932,7 +935,7 @@ void mgmtDropAllDbs(SAcctObj *pAcct) {
|
||||||
mPrint("acct:%s, all dbs will be dropped from sdb", pAcct->user);
|
mPrint("acct:%s, all dbs will be dropped from sdb", pAcct->user);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pNode = sdbFetchRow(tsDbSdb, pNode, (void **)&pDb);
|
pNode = mgmtGetNextDb(pNode, &pDb);
|
||||||
if (pDb == NULL) break;
|
if (pDb == NULL) break;
|
||||||
|
|
||||||
if (pDb->pAcct == pAcct) {
|
if (pDb->pAcct == pAcct) {
|
||||||
|
|
|
@ -36,9 +36,9 @@
|
||||||
#include "mgmtUser.h"
|
#include "mgmtUser.h"
|
||||||
#include "mgmtVgroup.h"
|
#include "mgmtVgroup.h"
|
||||||
|
|
||||||
void *tsDnodeSdb = NULL;
|
|
||||||
int32_t tsDnodeUpdateSize = 0;
|
|
||||||
int32_t tsAccessSquence = 0;
|
int32_t tsAccessSquence = 0;
|
||||||
|
static void *tsDnodeSdb = NULL;
|
||||||
|
static int32_t tsDnodeUpdateSize = 0;
|
||||||
extern void * tsMnodeSdb;
|
extern void * tsMnodeSdb;
|
||||||
extern void * tsVgroupSdb;
|
extern void * tsVgroupSdb;
|
||||||
|
|
||||||
|
@ -73,39 +73,12 @@ static int32_t mgmtDnodeActionInsert(SSdbOper *pOper) {
|
||||||
|
|
||||||
static int32_t mgmtDnodeActionDelete(SSdbOper *pOper) {
|
static int32_t mgmtDnodeActionDelete(SSdbOper *pOper) {
|
||||||
SDnodeObj *pDnode = pOper->pObj;
|
SDnodeObj *pDnode = pOper->pObj;
|
||||||
void * pNode = NULL;
|
|
||||||
void * pLastNode = NULL;
|
mgmtDropAllDnodeVgroups(pDnode);
|
||||||
SVgObj * pVgroup = NULL;
|
mgmtDropMnodeLocal(pDnode->dnodeId);
|
||||||
int32_t numOfVgroups = 0;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
pLastNode = pNode;
|
|
||||||
pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup);
|
|
||||||
if (pVgroup == NULL) break;
|
|
||||||
|
|
||||||
if (pVgroup->vnodeGid[0].dnodeId == pDnode->dnodeId) {
|
|
||||||
SSdbOper oper = {
|
|
||||||
.type = SDB_OPER_LOCAL,
|
|
||||||
.table = tsVgroupSdb,
|
|
||||||
.pObj = pVgroup,
|
|
||||||
};
|
|
||||||
sdbDeleteRow(&oper);
|
|
||||||
pNode = pLastNode;
|
|
||||||
numOfVgroups++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SMnodeObj *pMnode = mgmtGetMnode(pDnode->dnodeId);
|
|
||||||
if (pMnode != NULL) {
|
|
||||||
SSdbOper oper = {.type = SDB_OPER_LOCAL, .table = tsMnodeSdb, .pObj = pMnode};
|
|
||||||
sdbDeleteRow(&oper);
|
|
||||||
mgmtReleaseMnode(pMnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
balanceNotify();
|
balanceNotify();
|
||||||
|
|
||||||
mTrace("dnode:%d, all vgroups:%d is dropped from sdb", pDnode->dnodeId, numOfVgroups);
|
mTrace("dnode:%d, all vgroups is dropped from sdb", pDnode->dnodeId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,6 +89,7 @@ static int32_t mgmtDnodeActionUpdate(SSdbOper *pOper) {
|
||||||
memcpy(pSaved, pDnode, pOper->rowSize);
|
memcpy(pSaved, pDnode, pOper->rowSize);
|
||||||
free(pDnode);
|
free(pDnode);
|
||||||
}
|
}
|
||||||
|
mgmtDecDnodeRef(pSaved);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,7 +186,7 @@ void *mgmtGetDnodeByIp(char *ep) {
|
||||||
void * pNode = NULL;
|
void * pNode = NULL;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pNode = sdbFetchRow(tsDnodeSdb, pNode, (void**)&pDnode);
|
pNode = mgmtGetNextDnode(pNode, &pDnode);
|
||||||
if (pDnode == NULL) break;
|
if (pDnode == NULL) break;
|
||||||
if (strcmp(ep, pDnode->dnodeEp) == 0) {
|
if (strcmp(ep, pDnode->dnodeEp) == 0) {
|
||||||
return pDnode;
|
return pDnode;
|
||||||
|
@ -235,8 +209,7 @@ void mgmtUpdateDnode(SDnodeObj *pDnode) {
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.table = tsDnodeSdb,
|
.table = tsDnodeSdb,
|
||||||
.pObj = pDnode,
|
.pObj = pDnode
|
||||||
.rowSize = tsDnodeUpdateSize
|
|
||||||
};
|
};
|
||||||
|
|
||||||
sdbUpdateRow(&oper);
|
sdbUpdateRow(&oper);
|
||||||
|
@ -387,6 +360,7 @@ static int32_t mgmtCreateDnode(char *ep) {
|
||||||
|
|
||||||
SDnodeObj *pDnode = mgmtGetDnodeByIp(ep);
|
SDnodeObj *pDnode = mgmtGetDnodeByIp(ep);
|
||||||
if (pDnode != NULL) {
|
if (pDnode != NULL) {
|
||||||
|
mgmtDecDnodeRef(pDnode);
|
||||||
mError("dnode:%d is alredy exist, %s:%d", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodePort);
|
mError("dnode:%d is alredy exist, %s:%d", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodePort);
|
||||||
return TSDB_CODE_DNODE_ALREADY_EXIST;
|
return TSDB_CODE_DNODE_ALREADY_EXIST;
|
||||||
}
|
}
|
||||||
|
@ -440,6 +414,7 @@ static int32_t mgmtDropDnodeByIp(char *ep) {
|
||||||
return TSDB_CODE_DNODE_NOT_EXIST;
|
return TSDB_CODE_DNODE_NOT_EXIST;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mgmtDecDnodeRef(pDnode);
|
||||||
if (strcmp(pDnode->dnodeEp, dnodeGetMnodeMasterEp()) == 0) {
|
if (strcmp(pDnode->dnodeEp, dnodeGetMnodeMasterEp()) == 0) {
|
||||||
mError("dnode:%d, can't drop dnode:%s which is master", pDnode->dnodeId, ep);
|
mError("dnode:%d, can't drop dnode:%s which is master", pDnode->dnodeId, ep);
|
||||||
return TSDB_CODE_NO_REMOVE_MASTER;
|
return TSDB_CODE_NO_REMOVE_MASTER;
|
||||||
|
@ -464,6 +439,7 @@ static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg) {
|
||||||
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
|
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
|
||||||
SDnodeObj *pDnode = mgmtGetDnodeByIp(pCreate->ep);
|
SDnodeObj *pDnode = mgmtGetDnodeByIp(pCreate->ep);
|
||||||
mLPrint("dnode:%d, %s is created by %s", pDnode->dnodeId, pCreate->ep, pMsg->pUser->user);
|
mLPrint("dnode:%d, %s is created by %s", pDnode->dnodeId, pCreate->ep, pMsg->pUser->user);
|
||||||
|
mgmtDecDnodeRef(pDnode);
|
||||||
} else {
|
} else {
|
||||||
mError("failed to create dnode:%s, reason:%s", pCreate->ep, tstrerror(rpcRsp.code));
|
mError("failed to create dnode:%s, reason:%s", pCreate->ep, tstrerror(rpcRsp.code));
|
||||||
}
|
}
|
||||||
|
@ -492,7 +468,7 @@ static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
||||||
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
|
SUserObj *pUser = mgmtGetUserFromConn(pConn);
|
||||||
if (pUser == NULL) return 0;
|
if (pUser == NULL) return 0;
|
||||||
|
|
||||||
if (strcmp(pUser->pAcct->user, "root") != 0) {
|
if (strcmp(pUser->pAcct->user, "root") != 0) {
|
||||||
|
@ -609,7 +585,7 @@ static bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
|
||||||
static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
|
||||||
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
|
SUserObj *pUser = mgmtGetUserFromConn(pConn);
|
||||||
if (pUser == NULL) return 0;
|
if (pUser == NULL) return 0;
|
||||||
|
|
||||||
if (strcmp(pUser->user, "root") != 0) {
|
if (strcmp(pUser->user, "root") != 0) {
|
||||||
|
@ -719,7 +695,7 @@ static bool mgmtCheckConfigShow(SGlobalCfg *cfg) {
|
||||||
static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
|
||||||
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
|
SUserObj *pUser = mgmtGetUserFromConn(pConn);
|
||||||
if (pUser == NULL) return 0;
|
if (pUser == NULL) return 0;
|
||||||
|
|
||||||
if (strcmp(pUser->user, "root") != 0) {
|
if (strcmp(pUser->user, "root") != 0) {
|
||||||
|
@ -806,7 +782,7 @@ static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, vo
|
||||||
|
|
||||||
static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
|
SUserObj *pUser = mgmtGetUserFromConn(pConn);
|
||||||
if (pUser == NULL) return 0;
|
if (pUser == NULL) return 0;
|
||||||
|
|
||||||
if (strcmp(pUser->user, "root") != 0) {
|
if (strcmp(pUser->user, "root") != 0) {
|
||||||
|
|
|
@ -30,7 +30,7 @@
|
||||||
#include "mgmtShell.h"
|
#include "mgmtShell.h"
|
||||||
#include "mgmtUser.h"
|
#include "mgmtUser.h"
|
||||||
|
|
||||||
void * tsMnodeSdb = NULL;
|
static void * tsMnodeSdb = NULL;
|
||||||
static int32_t tsMnodeUpdateSize = 0;
|
static int32_t tsMnodeUpdateSize = 0;
|
||||||
static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
||||||
static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||||
|
@ -71,7 +71,7 @@ static int32_t mgmtMnodeActionUpdate(SSdbOper *pOper) {
|
||||||
memcpy(pSaved, pMnode, pOper->rowSize);
|
memcpy(pSaved, pMnode, pOper->rowSize);
|
||||||
free(pMnode);
|
free(pMnode);
|
||||||
}
|
}
|
||||||
|
mgmtDecMnodeRef(pSaved);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,7 +97,7 @@ static int32_t mgmtMnodeActionRestored() {
|
||||||
mgmtGetNextMnode(NULL, &pMnode);
|
mgmtGetNextMnode(NULL, &pMnode);
|
||||||
if (pMnode != NULL) {
|
if (pMnode != NULL) {
|
||||||
pMnode->role = TAOS_SYNC_ROLE_MASTER;
|
pMnode->role = TAOS_SYNC_ROLE_MASTER;
|
||||||
mgmtReleaseMnode(pMnode);
|
mgmtDecMnodeRef(pMnode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -148,7 +148,11 @@ void *mgmtGetMnode(int32_t mnodeId) {
|
||||||
return sdbGetRow(tsMnodeSdb, &mnodeId);
|
return sdbGetRow(tsMnodeSdb, &mnodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtReleaseMnode(SMnodeObj *pMnode) {
|
void mgmtIncMnodeRef(SMnodeObj *pMnode) {
|
||||||
|
sdbIncRef(tsMnodeSdb, pMnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mgmtDecMnodeRef(SMnodeObj *pMnode) {
|
||||||
sdbDecRef(tsMnodeSdb, pMnode);
|
sdbDecRef(tsMnodeSdb, pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,7 +191,7 @@ void mgmtGetMnodeIpSet(SRpcIpSet *ipSet) {
|
||||||
|
|
||||||
ipSet->numOfIps++;
|
ipSet->numOfIps++;
|
||||||
|
|
||||||
mgmtReleaseMnode(pMnode);
|
mgmtDecMnodeRef(pMnode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,7 +213,7 @@ void mgmtGetMnodeInfos(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
index++;
|
index++;
|
||||||
mgmtReleaseMnode(pMnode);
|
mgmtDecMnodeRef(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
mnodes->nodeNum = index;
|
mnodes->nodeNum = index;
|
||||||
|
@ -235,8 +239,17 @@ int32_t mgmtAddMnode(int32_t dnodeId) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void mgmtDropMnodeLocal(int32_t dnodeId) {
|
||||||
|
SMnodeObj *pMnode = mgmtGetMnode(dnodeId);
|
||||||
|
if (pMnode != NULL) {
|
||||||
|
SSdbOper oper = {.type = SDB_OPER_LOCAL, .table = tsMnodeSdb, .pObj = pMnode};
|
||||||
|
sdbDeleteRow(&oper);
|
||||||
|
mgmtDecMnodeRef(pMnode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mgmtDropMnode(int32_t dnodeId) {
|
int32_t mgmtDropMnode(int32_t dnodeId) {
|
||||||
SMnodeObj *pMnode = sdbGetRow(tsMnodeSdb, &dnodeId);
|
SMnodeObj *pMnode = mgmtGetMnode(dnodeId);
|
||||||
if (pMnode == NULL) {
|
if (pMnode == NULL) {
|
||||||
return TSDB_CODE_DNODE_NOT_EXIST;
|
return TSDB_CODE_DNODE_NOT_EXIST;
|
||||||
}
|
}
|
||||||
|
@ -258,7 +271,7 @@ int32_t mgmtDropMnode(int32_t dnodeId) {
|
||||||
|
|
||||||
static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
||||||
sdbUpdateMnodeRoles();
|
sdbUpdateMnodeRoles();
|
||||||
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
|
SUserObj *pUser = mgmtGetUserFromConn(pConn);
|
||||||
if (pUser == NULL) return 0;
|
if (pUser == NULL) return 0;
|
||||||
|
|
||||||
if (strcmp(pUser->pAcct->user, "root") != 0) {
|
if (strcmp(pUser->pAcct->user, "root") != 0) {
|
||||||
|
@ -339,7 +352,7 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
|
|
||||||
mgmtReleaseMnode(pMnode);
|
mgmtDecMnodeRef(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
pShow->numOfReads += numOfRows;
|
pShow->numOfReads += numOfRows;
|
||||||
|
|
|
@ -675,7 +675,7 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn
|
||||||
void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) {
|
void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) {
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||||
|
|
||||||
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL);
|
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle);
|
||||||
if (pUser == NULL) {
|
if (pUser == NULL) {
|
||||||
rpcRsp.code = TSDB_CODE_INVALID_USER;
|
rpcRsp.code = TSDB_CODE_INVALID_USER;
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
|
@ -699,7 +699,7 @@ void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) {
|
||||||
void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) {
|
void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) {
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||||
|
|
||||||
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL);
|
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle);
|
||||||
if (pUser == NULL) {
|
if (pUser == NULL) {
|
||||||
rpcRsp.code = TSDB_CODE_INVALID_USER;
|
rpcRsp.code = TSDB_CODE_INVALID_USER;
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
|
@ -723,7 +723,7 @@ void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) {
|
||||||
void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) {
|
void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) {
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||||
|
|
||||||
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL);
|
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle);
|
||||||
if (pUser == NULL) {
|
if (pUser == NULL) {
|
||||||
rpcRsp.code = TSDB_CODE_INVALID_USER;
|
rpcRsp.code = TSDB_CODE_INVALID_USER;
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
|
|
|
@ -184,7 +184,7 @@ void sdbUpdateMnodeRoles() {
|
||||||
if (pMnode != NULL) {
|
if (pMnode != NULL) {
|
||||||
pMnode->role = roles.role[i];
|
pMnode->role = roles.role[i];
|
||||||
sdbPrint("mnode:%d, role:%s", pMnode->mnodeId, mgmtGetMnodeRoleStr(pMnode->role));
|
sdbPrint("mnode:%d, role:%s", pMnode->mnodeId, mgmtGetMnodeRoleStr(pMnode->role));
|
||||||
mgmtReleaseMnode(pMnode);
|
mgmtDecMnodeRef(pMnode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -252,7 +252,7 @@ void sdbUpdateSync() {
|
||||||
strcpy(syncCfg.nodeInfo[index].nodeFqdn, pMnode->pDnode->dnodeEp);
|
strcpy(syncCfg.nodeInfo[index].nodeFqdn, pMnode->pDnode->dnodeEp);
|
||||||
index++;
|
index++;
|
||||||
|
|
||||||
mgmtReleaseMnode(pMnode);
|
mgmtDecMnodeRef(pMnode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -421,6 +421,7 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) {
|
||||||
code = TSDB_CODE_INVALID_DB;
|
code = TSDB_CODE_INVALID_DB;
|
||||||
goto connect_over;
|
goto connect_over;
|
||||||
}
|
}
|
||||||
|
mgmtDecDbRef(pDb);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCMConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp));
|
SCMConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp));
|
||||||
|
@ -454,9 +455,8 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) {
|
||||||
SCMUseDbMsg *pUseDbMsg = pMsg->pCont;
|
SCMUseDbMsg *pUseDbMsg = pMsg->pCont;
|
||||||
|
|
||||||
// todo check for priority of current user
|
// todo check for priority of current user
|
||||||
pMsg->pDb = mgmtGetDb(pUseDbMsg->db);
|
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDb(pUseDbMsg->db);
|
||||||
if (pMsg->pDb == NULL) {
|
if (pMsg->pDb == NULL) {
|
||||||
code = TSDB_CODE_INVALID_DB;
|
code = TSDB_CODE_INVALID_DB;
|
||||||
}
|
}
|
||||||
|
@ -470,7 +470,7 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) {
|
||||||
*/
|
*/
|
||||||
static bool mgmtCheckTableMetaMsgReadOnly(SQueuedMsg *pMsg) {
|
static bool mgmtCheckTableMetaMsgReadOnly(SQueuedMsg *pMsg) {
|
||||||
SCMTableInfoMsg *pInfo = pMsg->pCont;
|
SCMTableInfoMsg *pInfo = pMsg->pCont;
|
||||||
pMsg->pTable = mgmtGetTable(pInfo->tableId);
|
if (pMsg->pTable == NULL) pMsg->pTable = mgmtGetTable(pInfo->tableId);
|
||||||
if (pMsg->pTable != NULL) return true;
|
if (pMsg->pTable != NULL) return true;
|
||||||
|
|
||||||
// If table does not exists and autoCreate flag is set, we add the handler into task queue
|
// If table does not exists and autoCreate flag is set, we add the handler into task queue
|
||||||
|
@ -551,8 +551,7 @@ void mgmtFreeQhandle(void *qhandle, bool forceRemove) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) {
|
void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) {
|
||||||
bool usePublicIp = false;
|
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
|
||||||
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle, &usePublicIp);
|
|
||||||
if (pUser == NULL) {
|
if (pUser == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -563,7 +562,6 @@ void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) {
|
||||||
pMsg->contLen = rpcMsg->contLen;
|
pMsg->contLen = rpcMsg->contLen;
|
||||||
pMsg->pCont = rpcMsg->pCont;
|
pMsg->pCont = rpcMsg->pCont;
|
||||||
pMsg->pUser = pUser;
|
pMsg->pUser = pUser;
|
||||||
pMsg->usePublicIp = usePublicIp;
|
|
||||||
|
|
||||||
return pMsg;
|
return pMsg;
|
||||||
}
|
}
|
||||||
|
@ -591,8 +589,7 @@ void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) {
|
||||||
pDestMsg->retry = pSrcMsg->retry;
|
pDestMsg->retry = pSrcMsg->retry;
|
||||||
pDestMsg->maxRetry= pSrcMsg->maxRetry;
|
pDestMsg->maxRetry= pSrcMsg->maxRetry;
|
||||||
pDestMsg->pUser = pSrcMsg->pUser;
|
pDestMsg->pUser = pSrcMsg->pUser;
|
||||||
pDestMsg->usePublicIp = pSrcMsg->usePublicIp;
|
|
||||||
|
|
||||||
pSrcMsg->pCont = NULL;
|
pSrcMsg->pCont = NULL;
|
||||||
pSrcMsg->pUser = NULL;
|
pSrcMsg->pUser = NULL;
|
||||||
|
|
||||||
|
|
|
@ -41,8 +41,8 @@
|
||||||
#include "mgmtVgroup.h"
|
#include "mgmtVgroup.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
|
|
||||||
void * tsChildTableSdb;
|
static void * tsChildTableSdb;
|
||||||
void * tsSuperTableSdb;
|
static void * tsSuperTableSdb;
|
||||||
static int32_t tsChildTableUpdateSize;
|
static int32_t tsChildTableUpdateSize;
|
||||||
static int32_t tsSuperTableUpdateSize;
|
static int32_t tsSuperTableUpdateSize;
|
||||||
static void * mgmtGetChildTable(char *tableId);
|
static void * mgmtGetChildTable(char *tableId);
|
||||||
|
@ -117,6 +117,7 @@ static int32_t mgmtChildTableActionInsert(SSdbOper *pOper) {
|
||||||
mgmtDecAcctRef(pAcct);
|
mgmtDecAcctRef(pAcct);
|
||||||
|
|
||||||
if (pTable->info.type == TSDB_CHILD_TABLE) {
|
if (pTable->info.type == TSDB_CHILD_TABLE) {
|
||||||
|
// add ref
|
||||||
pTable->superTable = mgmtGetSuperTable(pTable->superTableId);
|
pTable->superTable = mgmtGetSuperTable(pTable->superTableId);
|
||||||
mgmtAddTableIntoStable(pTable->superTable, pTable);
|
mgmtAddTableIntoStable(pTable->superTable, pTable);
|
||||||
grantAdd(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1);
|
grantAdd(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1);
|
||||||
|
@ -186,6 +187,7 @@ static int32_t mgmtChildTableActionUpdate(SSdbOper *pOper) {
|
||||||
free(oldSql);
|
free(oldSql);
|
||||||
free(oldSchema);
|
free(oldSchema);
|
||||||
}
|
}
|
||||||
|
mgmtDecTableRef(pTable);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -250,7 +252,7 @@ static int32_t mgmtChildTableActionRestored() {
|
||||||
while (1) {
|
while (1) {
|
||||||
pLastNode = pNode;
|
pLastNode = pNode;
|
||||||
mgmtDecTableRef(pTable);
|
mgmtDecTableRef(pTable);
|
||||||
pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable);
|
pNode = mgmtGetNextChildTable(pNode, &pTable);
|
||||||
if (pTable == NULL) break;
|
if (pTable == NULL) break;
|
||||||
|
|
||||||
SDbObj *pDb = mgmtGetDbByTableId(pTable->info.tableId);
|
SDbObj *pDb = mgmtGetDbByTableId(pTable->info.tableId);
|
||||||
|
@ -435,7 +437,7 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) {
|
||||||
free(pNew);
|
free(pNew);
|
||||||
free(oldSchema);
|
free(oldSchema);
|
||||||
}
|
}
|
||||||
|
mgmtDecTableRef(pTable);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -558,20 +560,28 @@ static void *mgmtGetSuperTable(char *tableId) {
|
||||||
return sdbGetRow(tsSuperTableSdb, tableId);
|
return sdbGetRow(tsSuperTableSdb, tableId);
|
||||||
}
|
}
|
||||||
|
|
||||||
STableObj *mgmtGetTable(char *tableId) {
|
void *mgmtGetTable(char *tableId) {
|
||||||
STableObj *tableInfo = sdbGetRow(tsSuperTableSdb, tableId);
|
void *pTable = mgmtGetSuperTable(tableId);
|
||||||
if (tableInfo != NULL) {
|
if (pTable != NULL) {
|
||||||
return tableInfo;
|
return pTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
tableInfo = sdbGetRow(tsChildTableSdb, tableId);
|
pTable = mgmtGetChildTable(tableId);
|
||||||
if (tableInfo != NULL) {
|
if (pTable != NULL) {
|
||||||
return tableInfo;
|
return pTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *mgmtGetNextChildTable(void *pNode, SChildTableObj **pTable) {
|
||||||
|
return sdbFetchRow(tsChildTableSdb, pNode, (void **)pTable);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *mgmtGetNextSuperTable(void *pNode, SSuperTableObj **pTable) {
|
||||||
|
return sdbFetchRow(tsSuperTableSdb, pNode, (void **)pTable);
|
||||||
|
}
|
||||||
|
|
||||||
void mgmtIncTableRef(void *p1) {
|
void mgmtIncTableRef(void *p1) {
|
||||||
STableObj *pTable = (STableObj *)p1;
|
STableObj *pTable = (STableObj *)p1;
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||||
|
@ -787,8 +797,6 @@ static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) {
|
||||||
mgmtDecVgroupRef(pVgroup);
|
mgmtDecVgroupRef(pVgroup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//mError("stable:%s, numOfTables:%d not 0", pStable->info.tableId, pStable->numOfTables);
|
|
||||||
//mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS);
|
|
||||||
} else {
|
} else {
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
|
@ -846,8 +854,7 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.table = tsSuperTableSdb,
|
.table = tsSuperTableSdb,
|
||||||
.pObj = pStable,
|
.pObj = pStable
|
||||||
.rowSize = tsSuperTableUpdateSize
|
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = sdbUpdateRow(&oper);
|
int32_t code = sdbUpdateRow(&oper);
|
||||||
|
@ -874,8 +881,7 @@ static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.table = tsSuperTableSdb,
|
.table = tsSuperTableSdb,
|
||||||
.pObj = pStable,
|
.pObj = pStable
|
||||||
.rowSize = tsSuperTableUpdateSize
|
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = sdbUpdateRow(&oper);
|
int32_t code = sdbUpdateRow(&oper);
|
||||||
|
@ -911,8 +917,7 @@ static int32_t mgmtModifySuperTableTagName(SSuperTableObj *pStable, char *oldTag
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.table = tsSuperTableSdb,
|
.table = tsSuperTableSdb,
|
||||||
.pObj = pStable,
|
.pObj = pStable
|
||||||
.rowSize = tsSuperTableUpdateSize
|
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = sdbUpdateRow(&oper);
|
int32_t code = sdbUpdateRow(&oper);
|
||||||
|
@ -977,8 +982,7 @@ static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSc
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.table = tsSuperTableSdb,
|
.table = tsSuperTableSdb,
|
||||||
.pObj = pStable,
|
.pObj = pStable
|
||||||
.rowSize = tsSuperTableUpdateSize
|
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = sdbUpdateRow(&oper);
|
int32_t code = sdbUpdateRow(&oper);
|
||||||
|
@ -1015,8 +1019,7 @@ static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, ch
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.table = tsSuperTableSdb,
|
.table = tsSuperTableSdb,
|
||||||
.pObj = pStable,
|
.pObj = pStable
|
||||||
.rowSize = tsSuperTableUpdateSize
|
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = sdbUpdateRow(&oper);
|
int32_t code = sdbUpdateRow(&oper);
|
||||||
|
@ -1099,7 +1102,8 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
|
||||||
char stableName[TSDB_TABLE_NAME_LEN] = {0};
|
char stableName[TSDB_TABLE_NAME_LEN] = {0};
|
||||||
|
|
||||||
while (numOfRows < rows) {
|
while (numOfRows < rows) {
|
||||||
pShow->pNode = sdbFetchRow(tsSuperTableSdb, pShow->pNode, (void **) &pTable);
|
mgmtDecTableRef(pTable);
|
||||||
|
pShow->pNode = mgmtGetNextSuperTable(pShow->pNode, &pTable);
|
||||||
if (pTable == NULL) break;
|
if (pTable == NULL) break;
|
||||||
if (strncmp(pTable->info.tableId, prefix, prefixLen)) {
|
if (strncmp(pTable->info.tableId, prefix, prefixLen)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -1135,8 +1139,6 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
mgmtDecTableRef(pTable);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pShow->numOfReads += numOfRows;
|
pShow->numOfReads += numOfRows;
|
||||||
|
@ -1155,7 +1157,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
|
||||||
mPrint("db:%s, all super tables will be dropped from sdb", pDropDb->name);
|
mPrint("db:%s, all super tables will be dropped from sdb", pDropDb->name);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **)&pTable);
|
pNode = mgmtGetNextSuperTable(pNode, &pTable);
|
||||||
if (pTable == NULL) break;
|
if (pTable == NULL) break;
|
||||||
|
|
||||||
if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) {
|
if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) {
|
||||||
|
@ -1213,14 +1215,13 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) {
|
||||||
|
|
||||||
static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
|
static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
|
||||||
SCMSTableVgroupMsg *pInfo = pMsg->pCont;
|
SCMSTableVgroupMsg *pInfo = pMsg->pCont;
|
||||||
SSuperTableObj *pTable = mgmtGetSuperTable(pInfo->tableId);
|
if (pMsg->pTable == NULL) pMsg->pTable = mgmtGetSuperTable(pInfo->tableId);
|
||||||
|
|
||||||
pMsg->pTable = (STableObj *)pTable;
|
|
||||||
if (pMsg->pTable == NULL) {
|
if (pMsg->pTable == NULL) {
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
|
||||||
int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + sizeof(SCMVgroupInfo) * pTable->vgLen;
|
int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + sizeof(SCMVgroupInfo) * pTable->vgLen;
|
||||||
SCMSTableVgroupRspMsg *pRsp = rpcMallocCont(contLen);
|
SCMSTableVgroupRspMsg *pRsp = rpcMallocCont(contLen);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
|
@ -1437,10 +1438,14 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMsg->retry == 0) {
|
if (pMsg->retry == 0) {
|
||||||
pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid);
|
if (pMsg->pTable == NULL) {
|
||||||
|
pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid);
|
||||||
|
mgmtIncTableRef(pMsg->pTable);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
pMsg->pTable = mgmtGetTable(pCreate->tableId);
|
if (pMsg->pTable == NULL) pMsg->pTable = mgmtGetTable(pCreate->tableId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMsg->pTable == NULL) {
|
if (pMsg->pTable == NULL) {
|
||||||
mgmtSendSimpleResp(pMsg->thandle, terrno);
|
mgmtSendSimpleResp(pMsg->thandle, terrno);
|
||||||
return;
|
return;
|
||||||
|
@ -1456,7 +1461,6 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
|
||||||
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
|
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
|
||||||
newMsg->ahandle = pMsg->pTable;
|
newMsg->ahandle = pMsg->pTable;
|
||||||
newMsg->maxRetry = 5;
|
newMsg->maxRetry = 5;
|
||||||
mgmtIncTableRef(pMsg->pTable);
|
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.handle = newMsg,
|
.handle = newMsg,
|
||||||
.pCont = pMDCreate,
|
.pCont = pMDCreate,
|
||||||
|
@ -1470,8 +1474,8 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
|
||||||
|
|
||||||
static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) {
|
static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) {
|
||||||
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
|
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
|
||||||
SVgObj *pVgroup = pMsg->pVgroup = mgmtGetVgroup(pTable->vgId);
|
if (pMsg->pVgroup == NULL) pMsg->pVgroup = mgmtGetVgroup(pTable->vgId);
|
||||||
if (pVgroup == NULL) {
|
if (pMsg->pVgroup == NULL) {
|
||||||
mError("table:%s, failed to drop ctable, vgroup not exist", pTable->info.tableId);
|
mError("table:%s, failed to drop ctable, vgroup not exist", pTable->info.tableId);
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS);
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS);
|
||||||
return;
|
return;
|
||||||
|
@ -1490,7 +1494,7 @@ static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) {
|
||||||
pDrop->sid = htonl(pTable->sid);
|
pDrop->sid = htonl(pTable->sid);
|
||||||
pDrop->uid = htobe64(pTable->uid);
|
pDrop->uid = htobe64(pTable->uid);
|
||||||
|
|
||||||
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
|
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pMsg->pVgroup);
|
||||||
|
|
||||||
mTrace("table:%s, send drop ctable msg", pDrop->tableId);
|
mTrace("table:%s, send drop ctable msg", pDrop->tableId);
|
||||||
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
|
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
|
||||||
|
@ -1556,8 +1560,7 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.table = tsChildTableSdb,
|
.table = tsChildTableSdb,
|
||||||
.pObj = pTable,
|
.pObj = pTable
|
||||||
.rowSize = tsChildTableUpdateSize
|
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = sdbUpdateRow(&oper);
|
int32_t code = sdbUpdateRow(&oper);
|
||||||
|
@ -1589,8 +1592,7 @@ static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, ch
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.table = tsChildTableSdb,
|
.table = tsChildTableSdb,
|
||||||
.pObj = pTable,
|
.pObj = pTable
|
||||||
.rowSize = tsChildTableUpdateSize
|
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = sdbUpdateRow(&oper);
|
int32_t code = sdbUpdateRow(&oper);
|
||||||
|
@ -1638,21 +1640,21 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
|
||||||
pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromNormalTable(pMeta->schema, pTable);
|
pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromNormalTable(pMeta->schema, pTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgObj *pVgroup = pMsg->pVgroup = mgmtGetVgroup(pTable->vgId);
|
if (pMsg->pVgroup == NULL) pMsg->pVgroup = mgmtGetVgroup(pTable->vgId);
|
||||||
if (pVgroup == NULL) {
|
if (pMsg->pVgroup == NULL) {
|
||||||
mError("table:%s, failed to get table meta, db not selected", pTable->info.tableId);
|
mError("table:%s, failed to get table meta, db not selected", pTable->info.tableId);
|
||||||
return TSDB_CODE_INVALID_VGROUP_ID;
|
return TSDB_CODE_INVALID_VGROUP_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
for (int32_t i = 0; i < pMsg->pVgroup->numOfVnodes; ++i) {
|
||||||
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId);
|
SDnodeObj *pDnode = mgmtGetDnode(pMsg->pVgroup->vnodeGid[i].dnodeId);
|
||||||
if (pDnode == NULL) break;
|
if (pDnode == NULL) break;
|
||||||
strcpy(pMeta->vgroup.ipAddr[i].fqdn, pDnode->dnodeFqdn);
|
strcpy(pMeta->vgroup.ipAddr[i].fqdn, pDnode->dnodeFqdn);
|
||||||
pMeta->vgroup.ipAddr[i].port = htons(pDnode->dnodePort + TSDB_PORT_DNODESHELL);
|
pMeta->vgroup.ipAddr[i].port = htons(pDnode->dnodePort + TSDB_PORT_DNODESHELL);
|
||||||
pMeta->vgroup.numOfIps++;
|
pMeta->vgroup.numOfIps++;
|
||||||
mgmtDecDnodeRef(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
}
|
}
|
||||||
pMeta->vgroup.vgId = htonl(pVgroup->vgId);
|
pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId);
|
||||||
|
|
||||||
mTrace("table:%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid);
|
mTrace("table:%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid);
|
||||||
|
|
||||||
|
@ -1714,7 +1716,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) {
|
||||||
mPrint("db:%s, all child tables will be dropped from sdb", pDropDb->name);
|
mPrint("db:%s, all child tables will be dropped from sdb", pDropDb->name);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable);
|
pNode = mgmtGetNextChildTable(pNode, &pTable);
|
||||||
if (pTable == NULL) break;
|
if (pTable == NULL) break;
|
||||||
|
|
||||||
if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) {
|
if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) {
|
||||||
|
@ -1742,7 +1744,7 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
|
||||||
mPrint("stable:%s, all child tables will dropped from sdb", pStable->info.tableId, numOfTables);
|
mPrint("stable:%s, all child tables will dropped from sdb", pStable->info.tableId, numOfTables);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable);
|
pNode = mgmtGetNextChildTable(pNode, &pTable);
|
||||||
if (pTable == NULL) break;
|
if (pTable == NULL) break;
|
||||||
|
|
||||||
if (pTable->superTable == pStable) {
|
if (pTable->superTable == pStable) {
|
||||||
|
@ -1762,16 +1764,13 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
|
||||||
mPrint("stable:%s, all child tables:%d is dropped from sdb", pStable->info.tableId, numOfTables);
|
mPrint("stable:%s, all child tables:%d is dropped from sdb", pStable->info.tableId, numOfTables);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_t sid) {
|
static SChildTableObj* mgmtGetTableByPos(int32_t vnode, int32_t sid) {
|
||||||
SDnodeObj *pObj = mgmtGetDnode(dnodeId);
|
|
||||||
SVgObj *pVgroup = mgmtGetVgroup(vnode);
|
SVgObj *pVgroup = mgmtGetVgroup(vnode);
|
||||||
|
if (pVgroup == NULL) return NULL;
|
||||||
if (pObj == NULL || pVgroup == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SChildTableObj *pTable = pVgroup->tableList[sid];
|
SChildTableObj *pTable = pVgroup->tableList[sid];
|
||||||
mgmtIncTableRef((STableObj *)pTable);
|
mgmtIncTableRef((STableObj *)pTable);
|
||||||
|
|
||||||
mgmtDecVgroupRef(pVgroup);
|
mgmtDecVgroupRef(pVgroup);
|
||||||
return pTable;
|
return pTable;
|
||||||
}
|
}
|
||||||
|
@ -1783,7 +1782,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) {
|
||||||
pCfg->sid = htonl(pCfg->sid);
|
pCfg->sid = htonl(pCfg->sid);
|
||||||
mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
|
mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
|
||||||
|
|
||||||
SChildTableObj *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid);
|
SChildTableObj *pTable = mgmtGetTableByPos(pCfg->vnode, pCfg->sid);
|
||||||
if (pTable == NULL) {
|
if (pTable == NULL) {
|
||||||
mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
|
mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
|
||||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_TABLE);
|
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_TABLE);
|
||||||
|
@ -1798,6 +1797,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) {
|
||||||
mgmtDecTableRef(pTable);
|
mgmtDecTableRef(pTable);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDnodeObj *pDnode = mgmtGetDnode(pCfg->dnode);
|
SDnodeObj *pDnode = mgmtGetDnode(pCfg->dnode);
|
||||||
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->dnodeEp);
|
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->dnodeEp);
|
||||||
SRpcMsg rpcRsp = {
|
SRpcMsg rpcRsp = {
|
||||||
|
@ -1808,7 +1808,9 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) {
|
||||||
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
|
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
|
||||||
};
|
};
|
||||||
mgmtSendMsgToDnode(&ipSet, &rpcRsp);
|
mgmtSendMsgToDnode(&ipSet, &rpcRsp);
|
||||||
|
|
||||||
mgmtDecTableRef(pTable);
|
mgmtDecTableRef(pTable);
|
||||||
|
mgmtDecDnodeRef(pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
// handle drop child response
|
// handle drop child response
|
||||||
|
@ -1829,8 +1831,8 @@ static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgObj *pVgroup = queueMsg->pVgroup = mgmtGetVgroup(pTable->vgId);
|
if (queueMsg->pVgroup == NULL) queueMsg->pVgroup = mgmtGetVgroup(pTable->vgId);
|
||||||
if (pVgroup == NULL) {
|
if (queueMsg->pVgroup == NULL) {
|
||||||
mError("table:%s, failed to get vgroup", pTable->info.tableId);
|
mError("table:%s, failed to get vgroup", pTable->info.tableId);
|
||||||
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID);
|
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID);
|
||||||
return;
|
return;
|
||||||
|
@ -1849,9 +1851,9 @@ static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVgroup->numOfTables <= 0) {
|
if (queueMsg->pVgroup->numOfTables <= 0) {
|
||||||
mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId);
|
mPrint("vgroup:%d, all tables is dropped, drop vgroup", queueMsg->pVgroup->vgId);
|
||||||
mgmtDropVgroup(pVgroup, NULL);
|
mgmtDropVgroup(queueMsg->pVgroup, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS);
|
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS);
|
||||||
|
@ -1928,8 +1930,8 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) {
|
||||||
SChildTableObj *pTable = mgmtGetChildTable(tableId);
|
SChildTableObj *pTable = mgmtGetChildTable(tableId);
|
||||||
if (pTable == NULL) continue;
|
if (pTable == NULL) continue;
|
||||||
|
|
||||||
SDbObj *pDb = mgmtGetDbByTableId(tableId);
|
if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDbByTableId(tableId);
|
||||||
if (pDb == NULL) continue;
|
if (pMsg->pDb == NULL) continue;
|
||||||
|
|
||||||
int availLen = totalMallocLen - pMultiMeta->contLen;
|
int availLen = totalMallocLen - pMultiMeta->contLen;
|
||||||
if (availLen <= sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS) {
|
if (availLen <= sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS) {
|
||||||
|
@ -2028,7 +2030,8 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
|
||||||
int32_t prefixLen = strlen(prefix);
|
int32_t prefixLen = strlen(prefix);
|
||||||
|
|
||||||
while (numOfRows < rows) {
|
while (numOfRows < rows) {
|
||||||
pShow->pNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable);
|
mgmtDecTableRef(pTable);
|
||||||
|
pShow->pNode = mgmtGetNextChildTable(pShow->pNode, &pTable);
|
||||||
if (pTable == NULL) break;
|
if (pTable == NULL) break;
|
||||||
|
|
||||||
// not belong to current db
|
// not belong to current db
|
||||||
|
@ -2072,7 +2075,6 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
mgmtDecTableRef(pTable);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pShow->numOfReads += numOfRows;
|
pShow->numOfReads += numOfRows;
|
||||||
|
@ -2088,7 +2090,7 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
|
||||||
SCMAlterTableMsg *pAlter = pMsg->pCont;
|
SCMAlterTableMsg *pAlter = pMsg->pCont;
|
||||||
mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle);
|
mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle);
|
||||||
|
|
||||||
pMsg->pDb = mgmtGetDbByTableId(pAlter->tableId);
|
if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDbByTableId(pAlter->tableId);
|
||||||
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
|
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
|
||||||
mError("table:%s, failed to alter table, db not selected", pAlter->tableId);
|
mError("table:%s, failed to alter table, db not selected", pAlter->tableId);
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
|
||||||
|
@ -2101,7 +2103,7 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMsg->pTable = mgmtGetTable(pAlter->tableId);
|
if (pMsg->pTable == NULL) pMsg->pTable = mgmtGetTable(pAlter->tableId);
|
||||||
if (pMsg->pTable == NULL) {
|
if (pMsg->pTable == NULL) {
|
||||||
mError("table:%s, failed to alter table, table not exist", pMsg->pTable->tableId);
|
mError("table:%s, failed to alter table, table not exist", pMsg->pTable->tableId);
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
|
||||||
|
|
|
@ -168,8 +168,7 @@ static int32_t mgmtUpdateUser(SUserObj *pUser) {
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.table = tsUserSdb,
|
.table = tsUserSdb,
|
||||||
.pObj = pUser,
|
.pObj = pUser
|
||||||
.rowSize = tsUserUpdateSize
|
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = sdbUpdateRow(&oper);
|
int32_t code = sdbUpdateRow(&oper);
|
||||||
|
@ -249,7 +248,7 @@ static int32_t mgmtDropUser(SUserObj *pUser) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
||||||
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
|
SUserObj *pUser = mgmtGetUserFromConn(pConn);
|
||||||
if (pUser == NULL) {
|
if (pUser == NULL) {
|
||||||
return TSDB_CODE_NO_USER_FROM_CONN;
|
return TSDB_CODE_NO_USER_FROM_CONN;
|
||||||
}
|
}
|
||||||
|
@ -298,7 +297,7 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void
|
||||||
char *pWrite;
|
char *pWrite;
|
||||||
|
|
||||||
while (numOfRows < rows) {
|
while (numOfRows < rows) {
|
||||||
pShow->pNode = sdbFetchRow(tsUserSdb, pShow->pNode, (void **) &pUser);
|
pShow->pNode = mgmtGetNextUser(pShow->pNode, &pUser);
|
||||||
if (pUser == NULL) break;
|
if (pUser == NULL) break;
|
||||||
|
|
||||||
cols = 0;
|
cols = 0;
|
||||||
|
@ -329,12 +328,9 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp) {
|
SUserObj *mgmtGetUserFromConn(void *pConn) {
|
||||||
SRpcConnInfo connInfo;
|
SRpcConnInfo connInfo;
|
||||||
if (rpcGetConnInfo(pConn, &connInfo) == 0) {
|
if (rpcGetConnInfo(pConn, &connInfo) == 0) {
|
||||||
if (usePublicIp) {
|
|
||||||
*usePublicIp = (connInfo.serverIp == tsPublicIpInt);
|
|
||||||
}
|
|
||||||
return mgmtGetUser(connInfo.user);
|
return mgmtGetUser(connInfo.user);
|
||||||
} else {
|
} else {
|
||||||
mError("can not get user from conn:%p", pConn);
|
mError("can not get user from conn:%p", pConn);
|
||||||
|
@ -510,7 +506,7 @@ void mgmtDropAllUsers(SAcctObj *pAcct) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pLastNode = pNode;
|
pLastNode = pNode;
|
||||||
pNode = sdbFetchRow(tsUserSdb, pNode, (void **)&pUser);
|
pNode = mgmtGetNextUser(pNode, &pUser);
|
||||||
if (pUser == NULL) break;
|
if (pUser == NULL) break;
|
||||||
|
|
||||||
if (strncmp(pUser->acct, pAcct->user, acctNameLen) == 0) {
|
if (strncmp(pUser->acct, pAcct->user, acctNameLen) == 0) {
|
||||||
|
|
|
@ -36,8 +36,8 @@
|
||||||
#include "mgmtTable.h"
|
#include "mgmtTable.h"
|
||||||
#include "mgmtVgroup.h"
|
#include "mgmtVgroup.h"
|
||||||
|
|
||||||
void *tsVgroupSdb = NULL;
|
static void *tsVgroupSdb = NULL;
|
||||||
int32_t tsVgUpdateSize = 0;
|
static int32_t tsVgUpdateSize = 0;
|
||||||
|
|
||||||
static int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
static int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
||||||
static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||||
|
@ -62,6 +62,8 @@ static int32_t mgmtVgroupActionDestroy(SSdbOper *pOper) {
|
||||||
|
|
||||||
static int32_t mgmtVgroupActionInsert(SSdbOper *pOper) {
|
static int32_t mgmtVgroupActionInsert(SSdbOper *pOper) {
|
||||||
SVgObj *pVgroup = pOper->pObj;
|
SVgObj *pVgroup = pOper->pObj;
|
||||||
|
|
||||||
|
// refer to db
|
||||||
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
|
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
return TSDB_CODE_INVALID_DB;
|
return TSDB_CODE_INVALID_DB;
|
||||||
|
@ -140,6 +142,7 @@ static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) {
|
||||||
if (pDnode != NULL) {
|
if (pDnode != NULL) {
|
||||||
atomic_add_fetch_32(&pDnode->openVnodes, 1);
|
atomic_add_fetch_32(&pDnode->openVnodes, 1);
|
||||||
}
|
}
|
||||||
|
mgmtDecDnodeRef(pDnode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,6 +157,7 @@ static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mgmtDecVgroupRef(pVgroup);
|
||||||
mTrace("vgroup:%d, is updated, tables:%d numOfVnode:%d", pVgroup->vgId, pDb->cfg.maxTables, pVgroup->numOfVnodes);
|
mTrace("vgroup:%d, is updated, tables:%d numOfVnode:%d", pVgroup->vgId, pDb->cfg.maxTables, pVgroup->numOfVnodes);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -237,8 +241,7 @@ void mgmtUpdateVgroup(SVgObj *pVgroup) {
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.table = tsVgroupSdb,
|
.table = tsVgroupSdb,
|
||||||
.pObj = pVgroup,
|
.pObj = pVgroup
|
||||||
.rowSize = tsVgUpdateSize
|
|
||||||
};
|
};
|
||||||
|
|
||||||
sdbUpdateRow(&oper);
|
sdbUpdateRow(&oper);
|
||||||
|
@ -379,6 +382,7 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
||||||
if (pShow->payloadLen > 0 ) {
|
if (pShow->payloadLen > 0 ) {
|
||||||
pTable = mgmtGetTable(pShow->payload);
|
pTable = mgmtGetTable(pShow->payload);
|
||||||
if (NULL == pTable || pTable->type == TSDB_SUPER_TABLE) {
|
if (NULL == pTable || pTable->type == TSDB_SUPER_TABLE) {
|
||||||
|
mgmtDecTableRef(pTable);
|
||||||
return TSDB_CODE_INVALID_TABLE_ID;
|
return TSDB_CODE_INVALID_TABLE_ID;
|
||||||
}
|
}
|
||||||
mgmtDecTableRef(pTable);
|
mgmtDecTableRef(pTable);
|
||||||
|
@ -736,7 +740,33 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
|
||||||
mgmtSendCreateVnodeMsg(pVgroup, &ipSet, NULL);
|
mgmtSendCreateVnodeMsg(pVgroup, &ipSet, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtDropAllVgroups(SDbObj *pDropDb) {
|
void mgmtDropAllDnodeVgroups(SDnodeObj *pDropDnode) {
|
||||||
|
void * pNode = NULL;
|
||||||
|
void * pLastNode = NULL;
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
int32_t numOfVgroups = 0;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
pLastNode = pNode;
|
||||||
|
pNode = mgmtGetNextVgroup(pNode, &pVgroup);
|
||||||
|
if (pVgroup == NULL) break;
|
||||||
|
|
||||||
|
if (pVgroup->vnodeGid[0].dnodeId == pDropDnode->dnodeId) {
|
||||||
|
SSdbOper oper = {
|
||||||
|
.type = SDB_OPER_LOCAL,
|
||||||
|
.table = tsVgroupSdb,
|
||||||
|
.pObj = pVgroup,
|
||||||
|
};
|
||||||
|
sdbDeleteRow(&oper);
|
||||||
|
pNode = pLastNode;
|
||||||
|
numOfVgroups++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
mgmtDecVgroupRef(pVgroup);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void mgmtDropAllDbVgroups(SDbObj *pDropDb) {
|
||||||
void *pNode = NULL;
|
void *pNode = NULL;
|
||||||
void *pLastNode = NULL;
|
void *pLastNode = NULL;
|
||||||
int32_t numOfVgroups = 0;
|
int32_t numOfVgroups = 0;
|
||||||
|
@ -744,7 +774,7 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) {
|
||||||
|
|
||||||
mPrint("db:%s, all vgroups will be dropped from sdb", pDropDb->name);
|
mPrint("db:%s, all vgroups will be dropped from sdb", pDropDb->name);
|
||||||
while (1) {
|
while (1) {
|
||||||
pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup);
|
pNode = mgmtGetNextVgroup(pNode, &pVgroup);
|
||||||
if (pVgroup == NULL) break;
|
if (pVgroup == NULL) break;
|
||||||
|
|
||||||
if (pVgroup->pDb == pDropDb) {
|
if (pVgroup->pDb == pDropDb) {
|
||||||
|
|
Loading…
Reference in New Issue