From 8add4dc17774d155dbfec3f0dfdf74c1a9d1ec9d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 2 Jan 2024 18:37:29 +0800 Subject: [PATCH 01/11] fix:[TD-28032]concat subtable and hash val as new subtable name to avoid data from multi time line write to one time line --- include/common/tdatablock.h | 1 + include/libs/stream/tstream.h | 3 ++- source/common/src/tdatablock.c | 7 +++++++ source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/sma/smaTimeRange.c | 2 +- source/dnode/vnode/src/tq/tqSink.c | 24 ++++++++++++++++------- source/libs/stream/src/streamDispatch.c | 3 +++ source/libs/stream/src/streamMeta.c | 4 +--- source/libs/stream/src/streamTask.c | 6 +++--- 9 files changed, 36 insertions(+), 16 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index b2aff6fd7e..68d7914366 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -262,6 +262,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, c int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid); +void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2d2db5c1dc..0f93366ced 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -59,9 +59,10 @@ typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; typedef struct SStreamTaskSM SStreamTaskSM; -#define SSTREAM_TASK_VER 2 +#define SSTREAM_TASK_VER 3 #define SSTREAM_TASK_INCOMPATIBLE_VER 1 #define SSTREAM_TASK_NEED_CONVERT_VER 2 +#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 enum { STREAM_STATUS__NORMAL = 0, diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 054cff560f..27955d6d69 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2118,6 +2118,13 @@ _end: return TSDB_CODE_SUCCESS; } +void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId){ + char tmp[TSDB_TABLE_NAME_LEN] = {0}; + snprintf(tmp, TSDB_TABLE_NAME_LEN, "-%"PRIu64, groupId); + ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end + strcat(ctbName, tmp); +} + char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); if (!pBuf) { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 0ef29fcb3a..91149f8bfa 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -164,7 +164,7 @@ int32_t setDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id); SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, - SSDataBlock* pDataBlock, SArray* pTagArray); + SSDataBlock* pDataBlock, SArray* pTagArray, bool newSubTableRule); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 289986e01f..fceeca4af9 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -196,7 +196,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE,}; int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1; - tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray); + tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true); { uint64_t groupId = pDataBlock->info.id.groupId; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index c2e48d5d92..a1964026e9 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -41,7 +41,7 @@ static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const ch static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags); static SArray* createDefaultTagColName(); static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, - int64_t gid); + int64_t gid, bool newSubTableRule); int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { @@ -173,9 +173,15 @@ SArray* createDefaultTagColName() { } void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, - int64_t gid) { + int64_t gid, bool newSubTableRule) { if (pDataBlock->info.parTbName[0]) { - pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); + if(newSubTableRule) { + pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); + strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); + buildCtbNameAddGruopId(pCreateTableReq->name, gid); + }else{ + pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); + } } else { pCreateTableReq->name = buildCtbNameByGroupId(stbFullName, gid); } @@ -247,7 +253,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S ASSERT(gid == *(int64_t*)pGpIdData); } - setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid); + setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER); taosArrayPush(reqs.pArray, pCreateTbReq); tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name); @@ -411,7 +417,7 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam } SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, - SSDataBlock* pDataBlock, SArray* pTagArray) { + SSDataBlock* pDataBlock, SArray* pTagArray, bool newSubTableRule) { SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); if (pCreateTbReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -436,7 +442,7 @@ SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, in pCreateTbReq->ctb.tagName = createDefaultTagColName(); // set table name - setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId); + setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule); return pCreateTbReq; } @@ -649,6 +655,10 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (dstTableName[0] == 0) { memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); + }else{ + if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) { + buildCtbNameAddGruopId(dstTableName, groupId); + } } int32_t nameLen = strlen(dstTableName); @@ -690,7 +700,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; pTableData->pCreateTbReq = - buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray); + buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER); taosArrayDestroy(pTagArray); if (pTableData->pCreateTbReq == NULL) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 00a8940b6a..7252c258de 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -550,6 +550,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } if (pDataBlock->info.parTbName[0]) { + if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId); + } snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); } else { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a658c46c28..390be253df 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -172,10 +172,8 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { } if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) { ret = STREAM_STATA_NO_COMPATIBLE; - } else if (info.msgVer == SSTREAM_TASK_NEED_CONVERT_VER) { + } else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) { ret = STREAM_STATA_NEED_CONVERT; - } else if (info.msgVer == SSTREAM_TASK_VER) { - ret = STREAM_STATA_COMPATIBLE; } tDecoderClear(&decoder); break; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4d343a484d..c6078e196e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -164,7 +164,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; - if (pTask->ver != SSTREAM_TASK_VER) return -1; + if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1; if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; @@ -251,7 +251,7 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1; - // if (ver != SSTREAM_TASK_VER) return -1; + // if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1; if (tDecodeI64(pDecoder, &skip64) < 0) return -1; if (tDecodeI32(pDecoder, &skip32) < 0) return -1; @@ -279,7 +279,7 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) { int64_t ver; if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &ver) < 0) return -1; - if (ver != SSTREAM_TASK_VER) return -1; + if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1; if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1; From 9ba2f71487507de283e4b6cc6c4450bad7e7a68c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 3 Jan 2024 10:16:43 +0800 Subject: [PATCH 02/11] fix:[TD-28032]concat subtable and hash val as new subtable name to avoid data from multi time line write to one time line --- source/dnode/vnode/src/tq/tqSink.c | 33 ++++++++++++++++----- source/libs/stream/src/streamDispatch.c | 39 ++++++------------------- 2 files changed, 34 insertions(+), 38 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index a1964026e9..d721c7f659 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -627,13 +627,23 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, SSubmitTbData* pTableData) { uint64_t groupId = pDataBlock->info.id.groupId; - char* dstTableName = pDataBlock->info.parTbName; int32_t numOfRows = pDataBlock->info.rows; const char* id = pTask->id.idStr; int64_t suid = pTask->outputInfo.tbSink.stbUid; STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; int32_t vgId = TD_VID(pVnode); STableSinkInfo* pTableSinkInfo = NULL; + char* dstTableName = NULL; + int32_t code = 0; + if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); + if(dstTableName == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + strcpy(dstTableName, pDataBlock->info.parTbName); + }else{ + dstTableName = pDataBlock->info.parTbName; + } bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo); @@ -664,7 +674,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat int32_t nameLen = strlen(dstTableName); pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen); if (pTableSinkInfo == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; } pTableSinkInfo->name.len = nameLen; @@ -677,7 +688,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (pTableData->uid == 0) { tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); - return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid); + code = doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid); + goto END; } else { tqTrace("s-task:%s set the dstTable uid from cache:%"PRId64, id, pTableData->uid); } @@ -705,8 +717,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (pTableData->pCreateTbReq == NULL) { tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); - taosMemoryFree(pTableSinkInfo); - return terrno; + code = terrno; + goto END; } pTableSinkInfo->uid = 0; @@ -715,10 +727,10 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid); if (!isValid) { metaReaderClear(&mr); - taosMemoryFree(pTableSinkInfo); tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId, dstTableName); - return terrno; + code = terrno; + goto END; } else { pTableData->uid = mr.me.uid; pTableSinkInfo->uid = mr.me.uid; @@ -729,7 +741,12 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat } } - return TSDB_CODE_SUCCESS; +END: + if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + taosMemoryFree(dstTableName); + } + taosMemoryFree(pTableSinkInfo); + return code; } int32_t setDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 7252c258de..a9b83a148e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -18,11 +18,6 @@ #include "trpc.h" #include "ttimer.h" -typedef struct SBlockName { - uint32_t hashValue; - char parTbName[TSDB_TABLE_NAME_LEN]; -} SBlockName; - typedef struct { int32_t upStreamTaskId; SEpSet upstreamNodeEpset; @@ -537,40 +532,24 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t)); if (pVal) { - SBlockName* pBln = (SBlockName*)pVal; - hashValue = pBln->hashValue; - if (!pDataBlock->info.parTbName[0]) { - memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); - memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName)); - } + hashValue = *(uint32_t*)pVal; } else { - char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); - if (ctbName == NULL) { - return -1; - } - + char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; + char parTbName[TSDB_TABLE_NAME_LEN] = {0}; if (pDataBlock->info.parTbName[0]) { if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ - buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId); + strcpy(parTbName, pDataBlock->info.parTbName); + buildCtbNameAddGruopId(parTbName, groupId); } - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, - pDataBlock->info.parTbName); } else { - buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, - pDataBlock->info.parTbName); + buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, parTbName); } - + snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, parTbName); /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo; - hashValue = - taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); - taosMemoryFree(ctbName); - SBlockName bln = {0}; - bln.hashValue = hashValue; - memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); + hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { - tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName)); + tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &hashValue, sizeof(uint32_t)); } } From ab262550d10a6913fa06701b8a9141de63ff9430 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 3 Jan 2024 10:37:05 +0800 Subject: [PATCH 03/11] fix:[TD-28032]concat subtable and hash val as new subtable name to avoid data from multi time line write to one time line --- source/dnode/vnode/src/tq/tqSink.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index d721c7f659..4b7bca3c77 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -717,6 +717,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (pTableData->pCreateTbReq == NULL) { tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); + taosMemoryFree(pTableSinkInfo); code = terrno; goto END; } @@ -727,6 +728,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid); if (!isValid) { metaReaderClear(&mr); + taosMemoryFree(pTableSinkInfo); tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId, dstTableName); code = terrno; @@ -745,7 +747,6 @@ END: if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ taosMemoryFree(dstTableName); } - taosMemoryFree(pTableSinkInfo); return code; } From 317f0c6a39d3dc89de344e62508de2b97ad3b063 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 3 Jan 2024 18:48:54 +0800 Subject: [PATCH 04/11] fix:[TD-28032]concat subtable and hash val as new subtable name to avoid data from multi time line write to one time line --- include/common/tdatablock.h | 1 + source/common/src/tdatablock.c | 4 +++ source/dnode/vnode/src/tq/tqSink.c | 34 ++++++------------------- source/libs/stream/src/streamDispatch.c | 27 ++++++++++++++------ 4 files changed, 32 insertions(+), 34 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 68d7914366..07b9064cd9 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -262,6 +262,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, c int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid); +bool isAutoTableName(char* ctbName); void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 27955d6d69..de9d47a931 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2125,6 +2125,10 @@ void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId){ strcat(ctbName, tmp); } +bool isAutoTableName(char* ctbName){ + return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); +} + char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); if (!pBuf) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 4b7bca3c77..3536beaf49 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -175,7 +175,7 @@ SArray* createDefaultTagColName() { void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, int64_t gid, bool newSubTableRule) { if (pDataBlock->info.parTbName[0]) { - if(newSubTableRule) { + if(newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName)) { pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); buildCtbNameAddGruopId(pCreateTableReq->name, gid); @@ -627,23 +627,13 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, SSubmitTbData* pTableData) { uint64_t groupId = pDataBlock->info.id.groupId; + char* dstTableName = pDataBlock->info.parTbName; int32_t numOfRows = pDataBlock->info.rows; const char* id = pTask->id.idStr; int64_t suid = pTask->outputInfo.tbSink.stbUid; STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; int32_t vgId = TD_VID(pVnode); STableSinkInfo* pTableSinkInfo = NULL; - char* dstTableName = NULL; - int32_t code = 0; - if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ - dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); - if(dstTableName == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - strcpy(dstTableName, pDataBlock->info.parTbName); - }else{ - dstTableName = pDataBlock->info.parTbName; - } bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo); @@ -666,7 +656,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); }else{ - if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) { + if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && !isAutoTableName(dstTableName)) { buildCtbNameAddGruopId(dstTableName, groupId); } } @@ -674,8 +664,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat int32_t nameLen = strlen(dstTableName); pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen); if (pTableSinkInfo == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto END; + return TSDB_CODE_OUT_OF_MEMORY; } pTableSinkInfo->name.len = nameLen; @@ -688,8 +677,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (pTableData->uid == 0) { tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); - code = doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid); - goto END; + return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid); } else { tqTrace("s-task:%s set the dstTable uid from cache:%"PRId64, id, pTableData->uid); } @@ -718,8 +706,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (pTableData->pCreateTbReq == NULL) { tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); taosMemoryFree(pTableSinkInfo); - code = terrno; - goto END; + return terrno; } pTableSinkInfo->uid = 0; @@ -731,8 +718,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat taosMemoryFree(pTableSinkInfo); tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId, dstTableName); - code = terrno; - goto END; + return terrno; } else { pTableData->uid = mr.me.uid; pTableSinkInfo->uid = mr.me.uid; @@ -743,11 +729,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat } } -END: - if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ - taosMemoryFree(dstTableName); - } - return code; + return TDB_CODE_SUCCESS; } int32_t setDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a9b83a148e..4e96768ad6 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -18,6 +18,11 @@ #include "trpc.h" #include "ttimer.h" +typedef struct SBlockName { + uint32_t hashValue; + char parTbName[TSDB_TABLE_NAME_LEN]; +} SBlockName; + typedef struct { int32_t upStreamTaskId; SEpSet upstreamNodeEpset; @@ -532,24 +537,30 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t)); if (pVal) { - hashValue = *(uint32_t*)pVal; + SBlockName* pBln = (SBlockName*)pVal; + hashValue = pBln->hashValue; + if (!pDataBlock->info.parTbName[0]) { + memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); + memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName)); + } } else { char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; - char parTbName[TSDB_TABLE_NAME_LEN] = {0}; if (pDataBlock->info.parTbName[0]) { - if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ - strcpy(parTbName, pDataBlock->info.parTbName); - buildCtbNameAddGruopId(parTbName, groupId); + if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && !isAutoTableName(pDataBlock->info.parTbName)){ + buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId); } } else { - buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, parTbName); + buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); } - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, parTbName); + snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo; hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); + SBlockName bln = {0}; + bln.hashValue = hashValue; + memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { - tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &hashValue, sizeof(uint32_t)); + tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName)); } } From b200e6807f739376d396aa4edfecda3896efcaca Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 3 Jan 2024 19:44:30 +0800 Subject: [PATCH 05/11] fix:[TD-28032]concat subtable and hash val as new subtable name to avoid data from multi time line write to one time line --- include/common/tdatablock.h | 1 + source/common/src/tdatablock.c | 14 +++++++++++++- source/dnode/vnode/src/tq/tqSink.c | 8 ++++++-- source/libs/stream/src/streamDispatch.c | 4 +++- 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 07b9064cd9..bcb40f4175 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -262,6 +262,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, c int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid); +bool alreadyAddGroupId(char* ctbName); bool isAutoTableName(char* ctbName); void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index de9d47a931..31f7bce0ea 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2120,7 +2120,7 @@ _end: void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId){ char tmp[TSDB_TABLE_NAME_LEN] = {0}; - snprintf(tmp, TSDB_TABLE_NAME_LEN, "-%"PRIu64, groupId); + snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId); ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end strcat(ctbName, tmp); } @@ -2129,6 +2129,18 @@ bool isAutoTableName(char* ctbName){ return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); } +bool alreadyAddGroupId(char* ctbName){ + size_t len = strlen(ctbName); + size_t _location = len - 1; + for(; _location >= 0; _location--){ + if(ctbName[_location] < '0' && ctbName[_location] > '9'){ + break; + } + } + + return ctbName[_location] == '_' && _location < len -1; +} + char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); if (!pBuf) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 3536beaf49..5093d868c1 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -175,7 +175,9 @@ SArray* createDefaultTagColName() { void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, int64_t gid, bool newSubTableRule) { if (pDataBlock->info.parTbName[0]) { - if(newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName)) { + if(newSubTableRule && + !isAutoTableName(pDataBlock->info.parTbName) && + !alreadyAddGroupId(pDataBlock->info.parTbName)) { pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); buildCtbNameAddGruopId(pCreateTableReq->name, gid); @@ -656,7 +658,9 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); }else{ - if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && !isAutoTableName(dstTableName)) { + if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && + !isAutoTableName(dstTableName) && + !alreadyAddGroupId(dstTableName)) { buildCtbNameAddGruopId(dstTableName, groupId); } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 4e96768ad6..5212d4ae0d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -546,7 +546,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } else { char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; if (pDataBlock->info.parTbName[0]) { - if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && !isAutoTableName(pDataBlock->info.parTbName)){ + if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && + !isAutoTableName(pDataBlock->info.parTbName) && + !alreadyAddGroupId(pDataBlock->info.parTbName)){ buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId); } } else { From df70a6ef575a440568ec7b7e7681637250381bef Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 3 Jan 2024 21:11:26 +0800 Subject: [PATCH 06/11] fix:[TD-28032]concat subtable and hash val as new subtable name to avoid data from multi time line write to one time line --- source/common/src/tdatablock.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 31f7bce0ea..ad69a3ad63 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2132,10 +2132,11 @@ bool isAutoTableName(char* ctbName){ bool alreadyAddGroupId(char* ctbName){ size_t len = strlen(ctbName); size_t _location = len - 1; - for(; _location >= 0; _location--){ - if(ctbName[_location] < '0' && ctbName[_location] > '9'){ + while(_location > 0){ + if(ctbName[_location] < '0' || ctbName[_location] > '9'){ break; } + _location--; } return ctbName[_location] == '_' && _location < len -1; From 02c7bdd4f0dacb6e171249be52fc8c9c19c85e82 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 10 Jan 2024 14:57:16 +0800 Subject: [PATCH 07/11] fix:[TD-28032]fsdfasd --- source/common/src/tdatablock.c | 2 +- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/dnode/vnode/src/sma/smaTimeRange.c | 2 +- source/dnode/vnode/src/tq/tqSink.c | 13 ++++++++++--- 5 files changed, 14 insertions(+), 7 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index ad69a3ad63..f0d1630480 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2139,7 +2139,7 @@ bool alreadyAddGroupId(char* ctbName){ _location--; } - return ctbName[_location] == '_' && _location < len -1; + return ctbName[_location] == '_' && len - 1 - _location > 15; //15 means the min length of groupid } char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 91149f8bfa..cded4ddd7c 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -145,7 +145,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore); // tqSink int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, - const char* pIdStr); + const char* pIdStr, bool newSubTableRule); void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data); // tqOffset diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 4963a0aa3b..70bd803ac0 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -703,7 +703,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } - code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, ""); + code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, "", true); TSDB_CHECK_CODE(code, lino, _exit); code = tdRSmaProcessDelReq(pSma, suid, pItem->level, &deleteReq); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index fceeca4af9..f537ede8c1 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -188,7 +188,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * if (pDataBlock->info.type == STREAM_DELETE_RESULT) { pDeleteReq->suid = suid; pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, ""); + code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, "", true); TSDB_CHECK_CODE(code, lino, _exit); continue; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 5093d868c1..f14ed26b9a 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -44,7 +44,7 @@ static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSData int64_t gid, bool newSubTableRule); int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, - const char* pIdStr) { + const char* pIdStr, bool newSubTableRule) { int32_t totalRows = pDataBlock->info.rows; SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -68,6 +68,11 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p if (varTbName != NULL && varTbName != (void*)-1) { name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); + if(newSubTableRule && + !isAutoTableName(name) && + !alreadyAddGroupId(name)) { + buildCtbNameAddGruopId(name, groupId); + } } else if (stbFullName) { name = buildCtbNameByGroupId(stbFullName, groupId); } else { @@ -365,7 +370,8 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* int64_t suid) { SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; - int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr); + int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, + pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -645,6 +651,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId, dstTableName); } else { + tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1); if (pTableSinkInfo->uid != 0) { tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId, dstTableName, pTableSinkInfo->uid); @@ -666,7 +673,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat } int32_t nameLen = strlen(dstTableName); - pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen); + pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1); if (pTableSinkInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } From 83121055d98fbc0b724e0bdb0c25d5d24fc4f53b Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Thu, 11 Jan 2024 18:59:42 +0800 Subject: [PATCH 08/11] test: update stream for subtable --- tests/pytest/util/common.py | 17 +++++++++++ .../system-test/8-stream/at_once_interval.py | 28 ++++++++++++------- .../8-stream/at_once_interval_ext.py | 12 +++++--- tests/system-test/8-stream/at_once_session.py | 24 ++++++++++------ .../8-stream/at_once_state_window.py | 25 +++++++++++------ .../system-test/8-stream/pause_resume_test.py | 24 ++++++++++------ .../8-stream/window_close_interval.py | 18 ++++++++---- 7 files changed, 103 insertions(+), 45 deletions(-) diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index 9c45c09715..df09314bd9 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -1830,6 +1830,23 @@ class TDCom: if i == 1: self.record_history_ts = ts_value + def get_subtable(self, tbname_pre): + tdSql.query(f'show tables') + tbname_list = list(map(lambda x:x[0], tdSql.queryResult)) + for tbname in tbname_list: + if tbname_pre in tbname: + return tbname + + def get_subtable_wait(self, tbname_pre): + tbname = self.get_subtable(tbname_pre) + latency = 0 + while tbname is None: + tbname = self.get_subtable(tbname_pre) + if latency < self.stream_timeout: + latency += 1 + time.sleep(1) + return tbname + def is_json(msg): if isinstance(msg, str): try: diff --git a/tests/system-test/8-stream/at_once_interval.py b/tests/system-test/8-stream/at_once_interval.py index 8f5438be37..763b88bc69 100644 --- a/tests/system-test/8-stream/at_once_interval.py +++ b/tests/system-test/8-stream/at_once_interval.py @@ -106,14 +106,18 @@ class TDTestCase: ptn_counter = 0 for c1_value in tdSql.queryResult: if partition == "c1": - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition is None: - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "abs(c1)": abs_c1_value = abs(c1_value[1]) - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "tbname" and ptn_counter == 0: - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) @@ -121,14 +125,18 @@ class TDTestCase: ptn_counter = 0 for c1_value in tdSql.queryResult: if partition == "c1": - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition is None: - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "abs(c1)": abs_c1_value = abs(c1_value[1]) - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "tbname" and ptn_counter == 0: - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) @@ -208,8 +216,8 @@ class TDTestCase: self.at_once_interval(interval=random.randint(10, 15), partition=None, delete=True) self.at_once_interval(interval=random.randint(10, 15), partition=self.tdCom.stream_case_when_tbname, case_when=f'case when {self.tdCom.stream_case_when_tbname} = tbname then {self.tdCom.partition_tbname_alias} else tbname end') self.at_once_interval(interval=random.randint(10, 15), partition="tbname", fill_history_value=1, fill_value="NULL") - # for fill_value in ["NULL", "PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]: - for fill_value in ["PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]: + for fill_value in ["NULL", "PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]: + # for fill_value in ["PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]: self.at_once_interval(interval=random.randint(10, 15), partition="tbname", fill_value=fill_value) self.at_once_interval(interval=random.randint(10, 15), partition="tbname", fill_value=fill_value, delete=True) diff --git a/tests/system-test/8-stream/at_once_interval_ext.py b/tests/system-test/8-stream/at_once_interval_ext.py index e1dc057448..cbb4d7ca5a 100644 --- a/tests/system-test/8-stream/at_once_interval_ext.py +++ b/tests/system-test/8-stream/at_once_interval_ext.py @@ -153,12 +153,15 @@ class TDTestCase: ptn_counter = 0 for c1_value in tdSql.queryResult: if partition == "c1": - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "abs(c1)": abs_c1_value = abs(c1_value[1]) - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "tbname" and ptn_counter == 0: - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 else: tdSql.query(f'select cast(cast(cast({c1_value[1]} as int unsigned) as bigint) as varchar(100))') @@ -166,7 +169,8 @@ class TDTestCase: if subtable == "constant": return else: - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{subtable_value}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{subtable_value}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) def run(self): diff --git a/tests/system-test/8-stream/at_once_session.py b/tests/system-test/8-stream/at_once_session.py index 9a253a187f..7a51ad245f 100644 --- a/tests/system-test/8-stream/at_once_session.py +++ b/tests/system-test/8-stream/at_once_session.py @@ -167,15 +167,19 @@ class TDTestCase: for c1_value in tdSql.queryResult: if c1_value[1] is not None: if partition == "c1": - tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "abs(c1)": if subtable: abs_c1_value = abs(c1_value[1]) - tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') else: - tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "tbname" and ptn_counter == 0: - tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) @@ -184,15 +188,19 @@ class TDTestCase: for c1_value in tdSql.queryResult: if c1_value[1] is not None: if partition == "c1": - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "abs(c1)": if subtable: abs_c1_value = abs(c1_value[1]) - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') else: - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "tbname" and ptn_counter == 0: - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) diff --git a/tests/system-test/8-stream/at_once_state_window.py b/tests/system-test/8-stream/at_once_state_window.py index fa9f4ddd78..ea54a51737 100644 --- a/tests/system-test/8-stream/at_once_state_window.py +++ b/tests/system-test/8-stream/at_once_state_window.py @@ -91,15 +91,19 @@ class TDTestCase: for c1_value in tdSql.queryResult: if partition == "c1": if subtable: - tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') else: - tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') return elif partition == "abs(c1)": abs_c1_value = abs(c1_value[1]) - tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "tbname" and ptn_counter == 0: - tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) @@ -108,17 +112,20 @@ class TDTestCase: for c1_value in tdSql.queryResult: if partition == "c1": if subtable: - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') else: - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') return elif partition == "abs(c1)": abs_c1_value = abs(c1_value[1]) - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "tbname" and ptn_counter == 0: - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 - tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) diff --git a/tests/system-test/8-stream/pause_resume_test.py b/tests/system-test/8-stream/pause_resume_test.py index 484383f1ce..8b71466f01 100644 --- a/tests/system-test/8-stream/pause_resume_test.py +++ b/tests/system-test/8-stream/pause_resume_test.py @@ -117,14 +117,18 @@ class TDTestCase: ptn_counter = 0 for c1_value in tdSql.queryResult: if partition == "c1": - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition is None: - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "abs(c1)": abs_c1_value = abs(c1_value[1]) - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "tbname" and ptn_counter == 0: - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) @@ -132,14 +136,18 @@ class TDTestCase: ptn_counter = 0 for c1_value in tdSql.queryResult: if partition == "c1": - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition is None: - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "abs(c1)": abs_c1_value = abs(c1_value[1]) - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "tbname" and ptn_counter == 0: - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) diff --git a/tests/system-test/8-stream/window_close_interval.py b/tests/system-test/8-stream/window_close_interval.py index 31a566a0f8..1ac7a61d2f 100644 --- a/tests/system-test/8-stream/window_close_interval.py +++ b/tests/system-test/8-stream/window_close_interval.py @@ -146,12 +146,15 @@ class TDTestCase: ptn_counter = 0 for c1_value in tdSql.queryResult: if partition == "c1": - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}`;', count_expected_res=self.tdCom.range_count) + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`', count_expected_res=self.tdCom.range_count) elif partition == "abs(c1)": abs_c1_value = abs(c1_value[1]) - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;', count_expected_res=self.tdCom.range_count) + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`', count_expected_res=self.tdCom.range_count) elif partition == "tbname" and ptn_counter == 0: - tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}`;', count_expected_res=self.tdCom.range_count) + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`', count_expected_res=self.tdCom.range_count) ptn_counter += 1 tdSql.checkEqual(tdSql.queryResult[0][0] , self.tdCom.range_count) @@ -161,12 +164,15 @@ class TDTestCase: ptn_counter = 0 for c1_value in tdSql.queryResult: if partition == "c1": - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "abs(c1)": abs_c1_value = abs(c1_value[1]) - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') elif partition == "tbname" and ptn_counter == 0: - tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}`;') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}') + tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) From 00e0da15a347ab1e5efa0b8d91596bff6380d903 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 12 Jan 2024 17:37:22 +0800 Subject: [PATCH 09/11] fix:[TD-28032]groupid = 0 & modify test case in sim --- source/dnode/vnode/src/tq/tqSink.c | 9 ++++-- source/libs/stream/src/streamDispatch.c | 3 +- tests/script/tsim/stream/udTableAndTag0.sim | 36 --------------------- tests/script/tsim/stream/udTableAndTag1.sim | 35 -------------------- tests/script/tsim/stream/udTableAndTag2.sim | 5 --- 5 files changed, 8 insertions(+), 80 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index f14ed26b9a..7fcb86d84a 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -70,7 +70,8 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); if(newSubTableRule && !isAutoTableName(name) && - !alreadyAddGroupId(name)) { + !alreadyAddGroupId(name) && + groupId != 0) { buildCtbNameAddGruopId(name, groupId); } } else if (stbFullName) { @@ -182,7 +183,8 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa if (pDataBlock->info.parTbName[0]) { if(newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) && - !alreadyAddGroupId(pDataBlock->info.parTbName)) { + !alreadyAddGroupId(pDataBlock->info.parTbName) && + gid != 0) { pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); buildCtbNameAddGruopId(pCreateTableReq->name, gid); @@ -667,7 +669,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat }else{ if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && !isAutoTableName(dstTableName) && - !alreadyAddGroupId(dstTableName)) { + !alreadyAddGroupId(dstTableName) && + groupId != 0) { buildCtbNameAddGruopId(dstTableName, groupId); } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5212d4ae0d..2425c465a6 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -548,7 +548,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S if (pDataBlock->info.parTbName[0]) { if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && !isAutoTableName(pDataBlock->info.parTbName) && - !alreadyAddGroupId(pDataBlock->info.parTbName)){ + !alreadyAddGroupId(pDataBlock->info.parTbName) && + groupId != 0){ buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId); } } else { diff --git a/tests/script/tsim/stream/udTableAndTag0.sim b/tests/script/tsim/stream/udTableAndTag0.sim index c81927abcb..e8b51a3846 100644 --- a/tests/script/tsim/stream/udTableAndTag0.sim +++ b/tests/script/tsim/stream/udTableAndTag0.sim @@ -48,16 +48,6 @@ if $rows != 2 then goto loop0 endi -if $data00 != aaa-t1 then - print =====data00=$data00 - goto loop0 -endi - -if $data10 != aaa-t2 then - print =====data10=$data10 - goto loop0 -endi - $loop_count = 0 loop1: @@ -264,17 +254,6 @@ if $rows != 2 then goto loop6 endi -if $data00 != tbn-t1 then - print =====data00=$data00 - goto loop6 -endi - -if $data10 != tbn-t2 then - print =====data10=$data10 - goto loop6 -endi - - print ===== step5 print ===== tag name + table name @@ -312,21 +291,6 @@ if $rows != 3 then goto loop7 endi -if $data00 != tbn-t1 then - print =====data00=$data00 - goto loop7 -endi - -if $data10 != tbn-t2 then - print =====data10=$data10 - goto loop7 -endi - -if $data20 != tbn-t3 then - print =====data20=$data20 - goto loop7 -endi - $loop_count = 0 loop8: diff --git a/tests/script/tsim/stream/udTableAndTag1.sim b/tests/script/tsim/stream/udTableAndTag1.sim index e9dfbaabcf..e6427aab10 100644 --- a/tests/script/tsim/stream/udTableAndTag1.sim +++ b/tests/script/tsim/stream/udTableAndTag1.sim @@ -47,16 +47,6 @@ if $rows != 2 then goto loop0 endi -if $data00 != aaa-1 then - print =====data00=$data00 - goto loop0 -endi - -if $data10 != aaa-2 then - print =====data10=$data10 - goto loop0 -endi - $loop_count = 0 loop1: @@ -264,16 +254,6 @@ if $rows != 2 then goto loop6 endi -if $data00 != tbn-1 then - print =====data00=$data00 - goto loop6 -endi - -if $data10 != tbn-2 then - print =====data10=$data10 - goto loop6 -endi - print ===== step5 print ===== tag name + table name @@ -311,21 +291,6 @@ if $rows != 3 then goto loop7 endi -if $data00 != tbn-t1 then - print =====data00=$data00 - goto loop7 -endi - -if $data10 != tbn-t2 then - print =====data10=$data10 - goto loop7 -endi - -if $data20 != tbn-t3 then - print =====data20=$data20 - goto loop7 -endi - $loop_count = 0 loop8: diff --git a/tests/script/tsim/stream/udTableAndTag2.sim b/tests/script/tsim/stream/udTableAndTag2.sim index 973c55b9ef..8977dc219b 100644 --- a/tests/script/tsim/stream/udTableAndTag2.sim +++ b/tests/script/tsim/stream/udTableAndTag2.sim @@ -415,11 +415,6 @@ if $data00 != aaa then goto loop9 endi -if $data10 != aaa-1 then - print =====data00=$data00 - goto loop9 -endi - $loop_count = 0 loop10: From 8ab763fb9f48a12939c3a685690d8252e202cff5 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Mon, 15 Jan 2024 10:26:21 +0800 Subject: [PATCH 10/11] test: update logic with subtable=None in at_once_session.py --- tests/system-test/8-stream/at_once_session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/8-stream/at_once_session.py b/tests/system-test/8-stream/at_once_session.py index 7a51ad245f..9c53d81fc5 100644 --- a/tests/system-test/8-stream/at_once_session.py +++ b/tests/system-test/8-stream/at_once_session.py @@ -203,7 +203,7 @@ class TDTestCase: tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 - tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) + tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) if subtable is not None else tdSql.checkEqual(tdSql.queryResult[0][0] >= 0, True) From da648eb1150bfea3685fa52a7242bb7b2485d9e2 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Tue, 16 Jan 2024 15:15:56 +0800 Subject: [PATCH 11/11] test: update --- tests/system-test/8-stream/at_once_session.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/8-stream/at_once_session.py b/tests/system-test/8-stream/at_once_session.py index 0fbcd52fe9..6f25e5ad97 100644 --- a/tests/system-test/8-stream/at_once_session.py +++ b/tests/system-test/8-stream/at_once_session.py @@ -185,7 +185,7 @@ class TDTestCase: tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}') tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 - tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) + tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) if subtable is not None else tdSql.checkEqual(tdSql.queryResult[0][0] >= 0, True) tdSql.query(f'select * from {self.tb_name}') ptn_counter = 0 @@ -232,4 +232,4 @@ class TDTestCase: event = threading.Event() tdCases.addLinux(__file__, TDTestCase()) -tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file +tdCases.addWindows(__file__, TDTestCase())