diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7501c8fce0..4b624e333f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1134,11 +1134,11 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp); int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp); int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp); void tFreeSTableMetaRsp(STableMetaRsp* pRsp); -void tFreeSTableIndexRsp(void *info); +void tFreeSTableIndexRsp(void* info); typedef struct { - SArray* pMetaRsp; // Array of STableMetaRsp - SArray* pIndexRsp; // Array of STableIndexRsp; + SArray* pMetaRsp; // Array of STableMetaRsp + SArray* pIndexRsp; // Array of STableIndexRsp; } SSTbHbRsp; int32_t tSerializeSSTbHbRsp(void* buf, int32_t bufLen, SSTbHbRsp* pRsp); @@ -1305,8 +1305,9 @@ int32_t tSerializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq int32_t tDeserializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq); typedef struct { - int32_t connId; - int32_t queryId; + int32_t connId; // todo remove + int32_t queryId; // todo remove + char queryStrId[TSDB_QUERY_ID_LEN]; } SKillQueryReq; int32_t tSerializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq); diff --git a/include/common/ttokendef.h b/include/common/ttokendef.h index f5495db5ae..061c734484 100644 --- a/include/common/ttokendef.h +++ b/include/common/ttokendef.h @@ -78,7 +78,7 @@ #define TK_BUFFER 60 #define TK_CACHELAST 61 #define TK_COMP 62 -#define TK_DAYS 63 +#define TK_DURATION 63 #define TK_NK_VARIABLE 64 #define TK_FSYNC 65 #define TK_MAXROWS 66 diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 8d4e94663f..1282836425 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -89,6 +89,7 @@ typedef struct STableOptions { ENodeType type; char comment[TSDB_TB_COMMENT_LEN]; double filesFactor; + int32_t delay; SNodeList* pRollupFuncs; int32_t ttl; SNodeList* pSma; @@ -286,6 +287,11 @@ typedef struct SKillStmt { int32_t targetId; } SKillStmt; +typedef struct SKillQueryStmt { + ENodeType type; + char queryId[TSDB_QUERY_ID_LEN]; +} SKillQueryStmt; + typedef struct SStreamOptions { ENodeType type; int8_t triggerType; diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 3937fc13f4..3c25d8add4 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -204,6 +204,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, + QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, QUERY_NODE_PHYSICAL_PLAN_PROJECT, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 1a3bfdbb04..5d3e0cd142 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -34,7 +34,13 @@ typedef struct SLogicNode { uint8_t precision; } SLogicNode; -typedef enum EScanType { SCAN_TYPE_TAG = 1, SCAN_TYPE_TABLE, SCAN_TYPE_SYSTEM_TABLE, SCAN_TYPE_STREAM } EScanType; +typedef enum EScanType { + SCAN_TYPE_TAG = 1, + SCAN_TYPE_TABLE, + SCAN_TYPE_SYSTEM_TABLE, + SCAN_TYPE_STREAM, + SCAN_TYPE_TABLE_MERGE +} EScanType; typedef struct SScanLogicNode { SLogicNode node; @@ -262,6 +268,7 @@ typedef struct STableScanPhysiNode { } STableScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode; +typedef STableScanPhysiNode STableMergeScanPhysiNode; typedef STableScanPhysiNode SStreamScanPhysiNode; typedef struct SProjectPhysiNode { diff --git a/include/util/tdef.h b/include/util/tdef.h index 1fa8ad1912..1a8ba6a620 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -222,6 +222,8 @@ typedef enum ELogicConditionType { #define TSDB_APP_NAME_LEN TSDB_UNI_LEN #define TSDB_TB_COMMENT_LEN 1025 +#define TSDB_QUERY_ID_LEN 26 + /** * In some scenarios uint16_t (0~65535) is used to store the row len. * - Firstly, we use 65531(65535 - 4), as the SDataRow/SKVRow contains 4 bits header. @@ -341,6 +343,9 @@ typedef enum ELogicConditionType { #define TSDB_DB_SCHEMALESS_OFF 0 #define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF +// #define TSDB_MIN_ROLLUP_DELAY 1 +// #define TSDB_MAX_ROLLUP_DELAY 10 +// #define TSDB_DEFAULT_ROLLUP_DELAY 1 #define TSDB_MIN_ROLLUP_FILE_FACTOR 0 #define TSDB_MAX_ROLLUP_FILE_FACTOR 10 #define TSDB_DEFAULT_ROLLUP_FILE_FACTOR 0.1 diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index aafeca0b39..420ed8dcb2 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3369,8 +3369,7 @@ int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) tEncoderInit(&encoder, buf, bufLen); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI32(&encoder, pReq->connId) < 0) return -1; - if (tEncodeI32(&encoder, pReq->queryId) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->queryStrId) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -3383,8 +3382,7 @@ int32_t tDeserializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq tDecoderInit(&decoder, buf, bufLen); if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->connId) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->queryId) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->queryStrId) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index be1929d554..d73434f3f0 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1480,14 +1480,6 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { return code; } -static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) { - return physiTableScanNodeToJson(pObj, pJson); -} - -static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) { - return jsonToPhysiTableScanNode(pJson, pObj); -} - static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet"; static const char* jkSysTableScanPhysiPlanShowRewrite = "ShowRewrite"; static const char* jkSysTableScanPhysiPlanAccountId = "AccountId"; @@ -3964,9 +3956,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: return physiTagScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: - return physiTableScanNodeToJson(pObj, pJson); + case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: - return physiStreamScanNodeToJson(pObj, pJson); + return physiTableScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: return physiSysTableScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: @@ -4097,9 +4089,9 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: return jsonToPhysiTagScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: - return jsonToPhysiTableScanNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: - return jsonToPhysiStreamScanNode(pJson, pObj); + return jsonToPhysiTableScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: return jsonToPhysiSysTableScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 72666f833d..3bff010d43 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -209,9 +209,10 @@ SNode* nodesMakeNode(ENodeType type) { case QUERY_NODE_SHOW_CREATE_STABLE_STMT: case QUERY_NODE_SHOW_TRANSACTIONS_STMT: return makeNode(type, sizeof(SShowStmt)); - case QUERY_NODE_KILL_CONNECTION_STMT: case QUERY_NODE_KILL_QUERY_STMT: + return makeNode(type, sizeof(SKillQueryStmt)); case QUERY_NODE_KILL_TRANSACTION_STMT: + case QUERY_NODE_KILL_CONNECTION_STMT: return makeNode(type, sizeof(SKillStmt)); case QUERY_NODE_DELETE_STMT: return makeNode(type, sizeof(SDeleteStmt)); @@ -251,6 +252,8 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(STableScanPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: return makeNode(type, sizeof(STableSeqScanPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: + return makeNode(type, sizeof(STableMergeScanPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: return makeNode(type, sizeof(SStreamScanPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index 0c6663d29a..b8c6ffc843 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -60,6 +60,7 @@ typedef enum EDatabaseOptionType { typedef enum ETableOptionType { TABLE_OPTION_COMMENT = 1, TABLE_OPTION_FILE_FACTOR, + TABLE_OPTION_DELAY, TABLE_OPTION_ROLLUP, TABLE_OPTION_TTL, TABLE_OPTION_SMA @@ -187,6 +188,7 @@ SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SNode* pOptions, SNode* pQuery); SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pStreamName); SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId); +SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId); SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt); SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2); SNode* createRedistributeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId, SNodeList* pDnodes); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 83ad41aac0..0d9e6e4a73 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -168,8 +168,8 @@ db_options(A) ::= . db_options(A) ::= db_options(B) BUFFER NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_BUFFER, &C); } db_options(A) ::= db_options(B) CACHELAST NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_CACHELAST, &C); } db_options(A) ::= db_options(B) COMP NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_COMP, &C); } -db_options(A) ::= db_options(B) DAYS NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_DAYS, &C); } -db_options(A) ::= db_options(B) DAYS NK_VARIABLE(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_DAYS, &C); } +db_options(A) ::= db_options(B) DURATION NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_DAYS, &C); } +db_options(A) ::= db_options(B) DURATION NK_VARIABLE(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_DAYS, &C); } db_options(A) ::= db_options(B) FSYNC NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_FSYNC, &C); } db_options(A) ::= db_options(B) MAXROWS NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_MAXROWS, &C); } db_options(A) ::= db_options(B) MINROWS NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_MINROWS, &C); } @@ -317,8 +317,9 @@ tags_def(A) ::= TAGS NK_LP column_def_list(B) NK_RP. table_options(A) ::= . { A = createDefaultTableOptions(pCxt); } table_options(A) ::= table_options(B) COMMENT NK_STRING(C). { A = setTableOption(pCxt, B, TABLE_OPTION_COMMENT, &C); } +//table_options(A) ::= table_options(B) DELAY NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_DELAY, &C); } table_options(A) ::= table_options(B) FILE_FACTOR NK_FLOAT(C). { A = setTableOption(pCxt, B, TABLE_OPTION_FILE_FACTOR, &C); } -table_options(A) ::= table_options(B) ROLLUP NK_LP func_name_list(C) NK_RP. { A = setTableOption(pCxt, B, TABLE_OPTION_ROLLUP, C); } +table_options(A) ::= table_options(B) ROLLUP NK_LP rollup_func_list(C) NK_RP. { A = setTableOption(pCxt, B, TABLE_OPTION_ROLLUP, C); } table_options(A) ::= table_options(B) TTL NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_TTL, &C); } table_options(A) ::= table_options(B) SMA NK_LP col_name_list(C) NK_RP. { A = setTableOption(pCxt, B, TABLE_OPTION_SMA, C); } @@ -330,6 +331,15 @@ alter_table_options(A) ::= alter_table_options(B) alter_table_option(C). alter_table_option(A) ::= COMMENT NK_STRING(B). { A.type = TABLE_OPTION_COMMENT; A.val = B; } alter_table_option(A) ::= TTL NK_INTEGER(B). { A.type = TABLE_OPTION_TTL; A.val = B; } +%type rollup_func_list { SNodeList* } +%destructor rollup_func_list { nodesDestroyList($$); } +rollup_func_list(A) ::= rollup_func_name(B). { A = createNodeList(pCxt, B); } +rollup_func_list(A) ::= rollup_func_list(B) NK_COMMA rollup_func_name(C). { A = addNodeToList(pCxt, B, C); } + +rollup_func_name(A) ::= function_name(B). { A = createFunctionNode(pCxt, &B, NULL); } +rollup_func_name(A) ::= FIRST(B). { A = createFunctionNode(pCxt, &B, NULL); } +rollup_func_name(A) ::= LAST(B). { A = createFunctionNode(pCxt, &B, NULL); } + %type col_name_list { SNodeList* } %destructor col_name_list { nodesDestroyList($$); } col_name_list(A) ::= col_name(B). { A = createNodeList(pCxt, B); } @@ -378,13 +388,6 @@ table_name_cond(A) ::= table_name(B). from_db_opt(A) ::= . { A = createDefaultDatabaseCondValue(pCxt); } from_db_opt(A) ::= FROM db_name(B). { A = createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); } -%type func_name_list { SNodeList* } -%destructor func_name_list { nodesDestroyList($$); } -func_name_list(A) ::= func_name(B). { A = createNodeList(pCxt, B); } -func_name_list(A) ::= func_name_list(B) NK_COMMA func_name(C). { A = addNodeToList(pCxt, B, C); } - -func_name(A) ::= function_name(B). { A = createFunctionNode(pCxt, &B, NULL); } - /************************************************ create index ********************************************************/ cmd ::= CREATE SMA INDEX not_exists_opt(D) index_name(A) ON table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, D, &A, &B, NULL, C); } @@ -466,7 +469,7 @@ stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). /************************************************ kill connection/query ***********************************************/ cmd ::= KILL CONNECTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_CONNECTION_STMT, &A); } -cmd ::= KILL QUERY NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_QUERY_STMT, &A); } +cmd ::= KILL QUERY NK_STRING(A). { pCxt->pRootNode = createKillQueryStmt(pCxt, &A); } cmd ::= KILL TRANSACTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_TRANSACTION_STMT, &A); } /************************************************ merge/redistribute/ vgroup ******************************************/ diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index c75696cede..8a7987e4ec 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -804,10 +804,10 @@ SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOpti case DB_OPTION_RETENTIONS: ((SDatabaseOptions*)pOptions)->pRetentions = pVal; break; -// case DB_OPTION_SCHEMALESS: -// ((SDatabaseOptions*)pOptions)->schemaless = taosStr2Int8(((SToken*)pVal)->z, NULL, 10); -// ((SDatabaseOptions*)pOptions)->schemaless = 0; -// break; + // case DB_OPTION_SCHEMALESS: + // ((SDatabaseOptions*)pOptions)->schemaless = taosStr2Int8(((SToken*)pVal)->z, NULL, 10); + // ((SDatabaseOptions*)pOptions)->schemaless = 0; + // break; default: break; } @@ -867,6 +867,7 @@ SNode* createDefaultTableOptions(SAstCreateContext* pCxt) { CHECK_PARSER_STATUS(pCxt); STableOptions* pOptions = (STableOptions*)nodesMakeNode(QUERY_NODE_TABLE_OPTIONS); CHECK_OUT_OF_MEM(pOptions); + // pOptions->delay = TSDB_DEFAULT_ROLLUP_DELAY; pOptions->filesFactor = TSDB_DEFAULT_ROLLUP_FILE_FACTOR; pOptions->ttl = TSDB_DEFAULT_TABLE_TTL; return (SNode*)pOptions; @@ -876,7 +877,7 @@ SNode* createAlterTableOptions(SAstCreateContext* pCxt) { CHECK_PARSER_STATUS(pCxt); STableOptions* pOptions = (STableOptions*)nodesMakeNode(QUERY_NODE_TABLE_OPTIONS); CHECK_OUT_OF_MEM(pOptions); - pOptions->filesFactor = -1; + pOptions->delay = -1; pOptions->ttl = -1; return (SNode*)pOptions; } @@ -890,8 +891,8 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType sizeof(((STableOptions*)pOptions)->comment)); } break; - case TABLE_OPTION_FILE_FACTOR: - ((STableOptions*)pOptions)->filesFactor = taosStr2Double(((SToken*)pVal)->z, NULL); + case TABLE_OPTION_DELAY: + ((STableOptions*)pOptions)->delay = taosStr2Int32(((SToken*)pVal)->z, NULL, 10); break; case TABLE_OPTION_ROLLUP: ((STableOptions*)pOptions)->pRollupFuncs = pVal; @@ -1431,7 +1432,7 @@ SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const CHECK_PARSER_STATUS(pCxt); SDropStreamStmt* pStmt = (SDropStreamStmt*)nodesMakeNode(QUERY_NODE_DROP_STREAM_STMT); CHECK_OUT_OF_MEM(pStmt); - strncpy(pStmt->streamName, pStreamName->z, pStreamName->n); + strncpy(pStmt->streamName, pStreamName->z, TMIN(pStreamName->n, sizeof(pStmt->streamName) - 1)); pStmt->ignoreNotExists = ignoreNotExists; return (SNode*)pStmt; } @@ -1444,6 +1445,14 @@ SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId return (SNode*)pStmt; } +SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId) { + CHECK_PARSER_STATUS(pCxt); + SKillQueryStmt* pStmt = (SKillQueryStmt*)nodesMakeNode(QUERY_NODE_KILL_QUERY_STMT); + CHECK_OUT_OF_MEM(pStmt); + strncpy(pStmt->queryId, pQueryId->z, TMIN(pQueryId->n, sizeof(pStmt->queryId) - 1)); + return (SNode*)pStmt; +} + SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt) { CHECK_PARSER_STATUS(pCxt); SBalanceVgroupStmt* pStmt = (SBalanceVgroupStmt*)nodesMakeNode(QUERY_NODE_BALANCE_VGROUP_STMT); diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index a9d730e1ad..55ea7171fa 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -68,7 +68,7 @@ static SKeyword keywordTable[] = { {"CONTAINS", TK_CONTAINS}, {"DATABASE", TK_DATABASE}, {"DATABASES", TK_DATABASES}, - {"DAYS", TK_DAYS}, + // {"DAYS", TK_DAYS}, {"DBS", TK_DBS}, {"DELETE", TK_DELETE}, {"DESC", TK_DESC}, @@ -78,6 +78,7 @@ static SKeyword keywordTable[] = { {"DNODES", TK_DNODES}, {"DOUBLE", TK_DOUBLE}, {"DROP", TK_DROP}, + {"DURATION", TK_DURATION}, {"EXISTS", TK_EXISTS}, {"EXPLAIN", TK_EXPLAIN}, {"FILE_FACTOR", TK_FILE_FACTOR}, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 52abeadac9..bfb58d2cd1 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2839,6 +2839,9 @@ static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt if (TSDB_CODE_SUCCESS == code) { code = checTableFactorOption(pCxt, pStmt->pOptions->filesFactor); } + // if (TSDB_CODE_SUCCESS == code) { + // code = checkRangeOption(pCxt, "delay", pStmt->pOptions->delay, TSDB_MIN_ROLLUP_DELAY, TSDB_MAX_ROLLUP_DELAY); + // } if (TSDB_CODE_SUCCESS == code) { code = checkTableRollupOption(pCxt, pStmt->pOptions->pRollupFuncs); } @@ -3081,6 +3084,7 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStmt, SMCreateStbReq* pReq) { pReq->igExists = pStmt->ignoreExists; + // pReq->delay = pStmt->pOptions->delay; pReq->xFilesFactor = pStmt->pOptions->filesFactor; pReq->ttl = pStmt->pOptions->ttl; columnDefNodeToField(pStmt->pCols, &pReq->pColumns); @@ -3626,9 +3630,9 @@ static int32_t translateKillConnection(STranslateContext* pCxt, SKillStmt* pStmt return buildCmdMsg(pCxt, TDMT_MND_KILL_CONN, (FSerializeFunc)tSerializeSKillQueryReq, &killReq); } -static int32_t translateKillQuery(STranslateContext* pCxt, SKillStmt* pStmt) { +static int32_t translateKillQuery(STranslateContext* pCxt, SKillQueryStmt* pStmt) { SKillQueryReq killReq = {0}; - killReq.queryId = pStmt->targetId; + strcpy(killReq.queryStrId, pStmt->queryId); return buildCmdMsg(pCxt, TDMT_MND_KILL_QUERY, (FSerializeFunc)tSerializeSKillQueryReq, &killReq); } @@ -3970,7 +3974,7 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { code = translateKillConnection(pCxt, (SKillStmt*)pNode); break; case QUERY_NODE_KILL_QUERY_STMT: - code = translateKillQuery(pCxt, (SKillStmt*)pNode); + code = translateKillQuery(pCxt, (SKillQueryStmt*)pNode); break; case QUERY_NODE_KILL_TRANSACTION_STMT: code = translateKillTransaction(pCxt, (SKillStmt*)pNode); @@ -4793,6 +4797,7 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code && pClause->ignoreNotExists) { code = TSDB_CODE_SUCCESS; + goto over; } *pIsSuperTable = false; diff --git a/source/libs/parser/src/sql.c b/source/libs/parser/src/sql.c index b2de892f30..e81610c1b4 100644 --- a/source/libs/parser/src/sql.c +++ b/source/libs/parser/src/sql.c @@ -564,7 +564,7 @@ static const YYCODETYPE yy_lookahead[] = { /* 1260 */ 0, 0, 40, 0, 271, 72, 0, 47, 175, 175, /* 1270 */ 47, 47, 279, 310, 47, 0, 313, 314, 315, 316, /* 1280 */ 317, 318, 289, 320, 47, 47, 293, 243, 175, 0, - /* 1290 */ 175, 0, 47, 0, 47, 0, 243, 47, 0, 81, + /* 1290 */ 175, 0, 47, 0, 22, 0, 243, 47, 0, 81, /* 1300 */ 113, 160, 156, 310, 159, 0, 313, 314, 315, 316, /* 1310 */ 317, 318, 0, 320, 152, 271, 323, 151, 0, 356, /* 1320 */ 357, 328, 0, 279, 271, 44, 0, 0, 0, 0, @@ -693,7 +693,7 @@ static const unsigned short int yy_shift_ofst[] = { /* 310 */ 929, 931, 826, 875, 934, 952, 962, 965, 974, 976, /* 320 */ 859, 935, 1260, 1261, 1222, 1263, 1193, 1266, 1220, 1093, /* 330 */ 1223, 1224, 1227, 1094, 1275, 1237, 1238, 1113, 1289, 1115, - /* 340 */ 1291, 1245, 1293, 1247, 1295, 1250, 1298, 1218, 1141, 1145, + /* 340 */ 1291, 1245, 1293, 1272, 1295, 1250, 1298, 1218, 1141, 1145, /* 350 */ 1187, 1146, 1305, 1312, 1162, 1166, 1318, 1322, 1281, 1326, /* 360 */ 1327, 1328, 1329, 1330, 1331, 1334, 1335, 1336, 1338, 1339, /* 370 */ 1340, 1341, 1343, 1344, 1345, 1347, 1348, 1309, 1351, 1352, @@ -898,7 +898,7 @@ static const YYCODETYPE yyFallback[] = { 0, /* BUFFER => nothing */ 0, /* CACHELAST => nothing */ 0, /* COMP => nothing */ - 0, /* DAYS => nothing */ + 0, /* DURATION => nothing */ 0, /* NK_VARIABLE => nothing */ 0, /* FSYNC => nothing */ 0, /* MAXROWS => nothing */ @@ -1225,7 +1225,7 @@ static const char *const yyTokenName[] = { /* 60 */ "BUFFER", /* 61 */ "CACHELAST", /* 62 */ "COMP", - /* 63 */ "DAYS", + /* 63 */ "DURATION", /* 64 */ "NK_VARIABLE", /* 65 */ "FSYNC", /* 66 */ "MAXROWS", @@ -1600,8 +1600,8 @@ static const char *const yyRuleName[] = { /* 68 */ "db_options ::= db_options BUFFER NK_INTEGER", /* 69 */ "db_options ::= db_options CACHELAST NK_INTEGER", /* 70 */ "db_options ::= db_options COMP NK_INTEGER", - /* 71 */ "db_options ::= db_options DAYS NK_INTEGER", - /* 72 */ "db_options ::= db_options DAYS NK_VARIABLE", + /* 71 */ "db_options ::= db_options DURATION NK_INTEGER", + /* 72 */ "db_options ::= db_options DURATION NK_VARIABLE", /* 73 */ "db_options ::= db_options FSYNC NK_INTEGER", /* 74 */ "db_options ::= db_options MAXROWS NK_INTEGER", /* 75 */ "db_options ::= db_options MINROWS NK_INTEGER", @@ -1783,7 +1783,7 @@ static const char *const yyRuleName[] = { /* 251 */ "stream_options ::= stream_options TRIGGER MAX_DELAY duration_literal", /* 252 */ "stream_options ::= stream_options WATERMARK duration_literal", /* 253 */ "cmd ::= KILL CONNECTION NK_INTEGER", - /* 254 */ "cmd ::= KILL QUERY NK_INTEGER", + /* 254 */ "cmd ::= KILL QUERY NK_STRING", /* 255 */ "cmd ::= KILL TRANSACTION NK_INTEGER", /* 256 */ "cmd ::= BALANCE VGROUP", /* 257 */ "cmd ::= MERGE VGROUP NK_INTEGER NK_INTEGER", @@ -2646,8 +2646,8 @@ static const struct { { 254, -3 }, /* (68) db_options ::= db_options BUFFER NK_INTEGER */ { 254, -3 }, /* (69) db_options ::= db_options CACHELAST NK_INTEGER */ { 254, -3 }, /* (70) db_options ::= db_options COMP NK_INTEGER */ - { 254, -3 }, /* (71) db_options ::= db_options DAYS NK_INTEGER */ - { 254, -3 }, /* (72) db_options ::= db_options DAYS NK_VARIABLE */ + { 254, -3 }, /* (71) db_options ::= db_options DURATION NK_INTEGER */ + { 254, -3 }, /* (72) db_options ::= db_options DURATION NK_VARIABLE */ { 254, -3 }, /* (73) db_options ::= db_options FSYNC NK_INTEGER */ { 254, -3 }, /* (74) db_options ::= db_options MAXROWS NK_INTEGER */ { 254, -3 }, /* (75) db_options ::= db_options MINROWS NK_INTEGER */ @@ -2829,7 +2829,7 @@ static const struct { { 305, -4 }, /* (251) stream_options ::= stream_options TRIGGER MAX_DELAY duration_literal */ { 305, -3 }, /* (252) stream_options ::= stream_options WATERMARK duration_literal */ { 240, -3 }, /* (253) cmd ::= KILL CONNECTION NK_INTEGER */ - { 240, -3 }, /* (254) cmd ::= KILL QUERY NK_INTEGER */ + { 240, -3 }, /* (254) cmd ::= KILL QUERY NK_STRING */ { 240, -3 }, /* (255) cmd ::= KILL TRANSACTION NK_INTEGER */ { 240, -2 }, /* (256) cmd ::= BALANCE VGROUP */ { 240, -4 }, /* (257) cmd ::= MERGE VGROUP NK_INTEGER NK_INTEGER */ @@ -3326,8 +3326,8 @@ static YYACTIONTYPE yy_reduce( { yylhsminor.yy632 = setDatabaseOption(pCxt, yymsp[-2].minor.yy632, DB_OPTION_COMP, &yymsp[0].minor.yy0); } yymsp[-2].minor.yy632 = yylhsminor.yy632; break; - case 71: /* db_options ::= db_options DAYS NK_INTEGER */ - case 72: /* db_options ::= db_options DAYS NK_VARIABLE */ yytestcase(yyruleno==72); + case 71: /* db_options ::= db_options DURATION NK_INTEGER */ + case 72: /* db_options ::= db_options DURATION NK_VARIABLE */ yytestcase(yyruleno==72); { yylhsminor.yy632 = setDatabaseOption(pCxt, yymsp[-2].minor.yy632, DB_OPTION_DAYS, &yymsp[0].minor.yy0); } yymsp[-2].minor.yy632 = yylhsminor.yy632; break; @@ -3907,8 +3907,8 @@ static YYACTIONTYPE yy_reduce( case 253: /* cmd ::= KILL CONNECTION NK_INTEGER */ { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_CONNECTION_STMT, &yymsp[0].minor.yy0); } break; - case 254: /* cmd ::= KILL QUERY NK_INTEGER */ -{ pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_QUERY_STMT, &yymsp[0].minor.yy0); } + case 254: /* cmd ::= KILL QUERY NK_STRING */ +{ pCxt->pRootNode = createKillQueryStmt(pCxt, &yymsp[0].minor.yy0); } break; case 255: /* cmd ::= KILL TRANSACTION NK_INTEGER */ { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_TRANSACTION_STMT, &yymsp[0].minor.yy0); } diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp index 3a6c78d808..8b2c2aade7 100644 --- a/source/libs/parser/test/parInitialCTest.cpp +++ b/source/libs/parser/test/parInitialCTest.cpp @@ -46,7 +46,7 @@ TEST_F(ParserInitialCTest, createBnode) { * BUFFER value * | CACHELAST value * | COMP {0 | 1 | 2} - * | DAYS value + * | DURATION value * | FSYNC value * | MAXROWS value * | MINROWS value @@ -155,7 +155,7 @@ TEST_F(ParserInitialCTest, createDatabase) { ASSERT_EQ(req.replications, expect.replications); ASSERT_EQ(req.strict, expect.strict); ASSERT_EQ(req.cacheLastRow, expect.cacheLastRow); - //ASSERT_EQ(req.schemaless, expect.schemaless); + // ASSERT_EQ(req.schemaless, expect.schemaless); ASSERT_EQ(req.ignoreExist, expect.ignoreExist); ASSERT_EQ(req.numOfRetensions, expect.numOfRetensions); if (expect.numOfRetensions > 0) { @@ -202,7 +202,7 @@ TEST_F(ParserInitialCTest, createDatabase) { "BUFFER 64 " "CACHELAST 2 " "COMP 1 " - "DAYS 100 " + "DURATION 100 " "FSYNC 100 " "MAXROWS 1000 " "MINROWS 100 " @@ -223,7 +223,7 @@ TEST_F(ParserInitialCTest, createDatabase) { setDbDaysFunc(100); setDbKeepFunc(1440, 300 * 60, 400 * 1440); run("CREATE DATABASE IF NOT EXISTS wxy_db " - "DAYS 100m " + "DURATION 100m " "KEEP 1440m,300h,400d "); clearCreateDbReq(); } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 8502ffc04a..618ff9acc6 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -181,7 +181,7 @@ static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) { } static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, - const char* pStmtName, bool output, bool reserve) { + const char* pStmtName, bool output, bool reserve) { if (NULL == pList) { return TSDB_CODE_SUCCESS; } @@ -463,10 +463,24 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode); } +static ENodeType getScanOperatorType(EScanType scanType) { + switch (scanType) { + case SCAN_TYPE_TABLE: + return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + case SCAN_TYPE_STREAM: + return QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; + case SCAN_TYPE_TABLE_MERGE: + return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; + default: + break; + } + return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; +} + static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) { - STableScanPhysiNode* pTableScan = - (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); + STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, + getScanOperatorType(pScanLogicNode->scanType)); if (NULL == pTableScan) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -528,12 +542,12 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) { - int32_t res = createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode); - if (res == TSDB_CODE_SUCCESS) { - ENodeType type = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; - setNodeType(*pPhyNode, type); - } - return res; + return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode); +} + +static int32_t createTableMergeScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, + SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) { + return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode); } static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, @@ -547,6 +561,8 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, return createSystemTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode); case SCAN_TYPE_STREAM: return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode); + case SCAN_TYPE_TABLE_MERGE: + return createTableMergeScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode); default: break; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 4b372bb7d4..4c202d457f 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -170,8 +170,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { switch (nodeType(pNode)) { case QUERY_NODE_LOGIC_PLAN_SCAN: return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); - // case QUERY_NODE_LOGIC_PLAN_JOIN: - // return !(((SJoinLogicNode*)pNode)->isSingleTableJoin); + case QUERY_NODE_LOGIC_PLAN_JOIN: + return !(((SJoinLogicNode*)pNode)->isSingleTableJoin); case QUERY_NODE_LOGIC_PLAN_AGG: return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: { @@ -642,6 +642,8 @@ static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pS code = nodesListMakeStrictAppend(&pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)pScan, SPLIT_FLAG_STABLE_SPLIT)); } + pScan->scanType = SCAN_TYPE_TABLE_MERGE; + ++(pCxt->groupId); return code; } @@ -703,7 +705,6 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { break; } - ++(pCxt->groupId); pCxt->split = true; return code; } diff --git a/tests/script/tsim/db/alter_option.sim b/tests/script/tsim/db/alter_option.sim index 12babea097..4692b9d91b 100644 --- a/tests/script/tsim/db/alter_option.sim +++ b/tests/script/tsim/db/alter_option.sim @@ -66,7 +66,7 @@ print ============= create database # | REPLICA value [1 | 3] # | WAL value [1 | 2] -sql create database db CACHELAST 3 COMP 0 DAYS 240 FSYNC 1000 MAXROWS 8000 MINROWS 10 KEEP 1000 PRECISION 'ns' REPLICA 3 WAL 2 VGROUPS 6 SINGLE_STABLE 1 +sql create database db CACHELAST 3 COMP 0 DURATION 240 FSYNC 1000 MAXROWS 8000 MINROWS 10 KEEP 1000 PRECISION 'ns' REPLICA 3 WAL 2 VGROUPS 6 SINGLE_STABLE 1 sql show databases print rows: $rows print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 diff --git a/tests/script/tsim/db/create_all_options.sim b/tests/script/tsim/db/create_all_options.sim index fac385a9a6..03068698e2 100644 --- a/tests/script/tsim/db/create_all_options.sim +++ b/tests/script/tsim/db/create_all_options.sim @@ -63,7 +63,7 @@ print ============= create database with all options # | PAGESIZE value [1~16384, default: 4] # | CACHELAST value [0, 1, 2, 3, default: 0] # | COMP [0 | 1 | 2, default: 2] -# | DAYS value [60m ~ min(3650d,keep), default: 10d, unit may be minut/hour/day] +# | DURATION value [60m ~ min(3650d,keep), default: 10d, unit may be minut/hour/day] # | FSYNC value [0 ~ 180000 ms, default: 3000] # | MAXROWS value [200~10000, default: 4096] # | MINROWS value [10~1000, default: 100] @@ -234,9 +234,9 @@ sql drop database db sql_error create database db COMP 3 sql_error create database db COMP -1 -#print ====> DAYS value [60m ~ min(3650d,keep), default: 10d, unit may be minut/hour/day] +#print ====> DURATION value [60m ~ min(3650d,keep), default: 10d, unit may be minut/hour/day] #print ====> KEEP value [max(1d ~ 365000d), default: 1d, unit may be minut/hour/day] -#sql create database db DAYS 60m KEEP 60m +#sql create database db DURATION 60m KEEP 60m #sql show databases #print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db #if $data6_db != 60 then @@ -246,7 +246,7 @@ sql_error create database db COMP -1 # return -1 #endi #sql drop database db -#sql create database db DAYS 60m KEEP 1d +#sql create database db DURATION 60m KEEP 1d #sql show databases #print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db #if $data6_db != 60 then @@ -255,7 +255,7 @@ sql_error create database db COMP -1 #if $data7_db != 1440,1440,1440 then # return -1 #endi -#sql create database db DAYS 3650d KEEP 365000d +#sql create database db DURATION 3650d KEEP 365000d #sql show databases #print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db #if $data6_db != 5256000 then @@ -265,10 +265,10 @@ sql_error create database db COMP -1 # return -1 #endi #sql drop database db -#sql_error create database db DAYS -59m -#sql_error create database db DAYS 59m -#sql_error create database db DAYS 5256001m -#sql_error create database db DAYS 3651d +#sql_error create database db DURATION -59m +#sql_error create database db DURATION 59m +#sql_error create database db DURATION 5256001m +#sql_error create database db DURATION 3651d #sql_error create database db KEEP -59m #sql_error create database db KEEP 14399m #sql_error create database db KEEP 525600001m