fix:[TD-28032]fsdfasd

This commit is contained in:
wangmm0220 2024-01-10 14:57:16 +08:00
parent df70a6ef57
commit 02c7bdd4f0
5 changed files with 14 additions and 7 deletions

View File

@ -2139,7 +2139,7 @@ bool alreadyAddGroupId(char* ctbName){
_location--; _location--;
} }
return ctbName[_location] == '_' && _location < len -1; return ctbName[_location] == '_' && len - 1 - _location > 15; //15 means the min length of groupid
} }
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {

View File

@ -145,7 +145,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
// tqSink // tqSink
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr); const char* pIdStr, bool newSubTableRule);
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data); void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data);
// tqOffset // tqOffset

View File

@ -703,7 +703,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
code = terrno; code = terrno;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, ""); code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, "", true);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaProcessDelReq(pSma, suid, pItem->level, &deleteReq); code = tdRSmaProcessDelReq(pSma, suid, pItem->level, &deleteReq);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);

View File

@ -188,7 +188,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid; pDeleteReq->suid = suid;
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, ""); code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, "", true);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
continue; continue;
} }

View File

@ -44,7 +44,7 @@ static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSData
int64_t gid, bool newSubTableRule); int64_t gid, bool newSubTableRule);
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) { const char* pIdStr, bool newSubTableRule) {
int32_t totalRows = pDataBlock->info.rows; int32_t totalRows = pDataBlock->info.rows;
SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
@ -68,6 +68,11 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
if (varTbName != NULL && varTbName != (void*)-1) { if (varTbName != NULL && varTbName != (void*)-1) {
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
if(newSubTableRule &&
!isAutoTableName(name) &&
!alreadyAddGroupId(name)) {
buildCtbNameAddGruopId(name, groupId);
}
} else if (stbFullName) { } else if (stbFullName) {
name = buildCtbNameByGroupId(stbFullName, groupId); name = buildCtbNameByGroupId(stbFullName, groupId);
} else { } else {
@ -365,7 +370,8 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
int64_t suid) { int64_t suid) {
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr); int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr,
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -645,6 +651,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId, tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId,
dstTableName); dstTableName);
} else { } else {
tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
if (pTableSinkInfo->uid != 0) { if (pTableSinkInfo->uid != 0) {
tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId, tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId,
dstTableName, pTableSinkInfo->uid); dstTableName, pTableSinkInfo->uid);
@ -666,7 +673,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
} }
int32_t nameLen = strlen(dstTableName); int32_t nameLen = strlen(dstTableName);
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen); pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
if (pTableSinkInfo == NULL) { if (pTableSinkInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }