diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index fa6e4c3ae8..cf6d4ba2b0 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -267,7 +267,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData bool alreadyAddGroupId(char* ctbName); bool isAutoTableName(char* ctbName); -void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId); +void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5f3761d7b7..e7c6491b9d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -61,7 +61,7 @@ typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; typedef struct SStreamTaskSM SStreamTaskSM; -#define SSTREAM_TASK_VER 3 +#define SSTREAM_TASK_VER 4 #define SSTREAM_TASK_INCOMPATIBLE_VER 1 #define SSTREAM_TASK_NEED_CONVERT_VER 2 #define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f4455be206..dc1c27b123 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2141,10 +2141,14 @@ _end: return TSDB_CODE_SUCCESS; } -void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId){ +void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId){ char tmp[TSDB_TABLE_NAME_LEN] = {0}; - snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId); - ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end + if (stbName == NULL){ + snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId); + }else{ + snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%"PRIu64, stbName, groupId); + } + ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put stbname + groupId to the end strcat(ctbName, tmp); } @@ -2154,6 +2158,7 @@ bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0 bool alreadyAddGroupId(char* ctbName) { size_t len = strlen(ctbName); + if (len == 0) return false; size_t _location = len - 1; while (_location > 0) { if (ctbName[_location] < '0' || ctbName[_location] > '9') { diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 3f69c7def3..091edc6ab0 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -72,7 +72,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI32(pEncoder, innerSz) < 0) return -1; for (int32_t j = 0; j < innerSz; j++) { SStreamTask *pTask = taosArrayGetP(pArray, j); - pTask->ver = SSTREAM_TASK_VER; + if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + pTask->ver = SSTREAM_TASK_VER; + } if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1; } } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 6067af199e..ff05db417e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -434,7 +434,9 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { SEncoder encoder; tEncoderInit(&encoder, NULL, 0); - pTask->ver = SSTREAM_TASK_VER; + if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + pTask->ver = SSTREAM_TASK_VER; + } tEncodeStreamTask(&encoder, pTask); int32_t size = encoder.pos; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 5374b9aa78..1e0ae7a854 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -71,8 +71,8 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p if (varTbName != NULL && varTbName != (void*)-1) { name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); - if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) { - buildCtbNameAddGroupId(name, groupId); + if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0 && stbFullName) { + buildCtbNameAddGroupId(stbFullName, name, groupId); } } else if (stbFullName) { name = buildCtbNameByGroupId(stbFullName, groupId); @@ -182,10 +182,10 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa int64_t gid, bool newSubTableRule) { if (pDataBlock->info.parTbName[0]) { if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) && - !alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) { + !alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0 && stbFullName) { pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); - buildCtbNameAddGroupId(pCreateTableReq->name, gid); + buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid); // tqDebug("gen name from:%s", pDataBlock->info.parTbName); } else { pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); @@ -671,10 +671,14 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); } else { - if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 && - !isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) { + if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) && + !alreadyAddGroupId(dstTableName) && groupId != 0) { tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName); - buildCtbNameAddGroupId(dstTableName, groupId); + if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + buildCtbNameAddGroupId(NULL, dstTableName, groupId); + }else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) { + buildCtbNameAddGroupId(stbFullName, dstTableName, groupId); + } } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 0af664f1e1..9542009d72 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -580,12 +580,15 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } else { char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; if (pDataBlock->info.parTbName[0]) { - if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && - pTask->subtableWithoutMd5 != 1 && + if(pTask->subtableWithoutMd5 != 1 && !isAutoTableName(pDataBlock->info.parTbName) && !alreadyAddGroupId(pDataBlock->info.parTbName) && groupId != 0){ - buildCtbNameAddGroupId(pDataBlock->info.parTbName, groupId); + if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId); + }else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) { + buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName, groupId); + } } } else { buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c24763c024..aae3594905 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -542,7 +542,6 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { void* buf = NULL; int32_t len; int32_t code; - pTask->ver = SSTREAM_TASK_VER; tEncodeSize(tEncodeStreamTask, pTask, len, code); if (code < 0) { return -1; @@ -552,6 +551,9 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { return -1; } + if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + pTask->ver = SSTREAM_TASK_VER; + } SEncoder encoder = {0}; tEncoderInit(&encoder, buf, len); tEncodeStreamTask(&encoder, pTask); diff --git a/tests/system-test/8-stream/stream_basic.py b/tests/system-test/8-stream/stream_basic.py index 3ebc255114..f08cca1e13 100644 --- a/tests/system-test/8-stream/stream_basic.py +++ b/tests/system-test/8-stream/stream_basic.py @@ -78,8 +78,30 @@ class TDTestCase: tdLog.info(cmd) os.system(cmd) + def case1(self): + + tdSql.execute(f'create database if not exists d1 vgroups 1') + tdSql.execute(f'use d1') + tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)') + tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)') + + tdSql.execute("create stream stream1 fill_history 1 into sta subtable(concat('new-', tname)) AS SELECT " + "_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True) + + tdSql.execute("create stream stream2 fill_history 1 into sta subtable(concat('new-', tname)) AS SELECT " + "_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True) + + time.sleep(2) + tdSql.query("select * from sta") + tdSql.checkRows(3) + + tdSql.query("select * from stb") + tdSql.checkRows(3) # run def run(self): + self.case1() # gen data random.seed(int(time.time())) self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y")