diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index b2aff6fd7e..68d7914366 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -262,6 +262,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, c int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid); +void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2d2db5c1dc..0f93366ced 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -59,9 +59,10 @@ typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; typedef struct SStreamTaskSM SStreamTaskSM; -#define SSTREAM_TASK_VER 2 +#define SSTREAM_TASK_VER 3 #define SSTREAM_TASK_INCOMPATIBLE_VER 1 #define SSTREAM_TASK_NEED_CONVERT_VER 2 +#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 enum { STREAM_STATUS__NORMAL = 0, diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 054cff560f..27955d6d69 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2118,6 +2118,13 @@ _end: return TSDB_CODE_SUCCESS; } +void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId){ + char tmp[TSDB_TABLE_NAME_LEN] = {0}; + snprintf(tmp, TSDB_TABLE_NAME_LEN, "-%"PRIu64, groupId); + ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end + strcat(ctbName, tmp); +} + char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); if (!pBuf) { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 0ef29fcb3a..91149f8bfa 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -164,7 +164,7 @@ int32_t setDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id); SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, - SSDataBlock* pDataBlock, SArray* pTagArray); + SSDataBlock* pDataBlock, SArray* pTagArray, bool newSubTableRule); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 289986e01f..fceeca4af9 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -196,7 +196,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE,}; int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1; - tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray); + tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true); { uint64_t groupId = pDataBlock->info.id.groupId; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index c2e48d5d92..a1964026e9 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -41,7 +41,7 @@ static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const ch static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags); static SArray* createDefaultTagColName(); static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, - int64_t gid); + int64_t gid, bool newSubTableRule); int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { @@ -173,9 +173,15 @@ SArray* createDefaultTagColName() { } void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, - int64_t gid) { + int64_t gid, bool newSubTableRule) { if (pDataBlock->info.parTbName[0]) { - pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); + if(newSubTableRule) { + pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); + strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); + buildCtbNameAddGruopId(pCreateTableReq->name, gid); + }else{ + pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); + } } else { pCreateTableReq->name = buildCtbNameByGroupId(stbFullName, gid); } @@ -247,7 +253,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S ASSERT(gid == *(int64_t*)pGpIdData); } - setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid); + setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER); taosArrayPush(reqs.pArray, pCreateTbReq); tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name); @@ -411,7 +417,7 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam } SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, - SSDataBlock* pDataBlock, SArray* pTagArray) { + SSDataBlock* pDataBlock, SArray* pTagArray, bool newSubTableRule) { SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); if (pCreateTbReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -436,7 +442,7 @@ SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, in pCreateTbReq->ctb.tagName = createDefaultTagColName(); // set table name - setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId); + setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule); return pCreateTbReq; } @@ -649,6 +655,10 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (dstTableName[0] == 0) { memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); + }else{ + if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) { + buildCtbNameAddGruopId(dstTableName, groupId); + } } int32_t nameLen = strlen(dstTableName); @@ -690,7 +700,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; pTableData->pCreateTbReq = - buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray); + buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER); taosArrayDestroy(pTagArray); if (pTableData->pCreateTbReq == NULL) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 00a8940b6a..7252c258de 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -550,6 +550,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } if (pDataBlock->info.parTbName[0]) { + if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId); + } snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); } else { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a658c46c28..390be253df 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -172,10 +172,8 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { } if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) { ret = STREAM_STATA_NO_COMPATIBLE; - } else if (info.msgVer == SSTREAM_TASK_NEED_CONVERT_VER) { + } else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) { ret = STREAM_STATA_NEED_CONVERT; - } else if (info.msgVer == SSTREAM_TASK_VER) { - ret = STREAM_STATA_COMPATIBLE; } tDecoderClear(&decoder); break; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4d343a484d..c6078e196e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -164,7 +164,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; - if (pTask->ver != SSTREAM_TASK_VER) return -1; + if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1; if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; @@ -251,7 +251,7 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1; - // if (ver != SSTREAM_TASK_VER) return -1; + // if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1; if (tDecodeI64(pDecoder, &skip64) < 0) return -1; if (tDecodeI32(pDecoder, &skip32) < 0) return -1; @@ -279,7 +279,7 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) { int64_t ver; if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &ver) < 0) return -1; - if (ver != SSTREAM_TASK_VER) return -1; + if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1; if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;