message for drop table
This commit is contained in:
parent
a05776b0f7
commit
683ef75fda
|
@ -43,7 +43,7 @@ int32_t dnodeCreateTable(SDCreateTableMsg *pTable);
|
||||||
/*
|
/*
|
||||||
* Remove table from local repository
|
* Remove table from local repository
|
||||||
*/
|
*/
|
||||||
int32_t dnodeDropTable(int32_t vnode, int32_t sid, uint64_t uid);
|
int32_t dnodeDropTable(SDRemoveTableMsg *pTable);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Create stream
|
* Create stream
|
||||||
|
|
|
@ -33,7 +33,7 @@ void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgTyp
|
||||||
void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL;
|
void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL;
|
||||||
void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL;
|
void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL;
|
||||||
|
|
||||||
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn);
|
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn);
|
||||||
static void dnodeInitProcessShellMsg();
|
static void dnodeInitProcessShellMsg();
|
||||||
|
|
||||||
static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) {
|
static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) {
|
||||||
|
@ -121,26 +121,25 @@ void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void
|
||||||
//rpcFreeCont(pCont);
|
//rpcFreeCont(pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeProcessTableCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
||||||
int32_t code = htonl(*((int32_t *) pCont));
|
int32_t code = htonl(*((int32_t *) pCont));
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t));
|
SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t));
|
||||||
dnodeCreateTable(table);
|
dnodeCreateTable(table);
|
||||||
} else if (code == TSDB_CODE_INVALID_TABLE_ID) {
|
} else if (code == TSDB_CODE_INVALID_TABLE_ID) {
|
||||||
SDRemoveTableMsg *table = (SDRemoveTableMsg *) (pCont + sizeof(int32_t));
|
SDRemoveTableMsg *pTable = (SDRemoveTableMsg *) (pCont + sizeof(int32_t));
|
||||||
int32_t vnode = htonl(table->vnode);
|
pTable->sid = htonl(pTable->sid);
|
||||||
int32_t sid = htonl(table->sid);
|
pTable->uid = htobe64(pTable->uid);
|
||||||
uint64_t uid = htobe64(table->uid);
|
dError("table:%s, sid:%d table is not configured, remove it", pTable->tableId, pTable->sid);
|
||||||
dError("vnode:%d, sid:%d table is not configured, remove it", vnode, sid);
|
dnodeDropTable(pTable);
|
||||||
dnodeDropTable(vnode, sid, uid);
|
|
||||||
} else {
|
} else {
|
||||||
dError("code:%d invalid message", code);
|
dError("code:%d invalid message", code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
||||||
SDCreateTableMsg *pTable = (SDCreateTableMsg *) pCont;
|
SDCreateTableMsg *pTable = pCont;
|
||||||
pTable->numOfColumns = htons(pTable->numOfColumns);
|
pTable->numOfColumns = htons(pTable->numOfColumns);
|
||||||
pTable->numOfTags = htons(pTable->numOfTags);
|
pTable->numOfTags = htons(pTable->numOfTags);
|
||||||
pTable->sid = htonl(pTable->sid);
|
pTable->sid = htonl(pTable->sid);
|
||||||
|
@ -170,24 +169,28 @@ void dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, int8_t msgTy
|
||||||
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeProcessAlterStreamRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
||||||
SAlterStreamMsg *stream = (SAlterStreamMsg *) pCont;
|
SAlterStreamMsg *stream = (SAlterStreamMsg *) pCont;
|
||||||
int32_t code = dnodeCreateStream(stream);
|
int32_t code = dnodeCreateStream(stream);
|
||||||
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
void dnodeProcessRemoveTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
||||||
SDRemoveTableMsg *table = (SDRemoveTableMsg *) pCont;
|
SDRemoveTableMsg *pTable = pCont;
|
||||||
int32_t vnode = htonl(table->vnode);
|
pTable->sid = htonl(pTable->sid);
|
||||||
int32_t sid = htonl(table->sid);
|
pTable->numOfVPeers = htonl(pTable->numOfVPeers);
|
||||||
uint64_t uid = htobe64(table->uid);
|
pTable->uid = htobe64(pTable->uid);
|
||||||
|
|
||||||
dPrint("vnode:%d, sid:%d table is not configured, remove it", vnode, sid);
|
for (int i = 0; i < pTable->numOfVPeers; ++i) {
|
||||||
int32_t code = dnodeDropTable(vnode, sid, uid);
|
pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip);
|
||||||
|
pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = dnodeDropTable(pTable);
|
||||||
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeProcessVPeerCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
||||||
int32_t code = htonl(*((int32_t *) pCont));
|
int32_t code = htonl(*((int32_t *) pCont));
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
@ -204,7 +207,7 @@ void dnodeProcessVPeerCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, voi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
void dnodeProcessVPeersMsg(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
||||||
SVPeersMsg *vpeer = (SVPeersMsg *) pCont;
|
SVPeersMsg *vpeer = (SVPeersMsg *) pCont;
|
||||||
int32_t vnode = htonl(vpeer->vnode);
|
int32_t vnode = htonl(vpeer->vnode);
|
||||||
|
|
||||||
|
@ -214,7 +217,7 @@ void dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, int8_t msgType, void
|
||||||
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
||||||
SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) pCont;
|
SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) pCont;
|
||||||
int32_t vnode = htonl(vpeer->vnode);
|
int32_t vnode = htonl(vpeer->vnode);
|
||||||
|
|
||||||
|
@ -224,7 +227,7 @@ void dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgType
|
||||||
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeProcessDnodeCfgRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
void dnodeProcessDnodeCfgRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
||||||
SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont;
|
SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont;
|
||||||
int32_t code = tsCfgDynamicOptions(pCfg->config);
|
int32_t code = tsCfgDynamicOptions(pCfg->config);
|
||||||
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
||||||
|
|
|
@ -69,7 +69,8 @@ int32_t dnodeCreateTable(SDCreateTableMsg *pTable) {
|
||||||
/*
|
/*
|
||||||
* Remove table from local repository
|
* Remove table from local repository
|
||||||
*/
|
*/
|
||||||
int32_t dnodeDropTable(int32_t vnode, int32_t sid, uint64_t uid) {
|
int32_t dnodeDropTable(SDRemoveTableMsg *pTable) {
|
||||||
|
dPrint("table:%s, sid:%d will be removed", pTable->tableId, pTable->sid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -329,10 +329,11 @@ typedef struct {
|
||||||
} SMgmtHead;
|
} SMgmtHead;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
short vnode;
|
int32_t sid;
|
||||||
int32_t sid;
|
int32_t numOfVPeers;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
char tableId[TSDB_TABLE_ID_LEN + 1];
|
SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS];
|
||||||
|
char tableId[TSDB_TABLE_ID_LEN + 1];
|
||||||
} SDRemoveTableMsg;
|
} SDRemoveTableMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -28,13 +28,11 @@ extern "C" {
|
||||||
|
|
||||||
int32_t mgmtInitChildTables();
|
int32_t mgmtInitChildTables();
|
||||||
void mgmtCleanUpChildTables();
|
void mgmtCleanUpChildTables();
|
||||||
|
|
||||||
void * mgmtGetChildTable(char *tableId);
|
void * mgmtGetChildTable(char *tableId);
|
||||||
|
|
||||||
int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
|
int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
|
||||||
SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
|
SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
|
||||||
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable);
|
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable);
|
||||||
int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter);
|
|
||||||
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
|
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
|
||||||
|
|
||||||
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp);
|
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp);
|
||||||
|
|
|
@ -27,10 +27,10 @@ extern "C" {
|
||||||
extern void *mgmtStatusTimer;
|
extern void *mgmtStatusTimer;
|
||||||
|
|
||||||
void mgmtSendCreateTableMsg(SDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle);
|
void mgmtSendCreateTableMsg(SDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle);
|
||||||
void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle);
|
void mgmtSendRemoveTableMsg(SDRemoveTableMsg *pRemove, SRpcIpSet *ipSet, void *ahandle);
|
||||||
void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle);
|
void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle);
|
||||||
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
|
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
|
||||||
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *table_info);
|
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
|
||||||
void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
|
void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -26,14 +26,13 @@ extern "C" {
|
||||||
|
|
||||||
int32_t mgmtInitNormalTables();
|
int32_t mgmtInitNormalTables();
|
||||||
void mgmtCleanUpNormalTables();
|
void mgmtCleanUpNormalTables();
|
||||||
|
|
||||||
void * mgmtGetNormalTable(char *tableId);
|
void * mgmtGetNormalTable(char *tableId);
|
||||||
|
|
||||||
int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid, SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
|
int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
|
||||||
|
SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
|
||||||
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable);
|
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable);
|
||||||
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
|
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
|
||||||
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
|
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
|
||||||
int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable);
|
|
||||||
|
|
||||||
int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta *pMeta, bool usePublicIp);
|
int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta *pMeta, bool usePublicIp);
|
||||||
|
|
||||||
|
|
|
@ -46,7 +46,6 @@ void * mgmtGetSuperTableVgroup(SSuperTableObj *pStable);
|
||||||
|
|
||||||
int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName);
|
int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName);
|
||||||
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable);
|
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable);
|
||||||
int32_t mgmtGetTagsLength(SSuperTableObj *pSuperTable, int32_t col);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
|
||||||
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp);
|
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp);
|
||||||
|
|
||||||
int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
|
int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
|
||||||
void mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle);
|
int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle);
|
||||||
int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore);
|
int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore);
|
||||||
int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter);
|
int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter);
|
||||||
int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||||
|
@ -45,6 +45,8 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty);
|
||||||
SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable);
|
SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable);
|
||||||
SDRemoveSuperTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable);
|
SDRemoveSuperTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable);
|
||||||
|
|
||||||
|
void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -370,18 +370,38 @@ int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *
|
||||||
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
|
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
|
||||||
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
||||||
if (pVgroup == NULL) {
|
if (pVgroup == NULL) {
|
||||||
|
mError("table:%s, failed to drop child table, vgroup not exist", pTable->tableId);
|
||||||
return TSDB_CODE_OTHERS;
|
return TSDB_CODE_OTHERS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup);
|
SDRemoveTableMsg *pRemove = rpcMallocCont(sizeof(SDRemoveTableMsg));
|
||||||
|
if (pRemove == NULL) {
|
||||||
|
mError("table:%s, failed to drop child table, no enough memory", pTable->tableId);
|
||||||
|
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
sdbDeleteRow(tsChildTableSdb, pTable);
|
pRemove->sid = htonl(pTable->sid);
|
||||||
|
pRemove->uid = htobe64(pTable->uid);
|
||||||
|
|
||||||
|
pRemove->numOfVPeers = htonl(pVgroup->numOfVnodes);
|
||||||
|
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||||
|
pRemove->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip);
|
||||||
|
pRemove->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
|
||||||
|
mgmtSendRemoveTableMsg(pRemove, &ipSet, NULL);
|
||||||
|
|
||||||
|
if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) {
|
||||||
|
mError("table:%s, update ctables sdb error", pTable->tableId);
|
||||||
|
return TSDB_CODE_SDB_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
if (pVgroup->numOfTables <= 0) {
|
if (pVgroup->numOfTables <= 0) {
|
||||||
mgmtDropVgroup(pDb, pVgroup);
|
mgmtDropVgroup(pDb, pVgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* mgmtGetChildTable(char *tableId) {
|
void* mgmtGetChildTable(char *tableId) {
|
||||||
|
|
|
@ -149,6 +149,7 @@ static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t con
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcSendResponse(info->thandle, code, NULL, 0);
|
rpcSendResponse(info->thandle, code, NULL, 0);
|
||||||
|
free(info);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtSendCreateTableMsg(SDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) {
|
void mgmtSendCreateTableMsg(SDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) {
|
||||||
|
@ -157,58 +158,49 @@ void mgmtSendCreateTableMsg(SDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *a
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mgmtProcessRemoveTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
static void mgmtProcessRemoveTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
||||||
mTrace("remove table rsp received, handle:%p code:%d", thandle, code);
|
mTrace("remove table rsp received, thandle:%p code:%d", thandle, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mgmtSendRemoveTableMsg(SDRemoveTableMsg *pRemove, SRpcIpSet *ipSet, void *ahandle) {
|
||||||
|
mTrace("table:%s, sid:%d send remove table msg, ahandle:%p", pRemove->tableId, htonl(pRemove->sid), ahandle);
|
||||||
|
if (pRemove != NULL) {
|
||||||
|
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_REMOVE_TABLE, pRemove, sizeof(SDRemoveTableMsg), ahandle);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mgmtProcessFreeVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
static void mgmtProcessFreeVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
||||||
mTrace("free vnode rsp received, handle:%p code:%d", thandle, code);
|
mTrace("free vnode rsp received, handle:%p code:%d", thandle, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *ahandle, int32_t code) {
|
static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
||||||
SProcessInfo *vgprocess = ahandle;
|
mTrace("create vnode rsp received, thandle:%p code:%d", thandle, code);
|
||||||
mTrace("create vnode received, vgprocess:%p code:%d", vgprocess, code);
|
if (thandle == NULL) return;
|
||||||
|
|
||||||
if (vgprocess == NULL) {
|
SProcessInfo *info = thandle;
|
||||||
rpcFreeCont(pCont);
|
assert(info->type == TSDB_PROCESS_CREATE_VGROUP);
|
||||||
return;
|
info->received++;
|
||||||
|
SVgObj *pVgroup = info->ahandle;
|
||||||
|
|
||||||
|
mTrace("vgroup:%d, received:%d numOfVnodes:%d", pVgroup->vgId, info->received, pVgroup->numOfVnodes);
|
||||||
|
if (info->received == pVgroup->numOfVnodes) {
|
||||||
|
mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle);
|
||||||
|
free(info);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(vgprocess->type == TSDB_PROCESS_CREATE_VGROUP);
|
|
||||||
|
|
||||||
vgprocess->received++;
|
|
||||||
SVgObj *pVgroup = vgprocess->ahandle;
|
|
||||||
|
|
||||||
mTrace("vgroup:%d, received:%d numOfVnodes:%d vgprocess:%p tbprocess::%p",
|
|
||||||
pVgroup->vgId, vgprocess->received, pVgroup->numOfVnodes, vgprocess->thandle);
|
|
||||||
if (vgprocess->received == pVgroup->numOfVnodes) {
|
|
||||||
|
|
||||||
STableInfo *pTable = vgprocess->ahandle;
|
|
||||||
SProcessInfo *tbprocess = calloc(1, sizeof(SProcessInfo));
|
|
||||||
tbprocess->type = TSDB_PROCESS_CREATE_TABLE;
|
|
||||||
tbprocess->thandle = tbprocess->thandle;
|
|
||||||
tbprocess->ahandle = pTable;
|
|
||||||
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
|
|
||||||
mgmtSendCreateTableMsg(pTable, &ipSet, tbprocess);
|
|
||||||
|
|
||||||
free(vgprocess);
|
|
||||||
}
|
|
||||||
|
|
||||||
rpcFreeCont(pCont);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *table_info) {
|
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||||
mTrace("vgroup:%d, send create all vnodes msg, table_info:%p", pVgroup->vgId, table_info);
|
mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||||
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
|
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||||
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
|
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
|
||||||
mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, table_info);
|
mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *table_info) {
|
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
|
||||||
mTrace("vgroup:%d, send create vnode:%d msg, table_info:%p", pVgroup->vgId, vnode, table_info);
|
mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle);
|
||||||
SVPeersMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode);
|
SVPeersMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode);
|
||||||
if (pVpeer != NULL) {
|
if (pVpeer != NULL) {
|
||||||
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), table_info);
|
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), ahandle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,14 +234,6 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
|
|
||||||
mTrace("table:%s, sid:%d send remove table msg, handle:%p", pTable->tableId, pTable->sid);
|
|
||||||
|
|
||||||
SDRemoveTableMsg *pRemove = mgmtBuildRemoveTableMsg(pTable);
|
|
||||||
if (pRemove != NULL) {
|
|
||||||
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_REMOVE_TABLE, pRemove, sizeof(SDRemoveTableMsg), ahandle);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
|
void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
|
||||||
mTrace("table:%s, sid:%d send alter stream msg, handle:%p", pTable->tableId, pTable->sid);
|
mTrace("table:%s, sid:%d send alter stream msg, handle:%p", pTable->tableId, pTable->sid);
|
||||||
|
|
|
@ -291,43 +291,58 @@ void mgmtCleanUpNormalTables() {
|
||||||
sdbCloseTable(tsNormalTableSdb);
|
sdbCloseTable(tsNormalTableSdb);
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) {
|
static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
|
||||||
// int8_t *pMsg = NULL;
|
int32_t totalCols = pTable->numOfColumns;
|
||||||
// SDCreateTableMsg *pCreateTable = (SDCreateTableMsg *) pMsg;
|
int32_t contLen = sizeof(SCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
|
||||||
// memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
|
|
||||||
// pCreateTable->vnode = htobe32(vnode);
|
|
||||||
// pCreateTable->sid = htobe32(pTable->sid);
|
|
||||||
// pCreateTable->uid = htobe64(pTable->uid);
|
|
||||||
// pCreateTable->createdTime = htobe64(pTable->createdTime);
|
|
||||||
// pCreateTable->sversion = htobe32(pTable->sversion);
|
|
||||||
// pCreateTable->numOfColumns = htobe16(pTable->numOfColumns);
|
|
||||||
//
|
|
||||||
// SSchema *pSchema = pTable->schema;
|
|
||||||
// int32_t totalCols = pCreateTable->numOfColumns;
|
|
||||||
|
|
||||||
// for (int32_t col = 0; col < totalCols; ++col) {
|
SDCreateTableMsg *pCreateTable = rpcMallocCont(contLen);
|
||||||
// SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col];
|
if (pCreateTable == NULL) {
|
||||||
// colData->type = pSchema[col].type;
|
return NULL;
|
||||||
// colData->bytes = htons(pSchema[col].bytes);
|
}
|
||||||
// colData->colId = htons(pSchema[col].colId);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// int32_t totalColsSize = sizeof(SMColumn *) * totalCols;
|
memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
|
||||||
// pMsg = pCreateTable->data + totalColsSize;
|
pCreateTable->tableType = pTable->type;
|
||||||
|
pCreateTable->numOfColumns = htons(pTable->numOfColumns);
|
||||||
|
pCreateTable->numOfTags = htons(0);
|
||||||
|
pCreateTable->sid = htonl(pTable->sid);
|
||||||
|
pCreateTable->sversion = htonl(pTable->sversion);
|
||||||
|
pCreateTable->tagDataLen = htonl(0);
|
||||||
|
pCreateTable->sqlDataLen = htonl(pTable->sqlLen);
|
||||||
|
pCreateTable->contLen = htonl(contLen);
|
||||||
|
pCreateTable->numOfVPeers = htonl(pVgroup->numOfVnodes);
|
||||||
|
pCreateTable->uid = htobe64(pTable->uid);
|
||||||
|
pCreateTable->superTableUid = htobe64(0);
|
||||||
|
pCreateTable->createdTime = htobe64(pTable->createdTime);
|
||||||
|
|
||||||
// return pMsg;
|
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||||
return NULL;
|
pCreateTable->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip);
|
||||||
|
pCreateTable->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSchema *pSchema = (SSchema *) pCreateTable->data;
|
||||||
|
memcpy(pSchema, pTable->schema, totalCols * sizeof(SSchema));
|
||||||
|
for (int32_t col = 0; col < totalCols; ++col) {
|
||||||
|
pSchema->bytes = htons(pSchema->bytes);
|
||||||
|
pSchema->colId = htons(pSchema->colId);
|
||||||
|
pSchema++;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pCreateTable->data + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen);
|
||||||
|
|
||||||
|
return pCreateTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid, SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
|
int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
|
||||||
|
SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
|
||||||
int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb);
|
int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb);
|
||||||
if (numOfTables >= TSDB_MAX_NORMAL_TABLES) {
|
if (numOfTables >= TSDB_MAX_NORMAL_TABLES) {
|
||||||
mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_NORMAL_TABLES);
|
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_NORMAL_TABLES);
|
||||||
return TSDB_CODE_TOO_MANY_TABLES;
|
return TSDB_CODE_TOO_MANY_TABLES;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNormalTableObj *pTable = (SNormalTableObj *) calloc(sizeof(SNormalTableObj), 1);
|
SNormalTableObj *pTable = (SNormalTableObj *) calloc(sizeof(SNormalTableObj), 1);
|
||||||
if (pTable == NULL) {
|
if (pTable == NULL) {
|
||||||
|
mError("table:%s, failed to alloc memory", pCreate->tableId);
|
||||||
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -339,6 +354,7 @@ int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
|
||||||
pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
|
pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
|
||||||
pTable->sversion = 0;
|
pTable->sversion = 0;
|
||||||
pTable->numOfColumns = pCreate->numOfColumns;
|
pTable->numOfColumns = pCreate->numOfColumns;
|
||||||
|
pTable->sqlLen = pTable->sqlLen;
|
||||||
|
|
||||||
int32_t numOfCols = pCreate->numOfColumns;
|
int32_t numOfCols = pCreate->numOfColumns;
|
||||||
int32_t schemaSize = numOfCols * sizeof(SSchema);
|
int32_t schemaSize = numOfCols * sizeof(SSchema);
|
||||||
|
@ -373,6 +389,14 @@ int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
|
||||||
return TSDB_CODE_SDB_ERROR;
|
return TSDB_CODE_SDB_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*pDCreateOut = mgmtBuildCreateNormalTableMsg(pTable, pVgroup);
|
||||||
|
if (*pDCreateOut == NULL) {
|
||||||
|
mError("table:%s, failed to build create table message", pCreate->tableId);
|
||||||
|
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pTableOut = pTable;
|
||||||
|
|
||||||
mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 ,
|
mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 ,
|
||||||
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid);
|
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid);
|
||||||
|
|
||||||
|
@ -382,18 +406,38 @@ int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
|
||||||
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
|
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
|
||||||
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
||||||
if (pVgroup == NULL) {
|
if (pVgroup == NULL) {
|
||||||
|
mError("table:%s, failed to drop normal table, vgroup not exist", pTable->tableId);
|
||||||
return TSDB_CODE_OTHERS;
|
return TSDB_CODE_OTHERS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup);
|
SDRemoveTableMsg *pRemove = rpcMallocCont(sizeof(SDRemoveTableMsg));
|
||||||
|
if (pRemove == NULL) {
|
||||||
|
mError("table:%s, failed to drop normal table, no enough memory", pTable->tableId);
|
||||||
|
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
sdbDeleteRow(tsNormalTableSdb, pTable);
|
pRemove->sid = htonl(pTable->sid);
|
||||||
|
pRemove->uid = htobe64(pTable->uid);
|
||||||
|
|
||||||
|
pRemove->numOfVPeers = htonl(pVgroup->numOfVnodes);
|
||||||
|
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||||
|
pRemove->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip);
|
||||||
|
pRemove->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
|
||||||
|
mgmtSendRemoveTableMsg(pRemove, &ipSet, NULL);
|
||||||
|
|
||||||
|
if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) {
|
||||||
|
mError("table:%s, update ntables sdb error", pTable->tableId);
|
||||||
|
return TSDB_CODE_SDB_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
if (pVgroup->numOfTables <= 0) {
|
if (pVgroup->numOfTables <= 0) {
|
||||||
mgmtDropVgroup(pDb, pVgroup);
|
mgmtDropVgroup(pDb, pVgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* mgmtGetNormalTable(char *tableId) {
|
void* mgmtGetNormalTable(char *tableId) {
|
||||||
|
|
|
@ -782,38 +782,42 @@ void mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtCreateTable(pCreate, contLen, ahandle);
|
int32_t code = mgmtCreateTable(pCreate, contLen, ahandle);
|
||||||
|
if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
rpcSendResponse(ahandle, code, NULL, 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtProcessDropTableMsg(void *pCont, int32_t contLen, void *ahandle) {
|
void mgmtProcessDropTableMsg(void *pCont, int32_t contLen, void *ahandle) {
|
||||||
|
SDropTableMsg *pDrop = (SDropTableMsg *) pCont;
|
||||||
|
|
||||||
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
|
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
|
||||||
|
mError("table:%s, failed to drop table, need redirect message", pDrop->tableId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
|
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
|
||||||
if (pUser == NULL) {
|
if (pUser == NULL) {
|
||||||
|
mError("table:%s, failed to drop table, invalid user", pDrop->tableId);
|
||||||
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
|
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDropTableMsg *pDrop = (SDropTableMsg *) pCont;
|
|
||||||
int32_t code;
|
|
||||||
|
|
||||||
if (!pUser->writeAuth) {
|
if (!pUser->writeAuth) {
|
||||||
code = TSDB_CODE_NO_RIGHTS;
|
mError("table:%s, failed to drop table, no rights", pDrop->tableId);
|
||||||
} else {
|
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
|
||||||
SDbObj *pDb = mgmtGetDb(pDrop->db);
|
return;
|
||||||
if (pDb) {
|
|
||||||
code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists);
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
|
||||||
mTrace("table:%s is dropped by user:%s", pDrop->tableId, pUser->user);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
code = TSDB_CODE_DB_NOT_SELECTED;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
SDbObj *pDb = mgmtGetDb(pDrop->db);
|
||||||
|
if (pDb == NULL) {
|
||||||
|
mError("table:%s, failed to drop table, db:%s not selected", pDrop->tableId, pDrop->db);
|
||||||
|
rpcSendResponse(ahandle, TSDB_CODE_DB_NOT_SELECTED, NULL, 0);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists);
|
||||||
|
if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
rpcSendResponse(ahandle, code, NULL, 0);
|
rpcSendResponse(ahandle, code, NULL, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -608,17 +608,6 @@ void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable) {
|
||||||
pStable->numOfTables--;
|
pStable->numOfTables--;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mgmtGetTagsLength(SSuperTableObj* pSuperTable, int32_t col) { // length before column col
|
|
||||||
int32_t len = 0;
|
|
||||||
int32_t tagColumnIndexOffset = pSuperTable->numOfColumns;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pSuperTable->numOfTags && i < col; ++i) {
|
|
||||||
len += ((SSchema*)pSuperTable->schema)[tagColumnIndexOffset + i].bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
return len;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) {
|
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) {
|
||||||
int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags;
|
int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags;
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
|
|
@ -164,7 +164,7 @@ void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t c
|
||||||
code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable);
|
code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable);
|
||||||
} else {
|
} else {
|
||||||
mTrace("table:%s, start to create normal table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid);
|
mTrace("table:%s, start to create normal table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid);
|
||||||
code = mgmtCreateNormalTable(pCreate, pVgroup, sid, &pDCreate, &pTable);
|
code = mgmtCreateNormalTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -178,31 +178,29 @@ void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t c
|
||||||
|
|
||||||
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
|
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
|
||||||
info->type = TSDB_PROCESS_CREATE_TABLE;
|
info->type = TSDB_PROCESS_CREATE_TABLE;
|
||||||
info->ahandle = pTable;
|
|
||||||
info->thandle = thandle;
|
info->thandle = thandle;
|
||||||
|
info->ahandle = pTable;
|
||||||
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
|
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
|
||||||
|
|
||||||
mgmtSendCreateTableMsg(pDCreate, &ipSet, info);
|
mgmtSendCreateTableMsg(pDCreate, &ipSet, info);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle) {
|
int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle) {
|
||||||
SDbObj *pDb = mgmtGetDb(pCreate->db);
|
SDbObj *pDb = mgmtGetDb(pCreate->db);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
mError("table:%s, failed to create table, db not selected", pCreate->tableId);
|
mError("table:%s, failed to create table, db not selected", pCreate->tableId);
|
||||||
rpcSendResponse(thandle, TSDB_CODE_DB_NOT_SELECTED, NULL, 0);
|
return TSDB_CODE_DB_NOT_SELECTED;
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
STableInfo *pTable = mgmtGetTable(pCreate->tableId);
|
STableInfo *pTable = mgmtGetTable(pCreate->tableId);
|
||||||
if (pTable != NULL) {
|
if (pTable != NULL) {
|
||||||
if (pCreate->igExists) {
|
if (pCreate->igExists) {
|
||||||
mTrace("table:%s, table is alredy exist, think it success", pCreate->tableId);
|
mTrace("table:%s, table is already exist, think it success", pCreate->tableId);
|
||||||
rpcSendResponse(thandle, TSDB_CODE_SUCCESS, NULL, 0);
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
mError("table:%s, failed to create table, table already exist", pCreate->tableId);
|
mError("table:%s, failed to create table, table already exist", pCreate->tableId);
|
||||||
rpcSendResponse(thandle, TSDB_CODE_TABLE_ALREADY_EXIST, NULL, 0);
|
return TSDB_CODE_TABLE_ALREADY_EXIST;
|
||||||
}
|
}
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
|
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
|
||||||
|
@ -211,60 +209,70 @@ void mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle) {
|
||||||
int32_t code = mgmtCheckTableLimit(pAcct, pCreate);
|
int32_t code = mgmtCheckTableLimit(pAcct, pCreate);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
mError("table:%s, failed to create table, table num exceed the limit", pCreate->tableId);
|
mError("table:%s, failed to create table, table num exceed the limit", pCreate->tableId);
|
||||||
rpcSendResponse(thandle, code, NULL, 0);
|
return code;
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mgmtCheckExpired()) {
|
if (mgmtCheckExpired()) {
|
||||||
mError("table:%s, failed to create table, grant expired", pCreate->tableId);
|
mError("table:%s, failed to create table, grant expired", pCreate->tableId);
|
||||||
rpcSendResponse(thandle, TSDB_CODE_GRANT_EXPIRED, NULL, 0);
|
return TSDB_CODE_GRANT_EXPIRED;
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCreate->numOfTags != 0) {
|
if (pCreate->numOfTags != 0) {
|
||||||
mTrace("table:%s, start to create super table, tags:%d columns:%d", pCreate->tableId, pCreate->numOfTags, pCreate->numOfColumns);
|
mTrace("table:%s, start to create super table, tags:%d columns:%d",
|
||||||
mgmtCreateSuperTable(pDb, pCreate);
|
pCreate->tableId, pCreate->numOfTags, pCreate->numOfColumns);
|
||||||
return;
|
return mgmtCreateSuperTable(pDb, pCreate);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = mgmtCheckTimeSeries(pCreate->numOfColumns);
|
code = mgmtCheckTimeSeries(pCreate->numOfColumns);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
mError("table:%s, failed to create table, timeseries exceed the limit", pCreate->tableId);
|
mError("table:%s, failed to create table, timeseries exceed the limit", pCreate->tableId);
|
||||||
return;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgObj *pVgroup = mgmtGetAvailableVgroup(pDb);
|
SVgObj *pVgroup = mgmtGetAvailableVgroup(pDb);
|
||||||
if (pVgroup == NULL) {
|
if (pVgroup == NULL) {
|
||||||
mTrace("table:%s, no avaliable vgroup, start to create a new one", pCreate->tableId, pVgroup->vgId);
|
mTrace("table:%s, no avaliable vgroup, start to create a new one", pCreate->tableId);
|
||||||
mgmtProcessCreateVgroup(pCreate, contLen, thandle);
|
mgmtProcessCreateVgroup(pCreate, contLen, thandle);
|
||||||
} else {
|
} else {
|
||||||
mTrace("table:%s, try to create table in vgroup:%d", pCreate->tableId, pVgroup->vgId);
|
mTrace("table:%s, try to create table in vgroup:%d", pCreate->tableId, pVgroup->vgId);
|
||||||
mgmtProcessCreateTable(pVgroup, pCreate, contLen, thandle);
|
mgmtProcessCreateTable(pVgroup, pCreate, contLen, thandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) {
|
int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) {
|
||||||
STableInfo *pTable = mgmtGetTable(tableId);
|
STableInfo *pTable = mgmtGetTable(tableId);
|
||||||
if (pTable == NULL) {
|
if (pTable == NULL) {
|
||||||
if (ignore) {
|
if (ignore) {
|
||||||
|
mTrace("table:%s, table is not exist, think it success", tableId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
|
mError("table:%s, failed to create table, table not exist", tableId);
|
||||||
return TSDB_CODE_INVALID_TABLE;
|
return TSDB_CODE_INVALID_TABLE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
|
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
|
||||||
|
mError("table:%s, failed to create table, in monitor database", tableId);
|
||||||
return TSDB_CODE_MONITOR_DB_FORBIDDEN;
|
return TSDB_CODE_MONITOR_DB_FORBIDDEN;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (pTable->type) {
|
switch (pTable->type) {
|
||||||
case TSDB_TABLE_TYPE_SUPER_TABLE:
|
case TSDB_TABLE_TYPE_SUPER_TABLE:
|
||||||
|
mTrace("table:%s, start to drop super table", tableId);
|
||||||
return mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable);
|
return mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable);
|
||||||
case TSDB_TABLE_TYPE_CHILD_TABLE:
|
case TSDB_TABLE_TYPE_CHILD_TABLE:
|
||||||
|
mTrace("table:%s, start to drop child table", tableId);
|
||||||
return mgmtDropChildTable(pDb, (SChildTableObj *) pTable);
|
return mgmtDropChildTable(pDb, (SChildTableObj *) pTable);
|
||||||
case TSDB_TABLE_TYPE_NORMAL_TABLE:
|
case TSDB_TABLE_TYPE_NORMAL_TABLE:
|
||||||
|
mTrace("table:%s, start to drop normal table", tableId);
|
||||||
|
return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable);
|
||||||
|
case TSDB_TABLE_TYPE_STREAM_TABLE:
|
||||||
|
mTrace("table:%s, start to drop stream table", tableId);
|
||||||
return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable);
|
return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable);
|
||||||
default:
|
default:
|
||||||
|
mError("table:%s, invalid table type:%d", tableId, pTable->type);
|
||||||
return TSDB_CODE_INVALID_TABLE;
|
return TSDB_CODE_INVALID_TABLE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue