From 0f5dc110957a7f075ee5683742af85882dd7b575 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 9 Dec 2021 21:14:42 -0500 Subject: [PATCH] TD-11819 Parsing insert statement and assembling binary objects. --- include/libs/parser/parser.h | 10 ++++++++-- source/libs/parser/src/dataBlockMgt.c | 15 ++++----------- source/libs/parser/src/insertParser.c | 1 - 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index ea8caa691d..e2817baeae 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -158,9 +158,15 @@ typedef enum { PAYLOAD_TYPE_RAW = 1, } EPayloadType; +typedef struct SVgDataBlocks { + int64_t vgId; // virtual group id + int32_t numOfTables; // number of tables in current submit block + uint32_t size; + char *pData; +} SVgDataBlocks; + typedef struct SInsertStmtInfo { - // SHashObj* pTableBlockHashList; // data block for each table - SArray* pDataBlocks; // SArray. Merged submit block for each vgroup + SArray* pDataBlocks; // data block for each vgroup, SArray. int8_t schemaAttache; // denote if submit block is built with table schema or not uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert uint32_t insertType; // insert data from [file|sql statement| bound statement] diff --git a/source/libs/parser/src/dataBlockMgt.c b/source/libs/parser/src/dataBlockMgt.c index a08f4fb15b..e8bf9da793 100644 --- a/source/libs/parser/src/dataBlockMgt.c +++ b/source/libs/parser/src/dataBlockMgt.c @@ -507,22 +507,16 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); int code = 0; bool isRawPayload = IS_RAW_PAYLOAD(payloadType); - void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); STableDataBlocks** p = taosHashIterate(pHashObj, NULL); - STableDataBlocks* pOneTableBlock = *p; - SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock - - while(pOneTableBlock) { + while (pOneTableBlock) { SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; if (pBlocks->numOfRows > 0) { - // the maximum expanded size in byte when a row-wise data is converted to SDataRow format - int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0; STableDataBlocks* dataBuf = NULL; - int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0, &pOneTableBlock->tableName, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList); if (ret != TSDB_CODE_SUCCESS) { @@ -532,22 +526,21 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t return ret; } + // the maximum expanded size in byte when a row-wise data is converted to SDataRow format + int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0; int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta); if (dataBuf->nAllocSize < destSize) { dataBuf->nAllocSize = (uint32_t)(destSize * 1.5); - char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize); if (tmp != NULL) { dataBuf->pData = tmp; - //memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size); } else { // failed to allocate memory, free already allocated memory and return error code taosHashCleanup(pVnodeDataBlockHashList); destroyBlockArrayList(pVnodeDataBlockList); tfree(dataBuf->pData); tfree(blkKeyInfo.pKeyTuple); - return TSDB_CODE_TSC_OUT_OF_MEMORY; } } diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index 61f92eebe0..1748588427 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -233,7 +233,6 @@ static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start) if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) { pDataBlocks->ordered = false; - // tscWarn("NOT ordered input timestamp"); } pDataBlocks->prevTS = k;