[TD-9] refact create table msg

This commit is contained in:
slguan 2020-03-15 22:07:50 +08:00
parent ab805d418e
commit 63d2e696d2
10 changed files with 78 additions and 69 deletions

View File

@ -2032,6 +2032,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscClearFieldInfo(&pQueryInfo->fieldsInfo);
msgLen = pMsg - (char*)pCreateTableMsg;
pCreateTableMsg->contLen = htonl(msgLen);
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;

View File

@ -39,7 +39,7 @@ extern "C" {
#include "ttimer.h"
#include "tutil.h"
typedef struct {
typedef struct {
uint32_t privateIp;
int32_t sid;
uint32_t moduleStatus;
@ -97,6 +97,7 @@ struct _vg_obj;
typedef struct SSuperTableObj {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
int8_t dirty;
uint64_t uid;
int32_t sid;
int32_t vgId;
@ -104,7 +105,7 @@ typedef struct SSuperTableObj {
int32_t sversion;
int32_t numOfColumns;
int32_t numOfTags;
int8_t reserved[7];
int8_t reserved[5];
int8_t updateEnd[1];
int32_t numOfTables;
int16_t nextColId;
@ -114,12 +115,13 @@ typedef struct SSuperTableObj {
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
int8_t dirty;
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
char superTableId[TSDB_TABLE_ID_LEN + 1];
int8_t reserved[7];
int8_t reserved[1];
int8_t updateEnd[1];
SSuperTableObj *superTable;
} SChildTableObj;
@ -127,13 +129,14 @@ typedef struct {
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
int8_t dirty;
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
int32_t sversion;
int32_t numOfColumns;
int16_t sqlLen;
int32_t sqlLen;
int8_t reserved[3];
int8_t updateEnd[1];
char* sql; //null-terminated string

View File

@ -264,7 +264,8 @@ typedef struct {
int16_t numOfTags;
int16_t numOfColumns;
int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string
int16_t reserved[16];
int32_t contLen;
int8_t reserved[16];
char schema[];
} SCMCreateTableMsg;

View File

@ -30,8 +30,9 @@ int32_t mgmtInitChildTables();
void mgmtCleanUpChildTables();
void * mgmtGetChildTable(char *tableId);
int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
void *mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pCreate, SChildTableObj *pTable);
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable);
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);

View File

@ -28,8 +28,9 @@ int32_t mgmtInitNormalTables();
void mgmtCleanUpNormalTables();
void * mgmtGetNormalTable(char *tableId);
int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
void * mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
void * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable);
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable);
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);

View File

@ -31,7 +31,7 @@ void mgmtCleanUpSuperTables();
void * mgmtGetSuperTable(char *tableId);
int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate);
int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate);
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable);
int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags);
int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName);

View File

@ -272,19 +272,22 @@ void mgmtCleanUpChildTables() {
sdbCloseTable(tsChildTableSdb);
}
static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, void *pTagData, int32_t tagDataLen) {
int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen;
void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTable) {
char *pTagData = pMsg->schema + TSDB_TABLE_ID_LEN + 1;
int32_t tagDataLen = htonl(pMsg->contLen) - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1;
int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen;
SMDCreateTableMsg *pCreate = rpcMallocCont(contLen);
if (pCreate == NULL) {
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
memcpy(pCreate->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN);
memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN + 1);
memcpy(pCreate->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN + 1);
pCreate->contLen = htonl(contLen);
pCreate->vgId = htonl(pVgroup->vgId);
pCreate->vgId = htonl(pTable->vgId);
pCreate->tableType = pTable->type;
pCreate->numOfColumns = htons(pTable->superTable->numOfColumns);
pCreate->numOfTags = htons(pTable->superTable->numOfTags);
@ -305,36 +308,38 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou
}
memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData, tagDataLen);
return pCreate;
}
int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SMDCreateTableMsg **pMDCreateOut, STableInfo **pTableOut) {
void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t tid) {
int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb);
if (numOfTables >= tsMaxTables) {
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, tsMaxTables);
return TSDB_CODE_TOO_MANY_TABLES;
terrno = TSDB_CODE_TOO_MANY_TABLES;
return NULL;
}
char *pTagData = (char *) pCreate->schema; // it is a tag key
SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData);
if (pSuperTable == NULL) {
mError("table:%s, corresponding super table does not exist", pCreate->tableId);
return TSDB_CODE_INVALID_TABLE;
terrno = TSDB_CODE_INVALID_TABLE;
return NULL;
}
SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1);
if (pTable == NULL) {
mError("table:%s, failed to alloc memory", pCreate->tableId);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
strcpy(pTable->tableId, pCreate->tableId);
strcpy(pTable->superTableId, pSuperTable->tableId);
pTable->type = TSDB_CHILD_TABLE;
pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) +
((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
pTable->sid = sid;
pTable->sid = tid;
pTable->vgId = pVgroup->vgId;
pTable->createdTime = taosGetTimestampMs();
pTable->superTable = pSuperTable;
@ -342,21 +347,12 @@ int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj
if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) {
free(pTable);
mError("table:%s, update sdb error", pCreate->tableId);
return TSDB_CODE_SDB_ERROR;
terrno = TSDB_CODE_SDB_ERROR;
return NULL;
}
pTagData += (TSDB_TABLE_ID_LEN + 1);
int32_t tagDataLen = contLen - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1;
*pMDCreateOut = mgmtBuildCreateChildTableMsg(pTable, pVgroup, pTagData, tagDataLen);
if (*pMDCreateOut == NULL) {
mError("table:%s, failed to build create table message", pCreate->tableId);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
*pTableOut = (STableInfo *) pTable;
mTrace("table:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid);
return TSDB_CODE_SUCCESS;
return pTable;
}
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {

View File

@ -287,18 +287,19 @@ void mgmtCleanUpNormalTables() {
sdbCloseTable(tsNormalTableSdb);
}
static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) {
int32_t totalCols = pTable->numOfColumns;
int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
SMDCreateTableMsg *pCreate = rpcMallocCont(contLen);
if (pCreate == NULL) {
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN + 1);
pCreate->contLen = htonl(contLen);
pCreate->vgId = htonl(pVgroup->vgId);
pCreate->vgId = htonl(pTable->vgId);
pCreate->tableType = pTable->type;
pCreate->numOfColumns = htons(pTable->numOfColumns);
pCreate->numOfTags = 0;
@ -319,22 +320,22 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr
}
memcpy(pCreate + sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen);
return pCreate;
}
int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) {
int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb);
if (numOfTables >= TSDB_MAX_NORMAL_TABLES) {
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_NORMAL_TABLES);
return TSDB_CODE_TOO_MANY_TABLES;
terrno = TSDB_CODE_TOO_MANY_TABLES;
return NULL;
}
SNormalTableObj *pTable = (SNormalTableObj *) calloc(sizeof(SNormalTableObj), 1);
if (pTable == NULL) {
mError("table:%s, failed to alloc memory", pCreate->tableId);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
strcpy(pTable->tableId, pCreate->tableId);
@ -352,7 +353,8 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb
pTable->schema = (SSchema *) calloc(1, schemaSize);
if (pTable->schema == NULL) {
free(pTable);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
@ -368,7 +370,8 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb
pTable->sql = calloc(1, pTable->sqlLen);
if (pTable->sql == NULL) {
free(pTable);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pTable->sqlLen);
pTable->sql[pTable->sqlLen - 1] = 0;
@ -378,20 +381,12 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb
if (sdbInsertRow(tsNormalTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pTable->tableId);
free(pTable);
return TSDB_CODE_SDB_ERROR;
terrno = TSDB_CODE_SDB_ERROR;
return NULL;
}
*pDCreateOut = mgmtBuildCreateNormalTableMsg(pTable, pVgroup);
if (*pDCreateOut == NULL) {
mError("table:%s, failed to build create table message", pTable->tableId);
sdbDeleteRow(tsNormalTableSdb, pTable);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
*pTableOut = (STableInfo *) pTable;
mTrace("table:%s, create ntable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid);
return TSDB_CODE_SUCCESS;
return pTable;
}
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {

View File

@ -202,7 +202,7 @@ void mgmtCleanUpSuperTables() {
sdbCloseTable(tsSuperTableSdb);
}
int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate) {
int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) {
int32_t numOfTables = sdbGetNumOfRows(tsSuperTableSdb);
if (numOfTables >= TSDB_MAX_SUPER_TABLES) {
mError("stable:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_SUPER_TABLES);

View File

@ -82,6 +82,9 @@ int32_t mgmtInitTables() {
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateTableRsp);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, NULL);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, NULL);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, NULL);
return TSDB_CODE_SUCCESS;
}
@ -134,10 +137,6 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo
return TSDB_CODE_SUCCESS;
}
static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) {
}
int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter) {
STableInfo *pTable = mgmtGetTable(pAlter->tableId);
if (pTable == NULL) {
@ -401,7 +400,7 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
if (pCreate->numOfTags != 0) {
mTrace("table:%s, is a super table", pCreate->tableId);
code = mgmtCreateSuperTable(pMsg->pDb, pCreate);
code = mgmtCreateSuperTable(pCreate);
mgmtSendSimpleResp(pMsg->thandle, code);
return;
}
@ -434,16 +433,28 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
SMDCreateTableMsg *pMDCreate = NULL;
if (pCreate->numOfColumns == 0) {
mTrace("table:%s, is a child table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg);
code = mgmtCreateChildTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable);
pTable = mgmtCreateChildTable(pCreate, pVgroup, sid);
if (pTable == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
}
pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, pTable);
if (pCreate == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
}
} else {
mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg);
code = mgmtCreateNormalTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable);
}
if (code != TSDB_CODE_SUCCESS) {
mTrace("table:%s, failed to create in vgroup:%d", pCreate->tableId, pVgroup->vgId);
mgmtSendSimpleResp(pMsg->thandle, code);
return;
code = mgmtCreateNormalTable(pCreate, pVgroup, sid);
if (pTable == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
}
pMDCreate = mgmtBuildCreateNormalTableMsg(pTable);
if (pCreate == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
}
}
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);