From 317f0c6a39d3dc89de344e62508de2b97ad3b063 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 3 Jan 2024 18:48:54 +0800 Subject: [PATCH] fix:[TD-28032]concat subtable and hash val as new subtable name to avoid data from multi time line write to one time line --- include/common/tdatablock.h | 1 + source/common/src/tdatablock.c | 4 +++ source/dnode/vnode/src/tq/tqSink.c | 34 ++++++------------------- source/libs/stream/src/streamDispatch.c | 27 ++++++++++++++------ 4 files changed, 32 insertions(+), 34 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 68d7914366..07b9064cd9 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -262,6 +262,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, c int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid); +bool isAutoTableName(char* ctbName); void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 27955d6d69..de9d47a931 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2125,6 +2125,10 @@ void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId){ strcat(ctbName, tmp); } +bool isAutoTableName(char* ctbName){ + return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); +} + char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); if (!pBuf) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 4b7bca3c77..3536beaf49 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -175,7 +175,7 @@ SArray* createDefaultTagColName() { void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, int64_t gid, bool newSubTableRule) { if (pDataBlock->info.parTbName[0]) { - if(newSubTableRule) { + if(newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName)) { pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); buildCtbNameAddGruopId(pCreateTableReq->name, gid); @@ -627,23 +627,13 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, SSubmitTbData* pTableData) { uint64_t groupId = pDataBlock->info.id.groupId; + char* dstTableName = pDataBlock->info.parTbName; int32_t numOfRows = pDataBlock->info.rows; const char* id = pTask->id.idStr; int64_t suid = pTask->outputInfo.tbSink.stbUid; STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; int32_t vgId = TD_VID(pVnode); STableSinkInfo* pTableSinkInfo = NULL; - char* dstTableName = NULL; - int32_t code = 0; - if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ - dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); - if(dstTableName == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - strcpy(dstTableName, pDataBlock->info.parTbName); - }else{ - dstTableName = pDataBlock->info.parTbName; - } bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo); @@ -666,7 +656,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); }else{ - if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) { + if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && !isAutoTableName(dstTableName)) { buildCtbNameAddGruopId(dstTableName, groupId); } } @@ -674,8 +664,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat int32_t nameLen = strlen(dstTableName); pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen); if (pTableSinkInfo == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto END; + return TSDB_CODE_OUT_OF_MEMORY; } pTableSinkInfo->name.len = nameLen; @@ -688,8 +677,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (pTableData->uid == 0) { tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); - code = doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid); - goto END; + return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid); } else { tqTrace("s-task:%s set the dstTable uid from cache:%"PRId64, id, pTableData->uid); } @@ -718,8 +706,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (pTableData->pCreateTbReq == NULL) { tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); taosMemoryFree(pTableSinkInfo); - code = terrno; - goto END; + return terrno; } pTableSinkInfo->uid = 0; @@ -731,8 +718,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat taosMemoryFree(pTableSinkInfo); tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId, dstTableName); - code = terrno; - goto END; + return terrno; } else { pTableData->uid = mr.me.uid; pTableSinkInfo->uid = mr.me.uid; @@ -743,11 +729,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat } } -END: - if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ - taosMemoryFree(dstTableName); - } - return code; + return TDB_CODE_SUCCESS; } int32_t setDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a9b83a148e..4e96768ad6 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -18,6 +18,11 @@ #include "trpc.h" #include "ttimer.h" +typedef struct SBlockName { + uint32_t hashValue; + char parTbName[TSDB_TABLE_NAME_LEN]; +} SBlockName; + typedef struct { int32_t upStreamTaskId; SEpSet upstreamNodeEpset; @@ -532,24 +537,30 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t)); if (pVal) { - hashValue = *(uint32_t*)pVal; + SBlockName* pBln = (SBlockName*)pVal; + hashValue = pBln->hashValue; + if (!pDataBlock->info.parTbName[0]) { + memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); + memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName)); + } } else { char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; - char parTbName[TSDB_TABLE_NAME_LEN] = {0}; if (pDataBlock->info.parTbName[0]) { - if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ - strcpy(parTbName, pDataBlock->info.parTbName); - buildCtbNameAddGruopId(parTbName, groupId); + if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && !isAutoTableName(pDataBlock->info.parTbName)){ + buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId); } } else { - buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, parTbName); + buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); } - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, parTbName); + snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo; hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); + SBlockName bln = {0}; + bln.hashValue = hashValue; + memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { - tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &hashValue, sizeof(uint32_t)); + tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName)); } }