fix:[TD-28032]concat subtable and hash val as new subtable name to avoid data from multi time line write to one time line

This commit is contained in:
wangmm0220 2024-01-03 18:48:54 +08:00
parent ab262550d1
commit 317f0c6a39
4 changed files with 32 additions and 34 deletions

View File

@ -262,6 +262,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, c
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId,
tb_uid_t suid); tb_uid_t suid);
bool isAutoTableName(char* ctbName);
void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId); void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);

View File

@ -2125,6 +2125,10 @@ void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId){
strcat(ctbName, tmp); strcat(ctbName, tmp);
} }
bool isAutoTableName(char* ctbName){
return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_');
}
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
if (!pBuf) { if (!pBuf) {

View File

@ -175,7 +175,7 @@ SArray* createDefaultTagColName() {
void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid, bool newSubTableRule) { int64_t gid, bool newSubTableRule) {
if (pDataBlock->info.parTbName[0]) { if (pDataBlock->info.parTbName[0]) {
if(newSubTableRule) { if(newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName)) {
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
buildCtbNameAddGruopId(pCreateTableReq->name, gid); buildCtbNameAddGruopId(pCreateTableReq->name, gid);
@ -627,23 +627,13 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
SSubmitTbData* pTableData) { SSubmitTbData* pTableData) {
uint64_t groupId = pDataBlock->info.id.groupId; uint64_t groupId = pDataBlock->info.id.groupId;
char* dstTableName = pDataBlock->info.parTbName;
int32_t numOfRows = pDataBlock->info.rows; int32_t numOfRows = pDataBlock->info.rows;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int64_t suid = pTask->outputInfo.tbSink.stbUid; int64_t suid = pTask->outputInfo.tbSink.stbUid;
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
int32_t vgId = TD_VID(pVnode); int32_t vgId = TD_VID(pVnode);
STableSinkInfo* pTableSinkInfo = NULL; STableSinkInfo* pTableSinkInfo = NULL;
char* dstTableName = NULL;
int32_t code = 0;
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
if(dstTableName == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(dstTableName, pDataBlock->info.parTbName);
}else{
dstTableName = pDataBlock->info.parTbName;
}
bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo); bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo);
@ -666,7 +656,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
}else{ }else{
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) { if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && !isAutoTableName(dstTableName)) {
buildCtbNameAddGruopId(dstTableName, groupId); buildCtbNameAddGruopId(dstTableName, groupId);
} }
} }
@ -674,8 +664,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
int32_t nameLen = strlen(dstTableName); int32_t nameLen = strlen(dstTableName);
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen); pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen);
if (pTableSinkInfo == NULL) { if (pTableSinkInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
goto END;
} }
pTableSinkInfo->name.len = nameLen; pTableSinkInfo->name.len = nameLen;
@ -688,8 +677,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
if (pTableData->uid == 0) { if (pTableData->uid == 0) {
tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
code = doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid); return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
goto END;
} else { } else {
tqTrace("s-task:%s set the dstTable uid from cache:%"PRId64, id, pTableData->uid); tqTrace("s-task:%s set the dstTable uid from cache:%"PRId64, id, pTableData->uid);
} }
@ -718,8 +706,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
if (pTableData->pCreateTbReq == NULL) { if (pTableData->pCreateTbReq == NULL) {
tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno));
taosMemoryFree(pTableSinkInfo); taosMemoryFree(pTableSinkInfo);
code = terrno; return terrno;
goto END;
} }
pTableSinkInfo->uid = 0; pTableSinkInfo->uid = 0;
@ -731,8 +718,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
taosMemoryFree(pTableSinkInfo); taosMemoryFree(pTableSinkInfo);
tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId, tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId,
dstTableName); dstTableName);
code = terrno; return terrno;
goto END;
} else { } else {
pTableData->uid = mr.me.uid; pTableData->uid = mr.me.uid;
pTableSinkInfo->uid = mr.me.uid; pTableSinkInfo->uid = mr.me.uid;
@ -743,11 +729,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
} }
} }
END: return TDB_CODE_SUCCESS;
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
taosMemoryFree(dstTableName);
}
return code;
} }
int32_t setDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, int32_t setDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,

View File

@ -18,6 +18,11 @@
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
typedef struct SBlockName {
uint32_t hashValue;
char parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName;
typedef struct { typedef struct {
int32_t upStreamTaskId; int32_t upStreamTaskId;
SEpSet upstreamNodeEpset; SEpSet upstreamNodeEpset;
@ -532,24 +537,30 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t)); void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t));
if (pVal) { if (pVal) {
hashValue = *(uint32_t*)pVal; SBlockName* pBln = (SBlockName*)pVal;
hashValue = pBln->hashValue;
if (!pDataBlock->info.parTbName[0]) {
memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName));
}
} else { } else {
char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
char parTbName[TSDB_TABLE_NAME_LEN] = {0};
if (pDataBlock->info.parTbName[0]) { if (pDataBlock->info.parTbName[0]) {
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && !isAutoTableName(pDataBlock->info.parTbName)){
strcpy(parTbName, pDataBlock->info.parTbName); buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId);
buildCtbNameAddGruopId(parTbName, groupId);
} }
} else { } else {
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, parTbName); buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
} }
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, parTbName); snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo; SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo;
hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
SBlockName bln = {0};
bln.hashValue = hashValue;
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) {
tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &hashValue, sizeof(uint32_t)); tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
} }
} }