From 607f8163d3da22c1587d207a76146a06a8376abb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 10 Jul 2022 18:54:58 +0800 Subject: [PATCH] feat:add logic for get_tmq_meta_json --- include/client/taos.h | 4 +- include/common/tdataformat.h | 3 +- include/common/tmsg.h | 6 + include/libs/parser/parser.h | 6 +- source/client/src/clientSml.c | 2 +- source/client/src/clientStmt.c | 11 +- source/client/src/tmq.c | 233 ++++++++++++++---- source/common/src/tmsg.c | 16 ++ source/dnode/mnode/impl/src/mndStb.c | 12 +- source/libs/parser/src/parInsert.c | 38 +-- source/libs/parser/src/parTranslater.c | 18 +- source/libs/parser/src/parUtil.c | 3 +- source/libs/scalar/src/sclfunc.c | 2 +- .../libs/scalar/test/scalar/scalarTests.cpp | 4 +- 14 files changed, 261 insertions(+), 97 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 362782b420..690c473986 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -265,7 +265,9 @@ typedef struct tmq_raw_data tmq_raw_data; DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); DLL_EXPORT tmq_raw_data *tmq_get_raw_meta(TAOS_RES *res); DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta); -DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed. +DLL_EXPORT void tmq_free_raw_meta(tmq_raw_data *rawMeta); +DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta +DLL_EXPORT void tmq_free_json_meta(char* jsonMeta); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index e70008e4ef..95ab12b94e 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -78,7 +78,7 @@ int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag); int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag); int32_t tTagToValArray(const STag *pTag, SArray **ppArray); void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove -int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, void* pMsgBuf); +int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, void* pMsgBuf, const char* colName); // STRUCT ================= struct STColumn { @@ -147,6 +147,7 @@ struct SColVal { #pragma pack(push, 1) struct STagVal { + char colName[TSDB_COL_NAME_LEN]; // only used for tmq_get_meta union { int16_t cid; char *pKey; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 896ea27598..6140d30771 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1915,6 +1915,8 @@ typedef struct SVCreateStbReq { SSchemaWrapper schemaRow; SSchemaWrapper schemaTag; SRSmaParam rsmaParam; + int32_t alterOriDataLen; + void* alterOriData; } SVCreateStbReq; int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq); @@ -1942,6 +1944,7 @@ typedef struct SVCreateTbReq { int8_t type; union { struct { + char* name; tb_uid_t suid; uint8_t* pTag; } ctb; @@ -1959,6 +1962,7 @@ static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) { taosMemoryFreeClear(req->comment); if (req->type == TSDB_CHILD_TABLE) { taosMemoryFreeClear(req->ctb.pTag); + taosMemoryFreeClear(req->ctb.name); } else if (req->type == TSDB_NORMAL_TABLE) { taosMemoryFreeClear(req->ntb.schemaRow.pSchema); } @@ -2042,12 +2046,14 @@ typedef struct { int32_t bytes; // TSDB_ALTER_TABLE_DROP_COLUMN // TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES + int8_t colModType; int32_t colModBytes; // TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME char* colNewName; // TSDB_ALTER_TABLE_UPDATE_TAG_VAL char* tagName; int8_t isNull; + int8_t tagType; uint32_t nTagVal; uint8_t* pTagVal; // TSDB_ALTER_TABLE_UPDATE_OPTIONS diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index a4aec72d4f..a3de9164a2 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -29,7 +29,7 @@ struct SMetaData; typedef struct SStmtCallback { TAOS_STMT* pStmt; int32_t (*getTbNameFn)(TAOS_STMT*, char**); - int32_t (*setInfoFn)(TAOS_STMT*, STableMeta*, void*, char*, bool, SHashObj*, SHashObj*); + int32_t (*setInfoFn)(TAOS_STMT*, STableMeta*, void*, char*, bool, SHashObj*, SHashObj*, const char*); int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**); } SStmtCallback; @@ -84,7 +84,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu int32_t rowNum); int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD_E** fields); int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields); -int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tName, TAOS_MULTI_BIND* bind, +int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen); void destroyBoundColumnInfo(void* pBoundInfo); int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf, @@ -93,7 +93,7 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* void* smlInitHandle(SQuery* pQuery); void smlDestroyHandle(void* pHandle); int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, - char* tableName, char* msgBuf, int16_t msgBufLen); + char* tableName, const char* sTableName, int32_t sTableNameLen, char* msgBuf, int16_t msgBufLen); int32_t smlBuildOutput(void* handle, SHashObj* pVgHash); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index b6972e4670..df62f25168 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -2256,7 +2256,7 @@ static int32_t smlInsertData(SSmlHandle *info) { (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat, - (*pMeta)->tableMeta, tableData->childTableName, info->msgBuf.buf, info->msgBuf.len); + (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, info->msgBuf.buf, info->msgBuf.len); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlBindData failed", info->id); return code; diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 1e0f30695d..7287c46796 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -128,7 +128,7 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } -int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName) { +int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName, const char* sTableName) { STscStmt* pStmt = (STscStmt*)stmt; strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1); @@ -139,6 +139,7 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, pStmt->bInfo.tbType = pTableMeta->tableType; pStmt->bInfo.boundTags = tags; pStmt->bInfo.tagsCached = false; + strcpy(pStmt->bInfo.stbFName, sTableName); return TSDB_CODE_SUCCESS; } @@ -154,10 +155,10 @@ int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockH } int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName, bool autoCreateTbl, - SHashObj* pVgHash, SHashObj* pBlockHash) { + SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbFName)); + STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbFName, sTableName)); STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash, autoCreateTbl)); pStmt->sql.autoCreateTbl = autoCreateTbl; @@ -247,7 +248,7 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) { destroyBoundColumnInfo(pStmt->bInfo.boundTags); taosMemoryFreeClear(pStmt->bInfo.boundTags); } - + memset(pStmt->bInfo.stbFName, 0, TSDB_TABLE_FNAME_LEN); return TSDB_CODE_SUCCESS; } @@ -568,7 +569,7 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) { STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } - STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.sname.tname, + STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName, pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen)); return TSDB_CODE_SUCCESS; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 907691698c..fa9f715be3 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1867,10 +1867,10 @@ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* sch cJSON* type = cJSON_CreateString("create"); cJSON_AddItemToObject(json, "type", type); - char uid[32] = {0}; - sprintf(uid, "%"PRIi64, id); - cJSON* id_ = cJSON_CreateString(uid); - cJSON_AddItemToObject(json, "id", id_); +// char uid[32] = {0}; +// sprintf(uid, "%"PRIi64, id); +// cJSON* id_ = cJSON_CreateString(uid); +// cJSON_AddItemToObject(json, "id", id_); cJSON* tableName = cJSON_CreateString(name); cJSON_AddItemToObject(json, "tableName", tableName); cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super"); @@ -1925,6 +1925,98 @@ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* sch return string; } +static char *buildAlterSTableJson(void* alterData, int32_t alterDataLen){ + SMAlterStbReq req = {0}; + cJSON* json = NULL; + char* string = NULL; + + if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) { + goto end; + } + + json = cJSON_CreateObject(); + if (json == NULL) { + goto end; + } + cJSON* type = cJSON_CreateString("alter"); + cJSON_AddItemToObject(json, "type", type); +// cJSON* uid = cJSON_CreateNumber(id); +// cJSON_AddItemToObject(json, "uid", uid); + SName name = {0}; + tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + cJSON* tableName = cJSON_CreateString(name.tname); + cJSON_AddItemToObject(json, "tableName", tableName); + cJSON* tableType = cJSON_CreateString("super"); + cJSON_AddItemToObject(json, "tableType", tableType); + + cJSON* alterType = cJSON_CreateNumber(req.alterType); + cJSON_AddItemToObject(json, "alterType", alterType); + switch (req.alterType) { + case TSDB_ALTER_TABLE_ADD_TAG: + case TSDB_ALTER_TABLE_ADD_COLUMN: { + TAOS_FIELD *field = taosArrayGet(req.pFields, 0); + cJSON* colName = cJSON_CreateString(field->name); + cJSON_AddItemToObject(json, "colName", colName); + cJSON* colType = cJSON_CreateNumber(field->type); + cJSON_AddItemToObject(json, "colType", colType); + + if(field->type == TSDB_DATA_TYPE_BINARY){ + int32_t length = field->bytes - VARSTR_HEADER_SIZE; + cJSON* cbytes = cJSON_CreateNumber(length); + cJSON_AddItemToObject(json, "colLength", cbytes); + }else if (field->type == TSDB_DATA_TYPE_NCHAR){ + int32_t length = (field->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE; + cJSON* cbytes = cJSON_CreateNumber(length); + cJSON_AddItemToObject(json, "colLength", cbytes); + } + break; + } + case TSDB_ALTER_TABLE_DROP_TAG: + case TSDB_ALTER_TABLE_DROP_COLUMN:{ + TAOS_FIELD *field = taosArrayGet(req.pFields, 0); + cJSON* colName = cJSON_CreateString(field->name); + cJSON_AddItemToObject(json, "colName", colName); + break; + } + case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES: + case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:{ + TAOS_FIELD *field = taosArrayGet(req.pFields, 0); + cJSON* colName = cJSON_CreateString(field->name); + cJSON_AddItemToObject(json, "colName", colName); + cJSON* colType = cJSON_CreateNumber(field->type); + cJSON_AddItemToObject(json, "colType", colType); + if(field->type == TSDB_DATA_TYPE_BINARY){ + int32_t length = field->bytes - VARSTR_HEADER_SIZE; + cJSON* cbytes = cJSON_CreateNumber(length); + cJSON_AddItemToObject(json, "colLength", cbytes); + }else if (field->type == TSDB_DATA_TYPE_NCHAR){ + int32_t length = (field->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE; + cJSON* cbytes = cJSON_CreateNumber(length); + cJSON_AddItemToObject(json, "colLength", cbytes); + } + break; + } + case TSDB_ALTER_TABLE_UPDATE_TAG_NAME: + case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:{ + TAOS_FIELD *oldField = taosArrayGet(req.pFields, 0); + TAOS_FIELD *newField = taosArrayGet(req.pFields, 1); + cJSON* colName = cJSON_CreateString(oldField->name); + cJSON_AddItemToObject(json, "colName", colName); + cJSON* colNewName = cJSON_CreateString(newField->name); + cJSON_AddItemToObject(json, "colNewName", colNewName); + break; + } + default: + break; + } + string = cJSON_PrintUnformatted(json); + +end: + cJSON_Delete(json); + tFreeSMAltertbReq(&req); + return string; +} + static char *processCreateStb(SMqMetaRsp *metaRsp){ SVCreateStbReq req = {0}; SDecoder coder; @@ -1947,53 +2039,74 @@ _err: return string; } -static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t id){ +static char *processAlterStb(SMqMetaRsp *metaRsp){ + SVCreateStbReq req = {0}; + SDecoder coder; char* string = NULL; + + // decode and process req + void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); + int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); + tDecoderInit(&coder, data, len); + + if (tDecodeSVCreateStbReq(&coder, &req) < 0) { + goto _err; + } + string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); + tDecoderClear(&coder); + return string; + + _err: + tDecoderClear(&coder); + return string; +} + +static char *buildCreateCTableJson(STag* pTag, char* sname, char* name, int64_t id){ + char* string = NULL; + SArray* pTagVals = NULL; cJSON* json = cJSON_CreateObject(); if (json == NULL) { return string; } cJSON* type = cJSON_CreateString("create"); cJSON_AddItemToObject(json, "type", type); - char cid[32] = {0}; - sprintf(cid, "%"PRIi64, id); - cJSON* cid_ = cJSON_CreateString(cid); - cJSON_AddItemToObject(json, "id", cid_); +// char cid[32] = {0}; +// sprintf(cid, "%"PRIi64, id); +// cJSON* cid_ = cJSON_CreateString(cid); +// cJSON_AddItemToObject(json, "id", cid_); cJSON* tableName = cJSON_CreateString(name); cJSON_AddItemToObject(json, "tableName", tableName); cJSON* tableType = cJSON_CreateString("child"); cJSON_AddItemToObject(json, "tableType", tableType); - - char sid_[32] = {0}; - sprintf(sid_, "%"PRIi64, sid); - cJSON* using = cJSON_CreateString(sid_); + cJSON* using = cJSON_CreateString(sname); cJSON_AddItemToObject(json, "using", using); // cJSON* version = cJSON_CreateNumber(1); // cJSON_AddItemToObject(json, "version", version); cJSON* tags = cJSON_CreateArray(); + int32_t code = tTagToValArray(pTag, &pTagVals); + if (code) { + goto end; + } - if (tTagIsJson(pTag)) { // todo + if (tTagIsJson(pTag)) { + STag* p = (STag*)pTag; + if(p->nTag == 0){ + goto end; + } char* pJson = parseTagDatatoJson(pTag); - cJSON* tag = cJSON_CreateObject(); - cJSON* tname = cJSON_CreateString("unknown"); // todo + STagVal* pTagVal = taosArrayGet(pTagVals, 0); + + cJSON* tname = cJSON_CreateString(pTagVal->colName); cJSON_AddItemToObject(tag, "name", tname); cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON); cJSON_AddItemToObject(tag, "type", ttype); cJSON* tvalue = cJSON_CreateString(pJson); cJSON_AddItemToObject(tag, "value", tvalue); cJSON_AddItemToArray(tags, tag); - cJSON_AddItemToObject(json, "tags", tags); - - string = cJSON_PrintUnformatted(json); - goto end; - } - - SArray* pTagVals = NULL; - int32_t code = tTagToValArray(pTag, &pTagVals); - if (code) { + taosMemoryFree(pJson); goto end; } @@ -2001,8 +2114,7 @@ static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i); cJSON* tag = cJSON_CreateObject(); -// cJSON* tname = cJSON_CreateNumber(pTagVal->cid); - cJSON* tname = cJSON_CreateString("unkonwn"); // todo + cJSON* tname = cJSON_CreateString(pTagVal->colName); cJSON_AddItemToObject(tag, "name", tname); cJSON* ttype = cJSON_CreateNumber(pTagVal->type); cJSON_AddItemToObject(tag, "type", ttype); @@ -2021,12 +2133,12 @@ static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t cJSON_AddItemToObject(tag, "value", tvalue); cJSON_AddItemToArray(tags, tag); } + + end: cJSON_AddItemToObject(json, "tags", tags); string = cJSON_PrintUnformatted(json); - -end: - cJSON_Delete(json); + taosArrayDestroy(pTagVals); return string; } @@ -2047,7 +2159,7 @@ static char *processCreateTable(SMqMetaRsp *metaRsp){ for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; if(pCreateReq->type == TSDB_CHILD_TABLE){ - string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.suid, pCreateReq->name, pCreateReq->uid); + string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name, pCreateReq->uid); }else if(pCreateReq->type == TSDB_NORMAL_TABLE){ string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); } @@ -2085,11 +2197,11 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){ cJSON_AddItemToObject(json, "tableName", tableName); cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal"); cJSON_AddItemToObject(json, "tableType", tableType); + cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action); + cJSON_AddItemToObject(json, "alterType", alterType); switch (vAlterTbReq.action) { case TSDB_ALTER_TABLE_ADD_COLUMN: { - cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_ADD_COLUMN); - cJSON_AddItemToObject(json, "alterType", alterType); cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); cJSON_AddItemToObject(json, "colName", colName); cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type); @@ -2107,33 +2219,27 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){ break; } case TSDB_ALTER_TABLE_DROP_COLUMN:{ - cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_DROP_COLUMN); - cJSON_AddItemToObject(json, "alterType", alterType); cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); cJSON_AddItemToObject(json, "colName", colName); break; } case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:{ - cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES); - cJSON_AddItemToObject(json, "alterType", alterType); cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); cJSON_AddItemToObject(json, "colName", colName); - cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type); + cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType); cJSON_AddItemToObject(json, "colType", colType); - if(vAlterTbReq.type == TSDB_DATA_TYPE_BINARY){ - int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE; + if(vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY){ + int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); cJSON_AddItemToObject(json, "colLength", cbytes); - }else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR){ - int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE; + }else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR){ + int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); cJSON_AddItemToObject(json, "colLength", cbytes); } break; } case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:{ - cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME); - cJSON_AddItemToObject(json, "alterType", alterType); cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); cJSON_AddItemToObject(json, "colName", colName); cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName); @@ -2141,12 +2247,25 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){ break; } case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:{ - cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_TAG_VAL); - cJSON_AddItemToObject(json, "alterType", alterType); cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName); cJSON_AddItemToObject(json, "colName", tagName); - cJSON* colValue = cJSON_CreateString("invalid, todo"); // todo - cJSON_AddItemToObject(json, "colValue", colValue); + + if (!vAlterTbReq.isNull){ + char* buf = NULL; + + if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) { + ASSERT(tTagIsJson(vAlterTbReq.pTagVal) == true); + buf = parseTagDatatoJson(vAlterTbReq.pTagVal); + } else { + buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 1, 1); + dataConverToStr(buf, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL); + } + + cJSON* colValue = cJSON_CreateString(buf); + cJSON_AddItemToObject(json, "colValue", colValue); + taosMemoryFree(buf); + } + cJSON* isNull = cJSON_CreateBool(vAlterTbReq.isNull); cJSON_AddItemToObject(json, "colValueNull", isNull); break; @@ -2180,10 +2299,6 @@ static char *processDropSTable(SMqMetaRsp *metaRsp){ } cJSON* type = cJSON_CreateString("drop"); cJSON_AddItemToObject(json, "type", type); - char uid[32] = {0}; - sprintf(uid, "%"PRIi64, req.suid); - cJSON* id = cJSON_CreateString(uid); - cJSON_AddItemToObject(json, "id", id); cJSON* tableName = cJSON_CreateString(req.name); cJSON_AddItemToObject(json, "tableName", tableName); cJSON* tableType = cJSON_CreateString("super"); @@ -2224,7 +2339,7 @@ static char *processDropTable(SMqMetaRsp *metaRsp){ for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { SVDropTbReq* pDropTbReq = req.pReqs + iReq; - cJSON* tableName = cJSON_CreateString(pDropTbReq->name); // todo + cJSON* tableName = cJSON_CreateString(pDropTbReq->name); cJSON_AddItemToArray(tableNameList, tableName); } cJSON_AddItemToObject(json, "tableNameList", tableNameList); @@ -2245,7 +2360,7 @@ char *tmq_get_json_meta(TAOS_RES *res){ if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB){ return processCreateStb(&pMetaRspObj->metaRsp); }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB){ - return processCreateStb(&pMetaRspObj->metaRsp); + return processAlterStb(&pMetaRspObj->metaRsp); }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB){ return processDropSTable(&pMetaRspObj->metaRsp); }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE){ @@ -2258,6 +2373,10 @@ char *tmq_get_json_meta(TAOS_RES *res){ return NULL; } +void tmq_free_json_meta(char* jsonMeta){ + taosMemoryFreeClear(jsonMeta); +} + static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){ SVCreateStbReq req = {0}; SDecoder coder; @@ -2310,6 +2429,7 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){ pReq.commentLen = -1; pReq.suid = req.suid; pReq.source = 1; + pReq.igExists = true; SName tableName; tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name); @@ -2747,6 +2867,7 @@ end: } int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta){ + return 0; if (!taos || !raw_meta) { return TSDB_CODE_INVALID_PARA; } @@ -2767,6 +2888,10 @@ int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta){ return TSDB_CODE_INVALID_PARA; } +void tmq_free_raw_meta(tmq_raw_data *rawMeta){ + taosMemoryFreeClear(rawMeta); +} + void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) { tmqCommitInner2(tmq, msg, 0, 1, cb, param); } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 165e8631c4..0ffb4f6d2a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4803,6 +4803,11 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) { if (tEncodeSRSmaParam(pCoder, &pReq->rsmaParam) < 0) return -1; } + if (tEncodeI32(pCoder, pReq->alterOriDataLen) < 0) return -1; + if (pReq->alterOriDataLen > 0) { + if (tEncodeBinary(pCoder, pReq->alterOriData, pReq->alterOriDataLen) < 0) return -1; + } + tEndEncode(pCoder); return 0; } @@ -4819,6 +4824,11 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) { if (tDecodeSRSmaParam(pCoder, &pReq->rsmaParam) < 0) return -1; } + if (tDecodeI32(pCoder, &pReq->alterOriDataLen) < 0) return -1; + if (pReq->alterOriDataLen > 0) { + if (tDecodeBinary(pCoder, (uint8_t **)&pReq->alterOriData, NULL) < 0) return -1; + } + tEndDecode(pCoder); return 0; } @@ -4862,6 +4872,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) { } if (pReq->type == TSDB_CHILD_TABLE) { + if (tEncodeCStr(pCoder, pReq->ctb.name) < 0) return -1; if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1; if (tEncodeTag(pCoder, (const STag *)pReq->ctb.pTag) < 0) return -1; } else if (pReq->type == TSDB_NORMAL_TABLE) { @@ -4891,6 +4902,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) { } if (pReq->type == TSDB_CHILD_TABLE) { + if (tDecodeCStr(pCoder, &pReq->ctb.name) < 0) return -1; if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1; if (tDecodeTag(pCoder, (STag **)&pReq->ctb.pTag) < 0) return -1; } else if (pReq->type == TSDB_NORMAL_TABLE) { @@ -5224,6 +5236,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) { break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->colModType) < 0) return -1; if (tEncodeI32v(pEncoder, pReq->colModBytes) < 0) return -1; break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: @@ -5233,6 +5246,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) { case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: if (tEncodeCStr(pEncoder, pReq->tagName) < 0) return -1; if (tEncodeI8(pEncoder, pReq->isNull) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->tagType) < 0) return -1; if (!pReq->isNull) { if (tEncodeBinary(pEncoder, pReq->pTagVal, pReq->nTagVal) < 0) return -1; } @@ -5272,6 +5286,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; + if (tDecodeI8(pDecoder, &pReq->colModType) < 0) return -1; if (tDecodeI32v(pDecoder, &pReq->colModBytes) < 0) return -1; break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: @@ -5281,6 +5296,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: if (tDecodeCStr(pDecoder, &pReq->tagName) < 0) return -1; if (tDecodeI8(pDecoder, &pReq->isNull) < 0) return -1; + if (tDecodeI8(pDecoder, &pReq->tagType) < 0) return -1; if (!pReq->isNull) { if (tDecodeBinary(pDecoder, &pReq->pTagVal, &pReq->nTagVal) < 0) return -1; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 6d1cc05ace..17885de60b 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -409,7 +409,7 @@ static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void * return 0; } -static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { +static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen, void* alterOriData, int32_t alterOriDataLen) { SEncoder encoder = {0}; int32_t contLen; SName name = {0}; @@ -422,6 +422,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.name = (char *)tNameGetTableName(&name); req.suid = pStb->uid; req.rollup = pStb->ast1Len > 0 ? 1 : 0; + req.alterOriData = alterOriData; + req.alterOriDataLen = alterOriDataLen; // todo req.schemaRow.nCols = pStb->numOfColumns; req.schemaRow.version = pStb->colVer; @@ -626,7 +628,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj continue; } - void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen); + void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, NULL, 0); if (pReq == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); @@ -1278,7 +1280,7 @@ static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj * return 0; } -static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { +static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, void* alterOriData, int32_t alterOriDataLen) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; void *pIter = NULL; @@ -1292,7 +1294,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj continue; } - void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen); + void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, alterOriData, alterOriDataLen); if (pReq == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); @@ -1575,7 +1577,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER; if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER; - if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER; + if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, &stbObj, pReq->pCont, pReq->contLen) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index a5cf755a74..576dc6d8d3 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -71,6 +71,7 @@ typedef struct SInsertParseContext { SVnodeModifOpStmt* pOutput; SStmtCallback* pStmtCb; SParseMetaCache* pMetaCache; + char sTableName[TSDB_TABLE_NAME_LEN]; } SInsertParseContext; typedef struct SInsertParseSyntaxCxt { @@ -811,10 +812,11 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* return TSDB_CODE_SUCCESS; } -static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid) { +static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname) { pTbReq->type = TD_CHILD_TABLE; pTbReq->name = strdup(tname); pTbReq->ctb.suid = suid; + if(sname) pTbReq->ctb.name = strdup(sname); pTbReq->ctb.pTag = (uint8_t*)pTag; pTbReq->commentLen = -1; @@ -835,6 +837,7 @@ static int32_t parseTagToken(char** end, SToken* pToken, SSchema* pSchema, int16 return TSDB_CODE_SUCCESS; } + strcpy(val->colName, pSchema->name); val->cid = pSchema->colId; val->type = pSchema->type; @@ -1051,7 +1054,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint if (isNullStr(&sToken)) { code = tTagNew(pTagVals, 1, true, &pTag); } else { - code = parseJsontoTagData(sToken.z, pTagVals, &pTag, &pCxt->msg); + code = parseJsontoTagData(sToken.z, pTagVals, &pTag, &pCxt->msg, pTagSchema->name); } taosMemoryFree(tmpTokenBuf); if (code != TSDB_CODE_SUCCESS) { @@ -1081,7 +1084,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint goto end; } - buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid); + buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName); end: for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) { @@ -1166,6 +1169,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(&sname, dbFName); + strcpy(pCxt->sTableName, sname.tname); CHECK_CODE(getSTableMeta(pCxt, &sname, dbFName)); if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) { @@ -1402,15 +1406,10 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa return TSDB_CODE_SUCCESS; } -void destroyCreateSubTbReq(SVCreateTbReq* pReq) { - taosMemoryFreeClear(pReq->name); - taosMemoryFreeClear(pReq->ctb.pTag); -} - static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) { taosMemoryFreeClear(pCxt->pTableMeta); destroyBoundColumnInfo(&pCxt->tags); - destroyCreateSubTbReq(&pCxt->createTblReq); + tdDestroySVCreateTbReq(&pCxt->createTblReq); } static void destroySubTableHashElem(void* p) { taosMemoryFree(*(STableMeta**)p); } @@ -1556,7 +1555,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { } memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl, - pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj); + pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj, pCxt->sTableName); memset(&pCxt->tags, 0, sizeof(pCxt->tags)); pCxt->pVgroupsHashObj = NULL; @@ -1865,7 +1864,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash return TSDB_CODE_SUCCESS; } -int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tName, TAOS_MULTI_BIND* bind, +int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; @@ -1904,13 +1903,14 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN isJson = true; char* tmp = taosMemoryCalloc(1, colLen + 1); memcpy(tmp, bind[c].buffer, colLen); - code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf); + code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf, pTagSchema->name); taosMemoryFree(tmp); if (code != TSDB_CODE_SUCCESS) { goto end; } } else { STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; + strcpy(val.colName, pTagSchema->name); if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) { val.pData = (uint8_t*)bind[c].buffer; val.nData = colLen; @@ -1947,9 +1947,9 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN } SVCreateTbReq tbReq = {0}; - buildCreateTbReq(&tbReq, tName, pTag, suid); + buildCreateTbReq(&tbReq, tName, pTag, suid, sTableName); code = buildCreateTbMsg(pDataBlock, &tbReq); - destroyCreateSubTbReq(&tbReq); + tdDestroySVCreateTbReq(&tbReq); end: for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) { @@ -2213,7 +2213,7 @@ typedef struct SmlExecHandle { static void smlDestroyTableHandle(void* pHandle) { SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle; destroyBoundColumnInfo(&handle->tags); - destroyCreateSubTbReq(&handle->createTblReq); + tdDestroySVCreateTbReq(&handle->createTblReq); } static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema) { @@ -2311,6 +2311,7 @@ static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* p SSmlKv* kv = taosArrayGetP(cols, i); STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; + strcpy(val.colName, pTagSchema->name); if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) { val.pData = (uint8_t*)kv->value; val.nData = kv->length; @@ -2354,7 +2355,7 @@ end: } int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, - char* tableName, char* msgBuf, int16_t msgBufLen) { + char* tableName, const char* sTableName, int32_t sTableNameLen, char* msgBuf, int16_t msgBufLen) { SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle; @@ -2372,7 +2373,10 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols return ret; } - buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid); + buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL); + smlHandle->tableExecHandle.createTblReq.ctb.name = taosMemoryMalloc(sTableNameLen + 1); + memcpy(smlHandle->tableExecHandle.createTblReq.ctb.name, sTableName, sTableNameLen); + smlHandle->tableExecHandle.createTblReq.ctb.name[sTableNameLen] = 0; STableDataBlocks* pDataBlock = NULL; ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid), diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ed6d3c1c93..5ea06ca892 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5369,7 +5369,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) { } static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, SCreateSubTableClause* pStmt, - const STag* pTag, uint64_t suid, SVgroupInfo* pVgInfo) { + const STag* pTag, uint64_t suid, const char* sTableNmae, SVgroupInfo* pVgInfo) { // char dbFName[TSDB_DB_FNAME_LEN] = {0}; // SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId}; // strcpy(name.dbname, pStmt->dbName); @@ -5386,6 +5386,7 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S req.commentLen = -1; } req.ctb.suid = suid; + req.ctb.name = strdup(sTableNmae); req.ctb.pTag = (uint8_t*)pTag; if (pStmt->ignoreExists) { req.flags |= TD_CREATE_IF_NOT_EXISTS; @@ -5471,13 +5472,14 @@ static int32_t buildJsonTagVal(STranslateContext* pCxt, SSchema* pTagSchema, SVa return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal); } - return parseJsontoTagData(pVal->literal, pTagArray, ppTag, &pCxt->msgBuf); + return parseJsontoTagData(pVal->literal, pTagArray, ppTag, &pCxt->msgBuf, pTagSchema->name); } static int32_t buildNormalTagVal(STranslateContext* pCxt, SSchema* pTagSchema, SValueNode* pVal, SArray* pTagArray) { if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) { void* nodeVal = nodesGetValueFromNode(pVal); STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; + strcpy(val.colName, pTagSchema->name); if (IS_VAR_DATA_TYPE(pTagSchema->type)) { val.pData = varDataVal(nodeVal); val.nData = varDataLen(nodeVal); @@ -5571,6 +5573,7 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau } else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL && !pVal->isNull) { char* tmpVal = nodesGetValueFromNode(pVal); STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; + strcpy(val.colName, pTagSchema->name); if (IS_VAR_DATA_TYPE(pTagSchema->type)) { val.pData = varDataVal(tmpVal); val.nData = varDataLen(tmpVal); @@ -5627,7 +5630,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info); } if (TSDB_CODE_SUCCESS == code) { - addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt, pTag, pSuperTableMeta->uid, &info); + addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt, pTag, pSuperTableMeta->uid, pStmt->useTableName, &info); } taosMemoryFreeClear(pSuperTableMeta); @@ -5845,8 +5848,8 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS } pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type); + pReq->tagType = targetDt.type; if (targetDt.type == TSDB_DATA_TYPE_JSON) { - pReq->isNull = 0; if (pStmt->pVal->literal && strlen(pStmt->pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pStmt->pVal->literal); @@ -5855,7 +5858,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS int32_t code = TSDB_CODE_SUCCESS; STag* pTag = NULL; do { - code = parseJsontoTagData(pStmt->pVal->literal, pTagVals, &pTag, &pCxt->msgBuf); + code = parseJsontoTagData(pStmt->pVal->literal, pTagVals, &pTag, &pCxt->msgBuf, pReq->tagName); if (TSDB_CODE_SUCCESS != code) { break; } @@ -5870,6 +5873,9 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS if (code != TSDB_CODE_SUCCESS) { return code; } + if(pTag->nTag == 0){ + pReq->isNull = true; + } pReq->nTagVal = pTag->len; pReq->pTagVal = (uint8_t*)pTag; pStmt->pVal->datum.p = (char*)pTag; // for free @@ -5927,7 +5933,7 @@ static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta, SVAlterTbReq* pReq) { pReq->colModBytes = calcTypeBytes(pStmt->dataType); - + pReq->colModType = pStmt->dataType.type; SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName); if (NULL == pSchema) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pStmt->colName); diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 7a23338035..2f12c9adf6 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -341,7 +341,7 @@ static bool isValidateTag(char* input) { return true; } -int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, void* pMsgBuf) { +int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, void* pMsgBuf, const char* colName) { int32_t retCode = TSDB_CODE_SUCCESS; cJSON* root = NULL; SHashObj* keyHash = NULL; @@ -389,6 +389,7 @@ int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, voi continue; } STagVal val = {0}; + strcpy(val.colName, colName); val.pKey = jsonKey; taosHashPut(keyHash, jsonKey, keyLen, &keyLen, CHAR_BYTES); // add key to hash to remove dumplicate, value is useless diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index df5df127f0..dd0a60dced 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1150,7 +1150,7 @@ int32_t toJsonFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu } memcpy(tmp, varDataVal(input), varDataLen(input)); tmp[varDataLen(input)] = 0; - if(parseJsontoTagData(tmp, pTagVals, &pTag, NULL)){ + if(parseJsontoTagData(tmp, pTagVals, &pTag, NULL, "")){ tTagNew(pTagVals, 1, true, &pTag); } } diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index 07440e7435..8fcc76f51f 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -1114,7 +1114,7 @@ TEST(columnTest, json_column_arith_op) { memcpy(rightv, rightvTmp, strlen(rightvTmp)); SArray *tags = taosArrayInit(1, sizeof(STagVal)); STag* row = NULL; - parseJsontoTagData(rightv, tags, &row, NULL); + parseJsontoTagData(rightv, tags, &row, NULL, ""); const int32_t len = 8; EOperatorType op[len] = {OP_TYPE_ADD, OP_TYPE_SUB, OP_TYPE_MULTI, OP_TYPE_DIV, @@ -1262,7 +1262,7 @@ TEST(columnTest, json_column_logic_op) { memcpy(rightv, rightvTmp, strlen(rightvTmp)); SArray *tags = taosArrayInit(1, sizeof(STagVal)); STag* row = NULL; - parseJsontoTagData(rightv, tags, &row, NULL); + parseJsontoTagData(rightv, tags, &row, NULL, ""); const int32_t len0 = 6; const int32_t len = 9;