From 948502be2533c25162b451537e286a4dfaaf63e6 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sun, 27 Nov 2022 17:09:02 +0800 Subject: [PATCH 1/2] enh: replace row format --- include/common/tmsg.h | 4 + include/libs/nodes/querynodes.h | 50 +-- source/libs/nodes/src/nodesUtilFuncs.c | 3 +- source/libs/parser/inc/parInsertUtil.h | 34 +- source/libs/parser/src/parInsertSql.c | 452 +++++++++---------------- source/libs/parser/src/parInsertUtil.c | 274 +++++++++++++++ 6 files changed, 505 insertions(+), 312 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 434febbe22..1a4049a8a2 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2080,6 +2080,10 @@ int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq); int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq); static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) { + if (NULL == req) { + return; + } + taosMemoryFreeClear(req->name); taosMemoryFreeClear(req->comment); if (req->type == TSDB_CHILD_TABLE) { diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 5291130e4f..db6570dd34 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -354,33 +354,33 @@ typedef struct SVgDataBlocks { void* pData; // SSubmitReq + SSubmitBlk + ... } SVgDataBlocks; -typedef void (*FFreeDataBlockHash)(SHashObj*); -typedef void (*FFreeDataBlockArray)(SArray*); +typedef void (*FFreeTableBlockHash)(SHashObj*); +typedef void (*FFreeVgourpBlockArray)(SArray*); typedef struct SVnodeModifOpStmt { - ENodeType nodeType; - ENodeType sqlNodeType; - SArray* pDataBlocks; // data block for each vgroup, SArray. - uint32_t insertType; // insert data from [file|sql statement| bound statement] - const char* pSql; // current sql statement position - int32_t totalRowsNum; - int32_t totalTbNum; - SName targetTableName; - SName usingTableName; - const char* pBoundCols; - struct STableMeta* pTableMeta; - SHashObj* pVgroupsHashObj; - SHashObj* pTableBlockHashObj; - SHashObj* pSubTableHashObj; - SHashObj* pTableNameHashObj; - SHashObj* pDbFNameHashObj; - SArray* pVgDataBlocks; - SVCreateTbReq createTblReq; - TdFilePtr fp; - FFreeDataBlockHash freeHashFunc; - FFreeDataBlockArray freeArrayFunc; - bool usingTableProcessing; - bool fileProcessing; + ENodeType nodeType; + ENodeType sqlNodeType; + SArray* pDataBlocks; // data block for each vgroup, SArray. + uint32_t insertType; // insert data from [file|sql statement| bound statement] + const char* pSql; // current sql statement position + int32_t totalRowsNum; + int32_t totalTbNum; + SName targetTableName; + SName usingTableName; + const char* pBoundCols; + struct STableMeta* pTableMeta; + SHashObj* pVgroupsHashObj; + SHashObj* pTableBlockHashObj; // SHashObj + SHashObj* pSubTableHashObj; + SHashObj* pTableNameHashObj; + SHashObj* pDbFNameHashObj; + SArray* pVgDataBlocks; // SArray + SVCreateTbReq* pCreateTblReq; + TdFilePtr fp; + FFreeTableBlockHash freeHashFunc; + FFreeVgourpBlockArray freeArrayFunc; + bool usingTableProcessing; + bool fileProcessing; } SVnodeModifOpStmt; typedef struct SExplainOptions { diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 8c1a85b101..5e35f63240 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -807,7 +807,8 @@ void nodesDestroyNode(SNode* pNode) { if (pStmt->freeArrayFunc) { pStmt->freeArrayFunc(pStmt->pVgDataBlocks); } - tdDestroySVCreateTbReq(&pStmt->createTblReq); + tdDestroySVCreateTbReq(pStmt->pCreateTblReq); + taosMemoryFreeClear(pStmt->pCreateTblReq); taosCloseFile(&pStmt->fp); break; } diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index 09d55d369f..0aa8c1d248 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -20,8 +20,6 @@ struct SToken; -#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED) - #define NEXT_TOKEN(pSql, sToken) \ do { \ int32_t index = 0; \ @@ -37,6 +35,8 @@ struct SToken; } \ } while (0) +#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED) + typedef enum EOrderStatus { ORDER_STATUS_UNKNOWN = 0, ORDER_STATUS_ORDERED = 1, @@ -141,4 +141,34 @@ int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start); int32_t insBuildOutput(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks); void insDestroyDataBlock(STableDataBlocks *pDataBlock); +typedef struct SBoundColInfo { + int32_t *pColIndex; // bound index => schema index + int32_t numOfCols; + int32_t numOfBound; +} SBoundColInfo; + +typedef struct STableDataCxt { + STableMeta *pMeta; + STSchema *pSchema; + SBoundColInfo boundColsInfo; + SArray *pValues; + SVCreateTbReq *pCreateTblReq; + SSubmitTbData data; +} STableDataCxt; + +typedef struct SVgroupDataCxt { + int32_t vgId; + SSubmitReq2 data; +} SVgroupDataCxt; + +int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo); +int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta, + SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt); +int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks); +int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks); +void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash); +void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt); +void insDestroyVgroupDataCxtList(SArray *pVgCxtList); +void insDestroyVgroupDataCxtHashMap(SHashObj *pVgCxtHash); + #endif // TDENGINE_PAR_INSERT_UTIL_H diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 541d4c0563..b2cd213723 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -38,12 +38,12 @@ } while (TK_NK_SPACE == (token).type) typedef struct SInsertParseContext { - SParseContext* pComCxt; - SMsgBuf msg; - char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW]; - SParsedDataColInfo tags; // for stmt - bool missCache; - bool usingDuplicateTable; + SParseContext* pComCxt; + SMsgBuf msg; + char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW]; + SBoundColInfo tags; // for stmt + bool missCache; + bool usingDuplicateTable; } SInsertParseContext; typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); @@ -172,21 +172,19 @@ static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModifO } // pStmt->pSql -> field1_name, ...) -static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, bool isTags, - SParsedDataColInfo* pColList, SSchema* pSchema) { - col_id_t nCols = pColList->numOfCols; - - pColList->numOfBound = 0; - pColList->boundNullLen = 0; - memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols); - for (col_id_t i = 0; i < nCols; ++i) { - pColList->cols[i].valStat = VAL_STAT_NONE; +static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, bool isTags, SSchema* pSchema, + SBoundColInfo* pBoundInfo) { + bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool)); + if (NULL == pUseCols) { + return TSDB_CODE_OUT_OF_MEMORY; } - SToken token; - bool isOrdered = true; - col_id_t lastColIdx = -1; // last column found - while (1) { + pBoundInfo->numOfBound = 0; + + int32_t lastColIdx = -1; // last column found + int32_t code = TSDB_CODE_SUCCESS; + while (TSDB_CODE_SUCCESS == code) { + SToken token; NEXT_TOKEN(*pSql, token); if (TK_NK_RP == token.type) { @@ -198,64 +196,30 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, b token.z = tmpTokenBuf; token.n = strdequote(token.z); - col_id_t t = lastColIdx + 1; - col_id_t index = insFindCol(&token, t, nCols, pSchema); + int32_t t = lastColIdx + 1; + int32_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema); if (index < 0 && t > 0) { index = insFindCol(&token, 0, t, pSchema); - isOrdered = false; } if (index < 0) { - return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, token.z); - } - if (pColList->cols[index].valStat == VAL_STAT_HAS) { - return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", token.z); - } - lastColIdx = index; - pColList->cols[index].valStat = VAL_STAT_HAS; - pColList->boundColumns[pColList->numOfBound] = index; - ++pColList->numOfBound; - switch (pSchema[t].type) { - case TSDB_DATA_TYPE_BINARY: - pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES); - break; - case TSDB_DATA_TYPE_NCHAR: - pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE); - break; - default: - pColList->boundNullLen += TYPE_BYTES[pSchema[t].type]; - break; + code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, token.z); + } else if (pUseCols[index]) { + code = buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", token.z); + } else { + lastColIdx = index; + pUseCols[index] = true; + pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index; + ++pBoundInfo->numOfBound; } } - if (!isTags && pColList->cols[0].valStat == VAL_STAT_NONE) { - return buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null"); + if (TSDB_CODE_SUCCESS == code && !isTags && !pUseCols[0]) { + code = buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null"); } - pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED; + taosMemoryFree(pUseCols); - if (!isOrdered) { - pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo)); - if (NULL == pColList->colIdxInfo) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - SBoundIdxInfo* pColIdx = pColList->colIdxInfo; - for (col_id_t i = 0; i < pColList->numOfBound; ++i) { - pColIdx[i].schemaColIdx = pColList->boundColumns[i]; - pColIdx[i].boundIdx = i; - } - taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insSchemaIdxCompar); - for (col_id_t i = 0; i < pColList->numOfBound; ++i) { - pColIdx[i].finalIdx = i; - } - taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insBoundIdxCompar); - } - - if (pColList->numOfCols > pColList->numOfBound) { - memset(&pColList->boundColumns[pColList->numOfBound], 0, - sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound)); - } - - return TSDB_CODE_SUCCESS; + return code; } static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf) { @@ -518,8 +482,7 @@ static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema, // input pStmt->pSql: [(tag1_name, ...)] TAGS (tag1_value, ...) ... // output pStmt->pSql: TAGS (tag1_value, ...) ... static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { - SSchema* pTagsSchema = getTableTagSchema(pStmt->pTableMeta); - insSetBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pStmt->pTableMeta)); + insInitBoundColsInfo(getNumOfTags(pStmt->pTableMeta), &pCxt->tags); SToken token; int32_t index = 0; @@ -529,7 +492,7 @@ static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt } pStmt->pSql += index; - return parseBoundColumns(pCxt, &pStmt->pSql, true, &pCxt->tags, pTagsSchema); + return parseBoundColumns(pCxt, &pStmt->pSql, true, getTableTagSchema(pStmt->pTableMeta), &pCxt->tags); } static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SSchema* pTagSchema, SToken* pToken, @@ -561,7 +524,7 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt } static void buildCreateTbReq(SVnodeModifOpStmt* pStmt, STag* pTag, SArray* pTagName) { - insBuildCreateTbReq(&pStmt->createTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid, + insBuildCreateTbReq(pStmt->pCreateTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid, pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags); } @@ -616,7 +579,7 @@ static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifOpStmt* break; } - SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]]; + SSchema* pTagSchema = &pSchema[pCxt->tags.pColIndex[i]]; isJson = pTagSchema->type == TSDB_DATA_TYPE_JSON; code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg); if (TSDB_CODE_SUCCESS == code) { @@ -697,8 +660,8 @@ static int32_t parseTableOptions(SInsertParseContext* pCxt, SVnodeModifOpStmt* p if (TK_NK_INTEGER != token.type) { return buildSyntaxErrMsg(&pCxt->msg, "Invalid option ttl", token.z); } - pStmt->createTblReq.ttl = taosStr2Int32(token.z, NULL, 10); - if (pStmt->createTblReq.ttl < 0) { + pStmt->pCreateTblReq->ttl = taosStr2Int32(token.z, NULL, 10); + if (pStmt->pCreateTblReq->ttl < 0) { return buildSyntaxErrMsg(&pCxt->msg, "Invalid option ttl", token.z); } } else if (TK_COMMENT == token.type) { @@ -711,11 +674,11 @@ static int32_t parseTableOptions(SInsertParseContext* pCxt, SVnodeModifOpStmt* p return buildSyntaxErrMsg(&pCxt->msg, "comment too long", token.z); } int32_t len = trimString(token.z, token.n, pCxt->tmpTokenBuf, TSDB_TB_COMMENT_LEN); - pStmt->createTblReq.comment = strndup(pCxt->tmpTokenBuf, len); - if (NULL == pStmt->createTblReq.comment) { + pStmt->pCreateTblReq->comment = strndup(pCxt->tmpTokenBuf, len); + if (NULL == pStmt->pCreateTblReq->comment) { return TSDB_CODE_OUT_OF_MEMORY; } - pStmt->createTblReq.commentLen = len; + pStmt->pCreateTblReq->commentLen = len; } else { break; } @@ -916,22 +879,18 @@ static int32_t preParseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModif return skipParentheses(pCxt, &pStmt->pSql); } -static int32_t getTableDataBlocks(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks** pDataBuf) { +static int32_t getTableDataCxt(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt** pTableCxt) { if (pCxt->pComCxt->async) { - return insGetDataBlockFromList(pStmt->pTableBlockHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid), - TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), - getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL, - &pStmt->createTblReq); + return insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid), + pStmt->pTableMeta, &pStmt->pCreateTblReq, pTableCxt); } char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&pStmt->targetTableName, tbFName); - return insGetDataBlockFromList(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE, - sizeof(SSubmitBlk), getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, - pDataBuf, NULL, &pStmt->createTblReq); + return insGetTableDataCxt(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), pStmt->pTableMeta, + &pStmt->pCreateTblReq, pTableCxt); } -static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, - STableDataBlocks* pDataBuf) { +static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt) { SToken token; int32_t index = 0; NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index); @@ -941,13 +900,13 @@ static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpS return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z); } // pStmt->pSql -> field1_name, ...) - return parseBoundColumns(pCxt, &pStmt->pSql, false, &pDataBuf->boundColumnInfo, - getTableColumnSchema(pStmt->pTableMeta)); + return parseBoundColumns(pCxt, &pStmt->pSql, false, getTableColumnSchema(pStmt->pTableMeta), + &pTableCxt->boundColsInfo); } if (NULL != pStmt->pBoundCols) { - return parseBoundColumns(pCxt, &pStmt->pBoundCols, false, &pDataBuf->boundColumnInfo, - getTableColumnSchema(pStmt->pTableMeta)); + return parseBoundColumns(pCxt, &pStmt->pBoundCols, false, getTableColumnSchema(pStmt->pTableMeta), + &pTableCxt->boundColsInfo); } return TSDB_CODE_SUCCESS; @@ -957,14 +916,13 @@ static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpS // 1. [(tag1_name, ...)] ... // 2. VALUES ... | FILE ... // output pStmt->pSql: VALUES ... | FILE ... -static int32_t parseSchemaClauseBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, - STableDataBlocks** pDataBuf) { +static int32_t parseSchemaClauseBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt** pTableCxt) { int32_t code = parseUsingClauseBottom(pCxt, pStmt); if (TSDB_CODE_SUCCESS == code) { - code = getTableDataBlocks(pCxt, pStmt, pDataBuf); + code = getTableDataCxt(pCxt, pStmt, pTableCxt); } if (TSDB_CODE_SUCCESS == code) { - code = parseBoundColumnsClause(pCxt, pStmt, *pDataBuf); + code = parseBoundColumnsClause(pCxt, pStmt, *pTableCxt); } return code; } @@ -987,108 +945,88 @@ static int32_t parseSchemaClauseTop(SInsertParseContext* pCxt, SVnodeModifOpStmt } static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql, SToken* pToken, SSchema* pSchema, - int16_t timePrec, _row_append_fn_t func, void* param) { - int64_t iv; - uint64_t uv; - char* endptr = NULL; - + int16_t timePrec, SColVal* pVal) { switch (pSchema->type) { case TSDB_DATA_TYPE_BOOL: { if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) { if (strncmp(pToken->z, "true", pToken->n) == 0) { - return func(&pCxt->msg, &TRUE_VALUE, pSchema->bytes, param); + pVal->value.val = TRUE_VALUE; } else if (strncmp(pToken->z, "false", pToken->n) == 0) { - return func(&pCxt->msg, &FALSE_VALUE, pSchema->bytes, param); + pVal->value.val = FALSE_VALUE; } else { return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z); } } else if (pToken->type == TK_NK_INTEGER) { - return func(&pCxt->msg, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), - pSchema->bytes, param); + pVal->value.val = ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? FALSE_VALUE : TRUE_VALUE); } else if (pToken->type == TK_NK_FLOAT) { - return func(&pCxt->msg, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, - param); + pVal->value.val = ((taosStr2Double(pToken->z, NULL) == 0) ? FALSE_VALUE : TRUE_VALUE); } else { return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z); } + break; } - case TSDB_DATA_TYPE_TINYINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &pVal->value.val)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z); - } else if (!IS_VALID_TINYINT(iv)) { + } else if (!IS_VALID_TINYINT(pVal->value.val)) { return buildSyntaxErrMsg(&pCxt->msg, "tinyint data overflow", pToken->z); } - - uint8_t tmpVal = (uint8_t)iv; - return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); + break; } - case TSDB_DATA_TYPE_UTINYINT: { - if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) { + if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &pVal->value.val)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z); - } else if (uv > UINT8_MAX) { + } else if (pVal->value.val > UINT8_MAX) { return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z); } - uint8_t tmpVal = (uint8_t)uv; - return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); + break; } - case TSDB_DATA_TYPE_SMALLINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &pVal->value.val)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z); - } else if (!IS_VALID_SMALLINT(iv)) { + } else if (!IS_VALID_SMALLINT(pVal->value.val)) { return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z); } - int16_t tmpVal = (int16_t)iv; - return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); + break; } - case TSDB_DATA_TYPE_USMALLINT: { - if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) { + if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &pVal->value.val)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z); - } else if (uv > UINT16_MAX) { + } else if (pVal->value.val > UINT16_MAX) { return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z); } - uint16_t tmpVal = (uint16_t)uv; - return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); + break; } - case TSDB_DATA_TYPE_INT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &pVal->value.val)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z); - } else if (!IS_VALID_INT(iv)) { + } else if (!IS_VALID_INT(pVal->value.val)) { return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z); } - int32_t tmpVal = (int32_t)iv; - return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); + break; } - case TSDB_DATA_TYPE_UINT: { - if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) { + if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &pVal->value.val)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z); - } else if (uv > UINT32_MAX) { + } else if (pVal->value.val > UINT32_MAX) { return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z); } - uint32_t tmpVal = (uint32_t)uv; - return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); + break; } - case TSDB_DATA_TYPE_BIGINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &pVal->value.val)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z); } - return func(&pCxt->msg, &iv, pSchema->bytes, param); + break; } - case TSDB_DATA_TYPE_UBIGINT: { - if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) { + if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &pVal->value.val)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z); } - return func(&pCxt->msg, &uv, pSchema->bytes, param); + break; } - case TSDB_DATA_TYPE_FLOAT: { + char* endptr = NULL; double dv; if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z); @@ -1097,11 +1035,11 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql, isnan(dv)) { return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z); } - float tmpVal = (float)dv; - return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); + pVal->value.val = *(int64_t*)&dv; + break; } - case TSDB_DATA_TYPE_DOUBLE: { + char* endptr = NULL; double dv; if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z); @@ -1109,49 +1047,67 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql, if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) { return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z); } - return func(&pCxt->msg, &dv, pSchema->bytes, param); + pVal->value.val = *(int64_t*)&dv; + break; } - case TSDB_DATA_TYPE_BINARY: { // Too long values will raise the invalid sql error message if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name); } - - return func(&pCxt->msg, pToken->z, pToken->n, param); + pVal->value.pData = pToken->z; + pVal->value.nData = pToken->n; + break; } - case TSDB_DATA_TYPE_NCHAR: { - return func(&pCxt->msg, pToken->z, pToken->n, param); + // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' + int32_t len = 0; + char* pUcs4 = taosMemoryCalloc(1, pSchema->bytes - VARSTR_HEADER_SIZE); + if (NULL == pUcs4) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)pUcs4, pSchema->bytes - VARSTR_HEADER_SIZE, &len)) { + if (errno == E2BIG) { + return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name); + } + char buf[512] = {0}; + snprintf(buf, tListLen(buf), "%s", strerror(errno)); + return buildSyntaxErrMsg(&pCxt->msg, buf, pToken->z); + } + pVal->value.pData = pUcs4; + pVal->value.nData = len; + break; } case TSDB_DATA_TYPE_JSON: { if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { return buildSyntaxErrMsg(&pCxt->msg, "json string too long than 4095", pToken->z); } - return func(&pCxt->msg, pToken->z, pToken->n, param); + pVal->value.pData = pToken->z; + pVal->value.nData = pToken->n; + break; } case TSDB_DATA_TYPE_TIMESTAMP: { - int64_t tmpVal; - if (parseTime(pSql, pToken, timePrec, &tmpVal, &pCxt->msg) != TSDB_CODE_SUCCESS) { + if (parseTime(pSql, pToken, timePrec, &pVal->value.val, &pCxt->msg) != TSDB_CODE_SUCCESS) { return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z); } - - return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); + break; } + default: + return TSDB_CODE_FAILED; } - return TSDB_CODE_FAILED; + return TSDB_CODE_SUCCESS; } static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, SToken* pToken, SSchema* pSchema, - int16_t timePrec, _row_append_fn_t func, void* param) { + int16_t timePrec, SColVal* pVal) { int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg); if (TSDB_CODE_SUCCESS == code && isNullValue(pSchema->type, pToken)) { if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { return buildSyntaxErrMsg(&pCxt->msg, "primary timestamp should not be null", pToken->z); } - - return func(&pCxt->msg, NULL, 0, param); + pVal->flag = CV_FLAG_NULL; + return TSDB_CODE_SUCCESS; } if (TSDB_CODE_SUCCESS == code && IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) { @@ -1159,26 +1115,24 @@ static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, STo } if (TSDB_CODE_SUCCESS == code) { - code = parseValueTokenImpl(pCxt, pSql, pToken, pSchema, timePrec, func, param); + code = parseValueTokenImpl(pCxt, pSql, pToken, pSchema, timePrec, pVal); } return code; } -static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataBlocks* pDataBuf, bool* pGotRow, +static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow, SToken* pToken) { - SRowBuilder* pBuilder = &pDataBuf->rowBuilder; - STSRow* row = (STSRow*)(pDataBuf->pData + pDataBuf->size); // skip the SSubmitBlk header - SParsedDataColInfo* pCols = &pDataBuf->boundColumnInfo; - bool isParseBindParam = false; - SSchema* pSchemas = getTableColumnSchema(pDataBuf->pTableMeta); - SMemParam param = {.rb = pBuilder}; + SBoundColInfo* pCols = &pTableCxt->boundColsInfo; + bool isParseBindParam = false; + SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta); - int32_t code = tdSRowResetBuf(pBuilder, row); + int32_t code = TSDB_CODE_SUCCESS; // 1. set the parsed value from sql string for (int i = 0; i < pCols->numOfBound && TSDB_CODE_SUCCESS == code; ++i) { NEXT_TOKEN_WITH_PREV(*pSql, *pToken); - SSchema* pSchema = &pSchemas[pCols->boundColumns[i]]; + SSchema* pSchema = &pSchemas[pCols->pColIndex[i]]; + SColVal* pVal = taosArrayGet(pTableCxt->pValues, pCols->pColIndex[i]); if (pToken->type == TK_NK_QUESTION) { isParseBindParam = true; @@ -1197,10 +1151,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataB } if (TSDB_CODE_SUCCESS == code) { - param.schema = pSchema; - insGetSTSRowAppendInfo(pBuilder->rowType, pCols, i, ¶m.toffset, ¶m.colIdx); - code = parseValueToken(pCxt, pSql, pToken, pSchema, getTableInfo(pDataBuf->pTableMeta).precision, insMemRowAppend, - ¶m); + code = parseValueToken(pCxt, pSql, pToken, pSchema, getTableInfo(pTableCxt->pMeta).precision, pVal); } if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) { @@ -1212,64 +1163,22 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataB } if (TSDB_CODE_SUCCESS == code) { - TSKEY tsKey = TD_ROW_KEY(row); - code = insCheckTimestamp(pDataBuf, (const char*)&tsKey); + SRow** pRow = taosArrayReserve(pTableCxt->data.aRowP, 1); + code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); } if (TSDB_CODE_SUCCESS == code && !isParseBindParam) { - // set the null value for the columns that do not assign values - if ((pCols->numOfBound < pCols->numOfCols) && TD_IS_TP_ROW(row)) { - pBuilder->hasNone = true; - } - - tdSRowEnd(pBuilder); - *pGotRow = true; - -#ifdef TD_DEBUG_PRINT_ROW - STSchema* pSTSchema = tBuildTSchema(schema, spd->numOfCols, 1); - tdSRowPrint(row, pSTSchema, __func__); - taosMemoryFree(pSTSchema); -#endif } return code; } -static int32_t allocateMemIfNeed(STableDataBlocks* pDataBlock, int32_t rowSize, int32_t* numOfRows) { - size_t remain = pDataBlock->nAllocSize - pDataBlock->size; - const int factor = 5; - uint32_t nAllocSizeOld = pDataBlock->nAllocSize; - - // expand the allocated size - if (remain < rowSize * factor) { - while (remain < rowSize * factor) { - pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5); - remain = pDataBlock->nAllocSize - pDataBlock->size; - } - - char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize); - if (tmp != NULL) { - pDataBlock->pData = tmp; - memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size); - } else { - // do nothing, if allocate more memory failed - pDataBlock->nAllocSize = nAllocSizeOld; - *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize; - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - } - - *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize; - return TSDB_CODE_SUCCESS; -} - // pSql -> (field1_value, ...) [(field1_value2, ...) ...] -static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf, - int32_t maxRows, int32_t* pNumOfRows, SToken* pToken) { - int32_t code = insInitRowBuilder(&pDataBuf->rowBuilder, pDataBuf->pTableMeta->sversion, &pDataBuf->boundColumnInfo); +static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt, + int32_t* pNumOfRows, SToken* pToken) { + int32_t code = TSDB_CODE_SUCCESS; - int32_t extendedRowSize = insGetExtendedRowSize(pDataBuf); (*pNumOfRows) = 0; while (TSDB_CODE_SUCCESS == code) { int32_t index = 0; @@ -1279,13 +1188,9 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, } pStmt->pSql += index; - if ((*pNumOfRows) >= maxRows || pDataBuf->size + extendedRowSize >= pDataBuf->nAllocSize) { - code = allocateMemIfNeed(pDataBuf, extendedRowSize, &maxRows); - } - bool gotRow = false; if (TSDB_CODE_SUCCESS == code) { - code = parseOneRow(pCxt, &pStmt->pSql, pDataBuf, &gotRow, pToken); + code = parseOneRow(pCxt, &pStmt->pSql, pTableCxt, &gotRow, pToken); } if (TSDB_CODE_SUCCESS == code) { @@ -1298,7 +1203,6 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, } if (TSDB_CODE_SUCCESS == code && gotRow) { - pDataBuf->size += extendedRowSize; (*pNumOfRows)++; } } @@ -1311,19 +1215,11 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, } // VALUES (field1_value, ...) [(field1_value2, ...) ...] -static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf, +static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt, SToken* pToken) { - int32_t maxNumOfRows = 0; int32_t numOfRows = 0; - int32_t code = allocateMemIfNeed(pDataBuf, insGetExtendedRowSize(pDataBuf), &maxNumOfRows); + int32_t code = parseValues(pCxt, pStmt, pTableCxt, &numOfRows, pToken); if (TSDB_CODE_SUCCESS == code) { - code = parseValues(pCxt, pStmt, pDataBuf, maxNumOfRows, &numOfRows, pToken); - } - if (TSDB_CODE_SUCCESS == code) { - code = insSetBlockInfo((SSubmitBlk*)(pDataBuf->pData), pDataBuf, numOfRows, &pCxt->msg); - } - if (TSDB_CODE_SUCCESS == code) { - pDataBuf->numOfTables = 1; pStmt->totalRowsNum += numOfRows; pStmt->totalTbNum += 1; TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_INSERT); @@ -1331,11 +1227,9 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* p return code; } -static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf, - int maxRows, int32_t* pNumOfRows) { - int32_t code = insInitRowBuilder(&pDataBuf->rowBuilder, pDataBuf->pTableMeta->sversion, &pDataBuf->boundColumnInfo); - - int32_t extendedRowSize = insGetExtendedRowSize(pDataBuf); +static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt, + int32_t* pNumOfRows) { + int32_t code = TSDB_CODE_SUCCESS; (*pNumOfRows) = 0; char* pLine = NULL; int64_t readLen = 0; @@ -1349,24 +1243,19 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, continue; } - if ((*pNumOfRows) >= maxRows || pDataBuf->size + extendedRowSize >= pDataBuf->nAllocSize) { - code = allocateMemIfNeed(pDataBuf, extendedRowSize, &maxRows); - } - bool gotRow = false; if (TSDB_CODE_SUCCESS == code) { SToken token; strtolower(pLine, pLine); const char* pRow = pLine; - code = parseOneRow(pCxt, (const char**)&pRow, pDataBuf, &gotRow, &token); + code = parseOneRow(pCxt, (const char**)&pRow, pTableCxt, &gotRow, &token); } if (TSDB_CODE_SUCCESS == code && gotRow) { - pDataBuf->size += extendedRowSize; (*pNumOfRows)++; } - if (TSDB_CODE_SUCCESS == code && pDataBuf->nAllocSize > tsMaxMemUsedByInsert * 1024 * 1024) { + if (TSDB_CODE_SUCCESS == code && (*pNumOfRows) > tsMaxMemUsedByInsert * 1024 * 1024) { pStmt->fileProcessing = true; break; } @@ -1380,18 +1269,10 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, return code; } -static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf) { - int32_t maxNumOfRows = 0; +static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt) { int32_t numOfRows = 0; - int32_t code = allocateMemIfNeed(pDataBuf, insGetExtendedRowSize(pDataBuf), &maxNumOfRows); + int32_t code = parseCsvFile(pCxt, pStmt, pTableCxt, &numOfRows); if (TSDB_CODE_SUCCESS == code) { - code = parseCsvFile(pCxt, pStmt, pDataBuf, maxNumOfRows, &numOfRows); - } - if (TSDB_CODE_SUCCESS == code) { - code = insSetBlockInfo((SSubmitBlk*)(pDataBuf->pData), pDataBuf, numOfRows, &pCxt->msg); - } - if (TSDB_CODE_SUCCESS == code) { - pDataBuf->numOfTables = 1; pStmt->totalRowsNum += numOfRows; pStmt->totalTbNum += 1; TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT); @@ -1405,7 +1286,7 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifOpStm } static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SToken* pFilePath, - STableDataBlocks* pDataBuf) { + STableDataCxt* pTableCxt) { char filePathStr[TSDB_FILENAME_LEN] = {0}; if (TK_NK_STRING == pFilePath->type) { trimString(pFilePath->z, pFilePath->n, filePathStr, sizeof(filePathStr)); @@ -1417,27 +1298,27 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* p return TAOS_SYSTEM_ERROR(errno); } - return parseDataFromFileImpl(pCxt, pStmt, pDataBuf); + return parseDataFromFileImpl(pCxt, pStmt, pTableCxt); } -static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf, +static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt, SToken* pToken) { NEXT_TOKEN(pStmt->pSql, *pToken); if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) { return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z); } - return parseDataFromFile(pCxt, pStmt, pToken, pDataBuf); + return parseDataFromFile(pCxt, pStmt, pToken, pTableCxt); } // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path -static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf) { +static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt) { SToken token; NEXT_TOKEN(pStmt->pSql, token); switch (token.type) { case TK_VALUES: - return parseValuesClause(pCxt, pStmt, pDataBuf, &token); + return parseValuesClause(pCxt, pStmt, pTableCxt, &token); case TK_FILE: - return parseFileClause(pCxt, pStmt, pDataBuf, &token); + return parseFileClause(pCxt, pStmt, pTableCxt, &token); default: break; } @@ -1448,18 +1329,19 @@ static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pSt // 1. [(tag1_name, ...)] ... // 2. VALUES ... | FILE ... static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { - STableDataBlocks* pDataBuf = NULL; - int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pDataBuf); + STableDataCxt* pTableCxt = NULL; + int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt); if (TSDB_CODE_SUCCESS == code) { - code = parseDataClause(pCxt, pStmt, pDataBuf); + code = parseDataClause(pCxt, pStmt, pTableCxt); } return code; } static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { - destroyBoundColumnInfo(&pCxt->tags); + // destroyBoundColumnInfo(&pCxt->tags); taosMemoryFreeClear(pStmt->pTableMeta); - tdDestroySVCreateTbReq(&pStmt->createTblReq); + tdDestroySVCreateTbReq(pStmt->pCreateTblReq); + taosMemoryFreeClear(pStmt->pCreateTblReq); pCxt->missCache = false; pCxt->usingDuplicateTable = false; pStmt->pBoundCols = NULL; @@ -1517,6 +1399,7 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif } static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { +#if 0 SParsedDataColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags)); if (NULL == tags) { return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -1533,6 +1416,9 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) pStmt->pVgroupsHashObj = NULL; pStmt->pTableBlockHashObj = NULL; return code; +#else + return TSDB_CODE_FAILED; +#endif } static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { @@ -1541,13 +1427,11 @@ static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifOpStm } // merge according to vgId - int32_t code = TSDB_CODE_SUCCESS; - if (taosHashGetSize(pStmt->pTableBlockHashObj) > 0) { - code = insMergeTableDataBlocks(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks); - } + int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks); if (TSDB_CODE_SUCCESS == code) { - code = insBuildOutput(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks); + code = insBuildVgDataBlocks(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks); } + return code; } @@ -1588,8 +1472,8 @@ static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, S TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT); } pStmt->pSql = pCxt->pComCxt->pSql; - pStmt->freeHashFunc = insDestroyBlockHashmap; - pStmt->freeArrayFunc = insDestroyBlockArrayList; + pStmt->freeHashFunc = insDestroyTableDataCxtHashMap; + pStmt->freeArrayFunc = insDestroyVgroupDataCxtList; if (!reentry) { pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); @@ -1792,10 +1676,10 @@ static int32_t parseInsertSqlFromStart(SInsertParseContext* pCxt, SVnodeModifOpS } static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { - STableDataBlocks* pDataBuf = NULL; - int32_t code = getTableDataBlocks(pCxt, pStmt, &pDataBuf); + STableDataCxt* pTableCxt = NULL; + int32_t code = getTableDataCxt(pCxt, pStmt, &pTableCxt); if (TSDB_CODE_SUCCESS == code) { - code = parseDataFromFileImpl(pCxt, pStmt, pDataBuf); + code = parseDataFromFileImpl(pCxt, pStmt, pTableCxt); } if (TSDB_CODE_SUCCESS == code) { @@ -1919,6 +1803,6 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal QUERY_EXEC_STAGE_SCHEDULE == (*pQuery)->execStage) { code = setRefreshMate(*pQuery); } - destroyBoundColumnInfo(&context.tags); + // destroyBoundColumnInfo(&context.tags); return code; } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index f6fb8e27d6..cc9076a114 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -955,3 +955,277 @@ int32_t insBuildOutput(SHashObj* pVgroupsHashObj, SArray* pVgDataBlocks, SArray* } return TSDB_CODE_SUCCESS; } + +static void initBoundCols(int32_t ncols, int32_t* pBoundCols) { + for (int32_t i = 0; i < ncols; ++i) { + pBoundCols[i] = i; + } +} + +static void initColValues(STableMeta* pTableMeta, SArray* pValues) { + SSchema* pSchemas = getTableColumnSchema(pTableMeta); + for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) { + SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type); + taosArrayPush(pValues, &val); + } +} + +int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) { + pInfo->numOfCols = numOfBound; + pInfo->numOfBound = numOfBound; + pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int32_t)); + if (NULL == pInfo->pColIndex) { + return TSDB_CODE_OUT_OF_MEMORY; + } + initBoundCols(numOfBound, pInfo->pColIndex); + return TSDB_CODE_SUCCESS; +} + +static void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); } + +static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput) { + STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt)); + if (NULL == pTableCxt) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t code = TSDB_CODE_SUCCESS; + + pTableCxt->pMeta = tableMetaDup(pTableMeta); + if (NULL == pTableCxt->pMeta) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + if (TSDB_CODE_SUCCESS == code) { + pTableCxt->pSchema = + tBuildTSchema(getTableColumnSchema(pTableMeta), pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion); + if (NULL == pTableCxt->pSchema) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + code = insInitBoundColsInfo(pTableMeta->tableInfo.numOfColumns, &pTableCxt->boundColsInfo); + } + if (TSDB_CODE_SUCCESS == code) { + pTableCxt->pValues = taosArrayInit(pTableMeta->tableInfo.numOfColumns, sizeof(SColVal)); + if (NULL == pTableCxt->pValues) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + initColValues(pTableMeta, pTableCxt->pValues); + } + } + if (TSDB_CODE_SUCCESS == code) { + pTableCxt->data.aRowP = taosArrayInit(128, POINTER_BYTES); + if (NULL == pTableCxt->data.aRowP) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + if (TSDB_CODE_SUCCESS == code) { + pTableCxt->pCreateTblReq = *pCreateTbReq; + *pCreateTbReq = NULL; + *pOutput = pTableCxt; + } else { + taosMemoryFree(pTableCxt); + } + + return code; +} + +int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta, + SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt) { + *pTableCxt = taosHashGet(pHash, id, idLen); + if (NULL != *pTableCxt) { + return TSDB_CODE_SUCCESS; + } + int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt); + if (TSDB_CODE_SUCCESS == code) { + code = taosHashPut(pHash, id, idLen, pTableCxt, POINTER_BYTES); + } + return code; +} + +void insDestroyTableDataCxt(STableDataCxt* pTableCxt) { + if (NULL == pTableCxt) { + return; + } + + taosMemoryFreeClear(pTableCxt->pMeta); + tDestroyTSchema(pTableCxt->pSchema); + destroyBoundColInfo(&pTableCxt->boundColsInfo); + taosArrayDestroyEx(pTableCxt->pValues, NULL /*todo*/); + tdDestroySVCreateTbReq(pTableCxt->pCreateTblReq); + taosMemoryFreeClear(pTableCxt->pCreateTblReq); + // todo free SSubmitTbData +} + +void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) { + if (NULL == pVgCxt) { + return; + } + + tDestroySSubmitReq2(&pVgCxt->data); +} + +void insDestroyVgroupDataCxtList(SArray* pVgCxtList) { + if (NULL == pVgCxtList) { + return; + } + + size_t size = taosArrayGetSize(pVgCxtList); + for (int32_t i = 0; i < size; i++) { + void* p = taosArrayGetP(pVgCxtList, i); + insDestroyVgroupDataCxt(p); + } + + taosArrayDestroy(pVgCxtList); +} + +void insDestroyVgroupDataCxtHashMap(SHashObj* pVgCxtHash) { + if (NULL == pVgCxtHash) { + return; + } + + void** p = taosHashIterate(pVgCxtHash, NULL); + while (p) { + insDestroyVgroupDataCxt(*(SVgroupDataCxt**)p); + + p = taosHashIterate(pVgCxtHash, p); + } + + taosHashCleanup(pVgCxtHash); +} + +void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) { + if (NULL == pTableCxtHash) { + return; + } + + void** p = taosHashIterate(pTableCxtHash, NULL); + while (p) { + insDestroyTableDataCxt(*(STableDataCxt**)p); + + p = taosHashIterate(pTableCxtHash, p); + } + + taosHashCleanup(pTableCxtHash); +} + +static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt) { + if (NULL != pTableCxt->pCreateTblReq) { + if (NULL == pVgCxt->data.aCreateTbReq) { + pVgCxt->data.aCreateTbReq = taosArrayInit(128, sizeof(SVCreateTbReq)); + if (NULL == pVgCxt->data.aCreateTbReq) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + taosArrayPush(pVgCxt->data.aCreateTbReq, pTableCxt->pCreateTblReq); + } + + if (NULL == pVgCxt->data.aSubmitTbData) { + pVgCxt->data.aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData)); + if (NULL == pVgCxt->data.aSubmitTbData) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + taosArrayPush(pVgCxt->data.aSubmitTbData, &pTableCxt->data); + + return TSDB_CODE_SUCCESS; +} + +int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { + SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); + SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES); + if (NULL == pVgroupHash || NULL == pVgroupList) { + taosHashCleanup(pVgroupHash); + taosArrayDestroy(pVgroupList); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + int32_t code = TSDB_CODE_SUCCESS; + + void* p = taosHashIterate(pTableHash, NULL); + while (TSDB_CODE_SUCCESS == code && NULL != p) { + STableDataCxt* pTableCxt = *(STableDataCxt**)p; + code = tRowMergeSort(pTableCxt->data.aRowP, pTableCxt->pSchema); + if (TSDB_CODE_SUCCESS == code) { + int32_t vgId = pTableCxt->pMeta->vgId; + SVgroupDataCxt* pVgCxt = taosHashGet(pVgroupHash, &vgId, sizeof(vgId)); + if (NULL == pVgCxt) { + SVgroupDataCxt vgCxt = {.vgId = vgId}; + code = fillVgroupDataCxt(pTableCxt, &vgCxt); + if (TSDB_CODE_SUCCESS == code) { + code = taosHashPut(pVgroupHash, &vgId, sizeof(vgId), &vgCxt, sizeof(vgCxt)); + } + } else { + code = fillVgroupDataCxt(pTableCxt, pVgCxt); + } + } + if (TSDB_CODE_SUCCESS == code) { + p = taosHashIterate(pTableHash, p); + } + } + + taosHashCleanup(pVgroupHash); + if (TSDB_CODE_SUCCESS == code) { + *pVgDataBlocks = pVgroupList; + } else { + taosArrayDestroy(pVgroupList); + } + + return code; +} + +static int32_t buildSubmitReq(SSubmitReq2* pReq, void** pData, uint32_t* pLen) { + int32_t code = TSDB_CODE_SUCCESS; + tEncodeSize(tEncodeSSubmitReq2, pReq, *pLen, code); + if (TSDB_CODE_SUCCESS == code) { + SEncoder encoder; + *pData = taosMemoryMalloc(*pLen); + if (NULL == *pData) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + tEncoderInit(&encoder, *pData, *pLen); + code = tEncodeSSubmitReq2(&encoder, pReq); + tEncoderClear(&encoder); + } + if (TSDB_CODE_SUCCESS != code) { + taosMemoryFreeClear(*pData); + *pLen = 0; + } + return code; +} + +int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks) { + size_t numOfVg = taosArrayGetSize(pVgDataCxtList); + SArray* pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); + if (NULL == pDataBlocks) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + int32_t code = TSDB_CODE_SUCCESS; + for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) { + SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i); + SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); + if (NULL == dst) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + } + if (TSDB_CODE_SUCCESS == code) { + dst->numOfTables = taosArrayGetSize(src->data.aSubmitTbData); + code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); + } + if (TSDB_CODE_SUCCESS == code) { + code = buildSubmitReq(&src->data, &dst->pData, &dst->size); + } + if (TSDB_CODE_SUCCESS == code) { + code = (NULL == taosArrayPush(pDataBlocks, &dst) ? TSDB_CODE_TSC_OUT_OF_MEMORY : TSDB_CODE_SUCCESS); + } + } + + if (TSDB_CODE_SUCCESS == code) { + *pVgDataBlocks = pDataBlocks; + } else { + // todo + } + + return code; +} From faf156105d1313967be90dd99ca5f0b7074296e0 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 28 Nov 2022 00:08:41 +0800 Subject: [PATCH 2/2] enh: replace row format --- include/common/tmsg.h | 1 + source/common/src/tmsg.c | 4 ++ source/libs/parser/inc/parInsertUtil.h | 6 +- source/libs/parser/src/parInsertSql.c | 3 +- source/libs/parser/src/parInsertUtil.c | 97 ++++++++++++++++++-------- 5 files changed, 77 insertions(+), 34 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1a4049a8a2..2b1af66cc6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3250,6 +3250,7 @@ typedef struct { int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq); int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2** ppReq); +void tDestroySSubmitTbData(SSubmitTbData* pTbData); void tDestroySSubmitReq2(SSubmitReq2* pReq); #pragma pack(pop) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ed253294be..17083a682a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6830,6 +6830,10 @@ _exit: return 0; } +void tDestroySSubmitTbData(SSubmitTbData *pTbData) { + // todo +} + void tDestroySSubmitReq2(SSubmitReq2 *pReq) { if (NULL == pReq) return; diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index 0aa8c1d248..8d02aa0997 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -153,12 +153,12 @@ typedef struct STableDataCxt { SBoundColInfo boundColsInfo; SArray *pValues; SVCreateTbReq *pCreateTblReq; - SSubmitTbData data; + SSubmitTbData *pData; } STableDataCxt; typedef struct SVgroupDataCxt { - int32_t vgId; - SSubmitReq2 data; + int32_t vgId; + SSubmitReq2 *pData; } SVgroupDataCxt; int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo); diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 1e374a8880..a0c715c406 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1100,6 +1100,7 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql, return TSDB_CODE_FAILED; } + pVal->flag = CV_FLAG_VALUE; return TSDB_CODE_SUCCESS; } @@ -1167,7 +1168,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC } if (TSDB_CODE_SUCCESS == code) { - SRow** pRow = taosArrayReserve(pTableCxt->data.aRowP, 1); + SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index b646de67e3..04eb8b123c 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -1014,9 +1014,17 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat } } if (TSDB_CODE_SUCCESS == code) { - pTableCxt->data.aRowP = taosArrayInit(128, POINTER_BYTES); - if (NULL == pTableCxt->data.aRowP) { + pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData)); + if (NULL == pTableCxt->pData) { code = TSDB_CODE_OUT_OF_MEMORY; + } else { + pTableCxt->pData->suid = pTableMeta->suid; + pTableCxt->pData->uid = pTableMeta->uid; + pTableCxt->pData->sver = pTableMeta->sversion; + pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES); + if (NULL == pTableCxt->pData->aRowP) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } } @@ -1055,7 +1063,7 @@ void insDestroyTableDataCxt(STableDataCxt* pTableCxt) { taosArrayDestroyEx(pTableCxt->pValues, NULL /*todo*/); tdDestroySVCreateTbReq(pTableCxt->pCreateTblReq); taosMemoryFreeClear(pTableCxt->pCreateTblReq); - // todo free SSubmitTbData + tDestroySSubmitTbData(pTableCxt->pData); } void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) { @@ -1063,7 +1071,7 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) { return; } - tDestroySSubmitReq2(&pVgCxt->data); + tDestroySSubmitReq2(pVgCxt->pData); } void insDestroyVgroupDataCxtList(SArray* pVgCxtList) { @@ -1112,26 +1120,50 @@ void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) { static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt) { if (NULL != pTableCxt->pCreateTblReq) { - if (NULL == pVgCxt->data.aCreateTbReq) { - pVgCxt->data.aCreateTbReq = taosArrayInit(128, sizeof(SVCreateTbReq)); - if (NULL == pVgCxt->data.aCreateTbReq) { + if (NULL == pVgCxt->pData->aCreateTbReq) { + pVgCxt->pData->aCreateTbReq = taosArrayInit(128, sizeof(SVCreateTbReq)); + if (NULL == pVgCxt->pData->aCreateTbReq) { return TSDB_CODE_OUT_OF_MEMORY; } } - taosArrayPush(pVgCxt->data.aCreateTbReq, pTableCxt->pCreateTblReq); + taosArrayPush(pVgCxt->pData->aCreateTbReq, pTableCxt->pCreateTblReq); } - if (NULL == pVgCxt->data.aSubmitTbData) { - pVgCxt->data.aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData)); - if (NULL == pVgCxt->data.aSubmitTbData) { + if (NULL == pVgCxt->pData->aSubmitTbData) { + pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData)); + if (NULL == pVgCxt->pData->aSubmitTbData) { return TSDB_CODE_OUT_OF_MEMORY; } } - taosArrayPush(pVgCxt->data.aSubmitTbData, &pTableCxt->data); + taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData); + pTableCxt->pData = NULL; return TSDB_CODE_SUCCESS; } +static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHash, SArray* pVgroupList, + SVgroupDataCxt** pOutput) { + SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt)); + if (NULL == pVgCxt) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2)); + if (NULL == pVgCxt->pData) { + insDestroyVgroupDataCxt(pVgCxt); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pVgCxt->vgId = pTableCxt->pMeta->vgId; + int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES); + if (TSDB_CODE_SUCCESS == code) { + taosArrayPush(pVgroupList, &pVgCxt); + *pOutput = pVgCxt; + } else { + insDestroyVgroupDataCxt(pVgCxt); + } + return code; +} + int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES); @@ -1146,17 +1178,14 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { void* p = taosHashIterate(pTableHash, NULL); while (TSDB_CODE_SUCCESS == code && NULL != p) { STableDataCxt* pTableCxt = *(STableDataCxt**)p; - code = tRowMergeSort(pTableCxt->data.aRowP, pTableCxt->pSchema, 0); + code = tRowMergeSort(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0); if (TSDB_CODE_SUCCESS == code) { int32_t vgId = pTableCxt->pMeta->vgId; SVgroupDataCxt* pVgCxt = taosHashGet(pVgroupHash, &vgId, sizeof(vgId)); if (NULL == pVgCxt) { - SVgroupDataCxt vgCxt = {.vgId = vgId}; - code = fillVgroupDataCxt(pTableCxt, &vgCxt); - if (TSDB_CODE_SUCCESS == code) { - code = taosHashPut(pVgroupHash, &vgId, sizeof(vgId), &vgCxt, sizeof(vgCxt)); - } - } else { + code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt); + } + if (TSDB_CODE_SUCCESS == code) { code = fillVgroupDataCxt(pTableCxt, pVgCxt); } } @@ -1175,22 +1204,30 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { return code; } -static int32_t buildSubmitReq(SSubmitReq2* pReq, void** pData, uint32_t* pLen) { - int32_t code = TSDB_CODE_SUCCESS; - tEncodeSize(tEncodeSSubmitReq2, pReq, *pLen, code); +static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) { + int32_t code = TSDB_CODE_SUCCESS; + uint32_t len = 0; + void* pBuf = NULL; + tEncodeSize(tEncodeSSubmitReq2, pReq, len, code); if (TSDB_CODE_SUCCESS == code) { SEncoder encoder; - *pData = taosMemoryMalloc(*pLen); - if (NULL == *pData) { + len += sizeof(SMsgHead); + pBuf = taosMemoryMalloc(len); + if (NULL == pBuf) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - tEncoderInit(&encoder, *pData, *pLen); + ((SMsgHead*)pBuf)->vgId = htonl(vgId); + ((SMsgHead*)pBuf)->contLen = htonl(len); + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); code = tEncodeSSubmitReq2(&encoder, pReq); tEncoderClear(&encoder); } - if (TSDB_CODE_SUCCESS != code) { - taosMemoryFreeClear(*pData); - *pLen = 0; + + if (TSDB_CODE_SUCCESS == code) { + *pData = pBuf; + *pLen = len; + } else { + taosMemoryFree(pBuf); } return code; } @@ -1210,11 +1247,11 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, code = TSDB_CODE_TSC_OUT_OF_MEMORY; } if (TSDB_CODE_SUCCESS == code) { - dst->numOfTables = taosArrayGetSize(src->data.aSubmitTbData); + dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData); code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); } if (TSDB_CODE_SUCCESS == code) { - code = buildSubmitReq(&src->data, &dst->pData, &dst->size); + code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size); } if (TSDB_CODE_SUCCESS == code) { code = (NULL == taosArrayPush(pDataBlocks, &dst) ? TSDB_CODE_TSC_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);