From faf156105d1313967be90dd99ca5f0b7074296e0 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 28 Nov 2022 00:08:41 +0800 Subject: [PATCH] enh: replace row format --- include/common/tmsg.h | 1 + source/common/src/tmsg.c | 4 ++ source/libs/parser/inc/parInsertUtil.h | 6 +- source/libs/parser/src/parInsertSql.c | 3 +- source/libs/parser/src/parInsertUtil.c | 97 ++++++++++++++++++-------- 5 files changed, 77 insertions(+), 34 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1a4049a8a2..2b1af66cc6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3250,6 +3250,7 @@ typedef struct { int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq); int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2** ppReq); +void tDestroySSubmitTbData(SSubmitTbData* pTbData); void tDestroySSubmitReq2(SSubmitReq2* pReq); #pragma pack(pop) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ed253294be..17083a682a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6830,6 +6830,10 @@ _exit: return 0; } +void tDestroySSubmitTbData(SSubmitTbData *pTbData) { + // todo +} + void tDestroySSubmitReq2(SSubmitReq2 *pReq) { if (NULL == pReq) return; diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index 0aa8c1d248..8d02aa0997 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -153,12 +153,12 @@ typedef struct STableDataCxt { SBoundColInfo boundColsInfo; SArray *pValues; SVCreateTbReq *pCreateTblReq; - SSubmitTbData data; + SSubmitTbData *pData; } STableDataCxt; typedef struct SVgroupDataCxt { - int32_t vgId; - SSubmitReq2 data; + int32_t vgId; + SSubmitReq2 *pData; } SVgroupDataCxt; int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo); diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 1e374a8880..a0c715c406 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1100,6 +1100,7 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql, return TSDB_CODE_FAILED; } + pVal->flag = CV_FLAG_VALUE; return TSDB_CODE_SUCCESS; } @@ -1167,7 +1168,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC } if (TSDB_CODE_SUCCESS == code) { - SRow** pRow = taosArrayReserve(pTableCxt->data.aRowP, 1); + SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index b646de67e3..04eb8b123c 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -1014,9 +1014,17 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat } } if (TSDB_CODE_SUCCESS == code) { - pTableCxt->data.aRowP = taosArrayInit(128, POINTER_BYTES); - if (NULL == pTableCxt->data.aRowP) { + pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData)); + if (NULL == pTableCxt->pData) { code = TSDB_CODE_OUT_OF_MEMORY; + } else { + pTableCxt->pData->suid = pTableMeta->suid; + pTableCxt->pData->uid = pTableMeta->uid; + pTableCxt->pData->sver = pTableMeta->sversion; + pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES); + if (NULL == pTableCxt->pData->aRowP) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } } @@ -1055,7 +1063,7 @@ void insDestroyTableDataCxt(STableDataCxt* pTableCxt) { taosArrayDestroyEx(pTableCxt->pValues, NULL /*todo*/); tdDestroySVCreateTbReq(pTableCxt->pCreateTblReq); taosMemoryFreeClear(pTableCxt->pCreateTblReq); - // todo free SSubmitTbData + tDestroySSubmitTbData(pTableCxt->pData); } void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) { @@ -1063,7 +1071,7 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) { return; } - tDestroySSubmitReq2(&pVgCxt->data); + tDestroySSubmitReq2(pVgCxt->pData); } void insDestroyVgroupDataCxtList(SArray* pVgCxtList) { @@ -1112,26 +1120,50 @@ void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) { static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt) { if (NULL != pTableCxt->pCreateTblReq) { - if (NULL == pVgCxt->data.aCreateTbReq) { - pVgCxt->data.aCreateTbReq = taosArrayInit(128, sizeof(SVCreateTbReq)); - if (NULL == pVgCxt->data.aCreateTbReq) { + if (NULL == pVgCxt->pData->aCreateTbReq) { + pVgCxt->pData->aCreateTbReq = taosArrayInit(128, sizeof(SVCreateTbReq)); + if (NULL == pVgCxt->pData->aCreateTbReq) { return TSDB_CODE_OUT_OF_MEMORY; } } - taosArrayPush(pVgCxt->data.aCreateTbReq, pTableCxt->pCreateTblReq); + taosArrayPush(pVgCxt->pData->aCreateTbReq, pTableCxt->pCreateTblReq); } - if (NULL == pVgCxt->data.aSubmitTbData) { - pVgCxt->data.aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData)); - if (NULL == pVgCxt->data.aSubmitTbData) { + if (NULL == pVgCxt->pData->aSubmitTbData) { + pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData)); + if (NULL == pVgCxt->pData->aSubmitTbData) { return TSDB_CODE_OUT_OF_MEMORY; } } - taosArrayPush(pVgCxt->data.aSubmitTbData, &pTableCxt->data); + taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData); + pTableCxt->pData = NULL; return TSDB_CODE_SUCCESS; } +static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHash, SArray* pVgroupList, + SVgroupDataCxt** pOutput) { + SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt)); + if (NULL == pVgCxt) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2)); + if (NULL == pVgCxt->pData) { + insDestroyVgroupDataCxt(pVgCxt); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pVgCxt->vgId = pTableCxt->pMeta->vgId; + int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES); + if (TSDB_CODE_SUCCESS == code) { + taosArrayPush(pVgroupList, &pVgCxt); + *pOutput = pVgCxt; + } else { + insDestroyVgroupDataCxt(pVgCxt); + } + return code; +} + int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES); @@ -1146,17 +1178,14 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { void* p = taosHashIterate(pTableHash, NULL); while (TSDB_CODE_SUCCESS == code && NULL != p) { STableDataCxt* pTableCxt = *(STableDataCxt**)p; - code = tRowMergeSort(pTableCxt->data.aRowP, pTableCxt->pSchema, 0); + code = tRowMergeSort(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0); if (TSDB_CODE_SUCCESS == code) { int32_t vgId = pTableCxt->pMeta->vgId; SVgroupDataCxt* pVgCxt = taosHashGet(pVgroupHash, &vgId, sizeof(vgId)); if (NULL == pVgCxt) { - SVgroupDataCxt vgCxt = {.vgId = vgId}; - code = fillVgroupDataCxt(pTableCxt, &vgCxt); - if (TSDB_CODE_SUCCESS == code) { - code = taosHashPut(pVgroupHash, &vgId, sizeof(vgId), &vgCxt, sizeof(vgCxt)); - } - } else { + code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt); + } + if (TSDB_CODE_SUCCESS == code) { code = fillVgroupDataCxt(pTableCxt, pVgCxt); } } @@ -1175,22 +1204,30 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { return code; } -static int32_t buildSubmitReq(SSubmitReq2* pReq, void** pData, uint32_t* pLen) { - int32_t code = TSDB_CODE_SUCCESS; - tEncodeSize(tEncodeSSubmitReq2, pReq, *pLen, code); +static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) { + int32_t code = TSDB_CODE_SUCCESS; + uint32_t len = 0; + void* pBuf = NULL; + tEncodeSize(tEncodeSSubmitReq2, pReq, len, code); if (TSDB_CODE_SUCCESS == code) { SEncoder encoder; - *pData = taosMemoryMalloc(*pLen); - if (NULL == *pData) { + len += sizeof(SMsgHead); + pBuf = taosMemoryMalloc(len); + if (NULL == pBuf) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - tEncoderInit(&encoder, *pData, *pLen); + ((SMsgHead*)pBuf)->vgId = htonl(vgId); + ((SMsgHead*)pBuf)->contLen = htonl(len); + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); code = tEncodeSSubmitReq2(&encoder, pReq); tEncoderClear(&encoder); } - if (TSDB_CODE_SUCCESS != code) { - taosMemoryFreeClear(*pData); - *pLen = 0; + + if (TSDB_CODE_SUCCESS == code) { + *pData = pBuf; + *pLen = len; + } else { + taosMemoryFree(pBuf); } return code; } @@ -1210,11 +1247,11 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, code = TSDB_CODE_TSC_OUT_OF_MEMORY; } if (TSDB_CODE_SUCCESS == code) { - dst->numOfTables = taosArrayGetSize(src->data.aSubmitTbData); + dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData); code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); } if (TSDB_CODE_SUCCESS == code) { - code = buildSubmitReq(&src->data, &dst->pData, &dst->size); + code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size); } if (TSDB_CODE_SUCCESS == code) { code = (NULL == taosArrayPush(pDataBlocks, &dst) ? TSDB_CODE_TSC_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);