[TD-10] drop stable message
This commit is contained in:
parent
63d2e696d2
commit
58118cfa39
|
@ -93,17 +93,14 @@ void dnodeWrite(SRpcMsg *pMsg) {
|
|||
char *pCont = (char *) pMsg->pCont;
|
||||
SRpcContext *pRpcContext = NULL;
|
||||
|
||||
int32_t numOfVnodes = 0;
|
||||
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
|
||||
// TODO parse head, get number of vnodes;
|
||||
numOfVnodes = 1;
|
||||
} else {
|
||||
numOfVnodes = 1;
|
||||
}
|
||||
|
||||
if (numOfVnodes > 1) {
|
||||
pRpcContext = calloc(sizeof(SRpcContext), 1);
|
||||
pRpcContext->numOfVnodes = numOfVnodes;
|
||||
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) {
|
||||
SWriteMsgDesc *pDesc = pCont;
|
||||
pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
|
||||
pCont += sizeof(SWriteMsgDesc);
|
||||
if (pDesc->numOfVnodes > 1) {
|
||||
pRpcContext = calloc(sizeof(SRpcContext), 1);
|
||||
pRpcContext->numOfVnodes = pDesc->numOfVnodes;
|
||||
}
|
||||
}
|
||||
|
||||
while (leftLen > 0) {
|
||||
|
@ -291,26 +288,9 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
|
|||
|
||||
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
|
||||
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
|
||||
dTrace("table:%s, start to create in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
|
||||
|
||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
void *pVnode = dnodeGetVnode(pTable->vgId);
|
||||
if (pVnode == NULL) {
|
||||
rpcRsp.code = TSDB_CODE_INVALID_VGROUP_ID;
|
||||
dTrace("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
|
||||
rpcSendResponse(&rpcRsp);
|
||||
return;
|
||||
}
|
||||
|
||||
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
|
||||
if (pTsdb == NULL) {
|
||||
dnodeReleaseVnode(pVnode);
|
||||
rpcRsp.code = TSDB_CODE_NOT_ACTIVE_VNODE;
|
||||
dTrace("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
|
||||
rpcSendResponse(&rpcRsp);
|
||||
return;
|
||||
}
|
||||
|
||||
dTrace("table:%s, start to create in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
|
||||
pTable->numOfColumns = htons(pTable->numOfColumns);
|
||||
pTable->numOfTags = htons(pTable->numOfTags);
|
||||
pTable->sid = htonl(pTable->sid);
|
||||
|
@ -344,7 +324,6 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
|
|||
}
|
||||
tsdbTableSetSchema(&tCfg, pDestTagSchema, false);
|
||||
|
||||
// TODO: add data row
|
||||
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
|
||||
int accumBytes = 0;
|
||||
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
|
||||
|
@ -356,50 +335,107 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
|
|||
tsdbTableSetTagValue(&tCfg, dataRow, false);
|
||||
}
|
||||
|
||||
rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg);
|
||||
dnodeReleaseVnode(pVnode);
|
||||
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
|
||||
|
||||
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
|
||||
dError("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
|
||||
rpcSendResponse(&rpcRsp);
|
||||
} else {
|
||||
dTrace("table:%s, created in dnode", pTable->tableId);
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg);
|
||||
dnodeReleaseVnode(pMsg->pVnode);
|
||||
|
||||
dTrace("table:%s, create table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) {
|
||||
SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont;
|
||||
dPrint("table:%s, sid:%d is dropped", pTable->tableId, pTable->sid);
|
||||
|
||||
// pTable->sid = htonl(pTable->sid);
|
||||
// pTable->numOfVPeers = htonl(pTable->numOfVPeers);
|
||||
// pTable->uid = htobe64(pTable->uid);
|
||||
//
|
||||
// for (int i = 0; i < pTable->numOfVPeers; ++i) {
|
||||
// pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip);
|
||||
// pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode);
|
||||
// }
|
||||
//
|
||||
// int32_t code = dnodeDropTable(pTable);
|
||||
//
|
||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
|
||||
dTrace("table:%s, start to drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
|
||||
STableId tableId = {
|
||||
.uid = htobe64(pTable->uid),
|
||||
.tid = htonl(pTable->sid)
|
||||
};
|
||||
|
||||
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
|
||||
|
||||
rpcRsp.code = tsdbDropTable(pTsdb, tableId);
|
||||
dnodeReleaseVnode(pMsg->pVnode);
|
||||
|
||||
dTrace("table:%s, drop table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) {
|
||||
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
|
||||
dPrint("table:%s, sid:%d is alterd", pTable->tableId, pTable->sid);
|
||||
|
||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
|
||||
dTrace("table:%s, start to alter in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
|
||||
pTable->numOfColumns = htons(pTable->numOfColumns);
|
||||
pTable->numOfTags = htons(pTable->numOfTags);
|
||||
pTable->sid = htonl(pTable->sid);
|
||||
pTable->sversion = htonl(pTable->sversion);
|
||||
pTable->tagDataLen = htonl(pTable->tagDataLen);
|
||||
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
|
||||
pTable->uid = htobe64(pTable->uid);
|
||||
pTable->superTableUid = htobe64(pTable->superTableUid);
|
||||
pTable->createdTime = htobe64(pTable->createdTime);
|
||||
SSchema *pSchema = (SSchema *) pTable->data;
|
||||
|
||||
int totalCols = pTable->numOfColumns + pTable->numOfTags;
|
||||
for (int i = 0; i < totalCols; i++) {
|
||||
pSchema[i].colId = htons(pSchema[i].colId);
|
||||
pSchema[i].bytes = htons(pSchema[i].bytes);
|
||||
}
|
||||
|
||||
STableCfg tCfg;
|
||||
tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid);
|
||||
|
||||
STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns);
|
||||
for (int i = 0; i < pTable->numOfColumns; i++) {
|
||||
tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
|
||||
}
|
||||
tsdbTableSetSchema(&tCfg, pDestSchema, false);
|
||||
|
||||
if (pTable->numOfTags != 0) {
|
||||
STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags);
|
||||
for (int i = pTable->numOfColumns; i < totalCols; i++) {
|
||||
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
|
||||
}
|
||||
tsdbTableSetSchema(&tCfg, pDestTagSchema, false);
|
||||
|
||||
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
|
||||
int accumBytes = 0;
|
||||
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
|
||||
|
||||
for (int i = 0; i < pTable->numOfTags; i++) {
|
||||
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i);
|
||||
accumBytes += pSchema[i + pTable->numOfColumns].bytes;
|
||||
}
|
||||
tsdbTableSetTagValue(&tCfg, dataRow, false);
|
||||
}
|
||||
|
||||
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
|
||||
|
||||
rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg);
|
||||
dnodeReleaseVnode(pMsg->pVnode);
|
||||
|
||||
dTrace("table:%s, alter table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) {
|
||||
SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont;
|
||||
dPrint("stable:%s, is dropped", pTable->tableId);
|
||||
|
||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
|
||||
dTrace("stable:%s, start to drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
|
||||
pTable->uid = htobe64(pTable->uid);
|
||||
|
||||
// TODO: drop stable in vvnode
|
||||
//void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
|
||||
//rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid);
|
||||
|
||||
rpcRsp.code = TSDB_CODE_SUCCESS;
|
||||
dnodeReleaseVnode(pMsg->pVnode);
|
||||
|
||||
dTrace("stable:%s, drop stable result:%s", pTable->tableId, tstrerror(rpcRsp.code));
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
|
|
|
@ -234,6 +234,10 @@ typedef struct {
|
|||
uint32_t ip;
|
||||
} SVnodeDesc;
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfVnodes;
|
||||
} SWriteMsgDesc;
|
||||
|
||||
typedef struct {
|
||||
int32_t contLen;
|
||||
int32_t vgId;
|
||||
|
@ -341,8 +345,10 @@ typedef struct {
|
|||
} SMDDropTableMsg;
|
||||
|
||||
typedef struct {
|
||||
char tableId[TSDB_TABLE_ID_LEN + 1];
|
||||
int32_t contLen;
|
||||
int32_t vgId;
|
||||
int64_t uid;
|
||||
char tableId[TSDB_TABLE_ID_LEN + 1];
|
||||
} SMDDropSTableMsg;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -33,7 +33,7 @@ void * mgmtGetChildTable(char *tableId);
|
|||
void *mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
|
||||
void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pCreate, SChildTableObj *pTable);
|
||||
|
||||
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable);
|
||||
int32_t mgmtDropChildTable(SChildTableObj *pTable);
|
||||
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
|
||||
|
||||
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp);
|
||||
|
|
|
@ -31,7 +31,7 @@ void * mgmtGetNormalTable(char *tableId);
|
|||
void * mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
|
||||
void * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable);
|
||||
|
||||
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable);
|
||||
int32_t mgmtDropNormalTable(SNormalTableObj *pTable);
|
||||
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
|
||||
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ SVgObj *mgmtGetVgroup(int32_t vgId);
|
|||
SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode);
|
||||
|
||||
void mgmtCreateVgroup(SQueuedMsg *pMsg);
|
||||
int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup);
|
||||
int32_t mgmtDropVgroup(SVgObj *pVgroup);
|
||||
void mgmtUpdateVgroup(SVgObj *pVgroup);
|
||||
|
||||
void mgmtSetVgroupIdPool();
|
||||
|
|
|
@ -355,46 +355,38 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
|
|||
return pTable;
|
||||
}
|
||||
|
||||
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
|
||||
int32_t mgmtDropChildTable(SChildTableObj *pTable) {
|
||||
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
||||
if (pVgroup == NULL) {
|
||||
mError("table:%s, failed to drop child table, vgroup not exist", pTable->tableId);
|
||||
return TSDB_CODE_OTHERS;
|
||||
}
|
||||
|
||||
SMDDropTableMsg *pRemove = rpcMallocCont(sizeof(SMDDropTableMsg));
|
||||
if (pRemove == NULL) {
|
||||
SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg));
|
||||
if (pDrop == NULL) {
|
||||
mError("table:%s, failed to drop child table, no enough memory", pTable->tableId);
|
||||
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
strcpy(pRemove->tableId, pTable->tableId);
|
||||
pRemove->vgId = htonl(pTable->vgId);
|
||||
pRemove->contLen = htonl(sizeof(SMDDropTableMsg));
|
||||
pRemove->sid = htonl(pTable->sid);
|
||||
pRemove->uid = htobe64(pTable->uid);
|
||||
strcpy(pDrop->tableId, pTable->tableId);
|
||||
pDrop->vgId = htonl(pTable->vgId);
|
||||
pDrop->contLen = htonl(sizeof(SMDDropTableMsg));
|
||||
pDrop->sid = htonl(pTable->sid);
|
||||
pDrop->uid = htobe64(pTable->uid);
|
||||
|
||||
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
|
||||
|
||||
mTrace("table:%s, send drop table msg", pRemove->tableId);
|
||||
mTrace("table:%s, send drop table msg", pDrop->tableId);
|
||||
SRpcMsg rpcMsg = {
|
||||
.handle = 0,
|
||||
.pCont = pRemove,
|
||||
.pCont = pDrop,
|
||||
.contLen = sizeof(SMDDropTableMsg),
|
||||
.code = 0,
|
||||
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
|
||||
};
|
||||
|
||||
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
|
||||
|
||||
if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) {
|
||||
mError("table:%s, update ctables sdb error", pTable->tableId);
|
||||
return TSDB_CODE_SDB_ERROR;
|
||||
}
|
||||
|
||||
if (pVgroup->numOfTables <= 0) {
|
||||
mgmtDropVgroup(pDb, pVgroup);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -368,7 +368,7 @@ static bool mgmtCheckDropDbFinished(SDbObj *pDb) {
|
|||
}
|
||||
|
||||
static void mgmtDropDbFromSdb(SDbObj *pDb) {
|
||||
while (pDb->pHead) mgmtDropVgroup(pDb, pDb->pHead);
|
||||
while (pDb->pHead) mgmtDropVgroup(pDb->pHead);
|
||||
|
||||
// SSuperTableObj *pMetric = pDb->pSTable;
|
||||
// while (pMetric) {
|
||||
|
|
|
@ -389,43 +389,36 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
|
|||
return pTable;
|
||||
}
|
||||
|
||||
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
|
||||
int32_t mgmtDropNormalTable(SNormalTableObj *pTable) {
|
||||
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
||||
if (pVgroup == NULL) {
|
||||
mError("table:%s, failed to drop normal table, vgroup not exist", pTable->tableId);
|
||||
return TSDB_CODE_OTHERS;
|
||||
}
|
||||
|
||||
SMDDropTableMsg *pRemove = rpcMallocCont(sizeof(SMDDropTableMsg));
|
||||
if (pRemove == NULL) {
|
||||
SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg));
|
||||
if (pDrop == NULL) {
|
||||
mError("table:%s, failed to drop normal table, no enough memory", pTable->tableId);
|
||||
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
strcpy(pRemove->tableId, pTable->tableId);
|
||||
pRemove->sid = htonl(pTable->sid);
|
||||
pRemove->uid = htobe64(pTable->uid);
|
||||
strcpy(pDrop->tableId, pTable->tableId);
|
||||
pDrop->contLen = htonl(sizeof(SMDDropTableMsg));
|
||||
pDrop->vgId = htonl(pVgroup->vgId);
|
||||
pDrop->sid = htonl(pTable->sid);
|
||||
pDrop->uid = htobe64(pTable->uid);
|
||||
|
||||
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
|
||||
mTrace("table:%s, send drop table msg", pRemove->tableId);
|
||||
mTrace("table:%s, send drop table msg", pDrop->tableId);
|
||||
SRpcMsg rpcMsg = {
|
||||
.handle = 0,
|
||||
.pCont = pRemove,
|
||||
.pCont = pDrop,
|
||||
.contLen = sizeof(SMDDropTableMsg),
|
||||
.code = 0,
|
||||
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
|
||||
};
|
||||
|
||||
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
|
||||
|
||||
if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) {
|
||||
mError("table:%s, update ntables sdb error", pTable->tableId);
|
||||
return TSDB_CODE_SDB_ERROR;
|
||||
}
|
||||
|
||||
if (pVgroup->numOfTables <= 0) {
|
||||
mgmtDropVgroup(pDb, pVgroup);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -250,11 +250,16 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pSuperTable) {
|
||||
//TODO drop all child tables
|
||||
|
||||
mgmtRemoveSuperTableFromDb(pDb);
|
||||
return sdbDeleteRow(tsSuperTableSdb, pSuperTable);
|
||||
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pStable) {
|
||||
if (pStable->numOfTables != 0) {
|
||||
mError("stable:%s, numOfTables:%d not 0", pStable->tableId, pStable->numOfTables);
|
||||
return TSDB_CODE_OTHERS;
|
||||
} else {
|
||||
//TODO: drop child tables
|
||||
mError("stable:%s, is dropped from sdb", pStable->tableId);
|
||||
mgmtRemoveSuperTableFromDb(pDb);
|
||||
return TSDB_CODE_OTHERS;
|
||||
}
|
||||
}
|
||||
|
||||
void* mgmtGetSuperTable(char *tableId) {
|
||||
|
|
|
@ -51,6 +51,7 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg);
|
|||
static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg);
|
||||
static void mgmtProcessSuperTableMetaMsg(SQueuedMsg *queueMsg);
|
||||
static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg);
|
||||
static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg);
|
||||
static int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||
static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
static void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle);
|
||||
|
@ -82,7 +83,7 @@ int32_t mgmtInitTables() {
|
|||
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta);
|
||||
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables);
|
||||
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateTableRsp);
|
||||
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, NULL);
|
||||
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropTableRsp);
|
||||
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, NULL);
|
||||
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, NULL);
|
||||
|
||||
|
@ -439,19 +440,19 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
|
|||
return;
|
||||
}
|
||||
pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, pTable);
|
||||
if (pCreate == NULL) {
|
||||
if (pMDCreate == NULL) {
|
||||
mgmtSendSimpleResp(pMsg->thandle, terrno);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg);
|
||||
code = mgmtCreateNormalTable(pCreate, pVgroup, sid);
|
||||
pTable = mgmtCreateNormalTable(pCreate, pVgroup, sid);
|
||||
if (pTable == NULL) {
|
||||
mgmtSendSimpleResp(pMsg->thandle, terrno);
|
||||
return;
|
||||
}
|
||||
pMDCreate = mgmtBuildCreateNormalTableMsg(pTable);
|
||||
if (pCreate == NULL) {
|
||||
if (pMDCreate == NULL) {
|
||||
mgmtSendSimpleResp(pMsg->thandle, terrno);
|
||||
return;
|
||||
}
|
||||
|
@ -509,31 +510,36 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) {
|
|||
}
|
||||
|
||||
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
|
||||
mError("table:%s, failed to create table, in monitor database", pDrop->tableId);
|
||||
mError("table:%s, failed to drop table, in monitor database", pDrop->tableId);
|
||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN);
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t code;
|
||||
switch (pTable->type) {
|
||||
case TSDB_SUPER_TABLE:
|
||||
mTrace("table:%s, start to drop super table", pDrop->tableId);
|
||||
mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable);
|
||||
code = mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable);
|
||||
break;
|
||||
case TSDB_CHILD_TABLE:
|
||||
mTrace("table:%s, start to drop child table", pDrop->tableId);
|
||||
mgmtDropChildTable(pDb, (SChildTableObj *) pTable);
|
||||
code = mgmtDropChildTable((SChildTableObj *) pTable);
|
||||
break;
|
||||
case TSDB_NORMAL_TABLE:
|
||||
mTrace("table:%s, start to drop normal table", pDrop->tableId);
|
||||
mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable);
|
||||
code = mgmtDropNormalTable((SNormalTableObj *) pTable);
|
||||
break;
|
||||
case TSDB_STREAM_TABLE:
|
||||
mTrace("table:%s, start to drop stream table", pDrop->tableId);
|
||||
mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable);
|
||||
code = mgmtDropNormalTable((SNormalTableObj *) pTable);
|
||||
break;
|
||||
default:
|
||||
code = TSDB_CODE_INVALID_TABLE_TYPE;
|
||||
mError("table:%s, invalid table type:%d", pDrop->tableId, pTable->type);
|
||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mgmtSendSimpleResp(pMsg->thandle, code);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -778,3 +784,43 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) {
|
|||
|
||||
free(queueMsg);
|
||||
}
|
||||
|
||||
static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
|
||||
if (rpcMsg->handle == NULL) return;
|
||||
|
||||
STableInfo *pTable = rpcMsg->handle;
|
||||
mTrace("table:%s, drop table rsp received, thandle:%p result:%s", pTable->tableId, rpcMsg->handle, tstrerror(rpcMsg->code));
|
||||
|
||||
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
|
||||
mError("table:%s, failed to drop in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code));
|
||||
mgmtSendSimpleResp(rpcMsg->handle, rpcMsg->code);
|
||||
return;
|
||||
} else {
|
||||
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
||||
if (pVgroup == NULL) {
|
||||
mError("table:%s, failed to get vgroup", pTable->tableId);
|
||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_VGROUP_ID);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pTable->type == TSDB_CHILD_TABLE) {
|
||||
if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) {
|
||||
mError("table:%s, update ctables sdb error", pTable->tableId);
|
||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SDB_ERROR);
|
||||
return;
|
||||
}
|
||||
} else if (pTable->type == TSDB_NORMAL_TABLE){
|
||||
if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) {
|
||||
mError("table:%s, update ntables sdb error", pTable->tableId);
|
||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SDB_ERROR);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (pVgroup->numOfTables <= 0) {
|
||||
mgmtDropVgroup(pVgroup);
|
||||
}
|
||||
}
|
||||
|
||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
|
|
@ -185,7 +185,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) {
|
|||
mgmtSendCreateVgroupMsg(pVgroup, pMsg);
|
||||
}
|
||||
|
||||
int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
|
||||
int32_t mgmtDropVgroup(SVgObj *pVgroup) {
|
||||
STableInfo *pTable;
|
||||
|
||||
if (pVgroup->numOfTables > 0) {
|
||||
|
@ -197,7 +197,7 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
|
|||
// }
|
||||
}
|
||||
|
||||
mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
|
||||
mTrace("vgroup:%d, replica:%d is deleted", pVgroup->vgId, pVgroup->numOfVnodes);
|
||||
|
||||
//mgmtSendDropVgroupMsg(pVgroup, NULL);
|
||||
|
||||
|
|
Loading…
Reference in New Issue