Merge pull request #1991 from taosdata/patch/td-376
create/alter table: only send actual tag data to reduce message size
This commit is contained in:
commit
bf4c6d0eb1
|
@ -779,7 +779,6 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
|
|||
|
||||
STagData *pTag = (STagData *)pCmd->payload;
|
||||
memset(pTag, 0, sizeof(STagData));
|
||||
pCmd->payloadLen = sizeof(STagData);
|
||||
|
||||
/*
|
||||
* the source super table is moved to the secondary position of the pTableMetaInfo list
|
||||
|
@ -928,6 +927,14 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
|
|||
}
|
||||
}
|
||||
|
||||
// 3. calculate the actual data size of STagData
|
||||
pCmd->payloadLen = sizeof(pTag->name) + sizeof(pTag->dataLen);
|
||||
for (int32_t t = 0; t < numOfTags; ++t) {
|
||||
pTag->dataLen += pTagSchema[t].bytes;
|
||||
pCmd->payloadLen += pTagSchema[t].bytes;
|
||||
}
|
||||
pTag->dataLen = htonl(pTag->dataLen);
|
||||
|
||||
if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr);
|
||||
}
|
||||
|
|
|
@ -4416,6 +4416,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg13);
|
||||
}
|
||||
pAlterSQL->tagData.dataLen = pTagsSchema->bytes;
|
||||
|
||||
// validate the length of binary
|
||||
if ((pTagsSchema->type == TSDB_DATA_TYPE_BINARY || pTagsSchema->type == TSDB_DATA_TYPE_NCHAR) &&
|
||||
|
@ -5550,11 +5551,11 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
// too long tag values will return invalid sql, not be truncated automatically
|
||||
SSchema* pTagSchema = tscGetTableTagSchema(pStableMeterMetaInfo->pTableMeta);
|
||||
|
||||
char* tagVal = pCreateTable->usingInfo.tagdata.data;
|
||||
STagData* pTag = &pCreateTable->usingInfo.tagdata;
|
||||
char* tagVal = pTag->data;
|
||||
int32_t ret = TSDB_CODE_SUCCESS;
|
||||
|
||||
for (int32_t i = 0; i < pList->nExpr; ++i) {
|
||||
|
||||
if (pTagSchema[i].type == TSDB_DATA_TYPE_BINARY || pTagSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
|
||||
// validate the length of binary
|
||||
if (pList->a[i].pVar.nLen + VARSTR_HEADER_SIZE > pTagSchema[i].bytes) {
|
||||
|
@ -5593,6 +5594,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
pTag->dataLen = tagVal - pTag->data;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -1213,8 +1213,13 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
int8_t type = pInfo->pCreateTableInfo->type;
|
||||
if (type == TSQL_CREATE_TABLE_FROM_STABLE) { // create by using super table, tags value
|
||||
memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
|
||||
pMsg += sizeof(STagData);
|
||||
STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
|
||||
*(int32_t*)pMsg = htonl(pTag->dataLen);
|
||||
pMsg += sizeof(int32_t);
|
||||
memcpy(pMsg, pTag->name, sizeof(pTag->name));
|
||||
pMsg += sizeof(pTag->name);
|
||||
memcpy(pMsg, pTag->data, pTag->dataLen);
|
||||
pMsg += pTag->dataLen;
|
||||
} else { // create (super) table
|
||||
pSchema = (SSchema *)pCreateTableMsg->schema;
|
||||
|
||||
|
@ -1281,9 +1286,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
|
||||
pAlterTableMsg->type = htons(pAlterInfo->type);
|
||||
|
||||
pAlterTableMsg->numOfCols = tscNumOfFields(pQueryInfo);
|
||||
memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN);
|
||||
|
||||
pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
|
||||
SSchema *pSchema = pAlterTableMsg->schema;
|
||||
for (int i = 0; i < pAlterTableMsg->numOfCols; ++i) {
|
||||
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
|
||||
|
@ -1295,6 +1298,9 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
}
|
||||
|
||||
pMsg = (char *)pSchema;
|
||||
pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
|
||||
memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
|
||||
pMsg += pAlterInfo->tagData.dataLen;
|
||||
|
||||
msgLen = pMsg - (char*)pAlterTableMsg;
|
||||
pCmd->payloadLen = msgLen;
|
||||
|
|
|
@ -269,9 +269,11 @@ typedef struct {
|
|||
char tableId[TSDB_TABLE_ID_LEN + 1];
|
||||
char db[TSDB_DB_NAME_LEN + 1];
|
||||
int16_t type; /* operation type */
|
||||
char tagVal[TSDB_MAX_BYTES_PER_ROW];
|
||||
int8_t numOfCols; /* number of schema */
|
||||
int16_t numOfCols; /* number of schema */
|
||||
int32_t tagValLen;
|
||||
SSchema schema[];
|
||||
// tagVal is padded after schema
|
||||
// char tagVal[];
|
||||
} SCMAlterTableMsg;
|
||||
|
||||
typedef struct {
|
||||
|
@ -647,6 +649,7 @@ typedef struct SMultiTableMeta {
|
|||
} SMultiTableMeta;
|
||||
|
||||
typedef struct {
|
||||
int32_t dataLen;
|
||||
char name[TSDB_TABLE_ID_LEN + 1];
|
||||
char data[TSDB_MAX_TAGS_LEN];
|
||||
} STagData;
|
||||
|
|
|
@ -1334,13 +1334,13 @@ static void mgmtProcessDropSuperTableRsp(SRpcMsg *rpcMsg) {
|
|||
}
|
||||
|
||||
static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTable) {
|
||||
char * pTagData = NULL;
|
||||
STagData * pTagData = NULL;
|
||||
int32_t tagDataLen = 0;
|
||||
int32_t totalCols = 0;
|
||||
int32_t contLen = 0;
|
||||
if (pTable->info.type == TSDB_CHILD_TABLE && pMsg != NULL) {
|
||||
pTagData = pMsg->schema + TSDB_TABLE_ID_LEN + 1;
|
||||
tagDataLen = htonl(pMsg->contLen) - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1;
|
||||
pTagData = (STagData*)pMsg->schema;
|
||||
tagDataLen = ntohl(pTagData->dataLen);
|
||||
totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
|
||||
contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen + pTable->sqlLen;
|
||||
} else {
|
||||
|
@ -1393,7 +1393,7 @@ static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableOb
|
|||
}
|
||||
|
||||
if (pTable->info.type == TSDB_CHILD_TABLE && pMsg != NULL) {
|
||||
memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData, tagDataLen);
|
||||
memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData->data, tagDataLen);
|
||||
memcpy(pCreate->data + totalCols * sizeof(SSchema) + tagDataLen, pTable->sql, pTable->sqlLen);
|
||||
}
|
||||
|
||||
|
@ -1420,10 +1420,10 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj
|
|||
pTable->vgId = pVgroup->vgId;
|
||||
|
||||
if (pTable->info.type == TSDB_CHILD_TABLE) {
|
||||
char *pTagData = (char *) pCreate->schema; // it is a tag key
|
||||
SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData);
|
||||
STagData *pTagData = (STagData *) pCreate->schema; // it is a tag key
|
||||
SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData->name);
|
||||
if (pSuperTable == NULL) {
|
||||
mError("table:%s, corresponding super table:%s does not exist", pCreate->tableId, pTagData);
|
||||
mError("table:%s, corresponding super table:%s does not exist", pCreate->tableId, pTagData->name);
|
||||
free(pTable);
|
||||
terrno = TSDB_CODE_INVALID_TABLE;
|
||||
return NULL;
|
||||
|
@ -1742,7 +1742,9 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
|
|||
|
||||
static void mgmtAutoCreateChildTable(SQueuedMsg *pMsg) {
|
||||
SCMTableInfoMsg *pInfo = pMsg->pCont;
|
||||
int32_t contLen = sizeof(SCMCreateTableMsg) + sizeof(STagData);
|
||||
STagData* pTag = (STagData*)pInfo->tags;
|
||||
|
||||
int32_t contLen = sizeof(SCMCreateTableMsg) + offsetof(STagData, data) + ntohl(pTag->dataLen);
|
||||
SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen);
|
||||
if (pCreateMsg == NULL) {
|
||||
mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId);
|
||||
|
@ -1756,14 +1758,9 @@ static void mgmtAutoCreateChildTable(SQueuedMsg *pMsg) {
|
|||
pCreateMsg->getMeta = 1;
|
||||
pCreateMsg->contLen = htonl(contLen);
|
||||
|
||||
contLen = sizeof(STagData);
|
||||
if (contLen > pMsg->contLen - sizeof(SCMTableInfoMsg)) {
|
||||
contLen = pMsg->contLen - sizeof(SCMTableInfoMsg);
|
||||
}
|
||||
memcpy(pCreateMsg->schema, pInfo->tags, contLen);
|
||||
memcpy(pCreateMsg->schema, pInfo->tags, contLen - sizeof(SCMCreateTableMsg));
|
||||
|
||||
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
|
||||
pMsg->pCont = newMsg->pCont;
|
||||
newMsg->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
|
||||
newMsg->pCont = pCreateMsg;
|
||||
|
||||
|
@ -2201,6 +2198,8 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
|
|||
}
|
||||
|
||||
pAlter->type = htons(pAlter->type);
|
||||
pAlter->numOfCols = htons(pAlter->numOfCols);
|
||||
pAlter->tagValLen = htonl(pAlter->tagValLen);
|
||||
|
||||
if (pAlter->numOfCols > 2) {
|
||||
mError("table:%s, error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols);
|
||||
|
@ -2232,7 +2231,8 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
|
|||
mTrace("table:%s, start to alter ctable", pAlter->tableId);
|
||||
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
|
||||
if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
|
||||
code = mgmtModifyChildTableTagValue(pTable, pAlter->schema[0].name, pAlter->tagVal);
|
||||
char *tagVal = (char*)(pAlter->schema + pAlter->numOfCols);
|
||||
code = mgmtModifyChildTableTagValue(pTable, pAlter->schema[0].name, tagVal);
|
||||
} else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
|
||||
code = mgmtAddNormalTableColumn(pMsg->pDb, pTable, pAlter->schema, 1);
|
||||
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
|
||||
|
|
Loading…
Reference in New Issue