From 3ec1560faadbfa61643c9c917202f672a24186ee Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 12 Jan 2023 18:02:09 +0800 Subject: [PATCH] create stream and use existing super table --- include/common/tmsg.h | 7 +- include/libs/qcom/query.h | 6 + source/common/src/tmsg.c | 2 + source/dnode/mnode/impl/src/mndStream.c | 38 ++- source/dnode/vnode/src/tq/tqSink.c | 81 +++-- source/libs/parser/src/parTranslater.c | 8 +- tests/parallel_test/cases.task | 4 +- .../script/tsim/stream/checkStreamSTable.sim | 310 ++++++++++++++++++ .../{tableAndTag0.sim => udTableAndTag0.sim} | 99 ++++++ .../{tableAndTag1.sim => udTableAndTag1.sim} | 98 ++++++ 10 files changed, 611 insertions(+), 42 deletions(-) create mode 100644 tests/script/tsim/stream/checkStreamSTable.sim rename tests/script/tsim/stream/{tableAndTag0.sim => udTableAndTag0.sim} (73%) rename tests/script/tsim/stream/{tableAndTag1.sim => udTableAndTag1.sim} (74%) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 800f9e2eb7..cf57165e54 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -343,7 +343,8 @@ void tFreeSSubmitRsp(SSubmitRsp* pRsp); #define COL_IS_SET(FLG) (((FLG) & (COL_SET_VAL | COL_SET_NULL)) != 0) #define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL))) -#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON) +#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON) +#define IS_SET_NULL(s) (((s)->flags & COL_SET_NULL) == COL_SET_NULL) #define SSCHMEA_TYPE(s) ((s)->type) #define SSCHMEA_FLAGS(s) ((s)->flags) @@ -1771,7 +1772,9 @@ typedef struct { // 3.0.20 int64_t checkpointFreq; // ms // 3.0.2.3 - int8_t createStb; + int8_t createStb; + uint64_t targetStbUid; + SArray* fillNullCols; } SCMCreateStreamReq; typedef struct { diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index bbf332c4d4..57ddeb657c 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -207,6 +207,12 @@ typedef struct SQueryNodeStat { int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT } SQueryNodeStat; +typedef struct SColLocation { + int16_t slotId; + col_id_t colId; + int8_t type; +} SColLocation; + int32_t initTaskQueue(); int32_t cleanupTaskQueue(); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 83f447fd0e..dde7d50c32 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5425,6 +5425,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeCStr(&encoder, pField->name) < 0) return -1; } if (tEncodeI8(&encoder, pReq->createStb) < 0) return -1; + if (tEncodeU64(&encoder, pReq->targetStbUid) < 0) return -1; tEndEncode(&encoder); @@ -5486,6 +5487,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea } } if (tDecodeI8(&decoder, &pReq->createStb) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->targetStbUid) < 0) return -1; tEndDecode(&decoder); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8a435c4887..6b54a36a6f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -314,7 +314,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, } tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN); - pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN); + if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) { + pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN); + } else { + pObj->targetStbUid = pCreate->targetStbUid; + } pObj->targetDbUid = pTargetDb->uid; mndReleaseDb(pMnode, pTargetDb); @@ -334,6 +338,38 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, goto FAIL; } + int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols); + if(numOfNULL > 0) { + pObj->outputSchema.nCols += numOfNULL; + SSchema* pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema)); + if (!pFullSchema) { + goto FAIL; + } + + int32_t nullIndex = 0; + int32_t dataIndex = 0; + for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) { + SColLocation* pos = taosArrayGet(pCreate->fillNullCols, nullIndex); + if (i < pos->slotId) { + pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; + pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; + pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags; + strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name); + pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type; + dataIndex++; + } else { + pFullSchema[i].bytes = 0; + pFullSchema[i].colId = pos->colId; + pFullSchema[i].flags = COL_SET_NULL; + memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN); + pFullSchema[i].type = pos->type; + nullIndex++; + } + } + taosMemoryFree(pObj->outputSchema.pSchema); + pObj->outputSchema.pSchema = pFullSchema; + } + SPlanContext cxt = { .pAstRoot = pAst, .topicQuery = false, diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index cc60283c58..f1103ad48a 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -323,19 +323,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d taosArrayDestroy(tagArray); } -static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, void** pBuf, int32_t* contLen) { +static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) { int32_t ret = 0; - SVCreateTbBatchReq reqs = {0}; - reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq)); - if (NULL == reqs.pArray) { - ret = -1; - goto end; - } - taosArrayPush(reqs.pArray, req); - reqs.nReqs = 1; - - tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret); + tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret); if (ret < 0) { ret = -1; goto end; @@ -350,7 +341,7 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, vo ((SMsgHead*)(*pBuf))->contLen = htonl(*contLen); SEncoder coder = {0}; tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead) ); - if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) { + if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) { rpcFreeCont(*pBuf); *pBuf = NULL; *contLen = 0; @@ -361,14 +352,13 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, vo tEncoderClear(&coder); end: - taosArrayDestroy(reqs.pArray); return ret; } -int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbReq* pReq) { +int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { void* buf = NULL; int32_t tlen = 0; - encodeCreateChildTableForRPC(pReq, TD_VID(pVnode), &buf, &tlen); + encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen); SRpcMsg msg = { .msgType = TDMT_VND_CREATE_TABLE, @@ -387,6 +377,7 @@ _error: tqError("failed to encode submit req since %s", terrstr()); return TSDB_CODE_OUT_OF_MEMORY; } + void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; @@ -402,6 +393,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* void* pBuf = NULL; SArray* tagArray = NULL; SArray* pVals = NULL; + SArray* crTblArray = NULL; for (int32_t i = 0; i < blockSz; i++) { SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); @@ -442,8 +434,14 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* tqDebug("failed to put delete req into write-queue since %s", terrstr()); } } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + SVCreateTbBatchReq reqs = {0}; + crTblArray = reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq)); + if (NULL == reqs.pArray) { + goto _end; + } for (int32_t rowId = 0; rowId < rows; rowId++) { - SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)); + SVCreateTbReq createTbReq = {0}; + SVCreateTbReq* pCreateTbReq = &createTbReq; if (!pCreateTbReq) { goto _end; } @@ -511,6 +509,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* terrno = TSDB_CODE_OUT_OF_MEMORY; goto _end; } + pCreateTbReq->ctb.pTag = (uint8_t*)pTag; // set table name @@ -524,13 +523,15 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* pCreateTbReq->name = taosMemoryCalloc(1, varDataLen(pTbData) + 1); memcpy(pCreateTbReq->name, varDataVal(pTbData), varDataLen(pTbData)); } - - if (tqPutReqToQueue(pVnode, pCreateTbReq) != TSDB_CODE_SUCCESS) { - goto _end; - } - tdDestroySVCreateTbReq(pCreateTbReq); - taosMemoryFreeClear(pCreateTbReq); + taosArrayPush(reqs.pArray, pCreateTbReq); } + reqs.nReqs = taosArrayGetSize(reqs.pArray); + if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) { + goto _end; + } + tagArray = taosArrayDestroy(tagArray); + taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); + crTblArray = NULL; } else { SSubmitTbData tbData = {0}; tqDebug("tq sink pipe2, convert block1 %d, rows: %d", i, rows); @@ -579,7 +580,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* goto _end; } STagVal tagVal = { - .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, + .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = (int64_t)pDataBlock->info.id.groupId, }; @@ -638,28 +639,37 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* for (int32_t j = 0; j < rows; j++) { taosArrayClear(pVals); + int32_t dataIndex = 0; for (int32_t k = 0; k < pTSchema->numOfCols; k++) { const STColumn* pCol = &pTSchema->columns[k]; - SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k); if (k == 0) { + SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); void* colData = colDataGetData(pColData, j); tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData); } - if (colDataIsNull_s(pColData, j)) { + if (IS_SET_NULL(pCol)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); taosArrayPush(pVals, &cv); - } else { - void* colData = colDataGetData(pColData, j); - if (IS_STR_DATA_TYPE(pCol->type)) { - SValue sv = - (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; // address copy, no value - SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); + } else{ + SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); + if (colDataIsNull_s(pColData, j)) { + SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); taosArrayPush(pVals, &cv); + dataIndex++; } else { - SValue sv; - memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes); - SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); - taosArrayPush(pVals, &cv); + void* colData = colDataGetData(pColData, j); + if (IS_STR_DATA_TYPE(pCol->type)) { + SValue sv = + (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; // address copy, no value + SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); + taosArrayPush(pVals, &cv); + } else { + SValue sv; + memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes); + SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); + taosArrayPush(pVals, &cv); + } + dataIndex++; } } } @@ -716,5 +726,6 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* _end: taosArrayDestroy(tagArray); taosArrayDestroy(pVals); + taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); // TODO: change } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8d1660be72..df04d92599 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5740,7 +5740,7 @@ static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STable int32_t index = 0; SNode* pProj = NULL; FOREACH(pProj, pProjections) { - SSchema* pSchema = pSchemas + index; + SSchema* pSchema = pSchemas + index++; SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes}; if (!dataTypeEqual(&dt, &((SExprNode*)pProj)->resType)) { SNode* pFunc = NULL; @@ -5761,7 +5761,7 @@ typedef struct SProjColPos { } SProjColPos; static int32_t projColPosCompar(const void* l, const void* r) { - return ((SProjColPos*)l)->colId < ((SProjColPos*)r)->colId; + return ((SProjColPos*)l)->colId > ((SProjColPos*)r)->colId; } static void projColPosDelete(void* p) { taosMemoryFree(((SProjColPos*)p)->pProj); } @@ -5856,7 +5856,11 @@ static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateSt return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName); } pReq->createStb = STREAM_CREATE_STABLE_TRUE; + pReq->targetStbUid = 0; return TSDB_CODE_SUCCESS; + } else { + pReq->createStb = STREAM_CREATE_STABLE_FALSE; + pReq->targetStbUid = pMeta->suid; } if (TSDB_CODE_SUCCESS == code) { code = adjustStreamQueryForExistTableImpl(pCxt, pStmt, pMeta); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 66a71bfae1..0946655de6 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -247,8 +247,8 @@ ,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim -,,y,script,./test.sh -f tsim/stream/tableAndTag0.sim -,,y,script,./test.sh -f tsim/stream/tableAndTag1.sim +,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim +,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim ,,y,script,./test.sh -f tsim/trans/lossdata1.sim ,,y,script,./test.sh -f tsim/trans/create_db.sim ,,y,script,./test.sh -f tsim/tmq/basic1.sim diff --git a/tests/script/tsim/stream/checkStreamSTable.sim b/tests/script/tsim/stream/checkStreamSTable.sim new file mode 100644 index 0000000000..2ed6958196 --- /dev/null +++ b/tests/script/tsim/stream/checkStreamSTable.sim @@ -0,0 +1,310 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +print ===== step1 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print ===== step2 + +sql create database result vgroups 1; + +sql create database test vgroups 4; +sql use test; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stable result.streamt0(ts timestamp,a int,b int) tags(ta int,tb int,tc int); + +sql create stream streams0 trigger at_once into result.streamt0 as select _wstart, count(*) c1, max(a) c2 from st partition by tbname interval(10s); +sql insert into t1 values(1648791213000,1,2,3); +sql insert into t2 values(1648791213000,2,2,3); + +$loop_count = 0 + +sql select _wstart, count(*) c1, max(a) c2 from st partition by tbname interval(10s); +print $data00, $data01, $data02 +print $data10, $data11, $data12 +print $data20, $data21, $data22 + +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result.streamt0 order by ta; + +if $rows != 2 then + print =====rows=$rows + print $data00, $data01, $data02 + print $data10, $data11, $data12 + print $data20, $data21, $data22 + goto loop0 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop0 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop0 +endi + +if $data12 != 2 then + print =====data12=$data12 + goto loop0 +endi + +print ===== step3 + +sql create database result1 vgroups 1; + +sql create database test1 vgroups 4; +sql use test1; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stable result1.streamt1(ts timestamp,a int,b int,c int) tags(ta bigint unsigned,tb int,tc int); + +sql create stream streams1 trigger at_once into result1.streamt1(ts,c,a,b) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname interval(10s); +sql insert into t1 values(1648791213000,10,20,30); +sql insert into t2 values(1648791213000,40,50,60); + +$loop_count = 0 + +sql select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname interval(10s); +print $data00, $data01, $data02, $data03 +print $data10, $data11, $data12, $data13 +print $data20, $data21, $data22, $data23 + +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result1.streamt1 order by ta; + +if $rows != 2 then + print =====rows=$rows + print $data00, $data01, $data02, $data03 + print $data10, $data11, $data12, $data13 + print $data20, $data21, $data22, $data23 + goto loop1 +endi + +if $data01 != 10 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 20 then + print =====data02=$data02 + goto loop1 +endi + +if $data03 != 1 then + print =====data03=$data03 + goto loop1 +endi + +if $data11 != 40 then + print =====data11=$data11 + goto loop1 +endi + +if $data12 != 50 then + print =====data12=$data12 + goto loop1 +endi + +if $data13 != 1 then + print =====data13=$data13 + goto loop1 +endi + + +print ===== step4 + +sql create database result2 vgroups 1; + +sql create database test2 vgroups 4; +sql use test2; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stable result2.streamt2(ts timestamp, a int , b int) tags(ta varchar(20)); + +# tag dest 1, source 2 +##sql_error create stream streams2 trigger at_once into result2.streamt2 TAGS(aa varchar(100), ta int) as select _wstart, count(*) c1, max(a) from st partition by tbname as aa, ta interval(10s); + +# column dest 3, source 4 +sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s); + +# column dest 3, source 4 +sql_error create stream streams2 trigger at_once into result2.streamt2(ts, a, b) as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s); + +# column dest 3, source 2 +sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1 from st partition by tbname interval(10s); + +# column dest 3, source 2 +sql create stream streams2 trigger at_once into result2.streamt2(ts, a) as select _wstart, count(*) c1 from st partition by tbname interval(10s); + + +print ===== step5 + +sql create database result3 vgroups 1; + +sql create database test3 vgroups 4; +sql use test3; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,2,3); +sql create table t2 using st tags(4,5,6); + +sql create stable result3.streamt3(ts timestamp,a int,b int,c int, d int) tags(ta int,tb int,tc int); + +sql create stream streams3 trigger at_once into result3.streamt3(ts,c,a,b) as select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s); + +sql insert into t1 values(1648791213000,10,20,30); +sql insert into t2 values(1648791213000,40,50,60); + +$loop_count = 0 + +sql select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s); +print $data00, $data01, $data02, $data03, $data04 +print $data10, $data11, $data12, $data13, $data14 +print $data20, $data21, $data22, $data23, $data24 + +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result3.streamt3; + +if $rows != 1 then + print =====rows=$rows + print $data00, $data01, $data02, $data03 + print $data10, $data11, $data12, $data13 + print $data20, $data21, $data22, $data23 + goto loop2 +endi + +if $data01 != 40 then + print =====data01=$data01 + goto loop2 +endi + +if $data02 != 20 then + print =====data02=$data02 + goto loop2 +endi + +if $data03 != 2 then + print =====data03=$data03 + goto loop2 +endi + +if $data04 != NULL then + print =====data04=$data04 + goto loop2 +endi + +print ===== step6 + +sql create database result4 vgroups 1; + +sql create database test4 vgroups 4; +sql use test4; + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,2,3); +sql create table t2 using st tags(4,5,6); + +sql create stable result4.streamt4(ts timestamp,a int,b int,c int, d int) tags(ta int,tb int,tc int); + +sql create stream streams4 trigger at_once into result4.streamt4(ts,c,a,b) tags(tg2 int, tg3 varchar(100), tg1 bigint) subtable(concat("tbl-", tg1)) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s); + +sql insert into t1 values(1648791213000,10,20,30); +sql insert into t2 values(1648791213000,40,50,60); + +$loop_count = 0 + +sql select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s); +print $data00, $data01, $data02, $data03 +print $data10, $data11, $data12, $data13 +print $data20, $data21, $data22, $data23 + +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result4.streamt4; + +if $rows != 2 then + print =====rows=$rows + print $data00, $data01, $data02, $data03 + print $data10, $data11, $data12, $data13 + print $data20, $data21, $data22, $data23 + goto loop2 +endi + +if $data01 != 40 then + print =====data01=$data01 + goto loop2 +endi + +if $data02 != 20 then + print =====data02=$data02 + goto loop2 +endi + +if $data03 != 2 then + print =====data03=$data03 + goto loop2 +endi + +if $data04 != NULL then + print =====data04=$data04 + goto loop2 +endi + +print ======over + +system sh/stop_dnodes.sh diff --git a/tests/script/tsim/stream/tableAndTag0.sim b/tests/script/tsim/stream/udTableAndTag0.sim similarity index 73% rename from tests/script/tsim/stream/tableAndTag0.sim rename to tests/script/tsim/stream/udTableAndTag0.sim index 5e02171bee..86feca1918 100644 --- a/tests/script/tsim/stream/tableAndTag0.sim +++ b/tests/script/tsim/stream/udTableAndTag0.sim @@ -268,6 +268,105 @@ if $data10 != tbn-t2 then endi +print ===== step5 +print ===== tag name + table name + +sql create database result4 vgroups 1; + +sql create database test4 vgroups 4; +sql use test4; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(3,3,3); + +sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", tbname)) as select _wstart, count(*) c1 from st partition by concat("tag-", tbname) as dd, tbname interval(10s); +sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3); + + +$loop_count = 0 +loop7: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select table_name from information_schema.ins_tables where db_name="result4" order by 1; + +if $rows != 3 then + print =====rows=$rows + print $data00 $data10 + goto loop7 +endi + +if $data00 != tbn-t1 then + print =====data00=$data00 + goto loop7 +endi + +if $data10 != tbn-t2 then + print =====data10=$data10 + goto loop7 +endi + +if $data20 != tbn-t3 then + print =====data20=$data20 + goto loop7 +endi + +$loop_count = 0 +loop8: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result4.streamt4 order by 3; + +if $rows != 3 then + print =====rows=$rows + print $data00 $data10 + goto loop8 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != tag-t1 then + print =====data02=$data02 + goto loop8 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop8 +endi + +if $data12 != tag-t2 then + print =====data12=$data12 + goto loop8 +endi + +if $data21 != 1 then + print =====data21=$data21 + goto loop8 +endi + +if $data22 != tag-t3 then + print =====data22=$data22 + goto loop8 +endi + print ======over system sh/stop_dnodes.sh diff --git a/tests/script/tsim/stream/tableAndTag1.sim b/tests/script/tsim/stream/udTableAndTag1.sim similarity index 74% rename from tests/script/tsim/stream/tableAndTag1.sim rename to tests/script/tsim/stream/udTableAndTag1.sim index 74f67c1fb3..a0393a03cd 100644 --- a/tests/script/tsim/stream/tableAndTag1.sim +++ b/tests/script/tsim/stream/udTableAndTag1.sim @@ -269,6 +269,104 @@ if $data10 != tbn-2 then goto loop6 endi +print ===== step5 +print ===== tag name + table name + +sql create database result4 vgroups 1; + +sql create database test4 vgroups 4; +sql use test4; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(3,3,3); + +sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", dd)) as select _wstart, count(*) c1 from st partition by concat("t", cast(a as varchar(10) ) ) as dd interval(10s); +sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3); + + +$loop_count = 0 +loop7: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select table_name from information_schema.ins_tables where db_name="result4" order by 1; + +if $rows != 3 then + print =====rows=$rows + print $data00 $data10 + goto loop7 +endi + +if $data00 != tbn-t1 then + print =====data00=$data00 + goto loop7 +endi + +if $data10 != tbn-t2 then + print =====data10=$data10 + goto loop7 +endi + +if $data20 != tbn-t3 then + print =====data20=$data20 + goto loop7 +endi + +$loop_count = 0 +loop8: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result4.streamt4 order by 3; + +if $rows != 3 then + print =====rows=$rows + print $data00 $data10 + goto loop8 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != t1 then + print =====data02=$data02 + goto loop8 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop8 +endi + +if $data12 != t2 then + print =====data12=$data12 + goto loop8 +endi + +if $data21 != 1 then + print =====data21=$data21 + goto loop8 +endi + +if $data22 != t3 then + print =====data22=$data22 + goto loop8 +endi print ======over