From 7e4c7e7c517620b66bdff2c78d1c5df2f201dec9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 2 Jun 2020 15:47:24 +0800 Subject: [PATCH] [td-90] add protocol to update the tags value of tables. --- src/client/inc/tsclient.h | 1 + src/client/src/tscSQLParser.c | 39 ++++++++++++++++++++++++++++------ src/client/src/tscSchemaUtil.c | 2 ++ src/client/src/tscServer.c | 36 +++++++++++++++++++------------ src/common/inc/qsqltype.h | 3 ++- src/dnode/src/dnodeShell.c | 3 ++- src/inc/taosmsg.h | 14 +++++++++++- tests/examples/c/demo.c | 5 +++-- 8 files changed, 77 insertions(+), 26 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index f1b620176d..7bf5b05923 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -56,6 +56,7 @@ typedef struct STableMeta { STableComInfo tableInfo; uint8_t tableType; int16_t sversion; + int16_t tversion; SCMVgroupInfo vgroupInfo; int32_t sid; // the index of one table in a virtual node uint64_t uid; // unique id of a table diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 965e85efbd..4e05e71a0b 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4402,7 +4402,9 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); } else if (pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { // Note: update can only be applied to table not super table. - // the following is handle display tags value for meters created according to super table + // the following is used to handle tags value for table created according to super table + pCmd->command = TSDB_SQL_UPDATE_TAGS_VAL; + tVariantList* pVarList = pAlterSQL->varList; tVariant* pTagName = &pVarList->a[0].pVar; @@ -4425,15 +4427,38 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { // validate the length of binary if ((pTagsSchema->type == TSDB_DATA_TYPE_BINARY || pTagsSchema->type == TSDB_DATA_TYPE_NCHAR) && - pVarList->a[1].pVar.nLen > pTagsSchema->bytes) { + (pVarList->a[1].pVar.nLen + VARSTR_HEADER_SIZE) > pTagsSchema->bytes) { return invalidSqlErrMsg(pQueryInfo->msg, msg14); } - - char name1[128] = {0}; - strncpy(name1, pTagName->pz, pTagName->nLen); - TAOS_FIELD f = tscCreateField(TSDB_DATA_TYPE_INT, name1, tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize); - tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); + int32_t size = sizeof(SUpdateTableTagValMsg) + pTagsSchema->bytes + TSDB_EXTRA_PAYLOAD_SIZE; + if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { + tscError("%p failed to malloc for alter table msg", pSql); + return TSDB_CODE_CLI_OUT_OF_MEMORY; + } + + SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) (pCmd->payload + tsRpcHeadSize); + pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId); + pUpdateMsg->tid = htonl(pTableMeta->sid); + pUpdateMsg->uid = htobe64(pTableMeta->uid); + pUpdateMsg->colId = htons(pTagsSchema->colId); + pUpdateMsg->type = htons(pTagsSchema->type); + pUpdateMsg->bytes = htons(pTagsSchema->bytes); + pUpdateMsg->tversion = htons(pTableMeta->tversion); + + tVariantDump(&pVarList->a[1].pVar, pUpdateMsg->data, pTagsSchema->type, true); + + int32_t len = 0; + if (pTagsSchema->type != TSDB_DATA_TYPE_BINARY && pTagsSchema->type != TSDB_DATA_TYPE_NCHAR) { + len = tDataTypeDesc[pTagsSchema->type].nSize; + } else { + len = varDataLen(pUpdateMsg->data); + } + + pUpdateMsg->tagValLen = htonl(len); // length may be changed after dump data + + int32_t total = sizeof(SUpdateTableTagValMsg) + len; + pUpdateMsg->head.contLen = htonl(total); } else if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN) { tFieldList* pFieldList = pAlterSQL->pAddColumns; diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 35bc3c4a80..ce36a73bfc 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -168,6 +168,8 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->uid = pTableMetaMsg->uid; pTableMeta->vgroupInfo = pTableMetaMsg->vgroup; + pTableMeta->sversion = pTableMetaMsg->sversion; + pTableMeta->tversion = pTableMetaMsg->tversion; memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8ca590a1f6..86f9eac11d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1260,28 +1260,24 @@ int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) { } int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - SCMAlterTableMsg *pAlterTableMsg; - char * pMsg; - int msgLen = 0; - int size = 0; + char *pMsg; + int msgLen = 0; - SSqlCmd * pCmd = &pSql->cmd; + SSqlCmd *pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - size = tscEstimateAlterTableMsgLength(pCmd); + + SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo; + int size = tscEstimateAlterTableMsgLength(pCmd); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { tscError("%p failed to malloc for alter table msg", pSql); return -1; } - - pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload; - + + SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload; tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db); - SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo; - strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name); pAlterTableMsg->type = htons(pAlterInfo->type); @@ -1289,7 +1285,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSchema *pSchema = pAlterTableMsg->schema; for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) { TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - + pSchema->type = pField->type; strcpy(pSchema->name, pField->name); pSchema->bytes = htons(pField->bytes); @@ -1302,6 +1298,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += pAlterInfo->tagData.dataLen; msgLen = pMsg - (char*)pAlterTableMsg; + pCmd->payloadLen = msgLen; pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE; @@ -1310,6 +1307,16 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_SUCCESS; } +int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { + SSqlCmd* pCmd = &pSql->cmd; + pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL; + + SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) (pCmd->payload + tsRpcHeadSize); + pCmd->payloadLen = htonl(pUpdateMsg->head.contLen); + + return TSDB_CODE_SUCCESS; +} + int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; pCmd->payloadLen = sizeof(SCMAlterDbMsg); @@ -1804,7 +1811,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { pMetaMsg->sid = htonl(pMetaMsg->sid); pMetaMsg->sversion = htons(pMetaMsg->sversion); - + pMetaMsg->tversion = htons(pMetaMsg->tversion); pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId); pMetaMsg->uid = htobe64(pMetaMsg->uid); @@ -2552,6 +2559,7 @@ void tscInitMsgsFp() { tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg; tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg; tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg; + tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg; tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg; tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg; diff --git a/src/common/inc/qsqltype.h b/src/common/inc/qsqltype.h index d1e5bcaa52..6f6493d17c 100644 --- a/src/common/inc/qsqltype.h +++ b/src/common/inc/qsqltype.h @@ -35,7 +35,8 @@ enum { TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SELECT, "select" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" ) - + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" ) + // the SQL below is for mgmt node TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" ) diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index b09e14e283..074f21f972 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -40,7 +40,8 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue; - + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVnodeWriteQueue; + // the following message shall be treated as mnode write dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMnodeWriteQueue; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index da6d0847ec..b5d4ee7758 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -43,7 +43,7 @@ enum { TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) @@ -276,6 +276,18 @@ typedef struct { // char tagVal[]; } SCMAlterTableMsg; +typedef struct { + SMsgHead head; + int64_t uid; + int32_t tid; + int16_t tversion; + int16_t colId; + int16_t type; + int16_t bytes; + int32_t tagValLen; + char data[]; +} SUpdateTableTagValMsg; + typedef struct { char clientVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN]; diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index e27c73891f..b25e3c32fe 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -95,7 +95,7 @@ int main(int argc, char *argv[]) { return 0; } - taos_options(TSDB_OPTION_CONFIGDIR, "/home/lisa/Documents/workspace/TDinternal/community/sim/tsim/cfg"); + taos_options(TSDB_OPTION_CONFIGDIR, "~/sec/cfg"); // init TAOS taos_init(); @@ -108,7 +108,8 @@ int main(int argc, char *argv[]) { printf("success to connect to server\n"); // multiThreadTest(1, taos); - doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1, -2) group by t1 limit 2 offset 10;"); + doQuery(taos, "use test"); + doQuery(taos, "alter table tm99 set tag a=99"); // for(int32_t i = 0; i < 100000; ++i) { // doQuery(taos, "insert into t1 values(now, 2)"); // }