fix(stream): set correct user-define table name in sink table info cache.

This commit is contained in:
Haojun Liao 2023-08-30 10:52:39 +08:00
parent 3a7a220d43
commit dec4d69529
1 changed files with 14 additions and 5 deletions

View File

@ -157,6 +157,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
code = doSinkDeleteBlock(pVnode, stbFullName, pDataBlock, pTask, suid); code = doSinkDeleteBlock(pVnode, stbFullName, pDataBlock, pTask, suid);
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
tqDebug("s-task:%s build create table msg", pTask->id.idStr);
SVCreateTbBatchReq reqs = {0}; SVCreateTbBatchReq reqs = {0};
crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
if (NULL == reqs.pArray) { if (NULL == reqs.pArray) {
@ -164,8 +166,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
} }
for (int32_t rowId = 0; rowId < rows; rowId++) { for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq createTbReq = {0}; SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0});
SVCreateTbReq* pCreateTbReq = &createTbReq;
// set const // set const
pCreateTbReq->flags = 0; pCreateTbReq->flags = 0;
@ -181,6 +182,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
int32_t size = taosArrayGetSize(pDataBlock->pDataBlock); int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
if (size == 2) { if (size == 2) {
tagArray = taosArrayInit(1, sizeof(STagVal)); tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) { if (!tagArray) {
tdDestroySVCreateTbReq(pCreateTbReq); tdDestroySVCreateTbReq(pCreateTbReq);
goto _end; goto _end;
@ -202,6 +204,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
tdDestroySVCreateTbReq(pCreateTbReq); tdDestroySVCreateTbReq(pCreateTbReq);
goto _end; goto _end;
} }
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
@ -211,12 +214,13 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
continue; continue;
} else if (IS_VAR_DATA_TYPE(pTagData->info.type)) { } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
tagVal.nData = varDataLen(pData); tagVal.nData = varDataLen(pData);
tagVal.pData = varDataVal(pData); tagVal.pData = (uint8_t*) varDataVal(pData);
} else { } else {
memcpy(&tagVal.i64, pData, pTagData->info.bytes); memcpy(&tagVal.i64, pData, pTagData->info.bytes);
} }
taosArrayPush(tagArray, &tagVal); taosArrayPush(tagArray, &tagVal);
} }
} }
pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1); pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
@ -239,8 +243,11 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
} else { } else {
pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName); pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName);
} }
taosArrayPush(reqs.pArray, pCreateTbReq); taosArrayPush(reqs.pArray, pCreateTbReq);
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
} }
reqs.nReqs = taosArrayGetSize(reqs.pArray); reqs.nReqs = taosArrayGetSize(reqs.pArray);
if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) { if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) {
goto _end; goto _end;
@ -438,8 +445,10 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
} }
} }
} else { // not exist } else { // not exist
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); if (dstTableName[0] == 0) {
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
}
int32_t nameLen = strlen(dstTableName); int32_t nameLen = strlen(dstTableName);
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen); pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen);