Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/TD-30987-3.0-1

This commit is contained in:
Hongze Cheng 2024-07-16 13:18:56 +08:00
commit 6aa0b2ff3b
1 changed files with 25 additions and 16 deletions

View File

@ -46,6 +46,7 @@ static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid,
static SArray* createDefaultTagColName();
static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid, bool newSubTableRule);
static int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo);
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr, bool newSubTableRule) {
@ -270,6 +271,14 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
taosArrayPush(reqs.pArray, pCreateTbReq);
STableSinkInfo* pInfo = NULL;
bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, gid, &pInfo);
if (!alreadyCached) {
code = doCreateSinkInfo(pCreateTbReq->name, &pInfo);
doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pInfo, gid, pTask->id.idStr);
}
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
}
@ -634,6 +643,18 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
return TSDB_CODE_SUCCESS;
}
int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
int32_t nameLen = strlen(pDstTableName);
(*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
if (*pInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pInfo)->name.len = nameLen;
memcpy((*pInfo)->name.data, pDstTableName, nameLen);
return TSDB_CODE_SUCCESS;
}
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
SSubmitTbData* pTableData) {
uint64_t groupId = pDataBlock->info.id.groupId;
@ -670,22 +691,15 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
!alreadyAddGroupId(dstTableName, groupId) && groupId != 0) {
tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
buildCtbNameAddGroupId(NULL, dstTableName, groupId);
}else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) {
} else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) {
buildCtbNameAddGroupId(stbFullName, dstTableName, groupId);
}
}
}
int32_t nameLen = strlen(dstTableName);
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
if (pTableSinkInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pTableSinkInfo->name.len = nameLen;
memcpy(pTableSinkInfo->name.data, dstTableName, nameLen);
int32_t code = doCreateSinkInfo(dstTableName, &pTableSinkInfo);
tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
}
@ -693,7 +707,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
pTableData->uid = pTableSinkInfo->uid;
if (pTableData->uid == 0) {
tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
tqTrace("s-task:%s cached tableInfo:%s uid is invalid, acquire it from meta", id, pTableSinkInfo->name.data);
return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
} else {
tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid);
@ -929,11 +943,6 @@ bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
}
int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
if (tSimpleHashGetSize(pSinkTableMap) > MAX_CACHE_TABLE_INFO_NUM) {
taosMemoryFreeClear(pTableSinkInfo); // too many items, failed to cache it
return TSDB_CODE_FAILED;
}
int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTableSinkInfo);