fix:[TD-28032]concat subtable and hash val as new subtable name to avoid data from multi time line write to one time line

This commit is contained in:
wangmm0220 2024-01-02 18:37:29 +08:00
parent 0e9a1b22ed
commit 8add4dc177
9 changed files with 36 additions and 16 deletions

View File

@ -262,6 +262,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, c
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId,
tb_uid_t suid); tb_uid_t suid);
void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);

View File

@ -59,9 +59,10 @@ typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue; typedef struct SStreamQueue SStreamQueue;
typedef struct SStreamTaskSM SStreamTaskSM; typedef struct SStreamTaskSM SStreamTaskSM;
#define SSTREAM_TASK_VER 2 #define SSTREAM_TASK_VER 3
#define SSTREAM_TASK_INCOMPATIBLE_VER 1 #define SSTREAM_TASK_INCOMPATIBLE_VER 1
#define SSTREAM_TASK_NEED_CONVERT_VER 2 #define SSTREAM_TASK_NEED_CONVERT_VER 2
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
enum { enum {
STREAM_STATUS__NORMAL = 0, STREAM_STATUS__NORMAL = 0,

View File

@ -2118,6 +2118,13 @@ _end:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId){
char tmp[TSDB_TABLE_NAME_LEN] = {0};
snprintf(tmp, TSDB_TABLE_NAME_LEN, "-%"PRIu64, groupId);
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end
strcat(ctbName, tmp);
}
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
if (!pBuf) { if (!pBuf) {

View File

@ -164,7 +164,7 @@ int32_t setDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id); int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id);
SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols,
SSDataBlock* pDataBlock, SArray* pTagArray); SSDataBlock* pDataBlock, SArray* pTagArray, bool newSubTableRule);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -196,7 +196,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE,}; SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE,};
int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1; int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray); tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true);
{ {
uint64_t groupId = pDataBlock->info.id.groupId; uint64_t groupId = pDataBlock->info.id.groupId;

View File

@ -41,7 +41,7 @@ static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const ch
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags); static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags);
static SArray* createDefaultTagColName(); static SArray* createDefaultTagColName();
static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid); int64_t gid, bool newSubTableRule);
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) { const char* pIdStr) {
@ -173,9 +173,15 @@ SArray* createDefaultTagColName() {
} }
void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid) { int64_t gid, bool newSubTableRule) {
if (pDataBlock->info.parTbName[0]) { if (pDataBlock->info.parTbName[0]) {
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); if(newSubTableRule) {
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
buildCtbNameAddGruopId(pCreateTableReq->name, gid);
}else{
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
}
} else { } else {
pCreateTableReq->name = buildCtbNameByGroupId(stbFullName, gid); pCreateTableReq->name = buildCtbNameByGroupId(stbFullName, gid);
} }
@ -247,7 +253,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
ASSERT(gid == *(int64_t*)pGpIdData); ASSERT(gid == *(int64_t*)pGpIdData);
} }
setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid); setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
taosArrayPush(reqs.pArray, pCreateTbReq); taosArrayPush(reqs.pArray, pCreateTbReq);
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name); tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
@ -411,7 +417,7 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam
} }
SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols,
SSDataBlock* pDataBlock, SArray* pTagArray) { SSDataBlock* pDataBlock, SArray* pTagArray, bool newSubTableRule) {
SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (pCreateTbReq == NULL) { if (pCreateTbReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -436,7 +442,7 @@ SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, in
pCreateTbReq->ctb.tagName = createDefaultTagColName(); pCreateTbReq->ctb.tagName = createDefaultTagColName();
// set table name // set table name
setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId); setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule);
return pCreateTbReq; return pCreateTbReq;
} }
@ -649,6 +655,10 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
if (dstTableName[0] == 0) { if (dstTableName[0] == 0) {
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
}else{
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
buildCtbNameAddGruopId(dstTableName, groupId);
}
} }
int32_t nameLen = strlen(dstTableName); int32_t nameLen = strlen(dstTableName);
@ -690,7 +700,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
pTableData->pCreateTbReq = pTableData->pCreateTbReq =
buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray); buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
taosArrayDestroy(pTagArray); taosArrayDestroy(pTagArray);
if (pTableData->pCreateTbReq == NULL) { if (pTableData->pCreateTbReq == NULL) {

View File

@ -550,6 +550,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
} }
if (pDataBlock->info.parTbName[0]) { if (pDataBlock->info.parTbName[0]) {
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId);
}
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
pDataBlock->info.parTbName); pDataBlock->info.parTbName);
} else { } else {

View File

@ -172,10 +172,8 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
} }
if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) { if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) {
ret = STREAM_STATA_NO_COMPATIBLE; ret = STREAM_STATA_NO_COMPATIBLE;
} else if (info.msgVer == SSTREAM_TASK_NEED_CONVERT_VER) { } else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) {
ret = STREAM_STATA_NEED_CONVERT; ret = STREAM_STATA_NEED_CONVERT;
} else if (info.msgVer == SSTREAM_TASK_VER) {
ret = STREAM_STATA_COMPATIBLE;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
break; break;

View File

@ -164,7 +164,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
if (pTask->ver != SSTREAM_TASK_VER) return -1; if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
@ -251,7 +251,7 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo)
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1; if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1;
// if (ver != SSTREAM_TASK_VER) return -1; // if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
if (tDecodeI64(pDecoder, &skip64) < 0) return -1; if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1; if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
@ -279,7 +279,7 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
int64_t ver; int64_t ver;
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &ver) < 0) return -1; if (tDecodeI64(pDecoder, &ver) < 0) return -1;
if (ver != SSTREAM_TASK_VER) return -1; if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;