[TD-10] fix bugs while drop normal table
This commit is contained in:
parent
58118cfa39
commit
13b5cd4c3a
|
@ -425,7 +425,7 @@ static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) {
|
||||||
SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont;
|
SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont;
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||||
|
|
||||||
dTrace("stable:%s, start to drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
|
dTrace("stable:%s, start to it drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
|
||||||
pTable->uid = htobe64(pTable->uid);
|
pTable->uid = htobe64(pTable->uid);
|
||||||
|
|
||||||
// TODO: drop stable in vvnode
|
// TODO: drop stable in vvnode
|
||||||
|
|
|
@ -77,6 +77,7 @@ typedef struct {
|
||||||
} SDnodeObj;
|
} SDnodeObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
int32_t dnodeId;
|
||||||
uint32_t ip;
|
uint32_t ip;
|
||||||
uint32_t publicIp;
|
uint32_t publicIp;
|
||||||
int32_t vnode;
|
int32_t vnode;
|
||||||
|
|
|
@ -352,8 +352,7 @@ typedef struct {
|
||||||
} SMDDropSTableMsg;
|
} SMDDropSTableMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t vnode;
|
|
||||||
} SMDDropVnodeMsg;
|
} SMDDropVnodeMsg;
|
||||||
|
|
||||||
typedef struct SColIndexEx {
|
typedef struct SColIndexEx {
|
||||||
|
|
|
@ -33,7 +33,7 @@ void * mgmtGetChildTable(char *tableId);
|
||||||
void *mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
|
void *mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
|
||||||
void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pCreate, SChildTableObj *pTable);
|
void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pCreate, SChildTableObj *pTable);
|
||||||
|
|
||||||
int32_t mgmtDropChildTable(SChildTableObj *pTable);
|
int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable);
|
||||||
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
|
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
|
||||||
|
|
||||||
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp);
|
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp);
|
||||||
|
|
|
@ -31,7 +31,7 @@ void * mgmtGetNormalTable(char *tableId);
|
||||||
void * mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
|
void * mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
|
||||||
void * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable);
|
void * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable);
|
||||||
|
|
||||||
int32_t mgmtDropNormalTable(SNormalTableObj *pTable);
|
int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable);
|
||||||
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
|
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
|
||||||
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
|
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@ void mgmtCleanUpSuperTables();
|
||||||
void * mgmtGetSuperTable(char *tableId);
|
void * mgmtGetSuperTable(char *tableId);
|
||||||
|
|
||||||
int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate);
|
int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate);
|
||||||
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable);
|
int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pTable);
|
||||||
int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags);
|
int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags);
|
||||||
int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName);
|
int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName);
|
||||||
int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName);
|
int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName);
|
||||||
|
|
|
@ -41,7 +41,6 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable);
|
||||||
|
|
||||||
SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode);
|
SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode);
|
||||||
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
|
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
|
||||||
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
|
|
||||||
|
|
||||||
SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup);
|
SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup);
|
||||||
SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip);
|
SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip);
|
||||||
|
|
|
@ -337,11 +337,11 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
|
||||||
strcpy(pTable->tableId, pCreate->tableId);
|
strcpy(pTable->tableId, pCreate->tableId);
|
||||||
strcpy(pTable->superTableId, pSuperTable->tableId);
|
strcpy(pTable->superTableId, pSuperTable->tableId);
|
||||||
pTable->type = TSDB_CHILD_TABLE;
|
pTable->type = TSDB_CHILD_TABLE;
|
||||||
|
pTable->createdTime = taosGetTimestampMs();
|
||||||
pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) +
|
pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) +
|
||||||
((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
|
((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
|
||||||
pTable->sid = tid;
|
pTable->sid = tid;
|
||||||
pTable->vgId = pVgroup->vgId;
|
pTable->vgId = pVgroup->vgId;
|
||||||
pTable->createdTime = taosGetTimestampMs();
|
|
||||||
pTable->superTable = pSuperTable;
|
pTable->superTable = pSuperTable;
|
||||||
|
|
||||||
if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) {
|
if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) {
|
||||||
|
@ -355,7 +355,7 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
|
||||||
return pTable;
|
return pTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mgmtDropChildTable(SChildTableObj *pTable) {
|
int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable) {
|
||||||
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
||||||
if (pVgroup == NULL) {
|
if (pVgroup == NULL) {
|
||||||
mError("table:%s, failed to drop child table, vgroup not exist", pTable->tableId);
|
mError("table:%s, failed to drop child table, vgroup not exist", pTable->tableId);
|
||||||
|
@ -378,13 +378,14 @@ int32_t mgmtDropChildTable(SChildTableObj *pTable) {
|
||||||
|
|
||||||
mTrace("table:%s, send drop table msg", pDrop->tableId);
|
mTrace("table:%s, send drop table msg", pDrop->tableId);
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.handle = 0,
|
.handle = newMsg,
|
||||||
.pCont = pDrop,
|
.pCont = pDrop,
|
||||||
.contLen = sizeof(SMDDropTableMsg),
|
.contLen = sizeof(SMDDropTableMsg),
|
||||||
.code = 0,
|
.code = 0,
|
||||||
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
|
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
|
||||||
};
|
};
|
||||||
|
|
||||||
|
newMsg->ahandle = pTable;
|
||||||
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
|
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -962,7 +962,6 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
|
||||||
|
|
||||||
int32_t code;
|
int32_t code;
|
||||||
if (pMsg->pUser->superAuth) {
|
if (pMsg->pUser->superAuth) {
|
||||||
code = TSDB_CODE_OPS_NOT_SUPPORT;
|
|
||||||
//SCMDropDbMsg *pDrop = rpcMsg->pCont;
|
//SCMDropDbMsg *pDrop = rpcMsg->pCont;
|
||||||
//rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists);
|
//rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists);
|
||||||
//if (rpcRsp.code == TSDB_CODE_SUCCESS) {
|
//if (rpcRsp.code == TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -341,9 +341,9 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
|
||||||
strcpy(pTable->tableId, pCreate->tableId);
|
strcpy(pTable->tableId, pCreate->tableId);
|
||||||
pTable->type = TSDB_NORMAL_TABLE;
|
pTable->type = TSDB_NORMAL_TABLE;
|
||||||
pTable->vgId = pVgroup->vgId;
|
pTable->vgId = pVgroup->vgId;
|
||||||
|
pTable->createdTime = taosGetTimestampMs();
|
||||||
pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
|
pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
|
||||||
pTable->sid = sid;
|
pTable->sid = sid;
|
||||||
pTable->createdTime = taosGetTimestampMs();
|
|
||||||
pTable->sversion = 0;
|
pTable->sversion = 0;
|
||||||
pTable->numOfColumns = htons(pCreate->numOfColumns);
|
pTable->numOfColumns = htons(pCreate->numOfColumns);
|
||||||
pTable->sqlLen = htons(pCreate->sqlLen);
|
pTable->sqlLen = htons(pCreate->sqlLen);
|
||||||
|
@ -389,7 +389,7 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
|
||||||
return pTable;
|
return pTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mgmtDropNormalTable(SNormalTableObj *pTable) {
|
int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable) {
|
||||||
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
||||||
if (pVgroup == NULL) {
|
if (pVgroup == NULL) {
|
||||||
mError("table:%s, failed to drop normal table, vgroup not exist", pTable->tableId);
|
mError("table:%s, failed to drop normal table, vgroup not exist", pTable->tableId);
|
||||||
|
@ -411,13 +411,14 @@ int32_t mgmtDropNormalTable(SNormalTableObj *pTable) {
|
||||||
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
|
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
|
||||||
mTrace("table:%s, send drop table msg", pDrop->tableId);
|
mTrace("table:%s, send drop table msg", pDrop->tableId);
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.handle = 0,
|
.handle = newMsg,
|
||||||
.pCont = pDrop,
|
.pCont = pDrop,
|
||||||
.contLen = sizeof(SMDDropTableMsg),
|
.contLen = sizeof(SMDDropTableMsg),
|
||||||
.code = 0,
|
.code = 0,
|
||||||
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
|
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
|
||||||
};
|
};
|
||||||
|
|
||||||
|
newMsg->ahandle = pTable;
|
||||||
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
|
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -250,7 +250,7 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pStable) {
|
int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pStable) {
|
||||||
if (pStable->numOfTables != 0) {
|
if (pStable->numOfTables != 0) {
|
||||||
mError("stable:%s, numOfTables:%d not 0", pStable->tableId, pStable->numOfTables);
|
mError("stable:%s, numOfTables:%d not 0", pStable->tableId, pStable->numOfTables);
|
||||||
return TSDB_CODE_OTHERS;
|
return TSDB_CODE_OTHERS;
|
||||||
|
|
|
@ -515,23 +515,27 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
|
||||||
|
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
|
||||||
|
pMsg->pCont = NULL;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
||||||
switch (pTable->type) {
|
switch (pTable->type) {
|
||||||
case TSDB_SUPER_TABLE:
|
case TSDB_SUPER_TABLE:
|
||||||
mTrace("table:%s, start to drop super table", pDrop->tableId);
|
mTrace("table:%s, start to drop super table", pDrop->tableId);
|
||||||
code = mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable);
|
code = mgmtDropSuperTable(newMsg, pDb, (SSuperTableObj *) pTable);
|
||||||
break;
|
break;
|
||||||
case TSDB_CHILD_TABLE:
|
case TSDB_CHILD_TABLE:
|
||||||
mTrace("table:%s, start to drop child table", pDrop->tableId);
|
mTrace("table:%s, start to drop child table", pDrop->tableId);
|
||||||
code = mgmtDropChildTable((SChildTableObj *) pTable);
|
code = mgmtDropChildTable(newMsg, (SChildTableObj *) pTable);
|
||||||
break;
|
break;
|
||||||
case TSDB_NORMAL_TABLE:
|
case TSDB_NORMAL_TABLE:
|
||||||
mTrace("table:%s, start to drop normal table", pDrop->tableId);
|
mTrace("table:%s, start to drop normal table", pDrop->tableId);
|
||||||
code = mgmtDropNormalTable((SNormalTableObj *) pTable);
|
code = mgmtDropNormalTable(newMsg, (SNormalTableObj *) pTable);
|
||||||
break;
|
break;
|
||||||
case TSDB_STREAM_TABLE:
|
case TSDB_STREAM_TABLE:
|
||||||
mTrace("table:%s, start to drop stream table", pDrop->tableId);
|
mTrace("table:%s, start to drop stream table", pDrop->tableId);
|
||||||
code = mgmtDropNormalTable((SNormalTableObj *) pTable);
|
code = mgmtDropNormalTable(newMsg, (SNormalTableObj *) pTable);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
code = TSDB_CODE_INVALID_TABLE_TYPE;
|
code = TSDB_CODE_INVALID_TABLE_TYPE;
|
||||||
|
@ -539,6 +543,7 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
free(newMsg);
|
||||||
mgmtSendSimpleResp(pMsg->thandle, code);
|
mgmtSendSimpleResp(pMsg->thandle, code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -788,39 +793,48 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) {
|
||||||
static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
|
static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
|
||||||
if (rpcMsg->handle == NULL) return;
|
if (rpcMsg->handle == NULL) return;
|
||||||
|
|
||||||
STableInfo *pTable = rpcMsg->handle;
|
SQueuedMsg *queueMsg = rpcMsg->handle;
|
||||||
mTrace("table:%s, drop table rsp received, thandle:%p result:%s", pTable->tableId, rpcMsg->handle, tstrerror(rpcMsg->code));
|
queueMsg->received++;
|
||||||
|
|
||||||
|
STableInfo *pTable = queueMsg->ahandle;
|
||||||
|
mTrace("table:%s, drop table rsp received, thandle:%p result:%s", pTable->tableId, queueMsg->thandle, tstrerror(rpcMsg->code));
|
||||||
|
|
||||||
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
|
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
|
||||||
mError("table:%s, failed to drop in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code));
|
mError("table:%s, failed to drop in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code));
|
||||||
mgmtSendSimpleResp(rpcMsg->handle, rpcMsg->code);
|
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
|
||||||
|
free(queueMsg);
|
||||||
return;
|
return;
|
||||||
} else {
|
}
|
||||||
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
|
||||||
if (pVgroup == NULL) {
|
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
||||||
mError("table:%s, failed to get vgroup", pTable->tableId);
|
if (pVgroup == NULL) {
|
||||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_VGROUP_ID);
|
mError("table:%s, failed to get vgroup", pTable->tableId);
|
||||||
|
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID);
|
||||||
|
free(queueMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTable->type == TSDB_CHILD_TABLE) {
|
||||||
|
if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) {
|
||||||
|
mError("table:%s, update ctables sdb error", pTable->tableId);
|
||||||
|
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR);
|
||||||
|
free(queueMsg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
} else if (pTable->type == TSDB_NORMAL_TABLE){
|
||||||
if (pTable->type == TSDB_CHILD_TABLE) {
|
if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) {
|
||||||
if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) {
|
mError("table:%s, update ntables sdb error", pTable->tableId);
|
||||||
mError("table:%s, update ctables sdb error", pTable->tableId);
|
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR);
|
||||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SDB_ERROR);
|
free(queueMsg);
|
||||||
return;
|
return;
|
||||||
}
|
|
||||||
} else if (pTable->type == TSDB_NORMAL_TABLE){
|
|
||||||
if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) {
|
|
||||||
mError("table:%s, update ntables sdb error", pTable->tableId);
|
|
||||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SDB_ERROR);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pVgroup->numOfTables <= 0) {
|
|
||||||
mgmtDropVgroup(pVgroup);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS);
|
if (pVgroup->numOfTables <= 0) {
|
||||||
|
mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId);
|
||||||
|
mgmtDropVgroup(pVgroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS);
|
||||||
|
free(queueMsg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,8 +44,10 @@ static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t
|
||||||
static int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
static int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||||
static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||||
static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
|
static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
|
||||||
|
static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg);
|
||||||
|
|
||||||
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
|
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
|
||||||
|
static void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
|
||||||
|
|
||||||
static void mgmtVgroupActionInit() {
|
static void mgmtVgroupActionInit() {
|
||||||
SVgObj tObj;
|
SVgObj tObj;
|
||||||
|
@ -119,6 +121,7 @@ int32_t mgmtInitVgroups() {
|
||||||
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta);
|
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta);
|
||||||
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups);
|
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups);
|
||||||
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp);
|
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp);
|
||||||
|
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp);
|
||||||
|
|
||||||
mTrace("vgroup is initialized");
|
mTrace("vgroup is initialized");
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -186,7 +189,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mgmtDropVgroup(SVgObj *pVgroup) {
|
int32_t mgmtDropVgroup(SVgObj *pVgroup) {
|
||||||
STableInfo *pTable;
|
// STableInfo *pTable;
|
||||||
|
|
||||||
if (pVgroup->numOfTables > 0) {
|
if (pVgroup->numOfTables > 0) {
|
||||||
// for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) {
|
// for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) {
|
||||||
|
@ -197,12 +200,9 @@ int32_t mgmtDropVgroup(SVgObj *pVgroup) {
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("vgroup:%d, replica:%d is deleted", pVgroup->vgId, pVgroup->numOfVnodes);
|
mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
|
||||||
|
mgmtSendDropVgroupMsg(pVgroup, NULL);
|
||||||
//mgmtSendDropVgroupMsg(pVgroup, NULL);
|
|
||||||
|
|
||||||
sdbDeleteRow(tsVgroupSdb, pVgroup);
|
sdbDeleteRow(tsVgroupSdb, pVgroup);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -633,4 +633,37 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
free(queueMsg);
|
free(queueMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
static SMDDropVnodeMsg *mgmtBuildDropVnodeMsg(SVgObj *pVgroup) {
|
||||||
|
SMDDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SMDDropVnodeMsg));
|
||||||
|
if (pDrop == NULL) return NULL;
|
||||||
|
|
||||||
|
pDrop->vgId = htonl(pVgroup->vgId);
|
||||||
|
return pDrop;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mgmtSendDropVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
|
||||||
|
mTrace("vgroup:%d, send drop vnode msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||||
|
SMDDropVnodeMsg *pDrop = mgmtBuildDropVnodeMsg(pVgroup);
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.handle = ahandle,
|
||||||
|
.pCont = pDrop,
|
||||||
|
.contLen = pDrop ? sizeof(SMDDropVnodeMsg) : 0,
|
||||||
|
.code = 0,
|
||||||
|
.msgType = TSDB_MSG_TYPE_MD_DROP_VNODE
|
||||||
|
};
|
||||||
|
mgmtSendMsgToDnode(ipSet, &rpcMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||||
|
mTrace("vgroup:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||||
|
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||||
|
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
|
||||||
|
mgmtSendDropVnodeMsg(pVgroup, &ipSet, ahandle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
|
||||||
|
mTrace("drop vnode msg is received");
|
||||||
}
|
}
|
|
@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
TARGET_LINK_LIBRARIES(tsdb common tutil)
|
TARGET_LINK_LIBRARIES(tsdb common tutil)
|
||||||
|
|
||||||
# Someone has no gtest directory, so comment it
|
# Someone has no gtest directory, so comment it
|
||||||
ADD_SUBDIRECTORY(tests)
|
# ADD_SUBDIRECTORY(tests)
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
Loading…
Reference in New Issue