From 1ad2c3029a9189d24f9d8c2b60f4e0a11201bae4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 Oct 2023 09:48:10 +0800 Subject: [PATCH] refactor(stream): do some internal refactor. --- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/sma/smaTimeRange.c | 11 ++++------ source/dnode/vnode/src/tq/tqSink.c | 25 ++++++++++------------- 3 files changed, 16 insertions(+), 22 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 84f3b3e85a..24c31b95c8 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -175,7 +175,7 @@ int32_t setDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id); SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, - SSDataBlock* pDataBlock); + SSDataBlock* pDataBlock, SArray* pTagArray); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index f18843bc35..b7f7d16ddd 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -162,17 +162,15 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * int32_t len = 0; SSubmitReq2 *pReq = NULL; SArray *tagArray = NULL; - SArray *createTbArray = NULL; SArray *pVals = NULL; int32_t numOfBlocks = taosArrayGetSize(pBlocks); tagArray = taosArrayInit(1, sizeof(STagVal)); - createTbArray = taosArrayInit(numOfBlocks, POINTER_BYTES); pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)); pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)); - if (!tagArray || !createTbArray || !pReq || !pReq->aSubmitTbData) { + if (!tagArray || !pReq || !pReq->aSubmitTbData) { code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno; TSDB_CHECK_CODE(code, lino, _exit); } @@ -191,10 +189,9 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * } SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE,}; - int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1; - tbData.pCreateTbReq = - buildAutoCreateTableReq(stbFullName, suid, taosArrayGetSize(pDataBlock->pDataBlock) + 1, pDataBlock); + int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1; + tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray); { uint64_t groupId = pDataBlock->info.id.groupId; @@ -248,8 +245,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * } tEncoderClear(&encoder); } + _exit: - taosArrayDestroy(createTbArray); taosArrayDestroy(tagArray); taosArrayDestroy(pVals); if (pReq) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 65484f4842..97e3376663 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -394,29 +394,21 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam return true; } -SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock) { +SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, + SSDataBlock* pDataBlock, SArray* pTagArray) { SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); if (pCreateTbReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - // set tag content - SArray* tagArray = taosArrayInit(1, sizeof(STagVal)); - if (tagArray == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tdDestroySVCreateTbReq(pCreateTbReq); - taosMemoryFreeClear(pCreateTbReq); - return NULL; - } - + taosArrayClear(pTagArray); initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1); STagVal tagVal = { .cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId}; - taosArrayPush(tagArray, &tagVal); + taosArrayPush(pTagArray, &tagVal); - tTagNew(tagArray, 1, false, (STag**) &pCreateTbReq->ctb.pTag); - taosArrayDestroy(tagArray); + tTagNew(pTagArray, 1, false, (STag**) &pCreateTbReq->ctb.pTag); if (pCreateTbReq->ctb.pTag == NULL) { tdDestroySVCreateTbReq(pCreateTbReq); @@ -678,8 +670,13 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName); + SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal)); + pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; - pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock); + pTableData->pCreateTbReq = + buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray); + taosArrayDestroy(pTagArray); + if (pTableData->pCreateTbReq == NULL) { tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); taosMemoryFree(pTableSinkInfo);