[td-90] add protocol to update the tags value of tables.

This commit is contained in:
Haojun Liao 2020-06-02 15:47:24 +08:00
parent 152c8267e8
commit 7e4c7e7c51
8 changed files with 77 additions and 26 deletions

View File

@ -56,6 +56,7 @@ typedef struct STableMeta {
STableComInfo tableInfo; STableComInfo tableInfo;
uint8_t tableType; uint8_t tableType;
int16_t sversion; int16_t sversion;
int16_t tversion;
SCMVgroupInfo vgroupInfo; SCMVgroupInfo vgroupInfo;
int32_t sid; // the index of one table in a virtual node int32_t sid; // the index of one table in a virtual node
uint64_t uid; // unique id of a table uint64_t uid; // unique id of a table

View File

@ -4402,7 +4402,9 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { } else if (pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
// Note: update can only be applied to table not super table. // Note: update can only be applied to table not super table.
// the following is handle display tags value for meters created according to super table // the following is used to handle tags value for table created according to super table
pCmd->command = TSDB_SQL_UPDATE_TAGS_VAL;
tVariantList* pVarList = pAlterSQL->varList; tVariantList* pVarList = pAlterSQL->varList;
tVariant* pTagName = &pVarList->a[0].pVar; tVariant* pTagName = &pVarList->a[0].pVar;
@ -4425,15 +4427,38 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// validate the length of binary // validate the length of binary
if ((pTagsSchema->type == TSDB_DATA_TYPE_BINARY || pTagsSchema->type == TSDB_DATA_TYPE_NCHAR) && if ((pTagsSchema->type == TSDB_DATA_TYPE_BINARY || pTagsSchema->type == TSDB_DATA_TYPE_NCHAR) &&
pVarList->a[1].pVar.nLen > pTagsSchema->bytes) { (pVarList->a[1].pVar.nLen + VARSTR_HEADER_SIZE) > pTagsSchema->bytes) {
return invalidSqlErrMsg(pQueryInfo->msg, msg14); return invalidSqlErrMsg(pQueryInfo->msg, msg14);
} }
char name1[128] = {0}; int32_t size = sizeof(SUpdateTableTagValMsg) + pTagsSchema->bytes + TSDB_EXTRA_PAYLOAD_SIZE;
strncpy(name1, pTagName->pz, pTagName->nLen); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
tscError("%p failed to malloc for alter table msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
TAOS_FIELD f = tscCreateField(TSDB_DATA_TYPE_INT, name1, tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize); SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) (pCmd->payload + tsRpcHeadSize);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
pUpdateMsg->tid = htonl(pTableMeta->sid);
pUpdateMsg->uid = htobe64(pTableMeta->uid);
pUpdateMsg->colId = htons(pTagsSchema->colId);
pUpdateMsg->type = htons(pTagsSchema->type);
pUpdateMsg->bytes = htons(pTagsSchema->bytes);
pUpdateMsg->tversion = htons(pTableMeta->tversion);
tVariantDump(&pVarList->a[1].pVar, pUpdateMsg->data, pTagsSchema->type, true);
int32_t len = 0;
if (pTagsSchema->type != TSDB_DATA_TYPE_BINARY && pTagsSchema->type != TSDB_DATA_TYPE_NCHAR) {
len = tDataTypeDesc[pTagsSchema->type].nSize;
} else {
len = varDataLen(pUpdateMsg->data);
}
pUpdateMsg->tagValLen = htonl(len); // length may be changed after dump data
int32_t total = sizeof(SUpdateTableTagValMsg) + len;
pUpdateMsg->head.contLen = htonl(total);
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN) { } else if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
tFieldList* pFieldList = pAlterSQL->pAddColumns; tFieldList* pFieldList = pAlterSQL->pAddColumns;

View File

@ -168,6 +168,8 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->sid = pTableMetaMsg->sid;
pTableMeta->uid = pTableMetaMsg->uid; pTableMeta->uid = pTableMetaMsg->uid;
pTableMeta->vgroupInfo = pTableMetaMsg->vgroup; pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
pTableMeta->sversion = pTableMetaMsg->sversion;
pTableMeta->tversion = pTableMetaMsg->tversion;
memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize); memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize);

View File

@ -1260,28 +1260,24 @@ int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
} }
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMAlterTableMsg *pAlterTableMsg;
char *pMsg; char *pMsg;
int msgLen = 0; int msgLen = 0;
int size = 0;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
size = tscEstimateAlterTableMsgLength(pCmd); SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
int size = tscEstimateAlterTableMsgLength(pCmd);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
tscError("%p failed to malloc for alter table msg", pSql); tscError("%p failed to malloc for alter table msg", pSql);
return -1; return -1;
} }
pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload; SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db); tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db);
SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name); strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
pAlterTableMsg->type = htons(pAlterInfo->type); pAlterTableMsg->type = htons(pAlterInfo->type);
@ -1302,6 +1298,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += pAlterInfo->tagData.dataLen; pMsg += pAlterInfo->tagData.dataLen;
msgLen = pMsg - (char*)pAlterTableMsg; msgLen = pMsg - (char*)pAlterTableMsg;
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE; pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
@ -1310,6 +1307,16 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SSqlCmd* pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) (pCmd->payload + tsRpcHeadSize);
pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
return TSDB_CODE_SUCCESS;
}
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCMAlterDbMsg); pCmd->payloadLen = sizeof(SCMAlterDbMsg);
@ -1804,7 +1811,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
pMetaMsg->sid = htonl(pMetaMsg->sid); pMetaMsg->sid = htonl(pMetaMsg->sid);
pMetaMsg->sversion = htons(pMetaMsg->sversion); pMetaMsg->sversion = htons(pMetaMsg->sversion);
pMetaMsg->tversion = htons(pMetaMsg->tversion);
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId); pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
pMetaMsg->uid = htobe64(pMetaMsg->uid); pMetaMsg->uid = htobe64(pMetaMsg->uid);
@ -2552,6 +2559,7 @@ void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg; tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg; tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg; tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg; tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg; tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;

View File

@ -35,6 +35,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SELECT, "select" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SELECT, "select" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" )
// the SQL below is for mgmt node // the SQL below is for mgmt node
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )

View File

@ -40,6 +40,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVnodeWriteQueue;
// the following message shall be treated as mnode write // the following message shall be treated as mnode write
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue;

View File

@ -43,7 +43,7 @@ enum {
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
@ -276,6 +276,18 @@ typedef struct {
// char tagVal[]; // char tagVal[];
} SCMAlterTableMsg; } SCMAlterTableMsg;
typedef struct {
SMsgHead head;
int64_t uid;
int32_t tid;
int16_t tversion;
int16_t colId;
int16_t type;
int16_t bytes;
int32_t tagValLen;
char data[];
} SUpdateTableTagValMsg;
typedef struct { typedef struct {
char clientVersion[TSDB_VERSION_LEN]; char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN];

View File

@ -95,7 +95,7 @@ int main(int argc, char *argv[]) {
return 0; return 0;
} }
taos_options(TSDB_OPTION_CONFIGDIR, "/home/lisa/Documents/workspace/TDinternal/community/sim/tsim/cfg"); taos_options(TSDB_OPTION_CONFIGDIR, "~/sec/cfg");
// init TAOS // init TAOS
taos_init(); taos_init();
@ -108,7 +108,8 @@ int main(int argc, char *argv[]) {
printf("success to connect to server\n"); printf("success to connect to server\n");
// multiThreadTest(1, taos); // multiThreadTest(1, taos);
doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1, -2) group by t1 limit 2 offset 10;"); doQuery(taos, "use test");
doQuery(taos, "alter table tm99 set tag a=99");
// for(int32_t i = 0; i < 100000; ++i) { // for(int32_t i = 0; i < 100000; ++i) {
// doQuery(taos, "insert into t1 values(now, 2)"); // doQuery(taos, "insert into t1 values(now, 2)");
// } // }