fix(stream): add explicit create table into sink cache.

This commit is contained in:
Haojun Liao 2024-07-15 18:51:58 +08:00
parent ba2718b6de
commit 8dde576c6f
1 changed files with 25 additions and 16 deletions

View File

@ -46,6 +46,7 @@ static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid,
static SArray* createDefaultTagColName(); static SArray* createDefaultTagColName();
static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid, bool newSubTableRule); int64_t gid, bool newSubTableRule);
static int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo);
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr, bool newSubTableRule) { const char* pIdStr, bool newSubTableRule) {
@ -270,6 +271,14 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1); pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
taosArrayPush(reqs.pArray, pCreateTbReq); taosArrayPush(reqs.pArray, pCreateTbReq);
STableSinkInfo* pInfo = NULL;
bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, gid, &pInfo);
if (!alreadyCached) {
code = doCreateSinkInfo(pCreateTbReq->name, &pInfo);
doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pInfo, gid, pTask->id.idStr);
}
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name); tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
} }
@ -634,6 +643,18 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
int32_t nameLen = strlen(pDstTableName);
(*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
if (*pInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pInfo)->name.len = nameLen;
memcpy((*pInfo)->name.data, pDstTableName, nameLen);
return TSDB_CODE_SUCCESS;
}
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
SSubmitTbData* pTableData) { SSubmitTbData* pTableData) {
uint64_t groupId = pDataBlock->info.id.groupId; uint64_t groupId = pDataBlock->info.id.groupId;
@ -670,22 +691,15 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) && if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
!alreadyAddGroupId(dstTableName, groupId) && groupId != 0) { !alreadyAddGroupId(dstTableName, groupId) && groupId != 0) {
tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName); tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){ if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
buildCtbNameAddGroupId(NULL, dstTableName, groupId); buildCtbNameAddGroupId(NULL, dstTableName, groupId);
}else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) { } else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) {
buildCtbNameAddGroupId(stbFullName, dstTableName, groupId); buildCtbNameAddGroupId(stbFullName, dstTableName, groupId);
} }
} }
} }
int32_t nameLen = strlen(dstTableName); int32_t code = doCreateSinkInfo(dstTableName, &pTableSinkInfo);
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
if (pTableSinkInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pTableSinkInfo->name.len = nameLen;
memcpy(pTableSinkInfo->name.data, dstTableName, nameLen);
tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName); tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
} }
@ -693,7 +707,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
pTableData->uid = pTableSinkInfo->uid; pTableData->uid = pTableSinkInfo->uid;
if (pTableData->uid == 0) { if (pTableData->uid == 0) {
tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); tqTrace("s-task:%s cached tableInfo:%s uid is invalid, acquire it from meta", id, pTableSinkInfo->name.data);
return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid); return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
} else { } else {
tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid); tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid);
@ -929,11 +943,6 @@ bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
} }
int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) { int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
if (tSimpleHashGetSize(pSinkTableMap) > MAX_CACHE_TABLE_INFO_NUM) {
taosMemoryFreeClear(pTableSinkInfo); // too many items, failed to cache it
return TSDB_CODE_FAILED;
}
int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES); int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTableSinkInfo); taosMemoryFreeClear(pTableSinkInfo);