diff --git a/example/src/demoapi.c b/example/src/demoapi.c index b1efdb21b9..c38e481b96 100644 --- a/example/src/demoapi.c +++ b/example/src/demoapi.c @@ -30,7 +30,7 @@ fprintf(stderr, "\033[0m"); } while(0) int64_t g_num_of_tb = 2; -int64_t g_num_of_rec = 2; +int64_t g_num_of_rec = 3; static struct argp_option options[] = { {"tables", 't', "NUMBER", 0, "Number of child tables, default is 10000."}, @@ -42,10 +42,18 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { switch (key) { case 't': g_num_of_tb = atoll(arg); + if (g_num_of_tb < 1) { + warnPrint("minimal g_num_of_tb is %d\n", 1); + g_num_of_tb = 1; + } break; case 'n': g_num_of_rec = atoll(arg); + if (g_num_of_rec < 2) { + warnPrint("minimal g_num_of_rec is %d\n", 2); + g_num_of_rec = 2; + } break; } @@ -65,15 +73,32 @@ static void prepare_data(TAOS* taos) { usleep(100000); taos_select_db(taos, "test"); - res = taos_query(taos, "create table meters(ts timestamp, f float, n int, b binary(20), c nchar(20)) tags(area int, city binary(20), dist nchar(20));"); + char command[1024] = {0}; + sprintf(command, "%s", "create table meters(ts timestamp, f float, n int, bin1 binary(20), c nchar(20), bin2 binary(20)) tags(area int, city binary(20), dist nchar(20), street binary(20));"); + res = taos_query(taos, command); + if ((res) && (0 == taos_errno(res))) { + okPrint("%s created\n", "meters"); + } else { + errorPrint("%s() LN%d: %s\n", + __func__, __LINE__, taos_errstr(res)); + taos_free_result(res); + exit(1); + } taos_free_result(res); - char command[1024] = {0}; for (int64_t i = 0; i < g_num_of_tb; i ++) { -// sprintf(command, "create table t%"PRId64" using meters tags(%"PRId64", '%s', '%s');", -// i, i, (i%2)?"beijing":"shanghai", (i%2)?"朝阳区":"黄浦区"); - sprintf(command, "create table t%"PRId64" using meters tags(%"PRId64", '%s', '%s');", - i, i, (i%2)?"beijing":"shanghai", (i%2)?"chaoyang":"huangpu"); + sprintf(command, "create table t%"PRId64" using meters " + "tags(%"PRId64", '%s', '%s', '%s');", + i, i, (i%2)?"beijing":"shanghai", + (i%2)?"朝阳区":"黄浦区", + (i%2)?"长安街":"中山路"); +/* sprintf(command, "create table t%"PRId64" using meters " + "tags(%"PRId64", '%s', '%s', '%s');", + i, i, + (i%2)?"beijing":"shanghai", + (i%2)?"chaoyang":"huangpu", + (i%2?"changan street":"jianguo rd")); + */ res = taos_query(taos, command); if ((res) && (0 == taos_errno(res))) { okPrint("t%" PRId64 " created\n", i); @@ -86,11 +111,15 @@ static void prepare_data(TAOS* taos) { int64_t j = 0; int64_t total = 0; int64_t affected; - for (; j < g_num_of_rec -1; j ++) { + for (; j < g_num_of_rec -2; j ++) { sprintf(command, "insert into t%"PRId64" " - "values(%" PRId64 ", %f, %"PRId64", '%c%d', '%c%d')", - i, 1650000000000+j, (float)j, j, 'a'+(int)j%25, rand(), - 'z' - (int)j%25, rand()); + "values(%" PRId64 ", %f, %"PRId64", " + "'%c%d', '%s%c%d', '%c%d')", + i, 1650000000000+j, (float)j, j, + 'a'+(int)j%25, rand(), + "涛思", 'z' - (int)j%25, rand(), + 'b' - (int)j%25, rand() + ); res = taos_query(taos, command); if ((res) && (0 == taos_errno(res))) { affected = taos_affected_rows(res); @@ -101,8 +130,25 @@ static void prepare_data(TAOS* taos) { } taos_free_result(res); } - sprintf(command, "insert into t%"PRId64" values(%" PRId64 ", NULL, NULL, NULL, NULL)", - i, 1650000000000+j+1); + sprintf(command, "insert into t%"PRId64" values(%" PRId64 ", " + "NULL, NULL, NULL, NULL, NULL)", + i, 1650000000000+j); + res = taos_query(taos, command); + if ((res) && (0 == taos_errno(res))) { + affected = taos_affected_rows(res); + total += affected; + } else { + errorPrint("%s() LN%d: %s\n", + __func__, __LINE__, taos_errstr(res)); + } + sprintf(command, "insert into t%"PRId64" " + "values(%" PRId64 ", %f, %"PRId64", " + "'%c%d', '%s%c%d', '%c%d')", + i, 1650000000000+j+1, (float)j, j, + 'a'+(int)j%25, rand(), + "数据", 'z' - (int)j%25, rand(), + 'b' - (int)j%25, rand() + ); res = taos_query(taos, command); if ((res) && (0 == taos_errno(res))) { affected = taos_affected_rows(res); @@ -113,7 +159,8 @@ static void prepare_data(TAOS* taos) { } taos_free_result(res); - printf("insert %"PRId64" records into t%"PRId64", total affected rows: %"PRId64"\n", j, i, total); + okPrint("insert %"PRId64" records into t%"PRId64", " + "total affected rows: %"PRId64"\n", j, i, total); } } @@ -127,29 +174,63 @@ static int print_result(char *tbname, TAOS_RES* res, int block) { printf("fields[%d].name=%s, fields[%d].type=%d, fields[%d].bytes=%d\n", f, fields[f].name, f, fields[f].type, f, fields[f].bytes); } + if (block) { - warnPrint("%s() LN%d, call taos_fetch_block()\n", __func__, __LINE__); + warnPrint("%s", "call taos_fetch_block()\n"); int rows = 0; while ((rows = taos_fetch_block(res, &row))) { + for (int f = 0; f < num_fields; f++) { + if ((fields[f].type != TSDB_DATA_TYPE_VARCHAR) + && (fields[f].type != TSDB_DATA_TYPE_NCHAR) + && (fields[f].type != TSDB_DATA_TYPE_JSON)) { + printf("col%d type is %d, no need get offset\n", + f, fields[f].type); + continue; + } + + int *offsets = taos_get_column_data_offset(res, f); + if (offsets) { + for (int c = 0; c < rows; c++) { + if (offsets[c] != -1) { + int length = *(int16_t*)(row[f] + offsets[c]); + char *buf = calloc(1, length + 1); + strncpy(buf, (char *)(row[f] + offsets[c] + 2), length); + printf("row: %d, col: %d, offset: %d, length: %d, content: %s\n", + c, f, offsets[c], length, buf); + free(buf); + } else { + printf("row: %d, col: %d, offset: -1, means content is NULL\n", + c, f); + } + } + } else { + errorPrint("%s() LN%d: col%d's lengths is NULL\n", + __func__, __LINE__, f); + } + } num_rows += rows; } } else { - warnPrint("%s() LN%d, call taos_fetch_rows()\n", __func__, __LINE__); + warnPrint("%s", "call taos_fetch_rows()\n"); while ((row = taos_fetch_row(res))) { char temp[256] = {0}; taos_print_row(temp, row, fields, num_fields); puts(temp); - num_rows ++; int* lengths = taos_fetch_lengths(res); if (lengths) { for (int c = 0; c < num_fields; c++) { - printf("length of column %d is %d\n", c, lengths[c]); + printf("row: %"PRId64", col: %d, is_null: %s, length of column %d is %d\n", + num_rows, c, + taos_is_null(res, num_rows, c)?"True":"False", + c, lengths[c]); } } else { errorPrint("%s() LN%d: %s's lengths is NULL\n", __func__, __LINE__, tbname); } + + num_rows ++; } } @@ -172,8 +253,8 @@ static void verify_query(TAOS* taos) { int field_count = taos_field_count(res); printf("field_count: %d\n", field_count); int64_t rows = print_result(tbname, res, i % 2); - printf("rows is: %"PRId64"\n", rows); - + okPrint("total query %s result rows is: %"PRId64"\n", + tbname, rows); } else { errorPrint("%s() LN%d: %s\n", __func__, __LINE__, taos_errstr(res)); @@ -207,7 +288,7 @@ int main(int argc, char *argv[]) { verify_query(taos); taos_close(taos); - printf("done\n"); + okPrint("%s", "done\n"); return 0; } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4e89beabcb..cab8fea374 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1209,12 +1209,17 @@ typedef struct { int32_t code; } STaskDropRsp; +#define STREAM_TRIGGER_AT_ONCE 1 +#define STREAM_TRIGGER_WINDOW_CLOSE 2 + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; char outputSTbName[TSDB_TABLE_FNAME_LEN]; int8_t igExists; char* sql; char* ast; + int8_t triggerType; + int64_t watermark; } SCMCreateStreamReq; typedef struct { diff --git a/include/common/trow.h b/include/common/trow.h index 539bda078d..282921087c 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -162,38 +162,39 @@ typedef struct { int32_t extendedRowSize; } SRowBuilder; -#define TD_ROW_HEAD_LEN (sizeof(STSRow)) +#define TD_ROW_HEAD_LEN (sizeof(STSRow)) #define TD_ROW_NCOLS_LEN (sizeof(col_id_t)) -#define TD_ROW_INFO(r) ((r)->info) -#define TD_ROW_TYPE(r) ((r)->type) -#define TD_ROW_DELETE(r) ((r)->del) -#define TD_ROW_ENDIAN(r) ((r)->endian) -#define TD_ROW_SVER(r) ((r)->sver) -#define TD_ROW_NCOLS(r) ((r)->data) // only valid for SKvRow -#define TD_ROW_DATA(r) ((r)->data) -#define TD_ROW_LEN(r) ((r)->len) -#define TD_ROW_KEY(r) ((r)->ts) +#define TD_ROW_INFO(r) ((r)->info) +#define TD_ROW_TYPE(r) ((r)->type) +#define TD_ROW_DELETE(r) ((r)->del) +#define TD_ROW_ENDIAN(r) ((r)->endian) +#define TD_ROW_SVER(r) ((r)->sver) +#define TD_ROW_NCOLS(r) ((r)->data) // only valid for SKvRow +#define TD_ROW_DATA(r) ((r)->data) +#define TD_ROW_LEN(r) ((r)->len) +#define TD_ROW_KEY(r) ((r)->ts) +#define TD_ROW_VER(r) ((r)->ver) #define TD_ROW_KEY_ADDR(r) (r) // N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and // (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined. #define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) (schemaTLen(s) + TD_ROW_HEAD_LEN) -#define TD_ROW_SET_INFO(r, i) (TD_ROW_INFO(r) = (i)) -#define TD_ROW_SET_TYPE(r, t) (TD_ROW_TYPE(r) = (t)) -#define TD_ROW_SET_DELETE(r) (TD_ROW_DELETE(r) = 1) -#define TD_ROW_SET_SVER(r, v) (TD_ROW_SVER(r) = (v)) -#define TD_ROW_SET_LEN(r, l) (TD_ROW_LEN(r) = (l)) +#define TD_ROW_SET_INFO(r, i) (TD_ROW_INFO(r) = (i)) +#define TD_ROW_SET_TYPE(r, t) (TD_ROW_TYPE(r) = (t)) +#define TD_ROW_SET_DELETE(r) (TD_ROW_DELETE(r) = 1) +#define TD_ROW_SET_SVER(r, v) (TD_ROW_SVER(r) = (v)) +#define TD_ROW_SET_LEN(r, l) (TD_ROW_LEN(r) = (l)) #define TD_ROW_SET_NCOLS(r, n) (*(col_id_t *)TD_ROW_NCOLS(r) = (n)) #define TD_ROW_IS_DELETED(r) (TD_ROW_DELETE(r) == 1) -#define TD_IS_TP_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_TP) -#define TD_IS_KV_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_KV) -#define TD_IS_TP_ROW_T(t) ((t) == TD_ROW_TP) -#define TD_IS_KV_ROW_T(t) ((t) == TD_ROW_KV) +#define TD_IS_TP_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_TP) +#define TD_IS_KV_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_KV) +#define TD_IS_TP_ROW_T(t) ((t) == TD_ROW_TP) +#define TD_IS_KV_ROW_T(t) ((t) == TD_ROW_KV) -#define TD_BOOL_STR(b) ((b) ? "true" : "false") +#define TD_BOOL_STR(b) ((b) ? "true" : "false") #define isUtilizeKVRow(k, d) ((k) < ((d)*KVRatioConvert)) #define TD_ROW_COL_IDX(r) POINTER_SHIFT(TD_ROW_DATA(r), sizeof(col_id_t)) diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 6810b63f4e..b62555d549 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -272,14 +272,9 @@ typedef struct SKillStmt { int32_t targetId; } SKillStmt; -typedef enum EStreamTriggerType { - STREAM_TRIGGER_AT_ONCE = 1, - STREAM_TRIGGER_WINDOW_CLOSE -} EStreamTriggerType; - typedef struct SStreamOptions { ENodeType type; - EStreamTriggerType triggerType; + int8_t triggerType; SNode* pWatermark; } SStreamOptions; diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 3b5f9abe81..9d4f554c2c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -109,6 +109,8 @@ typedef struct SWindowLogicNode { int64_t sessionGap; SNode* pTspk; SNode* pStateExpr; + int8_t triggerType; + int64_t watermark; } SWindowLogicNode; typedef struct SSortLogicNode { @@ -251,6 +253,8 @@ typedef struct SWinodwPhysiNode { SNodeList* pExprs; // these are expression list of parameter expression of function SNodeList* pFuncs; SNode* pTspk; // timestamp primary key + int8_t triggerType; + int64_t watermark; } SWinodwPhysiNode; typedef struct SIntervalPhysiNode { diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 8db78fccf5..0b164bf43f 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -30,6 +30,8 @@ typedef struct SPlanContext { bool topicQuery; bool streamQuery; bool showRewrite; + int8_t triggerType; + int64_t watermark; } SPlanContext; // Create the physical plan for the query, according to the AST. diff --git a/include/util/taoserror.h b/include/util/taoserror.h index e31eea1b15..d51217874a 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -597,6 +597,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_GROUPBY_WINDOW_COEXIST TAOS_DEF_ERROR_CODE(0, 0x2624) #define TSDB_CODE_PAR_INVALID_OPTION_UNIT TAOS_DEF_ERROR_CODE(0, 0x2625) #define TSDB_CODE_PAR_INVALID_KEEP_UNIT TAOS_DEF_ERROR_CODE(0, 0x2626) +#define TSDB_CODE_PAR_AGG_FUNC_NESTING TAOS_DEF_ERROR_CODE(0, 0x2627) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 3e37c52c26..3adb0102ce 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3381,6 +3381,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeI32(&encoder, sqlLen) < 0) return -1; if (tEncodeI32(&encoder, astLen) < 0) return -1; + if (tEncodeI8(&encoder, pReq->triggerType) < 0) return -1; + if (tEncodeI64(&encoder, pReq->watermark) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1; @@ -3404,6 +3406,8 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->triggerType) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->watermark) < 0) return -1; if (sqlLen > 0) { pReq->sql = taosMemoryCalloc(1, sqlLen + 1); diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b5d22cb7a5..15cd9fa043 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 6c797b9044..13fbcdf9b3 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -106,6 +106,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { SDB_SET_INT8(pRaw, dataPos, pDb->cfg.cacheLastRow, DB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfRetensions, DB_ENCODE_OVER) for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) { + TASSERT(taosArrayGetSize(pDb->cfg.pRetensions) == pDb->cfg.numOfRetensions); SRetention *pRetension = taosArrayGet(pDb->cfg.pRetensions, i); SDB_SET_INT32(pRaw, dataPos, pRetension->freq, DB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pRetension->keep, DB_ENCODE_OVER) diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 1e68e8e9c5..a474ccc5b6 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -429,7 +429,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a0ee8ff44c..58e5b6c65b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -218,7 +218,7 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } -static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { +static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64_t watermark, char **pStr) { if (NULL == ast) { return TSDB_CODE_SUCCESS; } @@ -232,6 +232,8 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { .pAstRoot = pAst, .topicQuery = false, .streamQuery = true, + .triggerType = triggerType, + .watermark = watermark, }; code = qCreateQueryPlan(&cxt, &pPlan, NULL); } @@ -245,7 +247,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { return code; } -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans) { SNode *pAst = NULL; if (nodesStringToNode(ast, &pAst) < 0) { @@ -265,7 +267,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast #endif - if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) { + if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, triggerType, watermark, &pStream->physicalPlan)) { mError("topic:%s, failed to get plan since %s", pStream->name, terrstr()); return -1; } @@ -313,7 +315,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) { + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pCreate->triggerType, pCreate->watermark, pTrans) != 0) { mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 7abdf22073..b98fe8936a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -314,8 +314,7 @@ static FORCE_INLINE void tsdbSmaStatSetDropped(SSmaStatItem *pStatItem) { } static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) { - snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%stsdb%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TD_DIRSEP, - TSDB_SMA_DNAME[smaType]); + snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TSDB_SMA_DNAME[smaType]); } static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did) { diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index ce74748c0c..2c8a938a5a 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1104,6 +1104,8 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) { static const char* jkWindowPhysiPlanExprs = "Exprs"; static const char* jkWindowPhysiPlanFuncs = "Funcs"; static const char* jkWindowPhysiPlanTsPk = "TsPk"; +static const char* jkWindowPhysiPlanTriggerType = "TriggerType"; +static const char* jkWindowPhysiPlanWatermark = "Watermark"; static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj; @@ -1118,6 +1120,12 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkWindowPhysiPlanTsPk, nodeToJson, pNode->pTspk); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanWatermark, pNode->watermark); + } return code; } @@ -1135,6 +1143,12 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkWindowPhysiPlanTsPk, (SNode**)&pNode->pTspk); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkWindowPhysiPlanWatermark, pNode->watermark); + } return code; } diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 91ac6ce2d2..bd7c5d16b1 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -427,7 +427,7 @@ bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B). /************************************************ create/drop stream **************************************************/ cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) - stream_options(B) into_opt(C) AS query_expression(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, B, C, D); } + stream_options(B) into_opt(C) AS query_expression(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, D); } cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); } into_opt(A) ::= . { A = NULL; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index dc1338be68..324f9c17d2 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -481,6 +481,14 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) { return DEAL_RES_CONTINUE; } +static EDealRes haveAggFunction(SNode* pNode, void* pContext) { + if (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsAggFunc(((SFunctionNode*)pNode)->funcId)) { + *((bool*)pContext) = true; + return DEAL_RES_END; + } + return DEAL_RES_CONTINUE; +} + static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) { if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName); @@ -492,6 +500,11 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) if (fmIsAggFunc(pFunc->funcId) && beforeHaving(pCxt->currClause)) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION); } + bool haveAggFunc = false; + nodesWalkExprs(pFunc->pParameterList, haveAggFunction, &haveAggFunc); + if (haveAggFunc) { + return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_AGG_FUNC_NESTING); + } return DEAL_RES_CONTINUE; } @@ -2173,6 +2186,14 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt* } } + if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pOptions->pWatermark) { + code = (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark)) ? pCxt->errCode : TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS == code) { + createReq.triggerType = pStmt->pOptions->triggerType; + createReq.watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0); + } + if (TSDB_CODE_SUCCESS == code) { code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, &createReq); } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index efc807850f..549f448ff4 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -91,6 +91,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Invalid option %s unit: %c, only m, h, d allowed"; case TSDB_CODE_PAR_INVALID_KEEP_UNIT: return "Invalid option keep unit: %c, %c, %c, only m, h, d allowed"; + case TSDB_CODE_PAR_AGG_FUNC_NESTING: + return "Aggregate functions do not support nesting"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: diff --git a/source/libs/parser/src/sql.c b/source/libs/parser/src/sql.c index 80aad1dc66..e9ec9c987c 100644 --- a/source/libs/parser/src/sql.c +++ b/source/libs/parser/src/sql.c @@ -3452,7 +3452,7 @@ static YYACTIONTYPE yy_reduce( { yymsp[-1].minor.yy100 = strtol(yymsp[0].minor.yy0.z, NULL, 10); } break; case 234: /* cmd ::= CREATE STREAM not_exists_opt stream_name stream_options into_opt AS query_expression */ -{ pCxt->pRootNode = createCreateStreamStmt(pCxt, yymsp[-5].minor.yy659, &yymsp[-4].minor.yy479, yymsp[-3].minor.yy452, yymsp[-2].minor.yy452, yymsp[0].minor.yy452); } +{ pCxt->pRootNode = createCreateStreamStmt(pCxt, yymsp[-5].minor.yy659, &yymsp[-4].minor.yy479, yymsp[-2].minor.yy452, yymsp[-3].minor.yy452, yymsp[0].minor.yy452); } break; case 235: /* cmd ::= DROP STREAM exists_opt stream_name */ { pCxt->pRootNode = createDropStreamStmt(pCxt, yymsp[-1].minor.yy659, &yymsp[0].minor.yy479); } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index fd0d2758bf..5427749d09 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -463,6 +463,11 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow, SLogicNode** pLogicNode) { int32_t code = nodesCollectFuncs(pSelect, fmIsWindowClauseFunc, &pWindow->pFuncs); + if (pCxt->pPlanCxt->streamQuery) { + pWindow->triggerType = pCxt->pPlanCxt->triggerType; + pWindow->watermark = pCxt->pPlanCxt->watermark; + } + if (TSDB_CODE_SUCCESS == code) { code = rewriteExpr(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW); } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 8a766ae7e8..656caaf645 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -56,6 +56,20 @@ typedef enum ECondAction { // after supporting outer join, there are other possibilities } ECondAction; +EDealRes haveNormalColImpl(SNode* pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + *((bool*)pContext) = (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType); + return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD; + } + return DEAL_RES_CONTINUE; +} + +static bool haveNormalCol(SNodeList* pList) { + bool res = false; + nodesWalkExprsPostOrder(pList, haveNormalColImpl, &res); + return res; +} + static bool osdMayBeOptimized(SLogicNode* pNode) { if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OSD)) { return false; @@ -67,7 +81,10 @@ static bool osdMayBeOptimized(SLogicNode* pNode) { (QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) && QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode->pParent))) { return false; } - return true; + if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) { + return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType); + } + return !haveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys); } static SLogicNode* osdFindPossibleScanNode(SLogicNode* pNode) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index a45fabd828..f9f6d503a1 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -796,6 +796,9 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* } } + pWindow->triggerType = pWindowLogicNode->triggerType; + pWindow->watermark = pWindowLogicNode->watermark; + if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pWindow; } else { diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 14fa36a958..291b4c9cc7 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -45,7 +45,7 @@ protected: int32_t code = qParseQuerySql(&cxt_, &query_); if (code != TSDB_CODE_SUCCESS) { - cout << "sql:[" << cxt_.pSql << "] parser code:" << code << ", strerror:" << tstrerror(code) << ", msg:" << errMagBuf_ << endl; + cout << "sql:[" << cxt_.pSql << "] qParseQuerySql code:" << code << ", strerror:" << tstrerror(code) << ", msg:" << errMagBuf_ << endl; return false; } @@ -123,6 +123,12 @@ private: tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req); nodesStringToNode(req.ast, &pCxt->pAstRoot); pCxt->streamQuery = true; + } else if (QUERY_NODE_CREATE_STREAM_STMT == nodeType(pQuery->pRoot)) { + SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pQuery->pRoot; + pCxt->pAstRoot = pStmt->pQuery; + pCxt->streamQuery = true; + pCxt->triggerType = pStmt->pOptions->triggerType; + pCxt->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0); } else { pCxt->pAstRoot = pQuery->pRoot; } @@ -353,11 +359,11 @@ TEST_F(PlannerTest, createTopic) { ASSERT_TRUE(run()); } -TEST_F(PlannerTest, stream) { +TEST_F(PlannerTest, createStream) { setDatabase("root", "test"); - bind("SELECT sum(c1) FROM st1"); - ASSERT_TRUE(run(true)); + bind("create stream if not exists s1 trigger window_close watermark 10s into st1 as select count(*) from t1 interval(10s)"); + ASSERT_TRUE(run()); } TEST_F(PlannerTest, createSmaIndex) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 4e59e1bea6..6cc435fee4 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -2735,10 +2735,12 @@ void schedulerDestroy(void) { if (schMgmt.jobRef) { SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0); int64_t refId = 0; - + while (pJob) { refId = pJob->refId; - + if (refId == 0) { + break; + } taosRemoveRef(schMgmt.jobRef, pJob->refId); pJob = taosIterateRef(schMgmt.jobRef, refId); diff --git a/tests/pytest/smoketest.sh b/tests/pytest/smoketest.sh index 7ac5d4f6d3..32f5f00c51 100755 --- a/tests/pytest/smoketest.sh +++ b/tests/pytest/smoketest.sh @@ -58,4 +58,4 @@ python3 ./test.py $1 -f client/client.py python3 ./test.py $1 -s && sleep 1 # connector -python3 ./test.py $1 -f connector/lua.py +# python3 ./test.py $1 -f connector/lua.py diff --git a/tests/script/sh/stop_dnodes.sh b/tests/script/sh/stop_dnodes.sh index 4c6d8e0351..b431c0627c 100755 --- a/tests/script/sh/stop_dnodes.sh +++ b/tests/script/sh/stop_dnodes.sh @@ -12,7 +12,8 @@ fi PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'` while [ -n "$PID" ]; do echo kill -9 $PID - pkill -9 taosd + #pkill -9 taosd + kill -9 $PID echo "Killing processes locking on port 6030" if [ "$OS_TYPE" != "Darwin" ]; then fuser -k -n tcp 6030 diff --git a/tests/script/tsim/db/alter_option.sim b/tests/script/tsim/db/alter_option.sim index 9c7b2f5424..e9795bd8d2 100644 --- a/tests/script/tsim/db/alter_option.sim +++ b/tests/script/tsim/db/alter_option.sim @@ -350,4 +350,6 @@ sql_error alter database db precision 'ns' sql_error alter database db precision 'ys' sql_error alter database db prec 'xs' -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT diff --git a/tests/script/tsim/db/create_all_options.sim b/tests/script/tsim/db/create_all_options.sim index 4dda6cd00f..2baba8fb74 100644 --- a/tests/script/tsim/db/create_all_options.sim +++ b/tests/script/tsim/db/create_all_options.sim @@ -484,4 +484,6 @@ sql drop database db sql_error create database db STREAM_MODE 2 sql_error create database db STREAM_MODE -1 -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT diff --git a/tests/script/tsim/insert/backquote.sim b/tests/script/tsim/insert/backquote.sim index 819b1aea13..ba50e70afa 100644 --- a/tests/script/tsim/insert/backquote.sim +++ b/tests/script/tsim/insert/backquote.sim @@ -353,4 +353,4 @@ while $dbCnt < 2 endw -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/insert/null.sim b/tests/script/tsim/insert/null.sim index fab5335ac5..98a494c960 100644 --- a/tests/script/tsim/insert/null.sim +++ b/tests/script/tsim/insert/null.sim @@ -471,4 +471,4 @@ endi # return -1 #endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/parser/fourArithmetic-basic.sim b/tests/script/tsim/parser/fourArithmetic-basic.sim index ebe20924be..eee294dc80 100644 --- a/tests/script/tsim/parser/fourArithmetic-basic.sim +++ b/tests/script/tsim/parser/fourArithmetic-basic.sim @@ -138,4 +138,4 @@ if $loop_test == 0 then goto loop_test_pos endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/query/charScalarFunction.sim b/tests/script/tsim/query/charScalarFunction.sim index 0468125997..18ee93f610 100644 --- a/tests/script/tsim/query/charScalarFunction.sim +++ b/tests/script/tsim/query/charScalarFunction.sim @@ -727,4 +727,4 @@ if $loop_test == 0 then goto loop_test_pos endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/query/complex_limit.sim b/tests/script/tsim/query/complex_limit.sim index 1691f2d443..4942fec4ee 100644 --- a/tests/script/tsim/query/complex_limit.sim +++ b/tests/script/tsim/query/complex_limit.sim @@ -527,4 +527,4 @@ if $rows != 1 then return -1 endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/query/complex_select.sim b/tests/script/tsim/query/complex_select.sim index 1ebcb2f49a..1f41783383 100644 --- a/tests/script/tsim/query/complex_select.sim +++ b/tests/script/tsim/query/complex_select.sim @@ -577,4 +577,4 @@ if $data00 != 33 then endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/query/complex_where.sim b/tests/script/tsim/query/complex_where.sim index 7cd576400f..8e22a12fcf 100644 --- a/tests/script/tsim/query/complex_where.sim +++ b/tests/script/tsim/query/complex_where.sim @@ -688,4 +688,4 @@ if $rows != 1 then return -1 endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/query/interval-offset.sim b/tests/script/tsim/query/interval-offset.sim index e222077fa4..6c736e9daa 100644 --- a/tests/script/tsim/query/interval-offset.sim +++ b/tests/script/tsim/query/interval-offset.sim @@ -318,4 +318,4 @@ endi #sql select count(*) from car where ts > '2019-05-14 00:00:00' interval(1y, 5d) -#system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/query/interval.sim b/tests/script/tsim/query/interval.sim index 384008c887..9d7104c3de 100644 --- a/tests/script/tsim/query/interval.sim +++ b/tests/script/tsim/query/interval.sim @@ -180,4 +180,4 @@ print =============== clear # return -1 #endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/query/scalarFunction.sim b/tests/script/tsim/query/scalarFunction.sim index 9e6d378bd0..be75e1a21c 100644 --- a/tests/script/tsim/query/scalarFunction.sim +++ b/tests/script/tsim/query/scalarFunction.sim @@ -481,4 +481,4 @@ if $loop_test == 0 then goto loop_test_pos endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/query/session.sim b/tests/script/tsim/query/session.sim index d4920ea255..5062c556a5 100644 --- a/tests/script/tsim/query/session.sim +++ b/tests/script/tsim/query/session.sim @@ -347,4 +347,4 @@ if $loop_test == 0 then goto loop_test_pos endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/show/basic.sim b/tests/script/tsim/show/basic.sim index 0c0670ff7f..ca6cd1c11a 100644 --- a/tests/script/tsim/show/basic.sim +++ b/tests/script/tsim/show/basic.sim @@ -212,4 +212,5 @@ if $rows != 3 then return -1 endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/stable/dnode3.sim b/tests/script/tsim/stable/dnode3.sim index e388bd9b31..706c4aa499 100644 --- a/tests/script/tsim/stable/dnode3.sim +++ b/tests/script/tsim/stable/dnode3.sim @@ -211,3 +211,6 @@ if $rows != 2 then endi system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/basic.sim b/tests/script/tsim/tmq/basic.sim index 9f55847965..db2c7fd9a3 100644 --- a/tests/script/tsim/tmq/basic.sim +++ b/tests/script/tsim/tmq/basic.sim @@ -76,4 +76,4 @@ if $data00 != 10000 then return -1 endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim index ec33e89e84..52e8279322 100644 --- a/tests/script/tsim/tmq/basic1.sim +++ b/tests/script/tsim/tmq/basic1.sim @@ -196,4 +196,4 @@ $dbNamme = d1 sql create database $dbNamme vgroups 4 -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/main2Con1Cgrp1TopicFrCtb.sim b/tests/script/tsim/tmq/main2Con1Cgrp1TopicFrCtb.sim index 68c2d5a891..16e37e0e12 100644 --- a/tests/script/tsim/tmq/main2Con1Cgrp1TopicFrCtb.sim +++ b/tests/script/tsim/tmq/main2Con1Cgrp1TopicFrCtb.sim @@ -43,6 +43,35 @@ loop_vgroups: print =============== create database $dbNamme vgroups $vgroups sql create database $dbNamme vgroups $vgroups sql show databases +print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 +print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19 +print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29 + +if $loop_cnt == 0 then + if $rows != 2 then + return -1 + endi + if $data02 != 2 then # vgroups + print vgroups: $data02 + return -1 + endi +else + if $rows != 3 then + return -1 + endi + if $data00 == d1 then + if $data02 != 4 then # vgroups + print vgroups: $data02 + return -1 + endi + else + if $data12 != 4 then # vgroups + print vgroups: $data12 + return -1 + endi + endi +endi + sql use $dbNamme print =============== create super table @@ -233,4 +262,4 @@ if $loop_cnt == 0 then goto loop_vgroups endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/main2Con1Cgrp1TopicFrStb.sim b/tests/script/tsim/tmq/main2Con1Cgrp1TopicFrStb.sim index 1b98bcdd5d..9b24e4870d 100644 --- a/tests/script/tsim/tmq/main2Con1Cgrp1TopicFrStb.sim +++ b/tests/script/tsim/tmq/main2Con1Cgrp1TopicFrStb.sim @@ -204,27 +204,27 @@ print expectMsgCntFromStb: $expectMsgCntFromStb #endi $expect_result = @{consume success: @ -$expect_result = $expect_result . $expectConsumeMsgCnt +$expect_result = $expect_result . $expectMsgCntFromStb $expect_result = $expect_result . @, @ $expect_result = $expect_result . 0} print expect_result----> $expect_result -print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 0 +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 0 print cmd result----> $system_content if $system_content != success then return -1 endi -#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 0 +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 0 #print cmd result----> $system_content ##if $system_content != @{consume success: 10000, 0}@ then #if $system_content != success then # return -1 #endi -print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 0 +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 0 print cmd result----> $system_content #if $system_content != @{consume success: 10000, 0}@ then if $system_content != success then @@ -237,4 +237,4 @@ if $loop_cnt == 0 then goto loop_vgroups endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/main2Con1Cgrp2TopicFrCtb.sim b/tests/script/tsim/tmq/main2Con1Cgrp2TopicFrCtb.sim index 9f2b204b60..8c0b3934b1 100644 --- a/tests/script/tsim/tmq/main2Con1Cgrp2TopicFrCtb.sim +++ b/tests/script/tsim/tmq/main2Con1Cgrp2TopicFrCtb.sim @@ -232,4 +232,4 @@ if $loop_cnt == 0 then goto loop_vgroups endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/main2Con1Cgrp2TopicFrStb.sim b/tests/script/tsim/tmq/main2Con1Cgrp2TopicFrStb.sim index 45dd4fd187..853d842a44 100644 --- a/tests/script/tsim/tmq/main2Con1Cgrp2TopicFrStb.sim +++ b/tests/script/tsim/tmq/main2Con1Cgrp2TopicFrStb.sim @@ -237,4 +237,4 @@ if $loop_cnt == 0 then goto loop_vgroups endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/mainConsumerInMultiTopic.sim b/tests/script/tsim/tmq/mainConsumerInMultiTopic.sim index 2e2534d104..e9e24d06c6 100644 --- a/tests/script/tsim/tmq/mainConsumerInMultiTopic.sim +++ b/tests/script/tsim/tmq/mainConsumerInMultiTopic.sim @@ -210,4 +210,4 @@ if $loop_cnt == 0 then endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/mainConsumerInOneTopic.sim b/tests/script/tsim/tmq/mainConsumerInOneTopic.sim index d307723878..24f15ab46d 100644 --- a/tests/script/tsim/tmq/mainConsumerInOneTopic.sim +++ b/tests/script/tsim/tmq/mainConsumerInOneTopic.sim @@ -239,4 +239,4 @@ if $loop_cnt == 0 then goto loop_vgroups endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/multiTopic.sim b/tests/script/tsim/tmq/multiTopic.sim index 0ce6304799..ea5d7e3e65 100644 --- a/tests/script/tsim/tmq/multiTopic.sim +++ b/tests/script/tsim/tmq/multiTopic.sim @@ -221,4 +221,4 @@ if $loop_cnt == 0 then endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/oneTopic.sim b/tests/script/tsim/tmq/oneTopic.sim index e3f9d727b9..54b79bc490 100644 --- a/tests/script/tsim/tmq/oneTopic.sim +++ b/tests/script/tsim/tmq/oneTopic.sim @@ -261,4 +261,4 @@ if $loop_cnt == 0 then endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/overlapTopic2Con1Cgrp.sim b/tests/script/tsim/tmq/overlapTopic2Con1Cgrp.sim index 62ec3149be..943e139196 100644 --- a/tests/script/tsim/tmq/overlapTopic2Con1Cgrp.sim +++ b/tests/script/tsim/tmq/overlapTopic2Con1Cgrp.sim @@ -39,6 +39,7 @@ sql connect $loop_cnt = 0 $vgroups = 1 $dbNamme = d0 + loop_vgroups: print =============== create database $dbNamme vgroups $vgroups sql create database $dbNamme vgroups $vgroups @@ -48,15 +49,15 @@ print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $d print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29 if $loop_cnt == 0 then - if $rows != 2 then + if $rows != 3 then return -1 endi - if $data02 != 1 then # vgroups + if $data22 != 1 then # vgroups print vgroups: $data02 return -1 endi else - if $rows != 3 then + if $rows != 4 then return -1 endi if $data00 == d1 then @@ -153,7 +154,7 @@ endw # -g showMsgFlag, default is 0 # -$consumeDelay = 2 +$consumeDelay = 3 $expectMsgCntFromCtb = $rowNum $expectMsgCntFromStb = $rowNum * $tbNum @@ -179,8 +180,18 @@ $expect_result = $expect_result . $totalMsgCntOfmultiTopics $expect_result = $expect_result . @, @ $expect_result = $expect_result . 0} print expect_result----> $expect_result -print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column, topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 2 -system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column, topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 2 + +$check_mode = 0 +if $loop_cnt == 0 then + $check_mode = 0 +else + $check_mode = 2 +endi + +$expectMsgCntFromStb0 = 2001 +$expectMsgCntFromStb1 = 2001 +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column, topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb0 -m1 $expectMsgCntFromStb1 -j 2 +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column, topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb0 -m1 $expectMsgCntFromStb1 -j 2 print cmd result----> $system_content if $system_content != success then return -1 @@ -237,4 +248,4 @@ if $loop_cnt == 0 then endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s stop -x SIGINT