refactor compress

This commit is contained in:
Yihao Deng 2024-04-23 06:37:18 +00:00
parent 92a7801d09
commit f3feb9dabe
7 changed files with 127 additions and 109 deletions

View File

@ -86,11 +86,11 @@ bool checkColumnCompressOrSetDefault(uint8_t type, char compress[TSDB_CL_COMPRES
bool checkColumnLevel(char level[TSDB_CL_COMPRESS_OPTION_LEN]); bool checkColumnLevel(char level[TSDB_CL_COMPRESS_OPTION_LEN]);
bool checkColumnLevelOrSetDefault(uint8_t type, char level[TSDB_CL_COMPRESS_OPTION_LEN]); bool checkColumnLevelOrSetDefault(uint8_t type, char level[TSDB_CL_COMPRESS_OPTION_LEN]);
void setColEncode(uint32_t* compress, uint8_t encode); void setColEncode(uint32_t* compress, uint8_t encode);
void setColCompress(uint32_t* compress, uint16_t compressType); void setColCompress(uint32_t* compress, uint16_t compressType);
void setColLevel(uint32_t* compress, uint8_t level); void setColLevel(uint32_t* compress, uint8_t level);
int8_t setColCompressByOption(uint8_t type, uint8_t encode, uint16_t compressType, uint8_t level, bool check, int32_t setColCompressByOption(uint8_t type, uint8_t encode, uint16_t compressType, uint8_t level, bool check,
uint32_t* compress); uint32_t* compress);
int8_t validColCompressLevel(uint8_t type, uint8_t level); int8_t validColCompressLevel(uint8_t type, uint8_t level);
int8_t validColCompress(uint8_t type, uint8_t l2); int8_t validColCompress(uint8_t type, uint8_t l2);

View File

@ -182,6 +182,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_STMT_CACHE_ERROR TAOS_DEF_ERROR_CODE(0, 0X0230) #define TSDB_CODE_TSC_STMT_CACHE_ERROR TAOS_DEF_ERROR_CODE(0, 0X0230)
#define TSDB_CODE_TSC_ENCODE_PARAM_ERROR TAOS_DEF_ERROR_CODE(0, 0X0231) #define TSDB_CODE_TSC_ENCODE_PARAM_ERROR TAOS_DEF_ERROR_CODE(0, 0X0231)
#define TSDB_CODE_TSC_ENCODE_PARAM_NULL TAOS_DEF_ERROR_CODE(0, 0X0232) #define TSDB_CODE_TSC_ENCODE_PARAM_NULL TAOS_DEF_ERROR_CODE(0, 0X0232)
#define TSDB_CODE_TSC_COMPRESS_PARAM_ERROR TAOS_DEF_ERROR_CODE(0, 0X0233)
#define TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR TAOS_DEF_ERROR_CODE(0, 0X0234)
#define TSDB_CODE_TSC_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0X02FF) #define TSDB_CODE_TSC_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0X02FF)
// mnode-common // mnode-common

View File

@ -306,22 +306,22 @@ void setColLevel(uint32_t* compress, uint8_t level) {
return; return;
} }
int8_t setColCompressByOption(uint8_t type, uint8_t encode, uint16_t compressType, uint8_t level, bool check, int32_t setColCompressByOption(uint8_t type, uint8_t encode, uint16_t compressType, uint8_t level, bool check,
uint32_t* compress) { uint32_t* compress) {
if (check && !validColEncode(type, encode)) return 0; if (check && !validColEncode(type, encode)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
setColEncode(compress, encode); setColEncode(compress, encode);
if (compressType == TSDB_COLVAL_COMPRESS_DISABLED) { if (compressType == TSDB_COLVAL_COMPRESS_DISABLED) {
setColCompress(compress, compressType); setColCompress(compress, compressType);
setColLevel(compress, TSDB_COLVAL_LEVEL_DISABLED); setColLevel(compress, TSDB_COLVAL_LEVEL_DISABLED);
} else { } else {
if (check && !validColCompress(type, compressType)) return 0; if (check && !validColCompress(type, compressType)) return TSDB_CODE_TSC_COMPRESS_PARAM_ERROR;
setColCompress(compress, compressType); setColCompress(compress, compressType);
if (check && !validColCompressLevel(type, level)) return 0; if (check && !validColCompressLevel(type, level)) return TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
setColLevel(compress, level); setColLevel(compress, level);
} }
return 1; return TSDB_CODE_SUCCESS;
} }
bool useCompress(uint8_t tableType) { return TSDB_SUPER_TABLE == tableType || TSDB_NORMAL_TABLE == tableType; } bool useCompress(uint8_t tableType) { return TSDB_SUPER_TABLE == tableType || TSDB_NORMAL_TABLE == tableType; }

View File

@ -18,10 +18,10 @@
#include "commandInt.h" #include "commandInt.h"
#include "scheduler.h" #include "scheduler.h"
#include "systable.h" #include "systable.h"
#include "taosdef.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
#include "tgrant.h" #include "tgrant.h"
#include "taosdef.h"
extern SConfig* tsCfg; extern SConfig* tsCfg;
@ -126,7 +126,8 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
pCol7 = taosArrayGet(pBlock->pDataBlock, 6); pCol7 = taosArrayGet(pBlock->pDataBlock, 6);
} }
char buf[DESCRIBE_RESULT_FIELD_LEN] = {0}; int32_t fillTagCol = 0;
char buf[DESCRIBE_RESULT_FIELD_LEN] = {0};
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
if (invisibleColumn(sysInfoUser, pMeta->tableType, pMeta->schema[i].flags)) { if (invisibleColumn(sysInfoUser, pMeta->tableType, pMeta->schema[i].flags)) {
continue; continue;
@ -140,6 +141,7 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
if (TSDB_VIEW_TABLE != pMeta->tableType) { if (TSDB_VIEW_TABLE != pMeta->tableType) {
if (i >= pMeta->tableInfo.numOfColumns) { if (i >= pMeta->tableInfo.numOfColumns) {
STR_TO_VARSTR(buf, "TAG"); STR_TO_VARSTR(buf, "TAG");
fillTagCol = 1;
} else if (i == 1 && pMeta->schema[i].flags & COL_IS_KEY) { } else if (i == 1 && pMeta->schema[i].flags & COL_IS_KEY) {
STR_TO_VARSTR(buf, "PRIMARY KEY") STR_TO_VARSTR(buf, "PRIMARY KEY")
} else { } else {
@ -158,15 +160,17 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
STR_TO_VARSTR(buf, columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(pMeta->schemaExt[i].compress))); STR_TO_VARSTR(buf, columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(pMeta->schemaExt[i].compress)));
colDataSetVal(pCol7, pBlock->info.rows, buf, false); colDataSetVal(pCol7, pBlock->info.rows, buf, false);
} else { } else {
STR_TO_VARSTR(buf, ""); STR_TO_VARSTR(buf, fillTagCol == 0 ? "" : "disabled");
colDataSetVal(pCol5, pBlock->info.rows, buf, false); colDataSetVal(pCol5, pBlock->info.rows, buf, false);
STR_TO_VARSTR(buf, ""); STR_TO_VARSTR(buf, fillTagCol == 0 ? "" : "disabled");
colDataSetVal(pCol6, pBlock->info.rows, buf, false); colDataSetVal(pCol6, pBlock->info.rows, buf, false);
STR_TO_VARSTR(buf, ""); STR_TO_VARSTR(buf, fillTagCol == 0 ? "" : "disabled");
colDataSetVal(pCol7, pBlock->info.rows, buf, false); colDataSetVal(pCol7, pBlock->info.rows, buf, false);
} }
} }
fillTagCol = 0;
++(pBlock->info.rows); ++(pBlock->info.rows);
} }
if (pMeta->tableType == TSDB_SUPER_TABLE && biMode != 0) { if (pMeta->tableType == TSDB_SUPER_TABLE && biMode != 0) {
@ -355,7 +359,7 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbName, ch
break; break;
} }
char* retentions = buildRetension(pCfg->pRetensions); char* retentions = buildRetension(pCfg->pRetensions);
int32_t dbFNameLen = strlen(dbFName); int32_t dbFNameLen = strlen(dbFName);
int32_t hashPrefix = 0; int32_t hashPrefix = 0;
if (pCfg->hashPrefix > 0) { if (pCfg->hashPrefix > 0) {
@ -367,17 +371,20 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbName, ch
if (IS_SYS_DBNAME(dbName)) { if (IS_SYS_DBNAME(dbName)) {
len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE DATABASE `%s`", dbName); len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE DATABASE `%s`", dbName);
} else { } else {
len += sprintf( len += sprintf(buf2 + VARSTR_HEADER_SIZE,
buf2 + VARSTR_HEADER_SIZE, "CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm "
"CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm " "WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d STT_TRIGGER %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d "
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d STT_TRIGGER %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d " "PRECISION '%s' REPLICA %d "
"WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d TABLE_PREFIX %d TABLE_SUFFIX %d TSDB_PAGESIZE %d " "WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d TABLE_PREFIX %d TABLE_SUFFIX %d TSDB_PAGESIZE %d "
"WAL_RETENTION_PERIOD %d WAL_RETENTION_SIZE %" PRId64 " KEEP_TIME_OFFSET %d ENCRYPT_ALGORITHM '%s' S3_CHUNKSIZE %d S3_KEEPLOCAL %dm S3_COMPACT %d", "WAL_RETENTION_PERIOD %d WAL_RETENTION_SIZE %" PRId64
dbName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile, " KEEP_TIME_OFFSET %d ENCRYPT_ALGORITHM '%s' S3_CHUNKSIZE %d S3_KEEPLOCAL %dm S3_COMPACT %d",
pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->sstTrigger, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, dbName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression,
pCfg->pages, pCfg->pageSize, prec, pCfg->replications, pCfg->walLevel, pCfg->numOfVgroups, pCfg->daysPerFile, pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->sstTrigger,
1 == pCfg->numOfStables, hashPrefix, pCfg->hashSuffix, pCfg->tsdbPageSize, pCfg->walRetentionPeriod, pCfg->walRetentionSize, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages, pCfg->pageSize, prec,
pCfg->keepTimeOffset, encryptAlgorithmStr(pCfg->encryptAlgorithm), pCfg->s3ChunkSize, pCfg->s3KeepLocal, pCfg->s3Compact); pCfg->replications, pCfg->walLevel, pCfg->numOfVgroups, 1 == pCfg->numOfStables, hashPrefix,
pCfg->hashSuffix, pCfg->tsdbPageSize, pCfg->walRetentionPeriod, pCfg->walRetentionSize,
pCfg->keepTimeOffset, encryptAlgorithmStr(pCfg->encryptAlgorithm), pCfg->s3ChunkSize,
pCfg->s3KeepLocal, pCfg->s3Compact);
if (retentions) { if (retentions) {
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions); len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions);
@ -391,7 +398,9 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbName, ch
colDataSetVal(pCol2, 0, buf2, false); colDataSetVal(pCol2, 0, buf2, false);
} }
#define CHECK_LEADER(n) (row[n] && (fields[n].type == TSDB_DATA_TYPE_VARCHAR && strncasecmp(row[n], "leader", varDataLen((char *)row[n] - VARSTR_HEADER_SIZE)) == 0)) #define CHECK_LEADER(n) \
(row[n] && (fields[n].type == TSDB_DATA_TYPE_VARCHAR && \
strncasecmp(row[n], "leader", varDataLen((char*)row[n] - VARSTR_HEADER_SIZE)) == 0))
// on this row, if have leader return true else return false // on this row, if have leader return true else return false
bool existLeaderRole(TAOS_ROW row, TAOS_FIELD* fields, int nFields) { bool existLeaderRole(TAOS_ROW row, TAOS_FIELD* fields, int nFields) {
// vgroup_id | db_name | tables | v1_dnode | v1_status | v2_dnode | v2_status | v3_dnode | v3_status | v4_dnode | // vgroup_id | db_name | tables | v1_dnode | v1_status | v2_dnode | v2_status | v3_dnode | v3_status | v4_dnode |
@ -548,23 +557,25 @@ static int32_t buildCreateViewResultDataBlock(SSDataBlock** pOutput) {
return code; return code;
} }
void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) { void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfColumns; ++i) { for (int32_t i = 0; i < pCfg->numOfColumns; ++i) {
SSchema* pSchema = pCfg->pSchemas + i; SSchema* pSchema = pCfg->pSchemas + i;
char type[32 + 60]; // 60 byte for compress info char type[32 + 60]; // 60 byte for compress info
sprintf(type, "%s", tDataTypes[pSchema->type].name); sprintf(type, "%s", tDataTypes[pSchema->type].name);
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_VARBINARY == pSchema->type || TSDB_DATA_TYPE_GEOMETRY == pSchema->type) { if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_VARBINARY == pSchema->type ||
TSDB_DATA_TYPE_GEOMETRY == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE)); sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) { } else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
} }
if (useCompress(pCfg->tableType)) { if (useCompress(pCfg->tableType)) {
sprintf(type + strlen(type), " ENCODE \'%s\'", columnEncodeStr(COMPRESS_L1_TYPE_U32(pCfg->pSchemaExt[i].compress))); sprintf(type + strlen(type), " ENCODE \'%s\'",
sprintf(type + strlen(type), " COMPRESS \'%s\'", columnCompressStr(COMPRESS_L2_TYPE_U32(pCfg->pSchemaExt[i].compress))); columnEncodeStr(COMPRESS_L1_TYPE_U32(pCfg->pSchemaExt[i].compress)));
sprintf(type + strlen(type), " LEVEL \'%s\'", columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(pCfg->pSchemaExt[i].compress))); sprintf(type + strlen(type), " COMPRESS \'%s\'",
columnCompressStr(COMPRESS_L2_TYPE_U32(pCfg->pSchemaExt[i].compress)));
sprintf(type + strlen(type), " LEVEL \'%s\'",
columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(pCfg->pSchemaExt[i].compress)));
} }
if (!(pSchema->flags & COL_IS_KEY)) { if (!(pSchema->flags & COL_IS_KEY)) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type); *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type);
@ -580,7 +591,8 @@ void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) {
SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i; SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i;
char type[32]; char type[32];
sprintf(type, "%s", tDataTypes[pSchema->type].name); sprintf(type, "%s", tDataTypes[pSchema->type].name);
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_VARBINARY == pSchema->type || TSDB_DATA_TYPE_GEOMETRY == pSchema->type) { if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_VARBINARY == pSchema->type ||
TSDB_DATA_TYPE_GEOMETRY == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE)); sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) { } else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
@ -823,7 +835,8 @@ static int32_t setCreateViewResultIntoDataBlock(SSDataBlock* pBlock, SShowCreate
SViewMeta* pMeta = pStmt->pViewMeta; SViewMeta* pMeta = pStmt->pViewMeta;
ASSERT(pMeta); ASSERT(pMeta);
snprintf(varDataVal(buf2), SHOW_CREATE_VIEW_RESULT_FIELD2_LEN - VARSTR_HEADER_SIZE, "CREATE VIEW `%s`.`%s` AS %s", pStmt->dbName, pStmt->viewName, pMeta->querySql); snprintf(varDataVal(buf2), SHOW_CREATE_VIEW_RESULT_FIELD2_LEN - VARSTR_HEADER_SIZE, "CREATE VIEW `%s`.`%s` AS %s",
pStmt->dbName, pStmt->viewName, pMeta->querySql);
int32_t len = strlen(varDataVal(buf2)); int32_t len = strlen(varDataVal(buf2));
varDataLen(buf2) = (len > 65535) ? 65535 : len; varDataLen(buf2) = (len > 65535) ? 65535 : len;
colDataSetVal(pCol2, 0, buf2, false); colDataSetVal(pCol2, 0, buf2, false);
@ -833,7 +846,6 @@ static int32_t setCreateViewResultIntoDataBlock(SSDataBlock* pBlock, SShowCreate
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRsp** pRsp) { static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
int32_t code = buildCreateTbResultDataBlock(&pBlock); int32_t code = buildCreateTbResultDataBlock(&pBlock);

View File

@ -745,7 +745,6 @@ SNode* createTimeOffsetValueNode(SAstCreateContext* pCxt, const SToken* pLiteral
return (SNode*)val; return (SNode*)val;
} }
SNode* createDefaultDatabaseCondValue(SAstCreateContext* pCxt) { SNode* createDefaultDatabaseCondValue(SAstCreateContext* pCxt) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
if (NULL == pCxt->pQueryCxt->db) { if (NULL == pCxt->pQueryCxt->db) {
@ -965,7 +964,8 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok
return (SNode*)tempTable; return (SNode*)tempTable;
} }
SNode* createJoinTableNode(SAstCreateContext* pCxt, EJoinType type, EJoinSubType stype, SNode* pLeft, SNode* pRight, SNode* pJoinCond) { SNode* createJoinTableNode(SAstCreateContext* pCxt, EJoinType type, EJoinSubType stype, SNode* pLeft, SNode* pRight,
SNode* pJoinCond) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
SJoinTableNode* joinTable = (SJoinTableNode*)nodesMakeNode(QUERY_NODE_JOIN_TABLE); SJoinTableNode* joinTable = (SJoinTableNode*)nodesMakeNode(QUERY_NODE_JOIN_TABLE);
CHECK_OUT_OF_MEM(joinTable); CHECK_OUT_OF_MEM(joinTable);
@ -1264,7 +1264,6 @@ SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) {
return pStmt; return pStmt;
} }
SNode* addJLimitClause(SAstCreateContext* pCxt, SNode* pJoin, SNode* pJLimit) { SNode* addJLimitClause(SAstCreateContext* pCxt, SNode* pJoin, SNode* pJLimit) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
if (NULL == pJLimit) { if (NULL == pJLimit) {
@ -1272,11 +1271,10 @@ SNode* addJLimitClause(SAstCreateContext* pCxt, SNode* pJoin, SNode* pJLimit) {
} }
SJoinTableNode* pJoinNode = (SJoinTableNode*)pJoin; SJoinTableNode* pJoinNode = (SJoinTableNode*)pJoin;
pJoinNode->pJLimit = pJLimit; pJoinNode->pJLimit = pJLimit;
return pJoin; return pJoin;
} }
SNode* addWindowOffsetClause(SAstCreateContext* pCxt, SNode* pJoin, SNode* pWinOffset) { SNode* addWindowOffsetClause(SAstCreateContext* pCxt, SNode* pJoin, SNode* pWinOffset) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
if (NULL == pWinOffset) { if (NULL == pWinOffset) {
@ -1284,11 +1282,10 @@ SNode* addWindowOffsetClause(SAstCreateContext* pCxt, SNode* pJoin, SNode* pWinO
} }
SJoinTableNode* pJoinNode = (SJoinTableNode*)pJoin; SJoinTableNode* pJoinNode = (SJoinTableNode*)pJoin;
pJoinNode->pWindowOffset = pWinOffset; pJoinNode->pWindowOffset = pWinOffset;
return pJoin; return pJoin;
} }
SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pProjectionList, SNode* pTable, SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pProjectionList, SNode* pTable,
SNodeList* pHint) { SNodeList* pHint) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
@ -1549,10 +1546,10 @@ static SNode* setDatabaseOptionImpl(SAstCreateContext* pCxt, SNode* pOptions, ED
case DB_OPTION_KEEP_TIME_OFFSET: { case DB_OPTION_KEEP_TIME_OFFSET: {
pDbOptions->keepTimeOffset = taosStr2Int32(((SToken*)pVal)->z, NULL, 10); pDbOptions->keepTimeOffset = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break; break;
case DB_OPTION_ENCRYPT_ALGORITHM: case DB_OPTION_ENCRYPT_ALGORITHM:
COPY_STRING_FORM_STR_TOKEN(pDbOptions->encryptAlgorithmStr, (SToken*)pVal); COPY_STRING_FORM_STR_TOKEN(pDbOptions->encryptAlgorithmStr, (SToken*)pVal);
pDbOptions->encryptAlgorithm = TSDB_DEFAULT_ENCRYPT_ALGO; pDbOptions->encryptAlgorithm = TSDB_DEFAULT_ENCRYPT_ALGO;
break; break;
} }
default: default:
break; break;
@ -1744,17 +1741,17 @@ SNode* setColumnOptions(SAstCreateContext* pCxt, SNode* pOptions, EColumnOptionT
memset(((SColumnOptions*)pOptions)->compress, 0, TSDB_CL_COMPRESS_OPTION_LEN); memset(((SColumnOptions*)pOptions)->compress, 0, TSDB_CL_COMPRESS_OPTION_LEN);
COPY_STRING_FORM_STR_TOKEN(((SColumnOptions*)pOptions)->compress, (SToken*)pVal); COPY_STRING_FORM_STR_TOKEN(((SColumnOptions*)pOptions)->compress, (SToken*)pVal);
if (0 == strlen(((SColumnOptions*)pOptions)->compress)) { if (0 == strlen(((SColumnOptions*)pOptions)->compress)) {
pCxt->errCode = TSDB_CODE_TSC_ENCODE_PARAM_ERROR; pCxt->errCode = TSDB_CODE_TSC_COMPRESS_PARAM_ERROR;
} }
break; break;
case COLUMN_OPTION_LEVEL: case COLUMN_OPTION_LEVEL:
memset(((SColumnOptions*)pOptions)->compressLevel, 0, TSDB_CL_COMPRESS_OPTION_LEN); memset(((SColumnOptions*)pOptions)->compressLevel, 0, TSDB_CL_COMPRESS_OPTION_LEN);
COPY_STRING_FORM_STR_TOKEN(((SColumnOptions*)pOptions)->compressLevel, (SToken*)pVal); COPY_STRING_FORM_STR_TOKEN(((SColumnOptions*)pOptions)->compressLevel, (SToken*)pVal);
if (0 == strlen(((SColumnOptions*)pOptions)->compressLevel)) { if (0 == strlen(((SColumnOptions*)pOptions)->compressLevel)) {
pCxt->errCode = TSDB_CODE_TSC_ENCODE_PARAM_ERROR; pCxt->errCode = TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
} }
break; break;
case COLUMN_OPTION_PRIMARYKEY: case COLUMN_OPTION_PRIMARYKEY:
((SColumnOptions*)pOptions)->bPrimaryKey = true; ((SColumnOptions*)pOptions)->bPrimaryKey = true;
break; break;
default: default:
@ -1789,7 +1786,7 @@ SDataType createDataType(uint8_t type) {
SDataType createVarLenDataType(uint8_t type, const SToken* pLen) { SDataType createVarLenDataType(uint8_t type, const SToken* pLen) {
int32_t len = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE; int32_t len = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
if (type == TSDB_DATA_TYPE_NCHAR) len /= TSDB_NCHAR_SIZE; if (type == TSDB_DATA_TYPE_NCHAR) len /= TSDB_NCHAR_SIZE;
if(pLen) len = taosStr2Int32(pLen->z, NULL, 10); if (pLen) len = taosStr2Int32(pLen->z, NULL, 10);
SDataType dt = {.type = type, .precision = 0, .scale = 0, .bytes = len}; SDataType dt = {.type = type, .precision = 0, .scale = 0, .bytes = len};
return dt; return dt;
} }
@ -1895,8 +1892,8 @@ SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable,
return createAlterTableStmtFinalize(pRealTable, pStmt); return createAlterTableStmtFinalize(pRealTable, pStmt);
} }
SNode* createAlterTableAddModifyColOptions(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pColName, SNode* createAlterTableAddModifyColOptions(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType,
SNode* pOptions) { SToken* pColName, SNode* pOptions) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
if (!checkColumnName(pCxt, pColName)) { if (!checkColumnName(pCxt, pColName)) {
return NULL; return NULL;
@ -2965,7 +2962,7 @@ SNode* createTSMAOptions(SAstCreateContext* pCxt, SNodeList* pFuncs) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
STSMAOptions* pOptions = (STSMAOptions*)nodesMakeNode(QUERY_NODE_TSMA_OPTIONS); STSMAOptions* pOptions = (STSMAOptions*)nodesMakeNode(QUERY_NODE_TSMA_OPTIONS);
if (!pOptions) { if (!pOptions) {
//nodesDestroyList(pTSMAFuncs); // nodesDestroyList(pTSMAFuncs);
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "Out of memory"); snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "Out of memory");
return NULL; return NULL;

View File

@ -735,7 +735,7 @@ static int32_t getTableTsmas(STranslateContext* pCxt, const SName* pName, SArray
} }
static int32_t getTsma(STranslateContext* pCxt, const SName* pName, STableTSMAInfo** pTsma) { static int32_t getTsma(STranslateContext* pCxt, const SName* pName, STableTSMAInfo** pTsma) {
int32_t code = 0; int32_t code = 0;
SParseContext* pParCxt = pCxt->pParseCxt; SParseContext* pParCxt = pCxt->pParseCxt;
if (pParCxt->async) { if (pParCxt->async) {
code = getTsmaFromCache(pCxt->pMetaCache, pName, pTsma); code = getTsmaFromCache(pCxt->pMetaCache, pName, pTsma);
@ -3676,7 +3676,7 @@ static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNo
char buf[TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1]; char buf[TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1];
for (int32_t i = 0; i < pRealTable->pTsmas->size; ++i) { for (int32_t i = 0; i < pRealTable->pTsmas->size; ++i) {
STableTSMAInfo* pTsma = taosArrayGetP(pRealTable->pTsmas, i); STableTSMAInfo* pTsma = taosArrayGetP(pRealTable->pTsmas, i);
SName tsmaTargetTbName = {0}; SName tsmaTargetTbName = {0};
toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, "", &tsmaTargetTbName); toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, "", &tsmaTargetTbName);
int32_t len = snprintf(buf, TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN, "%s.%s_%s", pTsma->dbFName, pTsma->name, int32_t len = snprintf(buf, TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN, "%s.%s_%s", pTsma->dbFName, pTsma->name,
pRealTable->table.tableName); pRealTable->table.tableName);
@ -3684,7 +3684,7 @@ static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNo
strncpy(tsmaTargetTbName.tname, buf, MD5_OUTPUT_LEN); strncpy(tsmaTargetTbName.tname, buf, MD5_OUTPUT_LEN);
collectUseTable(&tsmaTargetTbName, pCxt->pTargetTables); collectUseTable(&tsmaTargetTbName, pCxt->pTargetTables);
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
bool exists = false; bool exists = false;
code = catalogGetCachedTableHashVgroup(pCxt->pParseCxt->pCatalog, &tsmaTargetTbName, &vgInfo, &exists); code = catalogGetCachedTableHashVgroup(pCxt->pParseCxt->pCatalog, &tsmaTargetTbName, &vgInfo, &exists);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
ASSERT(exists); ASSERT(exists);
@ -5695,8 +5695,8 @@ static int32_t setEqualTbnameTableVgroups(STranslateContext* pCxt, SSelectStmt*
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < taosArrayGetSize(aTables); ++i) { for (int i = 0; i < taosArrayGetSize(aTables); ++i) {
SEqCondTbNameTableInfo* pInfo = taosArrayGet(aTables, i); SEqCondTbNameTableInfo* pInfo = taosArrayGet(aTables, i);
int32_t nTbls = taosArrayGetSize(pInfo->aTbnames); int32_t nTbls = taosArrayGetSize(pInfo->aTbnames);
int32_t numOfVgs = pInfo->pRealTable->pVgroupList->numOfVgroups; int32_t numOfVgs = pInfo->pRealTable->pVgroupList->numOfVgroups;
SVgroupsInfo* vgsInfo = taosMemoryMalloc(sizeof(SVgroupsInfo) + nTbls * sizeof(SVgroupInfo)); SVgroupsInfo* vgsInfo = taosMemoryMalloc(sizeof(SVgroupsInfo) + nTbls * sizeof(SVgroupInfo));
findVgroupsFromEqualTbname(pCxt, pInfo->aTbnames, pInfo->pRealTable->table.dbName, numOfVgs, vgsInfo); findVgroupsFromEqualTbname(pCxt, pInfo->aTbnames, pInfo->pRealTable->table.dbName, numOfVgs, vgsInfo);
@ -5705,7 +5705,7 @@ static int32_t setEqualTbnameTableVgroups(STranslateContext* pCxt, SSelectStmt*
pInfo->pRealTable->pVgroupList = vgsInfo; pInfo->pRealTable->pVgroupList = vgsInfo;
} else { } else {
taosMemoryFree(vgsInfo); taosMemoryFree(vgsInfo);
} }
vgsInfo = NULL; vgsInfo = NULL;
if (pInfo->pRealTable->pTsmas) { if (pInfo->pRealTable->pTsmas) {
@ -5714,12 +5714,12 @@ static int32_t setEqualTbnameTableVgroups(STranslateContext* pCxt, SSelectStmt*
for (int32_t i = 0; i < pInfo->pRealTable->pTsmas->size; ++i) { for (int32_t i = 0; i < pInfo->pRealTable->pTsmas->size; ++i) {
STableTSMAInfo* pTsma = taosArrayGetP(pInfo->pRealTable->pTsmas, i); STableTSMAInfo* pTsma = taosArrayGetP(pInfo->pRealTable->pTsmas, i);
SArray *pTbNames = taosArrayInit(pInfo->aTbnames->size, POINTER_BYTES); SArray* pTbNames = taosArrayInit(pInfo->aTbnames->size, POINTER_BYTES);
if (!pTbNames) return TSDB_CODE_OUT_OF_MEMORY; if (!pTbNames) return TSDB_CODE_OUT_OF_MEMORY;
for (int32_t k = 0; k < pInfo->aTbnames->size; ++k) { for (int32_t k = 0; k < pInfo->aTbnames->size; ++k) {
const char* pTbName = taosArrayGetP(pInfo->aTbnames, k); const char* pTbName = taosArrayGetP(pInfo->aTbnames, k);
char* pNewTbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1); char* pNewTbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1);
if (!pNewTbName) { if (!pNewTbName) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
break; break;
@ -7225,9 +7225,9 @@ static int32_t checkColumnOptions(SNodeList* pList) {
if (!checkColumnEncodeOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->encode)) if (!checkColumnEncodeOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->encode))
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
if (!checkColumnCompressOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->compress)) if (!checkColumnCompressOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->compress))
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; return TSDB_CODE_TSC_COMPRESS_PARAM_ERROR;
if (!checkColumnLevelOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->compressLevel)) if (!checkColumnLevelOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->compressLevel))
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; return TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -7245,11 +7245,11 @@ static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray, bool calB
FOREACH(pNode, pList) { FOREACH(pNode, pList) {
SColumnDefNode* pCol = (SColumnDefNode*)pNode; SColumnDefNode* pCol = (SColumnDefNode*)pNode;
SFieldWithOptions field = {.type = pCol->dataType.type, .bytes = calcTypeBytes(pCol->dataType)}; SFieldWithOptions field = {.type = pCol->dataType.type, .bytes = calcTypeBytes(pCol->dataType)};
if (calBytes) { if (calBytes) {
field.bytes = calcTypeBytes(pCol->dataType); field.bytes = calcTypeBytes(pCol->dataType);
} else { } else {
field.bytes = pCol->dataType.bytes; field.bytes = pCol->dataType.bytes;
} }
strcpy(field.name, pCol->colName); strcpy(field.name, pCol->colName);
if (pCol->pOptions) { if (pCol->pOptions) {
@ -7742,7 +7742,7 @@ static int32_t addWdurationToSampleProjects(SNodeList* pProjectionList) {
return nodesListAppend(pProjectionList, (SNode*)pFunc); return nodesListAppend(pProjectionList, (SNode*)pFunc);
} }
static int32_t buildProjectsForSampleAst(SSampleAstInfo* pInfo, SNodeList** pList, int32_t *pProjectionTotalLen) { static int32_t buildProjectsForSampleAst(SSampleAstInfo* pInfo, SNodeList** pList, int32_t* pProjectionTotalLen) {
SNodeList* pProjectionList = pInfo->pFuncs; SNodeList* pProjectionList = pInfo->pFuncs;
pInfo->pFuncs = NULL; pInfo->pFuncs = NULL;
@ -8118,13 +8118,15 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt*
TAOS_FIELD field = {0}; TAOS_FIELD field = {0};
strcpy(field.name, pStmt->colName); strcpy(field.name, pStmt->colName);
if (!checkColumnEncode(pStmt->pColOptions->encode)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; if (!checkColumnEncode(pStmt->pColOptions->encode)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
if (!checkColumnCompress(pStmt->pColOptions->compress)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; if (!checkColumnCompress(pStmt->pColOptions->compress)) return TSDB_CODE_TSC_COMPRESS_PARAM_ERROR;
if (!checkColumnLevel(pStmt->pColOptions->compressLevel)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; if (!checkColumnLevel(pStmt->pColOptions->compressLevel)) return TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
int8_t valid = int32_t code =
setColCompressByOption(pStmt->dataType.type, columnEncodeVal(pStmt->pColOptions->encode), setColCompressByOption(pStmt->dataType.type, columnEncodeVal(pStmt->pColOptions->encode),
columnCompressVal(pStmt->pColOptions->compress), columnCompressVal(pStmt->pColOptions->compress),
columnLevelVal(pStmt->pColOptions->compressLevel), false, (uint32_t*)&field.bytes); columnLevelVal(pStmt->pColOptions->compressLevel), false, (uint32_t*)&field.bytes);
if (!valid) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; if (code != TSDB_CODE_SUCCESS) {
return code;
}
taosArrayPush(pAlterReq->pFields, &field); taosArrayPush(pAlterReq->pFields, &field);
break; break;
} }
@ -10708,8 +10710,8 @@ static int32_t compareTsmaFuncWithFuncAndColId(SNode* pNode1, SNode* pNode2) {
// pFuncs are already sorted by funcId and colId // pFuncs are already sorted by funcId and colId
static int32_t deduplicateTsmaFuncs(SNodeList* pFuncs) { static int32_t deduplicateTsmaFuncs(SNodeList* pFuncs) {
SNode* pLast = NULL; SNode* pLast = NULL;
SNode* pFunc = NULL; SNode* pFunc = NULL;
SNodeList* pRes = NULL; SNodeList* pRes = NULL;
FOREACH(pFunc, pFuncs) { FOREACH(pFunc, pFuncs) {
if (pLast) { if (pLast) {
@ -10726,7 +10728,8 @@ static int32_t deduplicateTsmaFuncs(SNodeList* pFuncs) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t buildTSMAAstStreamSubTable(SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, const SNode* pTbname, SNode** pSubTable) { static int32_t buildTSMAAstStreamSubTable(SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, const SNode* pTbname,
SNode** pSubTable) {
int32_t code = 0; int32_t code = 0;
SFunctionNode* pMd5Func = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); SFunctionNode* pMd5Func = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
SFunctionNode* pConcatFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); SFunctionNode* pConcatFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
@ -10768,8 +10771,8 @@ _end:
return code; return code;
} }
static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, const char* tbName,
const char* tbName, int32_t numOfTags, const SSchema* pTags) { int32_t numOfTags, const SSchema* pTags) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSampleAstInfo info = {0}; SSampleAstInfo info = {0};
info.createSmaIndex = true; info.createSmaIndex = true;
@ -10813,16 +10816,17 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC
if (!pTagCol) code = TSDB_CODE_OUT_OF_MEMORY; if (!pTagCol) code = TSDB_CODE_OUT_OF_MEMORY;
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = buildTSMAAstStreamSubTable(pStmt, pReq, pStmt->pOptions->recursiveTsma ? pTagCol : (SNode*)pTbnameFunc, (SNode**)&pSubTable); code = buildTSMAAstStreamSubTable(pStmt, pReq, pStmt->pOptions->recursiveTsma ? pTagCol : (SNode*)pTbnameFunc,
(SNode**)&pSubTable);
info.pSubTable = (SNode*)pSubTable; info.pSubTable = (SNode*)pSubTable;
} }
if (code == TSDB_CODE_SUCCESS) if (code == TSDB_CODE_SUCCESS)
code = nodesListMakeStrictAppend(&info.pTags, pStmt->pOptions->recursiveTsma ? pTagCol : nodesCloneNode((SNode*)pTbnameFunc)); code = nodesListMakeStrictAppend(
&info.pTags, pStmt->pOptions->recursiveTsma ? pTagCol : nodesCloneNode((SNode*)pTbnameFunc));
} }
} }
if (code == TSDB_CODE_SUCCESS && !pStmt->pOptions->recursiveTsma) if (code == TSDB_CODE_SUCCESS && !pStmt->pOptions->recursiveTsma) code = fmCreateStateFuncs(info.pFuncs);
code = fmCreateStateFuncs(info.pFuncs);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
int32_t pProjectionTotalLen = 0; int32_t pProjectionTotalLen = 0;
@ -10914,7 +10918,8 @@ static int32_t rewriteTSMAFuncs(STranslateContext* pCxt, SCreateTSMAStmt* pStmt,
return code; return code;
} }
static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, SName* useTbName) { static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq,
SName* useTbName) {
SName name; SName name;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), pReq->name); tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), pReq->name);
memset(&name, 0, sizeof(SName)); memset(&name, 0, sizeof(SName));
@ -10924,15 +10929,15 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i; pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
pReq->intervalUnit = TIME_UNIT_MILLISECOND; pReq->intervalUnit = TIME_UNIT_MILLISECOND;
#define TSMA_MIN_INTERVAL_MS 1 // 1ms #define TSMA_MIN_INTERVAL_MS 1 // 1ms
#define TSMA_MAX_INTERVAL_MS (60 * 60 * 1000) // 1h #define TSMA_MAX_INTERVAL_MS (60 * 60 * 1000) // 1h
if (pReq->interval > TSMA_MAX_INTERVAL_MS || pReq->interval < TSMA_MIN_INTERVAL_MS) { if (pReq->interval > TSMA_MAX_INTERVAL_MS || pReq->interval < TSMA_MIN_INTERVAL_MS) {
return TSDB_CODE_TSMA_INVALID_INTERVAL; return TSDB_CODE_TSMA_INVALID_INTERVAL;
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
STableTSMAInfo* pRecursiveTsma = NULL; STableTSMAInfo* pRecursiveTsma = NULL;
int32_t numOfCols = 0, numOfTags = 0; int32_t numOfCols = 0, numOfTags = 0;
SSchema * pCols = NULL, *pTags = NULL; SSchema * pCols = NULL, *pTags = NULL;
@ -11022,7 +11027,7 @@ static int32_t translateCreateTSMA(STranslateContext* pCxt, SCreateTSMAStmt* pSt
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = buildCreateTSMAReq(pCxt, pStmt, pStmt->pReq, &useTbName); code = buildCreateTSMAReq(pCxt, pStmt, pStmt->pReq, &useTbName);
} }
if ( TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = collectUseTable(&useTbName, pCxt->pTargetTables); code = collectUseTable(&useTbName, pCxt->pTargetTables);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -11063,7 +11068,8 @@ int32_t translatePostCreateTSMA(SParseContext* pParseCxt, SQuery* pQuery, SSData
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (interval.interval > 0) { if (interval.interval > 0) {
pStmt->pReq->lastTs = taosTimeAdd(taosTimeTruncate(lastTs, &interval), interval.interval, interval.intervalUnit, interval.precision); pStmt->pReq->lastTs = taosTimeAdd(taosTimeTruncate(lastTs, &interval), interval.interval, interval.intervalUnit,
interval.precision);
} else { } else {
pStmt->pReq->lastTs = lastTs + 1; // start key of the next time window pStmt->pReq->lastTs = lastTs + 1; // start key of the next time window
} }
@ -11074,7 +11080,7 @@ int32_t translatePostCreateTSMA(SParseContext* pParseCxt, SQuery* pQuery, SSData
code = setQuery(&cxt, pQuery); code = setQuery(&cxt, pQuery);
} }
if ( TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SName name = {0}; SName name = {0};
toName(pParseCxt->acctId, pStmt->dbName, pStmt->originalTbName, &name); toName(pParseCxt->acctId, pStmt->dbName, pStmt->originalTbName, &name);
code = collectUseTable(&name, cxt.pTargetTables); code = collectUseTable(&name, cxt.pTargetTables);
@ -11090,7 +11096,7 @@ int32_t translatePostCreateTSMA(SParseContext* pParseCxt, SQuery* pQuery, SSData
} }
static int32_t translateDropTSMA(STranslateContext* pCxt, SDropTSMAStmt* pStmt) { static int32_t translateDropTSMA(STranslateContext* pCxt, SDropTSMAStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SMDropSmaReq dropReq = {0}; SMDropSmaReq dropReq = {0};
SName name; SName name;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), dropReq.name); tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), dropReq.name);
@ -12033,13 +12039,13 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
toSchema(pColDef, index + 1, pScheam); toSchema(pColDef, index + 1, pScheam);
if (pColDef->pOptions) { if (pColDef->pOptions) {
req.colCmpr.pColCmpr[index].id = index + 1; req.colCmpr.pColCmpr[index].id = index + 1;
int8_t valid = setColCompressByOption( int32_t code = setColCompressByOption(
pScheam->type, columnEncodeVal(((SColumnOptions*)pColDef->pOptions)->encode), pScheam->type, columnEncodeVal(((SColumnOptions*)pColDef->pOptions)->encode),
columnCompressVal(((SColumnOptions*)pColDef->pOptions)->compress), columnCompressVal(((SColumnOptions*)pColDef->pOptions)->compress),
columnLevelVal(((SColumnOptions*)pColDef->pOptions)->compressLevel), true, &req.colCmpr.pColCmpr[index].alg); columnLevelVal(((SColumnOptions*)pColDef->pOptions)->compressLevel), true, &req.colCmpr.pColCmpr[index].alg);
if (!valid) { if (code != TSDB_CODE_SUCCESS) {
tdDestroySVCreateTbReq(&req); tdDestroySVCreateTbReq(&req);
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; return code;
} }
} }
++index; ++index;
@ -12499,7 +12505,6 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl
goto over; goto over;
} }
SVgroupInfo info = {0}; SVgroupInfo info = {0};
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info); code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info);
@ -12586,7 +12591,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch); taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
FOREACH(pNode, pStmt->pTables) { FOREACH(pNode, pStmt->pTables) {
SDropTableClause* pClause = (SDropTableClause*)pNode; SDropTableClause* pClause = (SDropTableClause*)pNode;
SName name; SName name;
toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &name); toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &name);
int32_t code = buildDropTableVgroupHashmap(pCxt, pClause, &name, &tableType, pVgroupHashmap); int32_t code = buildDropTableVgroupHashmap(pCxt, pClause, &name, &tableType, pVgroupHashmap);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
@ -12653,8 +12658,8 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta, static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
SVAlterTbReq* pReq) { SVAlterTbReq* pReq) {
SName tbName = {0}; SName tbName = {0};
SArray* pTsmas = NULL; SArray* pTsmas = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pCxt->pMetaCache) { if (pCxt->pMetaCache) {
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tbName); toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tbName);
@ -12679,9 +12684,9 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
pReq->colId = pSchema->colId; pReq->colId = pSchema->colId;
pReq->tagType = pSchema->type; pReq->tagType = pSchema->type;
STag* pTag = NULL; STag* pTag = NULL;
SToken token; SToken token;
char tokenBuf[TSDB_MAX_TAGS_LEN]; char tokenBuf[TSDB_MAX_TAGS_LEN];
const char* tagStr = pStmt->pVal->literal; const char* tagStr = pStmt->pVal->literal;
NEXT_TOKEN_WITH_PREV(tagStr, token); NEXT_TOKEN_WITH_PREV(tagStr, token);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -12879,14 +12884,12 @@ static int buildAlterTableColumnCompress(STranslateContext* pCxt, SAlterTableStm
} }
if (!checkColumnEncode(pStmt->pColOptions->encode)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; if (!checkColumnEncode(pStmt->pColOptions->encode)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
if (!checkColumnCompress(pStmt->pColOptions->compress)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; if (!checkColumnCompress(pStmt->pColOptions->compress)) return TSDB_CODE_TSC_COMPRESS_PARAM_ERROR;
if (!checkColumnLevel(pStmt->pColOptions->compressLevel)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; if (!checkColumnLevel(pStmt->pColOptions->compressLevel)) return TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
int8_t valid = setColCompressByOption(pSchema->type, columnEncodeVal(pStmt->pColOptions->encode), int8_t code = setColCompressByOption(pSchema->type, columnEncodeVal(pStmt->pColOptions->encode),
columnCompressVal(pStmt->pColOptions->compress), columnCompressVal(pStmt->pColOptions->compress),
columnLevelVal(pStmt->pColOptions->compressLevel), true, &pReq->compress); columnLevelVal(pStmt->pColOptions->compressLevel), true, &pReq->compress);
if (!valid) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; return code;
return TSDB_CODE_SUCCESS;
} }
static int32_t buildAlterTbReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta, static int32_t buildAlterTbReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,

View File

@ -151,8 +151,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_QUERY_KILLED, "Query killed")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_EXEC_NODE, "No available execution node in current query policy configuration") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_EXEC_NODE, "No available execution node in current query policy configuration")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NOT_STABLE_ERROR, "Table is not a super table") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NOT_STABLE_ERROR, "Table is not a super table")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CACHE_ERROR, "Stmt cache error") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CACHE_ERROR, "Stmt cache error")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ENCODE_PARAM_ERROR, "Invalid compress param") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ENCODE_PARAM_ERROR, "Invalid encode param")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ENCODE_PARAM_NULL, "Not found compress param") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ENCODE_PARAM_NULL, "Not found compress param")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_COMPRESS_PARAM_ERROR, "Invalid compress param")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR, "Invalid compress level param")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INTERNAL_ERROR, "Internal error") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INTERNAL_ERROR, "Internal error")
// mnode-common // mnode-common