From dec4d69529bcdc1c47c95652edadf05658d7ff07 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 30 Aug 2023 10:52:39 +0800 Subject: [PATCH] fix(stream): set correct user-define table name in sink table info cache. --- source/dnode/vnode/src/tq/tqSink.c | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 32838e5bc6..f7132ff6c4 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -157,6 +157,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) { code = doSinkDeleteBlock(pVnode, stbFullName, pDataBlock, pTask, suid); } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + tqDebug("s-task:%s build create table msg", pTask->id.idStr); + SVCreateTbBatchReq reqs = {0}; crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); if (NULL == reqs.pArray) { @@ -164,8 +166,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { } for (int32_t rowId = 0; rowId < rows; rowId++) { - SVCreateTbReq createTbReq = {0}; - SVCreateTbReq* pCreateTbReq = &createTbReq; + SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0}); // set const pCreateTbReq->flags = 0; @@ -181,6 +182,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { int32_t size = taosArrayGetSize(pDataBlock->pDataBlock); if (size == 2) { tagArray = taosArrayInit(1, sizeof(STagVal)); + if (!tagArray) { tdDestroySVCreateTbReq(pCreateTbReq); goto _end; @@ -202,6 +204,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { tdDestroySVCreateTbReq(pCreateTbReq); goto _end; } + for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); @@ -211,12 +214,13 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { continue; } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) { tagVal.nData = varDataLen(pData); - tagVal.pData = varDataVal(pData); + tagVal.pData = (uint8_t*) varDataVal(pData); } else { memcpy(&tagVal.i64, pData, pTagData->info.bytes); } taosArrayPush(tagArray, &tagVal); } + } pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1); @@ -239,8 +243,11 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { } else { pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName); } + taosArrayPush(reqs.pArray, pCreateTbReq); + tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name); } + reqs.nReqs = taosArrayGetSize(reqs.pArray); if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) { goto _end; @@ -438,8 +445,10 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, } } } else { // not exist - memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); - buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); + if (dstTableName[0] == 0) { + memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); + buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); + } int32_t nameLen = strlen(dstTableName); pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen);