fix:same subtable same partition by leads to same table name in stream
This commit is contained in:
parent
411ea39649
commit
8e2a2414c0
|
@ -267,7 +267,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData
|
||||||
|
|
||||||
bool alreadyAddGroupId(char* ctbName);
|
bool alreadyAddGroupId(char* ctbName);
|
||||||
bool isAutoTableName(char* ctbName);
|
bool isAutoTableName(char* ctbName);
|
||||||
void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId);
|
void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId);
|
||||||
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
|
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
|
||||||
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
|
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
|
||||||
|
|
||||||
|
|
|
@ -61,7 +61,7 @@ typedef struct SStreamTask SStreamTask;
|
||||||
typedef struct SStreamQueue SStreamQueue;
|
typedef struct SStreamQueue SStreamQueue;
|
||||||
typedef struct SStreamTaskSM SStreamTaskSM;
|
typedef struct SStreamTaskSM SStreamTaskSM;
|
||||||
|
|
||||||
#define SSTREAM_TASK_VER 3
|
#define SSTREAM_TASK_VER 4
|
||||||
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
|
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
|
||||||
#define SSTREAM_TASK_NEED_CONVERT_VER 2
|
#define SSTREAM_TASK_NEED_CONVERT_VER 2
|
||||||
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
|
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
|
||||||
|
|
|
@ -2141,10 +2141,14 @@ _end:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId){
|
void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId){
|
||||||
char tmp[TSDB_TABLE_NAME_LEN] = {0};
|
char tmp[TSDB_TABLE_NAME_LEN] = {0};
|
||||||
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
|
if (stbName == NULL){
|
||||||
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end
|
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
|
||||||
|
}else{
|
||||||
|
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%"PRIu64, stbName, groupId);
|
||||||
|
}
|
||||||
|
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put stbname + groupId to the end
|
||||||
strcat(ctbName, tmp);
|
strcat(ctbName, tmp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2154,6 +2158,7 @@ bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0
|
||||||
|
|
||||||
bool alreadyAddGroupId(char* ctbName) {
|
bool alreadyAddGroupId(char* ctbName) {
|
||||||
size_t len = strlen(ctbName);
|
size_t len = strlen(ctbName);
|
||||||
|
if (len == 0) return false;
|
||||||
size_t _location = len - 1;
|
size_t _location = len - 1;
|
||||||
while (_location > 0) {
|
while (_location > 0) {
|
||||||
if (ctbName[_location] < '0' || ctbName[_location] > '9') {
|
if (ctbName[_location] < '0' || ctbName[_location] > '9') {
|
||||||
|
|
|
@ -72,7 +72,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||||
if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
|
if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
|
||||||
for (int32_t j = 0; j < innerSz; j++) {
|
for (int32_t j = 0; j < innerSz; j++) {
|
||||||
SStreamTask *pTask = taosArrayGetP(pArray, j);
|
SStreamTask *pTask = taosArrayGetP(pArray, j);
|
||||||
pTask->ver = SSTREAM_TASK_VER;
|
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||||
|
pTask->ver = SSTREAM_TASK_VER;
|
||||||
|
}
|
||||||
if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1;
|
if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -434,7 +434,9 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, NULL, 0);
|
tEncoderInit(&encoder, NULL, 0);
|
||||||
|
|
||||||
pTask->ver = SSTREAM_TASK_VER;
|
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||||
|
pTask->ver = SSTREAM_TASK_VER;
|
||||||
|
}
|
||||||
tEncodeStreamTask(&encoder, pTask);
|
tEncodeStreamTask(&encoder, pTask);
|
||||||
|
|
||||||
int32_t size = encoder.pos;
|
int32_t size = encoder.pos;
|
||||||
|
|
|
@ -71,8 +71,8 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
|
||||||
if (varTbName != NULL && varTbName != (void*)-1) {
|
if (varTbName != NULL && varTbName != (void*)-1) {
|
||||||
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
|
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
|
||||||
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
|
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
|
||||||
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) {
|
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0 && stbFullName) {
|
||||||
buildCtbNameAddGroupId(name, groupId);
|
buildCtbNameAddGroupId(stbFullName, name, groupId);
|
||||||
}
|
}
|
||||||
} else if (stbFullName) {
|
} else if (stbFullName) {
|
||||||
name = buildCtbNameByGroupId(stbFullName, groupId);
|
name = buildCtbNameByGroupId(stbFullName, groupId);
|
||||||
|
@ -182,10 +182,10 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa
|
||||||
int64_t gid, bool newSubTableRule) {
|
int64_t gid, bool newSubTableRule) {
|
||||||
if (pDataBlock->info.parTbName[0]) {
|
if (pDataBlock->info.parTbName[0]) {
|
||||||
if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
|
if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
|
||||||
!alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) {
|
!alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0 && stbFullName) {
|
||||||
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
|
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
|
||||||
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
|
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
|
||||||
buildCtbNameAddGroupId(pCreateTableReq->name, gid);
|
buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid);
|
||||||
// tqDebug("gen name from:%s", pDataBlock->info.parTbName);
|
// tqDebug("gen name from:%s", pDataBlock->info.parTbName);
|
||||||
} else {
|
} else {
|
||||||
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
|
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
|
||||||
|
@ -671,10 +671,14 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
|
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
|
||||||
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
|
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
|
||||||
} else {
|
} else {
|
||||||
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 &&
|
if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
|
||||||
!isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) {
|
!alreadyAddGroupId(dstTableName) && groupId != 0) {
|
||||||
tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
|
tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
|
||||||
buildCtbNameAddGroupId(dstTableName, groupId);
|
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||||
|
buildCtbNameAddGroupId(NULL, dstTableName, groupId);
|
||||||
|
}else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) {
|
||||||
|
buildCtbNameAddGroupId(stbFullName, dstTableName, groupId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -580,12 +580,15 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
||||||
} else {
|
} else {
|
||||||
char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
|
char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
if (pDataBlock->info.parTbName[0]) {
|
if (pDataBlock->info.parTbName[0]) {
|
||||||
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER &&
|
if(pTask->subtableWithoutMd5 != 1 &&
|
||||||
pTask->subtableWithoutMd5 != 1 &&
|
|
||||||
!isAutoTableName(pDataBlock->info.parTbName) &&
|
!isAutoTableName(pDataBlock->info.parTbName) &&
|
||||||
!alreadyAddGroupId(pDataBlock->info.parTbName) &&
|
!alreadyAddGroupId(pDataBlock->info.parTbName) &&
|
||||||
groupId != 0){
|
groupId != 0){
|
||||||
buildCtbNameAddGroupId(pDataBlock->info.parTbName, groupId);
|
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||||
|
buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId);
|
||||||
|
}else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
|
||||||
|
buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName, groupId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
|
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
|
||||||
|
|
|
@ -542,7 +542,6 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
pTask->ver = SSTREAM_TASK_VER;
|
|
||||||
tEncodeSize(tEncodeStreamTask, pTask, len, code);
|
tEncodeSize(tEncodeStreamTask, pTask, len, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -552,6 +551,9 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||||
|
pTask->ver = SSTREAM_TASK_VER;
|
||||||
|
}
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, len);
|
tEncoderInit(&encoder, buf, len);
|
||||||
tEncodeStreamTask(&encoder, pTask);
|
tEncodeStreamTask(&encoder, pTask);
|
||||||
|
|
|
@ -78,8 +78,30 @@ class TDTestCase:
|
||||||
tdLog.info(cmd)
|
tdLog.info(cmd)
|
||||||
os.system(cmd)
|
os.system(cmd)
|
||||||
|
|
||||||
|
def case1(self):
|
||||||
|
|
||||||
|
tdSql.execute(f'create database if not exists d1 vgroups 1')
|
||||||
|
tdSql.execute(f'use d1')
|
||||||
|
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
|
||||||
|
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
|
||||||
|
tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)')
|
||||||
|
tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)')
|
||||||
|
|
||||||
|
tdSql.execute("create stream stream1 fill_history 1 into sta subtable(concat('new-', tname)) AS SELECT "
|
||||||
|
"_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True)
|
||||||
|
|
||||||
|
tdSql.execute("create stream stream2 fill_history 1 into sta subtable(concat('new-', tname)) AS SELECT "
|
||||||
|
"_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True)
|
||||||
|
|
||||||
|
time.sleep(2)
|
||||||
|
tdSql.query("select * from sta")
|
||||||
|
tdSql.checkRows(3)
|
||||||
|
|
||||||
|
tdSql.query("select * from stb")
|
||||||
|
tdSql.checkRows(3)
|
||||||
# run
|
# run
|
||||||
def run(self):
|
def run(self):
|
||||||
|
self.case1()
|
||||||
# gen data
|
# gen data
|
||||||
random.seed(int(time.time()))
|
random.seed(int(time.time()))
|
||||||
self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y")
|
self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y")
|
||||||
|
|
Loading…
Reference in New Issue