fix:[TD-28032]concat subtable and hash val as new subtable name to avoid data from multi time line write to one time line

This commit is contained in:
wangmm0220 2024-01-03 19:44:30 +08:00
parent 317f0c6a39
commit b200e6807f
4 changed files with 23 additions and 4 deletions

View File

@ -262,6 +262,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, c
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId,
tb_uid_t suid); tb_uid_t suid);
bool alreadyAddGroupId(char* ctbName);
bool isAutoTableName(char* ctbName); bool isAutoTableName(char* ctbName);
void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId); void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);

View File

@ -2120,7 +2120,7 @@ _end:
void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId){ void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId){
char tmp[TSDB_TABLE_NAME_LEN] = {0}; char tmp[TSDB_TABLE_NAME_LEN] = {0};
snprintf(tmp, TSDB_TABLE_NAME_LEN, "-%"PRIu64, groupId); snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end
strcat(ctbName, tmp); strcat(ctbName, tmp);
} }
@ -2129,6 +2129,18 @@ bool isAutoTableName(char* ctbName){
return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_');
} }
bool alreadyAddGroupId(char* ctbName){
size_t len = strlen(ctbName);
size_t _location = len - 1;
for(; _location >= 0; _location--){
if(ctbName[_location] < '0' && ctbName[_location] > '9'){
break;
}
}
return ctbName[_location] == '_' && _location < len -1;
}
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
if (!pBuf) { if (!pBuf) {

View File

@ -175,7 +175,9 @@ SArray* createDefaultTagColName() {
void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid, bool newSubTableRule) { int64_t gid, bool newSubTableRule) {
if (pDataBlock->info.parTbName[0]) { if (pDataBlock->info.parTbName[0]) {
if(newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName)) { if(newSubTableRule &&
!isAutoTableName(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName)) {
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
buildCtbNameAddGruopId(pCreateTableReq->name, gid); buildCtbNameAddGruopId(pCreateTableReq->name, gid);
@ -656,7 +658,9 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
}else{ }else{
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && !isAutoTableName(dstTableName)) { if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER &&
!isAutoTableName(dstTableName) &&
!alreadyAddGroupId(dstTableName)) {
buildCtbNameAddGruopId(dstTableName, groupId); buildCtbNameAddGruopId(dstTableName, groupId);
} }
} }

View File

@ -546,7 +546,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
} else { } else {
char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
if (pDataBlock->info.parTbName[0]) { if (pDataBlock->info.parTbName[0]) {
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && !isAutoTableName(pDataBlock->info.parTbName)){ if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER &&
!isAutoTableName(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName)){
buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId); buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId);
} }
} else { } else {