From df2488431e40e6edd887b075a8e4bb51d8a496a4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 24 Feb 2024 00:40:26 +0800 Subject: [PATCH 1/5] fix(stream): add logs. --- source/common/src/tdatablock.c | 16 ++++++++-------- source/dnode/vnode/src/tq/tqSink.c | 13 +++++++------ source/libs/stream/src/streamDispatch.c | 2 +- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 9439c172c4..35c886dd90 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2118,28 +2118,28 @@ _end: return TSDB_CODE_SUCCESS; } -void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId){ +void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId){ char tmp[TSDB_TABLE_NAME_LEN] = {0}; snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId); ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end strcat(ctbName, tmp); } -bool isAutoTableName(char* ctbName){ - return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); -} +// auto stream subtable name starts with 't_', followed by the first segment of MD5 digest for group vals. +// the total length is fixed to be 34 bytes. +bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); } -bool alreadyAddGroupId(char* ctbName){ +bool alreadyAddGroupId(char* ctbName) { size_t len = strlen(ctbName); size_t _location = len - 1; - while(_location > 0){ - if(ctbName[_location] < '0' || ctbName[_location] > '9'){ + while (_location > 0) { + if (ctbName[_location] < '0' || ctbName[_location] > '9') { break; } _location--; } - return ctbName[_location] == '_' && len - 1 - _location > 15; //15 means the min length of groupid + return ctbName[_location] == '_' && len - 1 - _location > 15; // 15 means the min length of groupid } char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index b56bf3e0fe..70519a5cd2 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -72,7 +72,7 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) { - buildCtbNameAddGruopId(name, groupId); + buildCtbNameAddGroupId(name, groupId); } } else if (stbFullName) { name = buildCtbNameByGroupId(stbFullName, groupId); @@ -185,7 +185,7 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa !alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) { pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); - buildCtbNameAddGruopId(pCreateTableReq->name, gid); + buildCtbNameAddGroupId(pCreateTableReq->name, gid); } else { pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); } @@ -196,12 +196,12 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid) { - tqDebug("s-task:%s build create table msg", pTask->id.idStr); - STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; int32_t rows = pDataBlock->info.rows; SArray* tagArray = taosArrayInit(4, sizeof(STagVal)); - ; + + tqDebug("s-task:%s build create %d table(s) msg", pTask->id.idStr, rows); + int32_t code = 0; SVCreateTbBatchReq reqs = {0}; @@ -670,7 +670,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat } else { if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) { - buildCtbNameAddGruopId(dstTableName, groupId); + tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName); + buildCtbNameAddGroupId(dstTableName, groupId); } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9383383dc0..60c545b9e5 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -575,7 +575,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S !isAutoTableName(pDataBlock->info.parTbName) && !alreadyAddGroupId(pDataBlock->info.parTbName) && groupId != 0){ - buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId); + buildCtbNameAddGroupId(pDataBlock->info.parTbName, groupId); } } else { buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); From c4562b5d2620ec886f55188cec7c1f30bc8e6a7a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 24 Feb 2024 00:54:57 +0800 Subject: [PATCH 2/5] fix(stream): fix syntax error. --- include/common/tdatablock.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index bcb40f4175..bac3031a0b 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -264,7 +264,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData bool alreadyAddGroupId(char* ctbName); bool isAutoTableName(char* ctbName); -void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId); +void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); From bb31b0c12175ee3f5cb1e1f8adebd7cfe47f2f4f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 24 Feb 2024 02:08:49 +0800 Subject: [PATCH 3/5] fix(stream): add logs. --- source/dnode/vnode/src/tq/tqSink.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 800b84df61..c9df5cfce0 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -186,11 +186,14 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); buildCtbNameAddGroupId(pCreateTableReq->name, gid); + tqDebug("gen name from:%s", pDataBlock->info.parTbName); } else { pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); + tqDebug("copy name:%s", pDataBlock->info.parTbName); } } else { pCreateTableReq->name = buildCtbNameByGroupId(stbFullName, gid); + tqDebug("gen name from stbFullName:%s gid:%"PRId64, stbFullName, gid); } } From 3cd158f681c9952091832159e5d7b1c7f586a000 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 24 Feb 2024 02:19:40 +0800 Subject: [PATCH 4/5] fix(stream): fix the invalid check for subtable name. --- source/common/src/tdatablock.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 033791a572..9f55b67ea3 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2139,7 +2139,7 @@ bool alreadyAddGroupId(char* ctbName) { _location--; } - return ctbName[_location] == '_' && len - 1 - _location > 15; // 15 means the min length of groupid + return ctbName[_location] == '_' && len - 1 - _location >= 15; // 15 means the min length of groupid } char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { From 59ad27fcae94668887dc7448bd47eb5897acce5b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 24 Feb 2024 22:50:17 +0800 Subject: [PATCH 5/5] other(stream): comment some logs. --- source/dnode/vnode/src/tq/tqSink.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index c9df5cfce0..9ff4e3ba62 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -186,14 +186,14 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); buildCtbNameAddGroupId(pCreateTableReq->name, gid); - tqDebug("gen name from:%s", pDataBlock->info.parTbName); +// tqDebug("gen name from:%s", pDataBlock->info.parTbName); } else { pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); - tqDebug("copy name:%s", pDataBlock->info.parTbName); +// tqDebug("copy name:%s", pDataBlock->info.parTbName); } } else { pCreateTableReq->name = buildCtbNameByGroupId(stbFullName, gid); - tqDebug("gen name from stbFullName:%s gid:%"PRId64, stbFullName, gid); +// tqDebug("gen name from stbFullName:%s gid:%"PRId64, stbFullName, gid); } }