diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 292e9a3181..fa6e4c3ae8 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -267,7 +267,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData bool alreadyAddGroupId(char* ctbName); bool isAutoTableName(char* ctbName); -void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId); +void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 9a39812bc9..9f55b67ea3 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2118,28 +2118,28 @@ _end: return TSDB_CODE_SUCCESS; } -void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId){ +void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId){ char tmp[TSDB_TABLE_NAME_LEN] = {0}; snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId); ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end strcat(ctbName, tmp); } -bool isAutoTableName(char* ctbName){ - return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); -} +// auto stream subtable name starts with 't_', followed by the first segment of MD5 digest for group vals. +// the total length is fixed to be 34 bytes. +bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); } -bool alreadyAddGroupId(char* ctbName){ +bool alreadyAddGroupId(char* ctbName) { size_t len = strlen(ctbName); size_t _location = len - 1; - while(_location > 0){ - if(ctbName[_location] < '0' || ctbName[_location] > '9'){ + while (_location > 0) { + if (ctbName[_location] < '0' || ctbName[_location] > '9') { break; } _location--; } - return ctbName[_location] == '_' && len - 1 - _location > 15; //15 means the min length of groupid + return ctbName[_location] == '_' && len - 1 - _location >= 15; // 15 means the min length of groupid } char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 42bd1bc59c..9ff4e3ba62 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -72,7 +72,7 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) { - buildCtbNameAddGruopId(name, groupId); + buildCtbNameAddGroupId(name, groupId); } } else if (stbFullName) { name = buildCtbNameByGroupId(stbFullName, groupId); @@ -185,23 +185,26 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa !alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) { pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); - buildCtbNameAddGruopId(pCreateTableReq->name, gid); + buildCtbNameAddGroupId(pCreateTableReq->name, gid); +// tqDebug("gen name from:%s", pDataBlock->info.parTbName); } else { pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); +// tqDebug("copy name:%s", pDataBlock->info.parTbName); } } else { pCreateTableReq->name = buildCtbNameByGroupId(stbFullName, gid); +// tqDebug("gen name from stbFullName:%s gid:%"PRId64, stbFullName, gid); } } static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid) { - tqDebug("s-task:%s build create table msg", pTask->id.idStr); - STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; int32_t rows = pDataBlock->info.rows; SArray* tagArray = taosArrayInit(4, sizeof(STagVal)); - ; + + tqDebug("s-task:%s build create %d table(s) msg", pTask->id.idStr, rows); + int32_t code = 0; SVCreateTbBatchReq reqs = {0}; @@ -670,7 +673,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat } else { if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) { - buildCtbNameAddGruopId(dstTableName, groupId); + tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName); + buildCtbNameAddGroupId(dstTableName, groupId); } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9383383dc0..60c545b9e5 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -575,7 +575,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S !isAutoTableName(pDataBlock->info.parTbName) && !alreadyAddGroupId(pDataBlock->info.parTbName) && groupId != 0){ - buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId); + buildCtbNameAddGroupId(pDataBlock->info.parTbName, groupId); } } else { buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);