refactor(stream): do some internal refactor.

This commit is contained in:
Haojun Liao 2023-10-24 09:48:10 +08:00
parent 7311f9d2fe
commit 1ad2c3029a
3 changed files with 16 additions and 22 deletions

View File

@ -175,7 +175,7 @@ int32_t setDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id); int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id);
SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols,
SSDataBlock* pDataBlock); SSDataBlock* pDataBlock, SArray* pTagArray);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -162,17 +162,15 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
int32_t len = 0; int32_t len = 0;
SSubmitReq2 *pReq = NULL; SSubmitReq2 *pReq = NULL;
SArray *tagArray = NULL; SArray *tagArray = NULL;
SArray *createTbArray = NULL;
SArray *pVals = NULL; SArray *pVals = NULL;
int32_t numOfBlocks = taosArrayGetSize(pBlocks); int32_t numOfBlocks = taosArrayGetSize(pBlocks);
tagArray = taosArrayInit(1, sizeof(STagVal)); tagArray = taosArrayInit(1, sizeof(STagVal));
createTbArray = taosArrayInit(numOfBlocks, POINTER_BYTES);
pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)); pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)); pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData));
if (!tagArray || !createTbArray || !pReq || !pReq->aSubmitTbData) { if (!tagArray || !pReq || !pReq->aSubmitTbData) {
code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno; code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -191,10 +189,9 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
} }
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE,}; SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE,};
int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
tbData.pCreateTbReq = int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
buildAutoCreateTableReq(stbFullName, suid, taosArrayGetSize(pDataBlock->pDataBlock) + 1, pDataBlock); tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray);
{ {
uint64_t groupId = pDataBlock->info.id.groupId; uint64_t groupId = pDataBlock->info.id.groupId;
@ -248,8 +245,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
} }
_exit: _exit:
taosArrayDestroy(createTbArray);
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
if (pReq) { if (pReq) {

View File

@ -394,29 +394,21 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam
return true; return true;
} }
SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock) { SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols,
SSDataBlock* pDataBlock, SArray* pTagArray) {
SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (pCreateTbReq == NULL) { if (pCreateTbReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
// set tag content taosArrayClear(pTagArray);
SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
if (tagArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFreeClear(pCreateTbReq);
return NULL;
}
initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1); initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
STagVal tagVal = { .cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId}; STagVal tagVal = { .cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
taosArrayPush(tagArray, &tagVal); taosArrayPush(pTagArray, &tagVal);
tTagNew(tagArray, 1, false, (STag**) &pCreateTbReq->ctb.pTag); tTagNew(pTagArray, 1, false, (STag**) &pCreateTbReq->ctb.pTag);
taosArrayDestroy(tagArray);
if (pCreateTbReq->ctb.pTag == NULL) { if (pCreateTbReq->ctb.pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq); tdDestroySVCreateTbReq(pCreateTbReq);
@ -678,8 +670,13 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName); tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock); pTableData->pCreateTbReq =
buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray);
taosArrayDestroy(pTagArray);
if (pTableData->pCreateTbReq == NULL) { if (pTableData->pCreateTbReq == NULL) {
tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno));
taosMemoryFree(pTableSinkInfo); taosMemoryFree(pTableSinkInfo);