From f7d94f73f51976fd405de5e9053986c535b0da7f Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 30 Nov 2022 19:08:48 +0800 Subject: [PATCH] enh: support column mode write --- include/libs/parser/parser.h | 2 +- source/client/src/clientStmt.c | 16 ++++++++-------- source/libs/parser/inc/parInsertUtil.h | 3 ++- source/libs/parser/src/parInsertSql.c | 4 ++-- source/libs/parser/src/parInsertStmt.c | 10 +++++----- source/libs/parser/src/parInsertUtil.c | 9 +-------- 6 files changed, 19 insertions(+), 25 deletions(-) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 7edde76ae5..912763314b 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -86,7 +86,7 @@ void qCleanupKeywordsTable(); int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash); int32_t qResetStmtDataBlock(void* block, bool keepBuf); -int32_t qCloneStmtDataBlock(void** pDst, void* pSrc); +int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset); int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId); void qDestroyStmtDataBlock(void* pBlock); STableMeta* qGetTableMetaInDataBlock(void* pDataBlock); diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 0e90858089..3af9bb2160 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -214,16 +214,16 @@ int32_t stmtCacheBlock(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } - STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); if (!pSrc) { return TSDB_CODE_OUT_OF_MEMORY; } - STableDataBlocks* pDst = NULL; + STableDataCxt* pDst = NULL; - STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc)); + STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true)); SStmtTableCache cache = { - .pDataBlock = pDst, + .pDataCtx = pDst, .boundTags = pStmt->bInfo.boundTags, }; @@ -289,7 +289,7 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) { size_t keyLen = 0; void* pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); while (pIter) { - STableDataCxt* pBlocks = *(STableDataCxt*)pIter; + STableDataCxt* pBlocks = *(STableDataCxt**)pIter; char* key = taosHashGetKey(pIter, &keyLen); STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks); @@ -333,7 +333,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) { while (pIter) { SStmtTableCache* pCache = (SStmtTableCache*)pIter; - qDestroyStmtDataBlock(pCache->pDataBlock); + qDestroyStmtDataBlock(pCache->pDataCtx); destroyBoundColumnInfo(pCache->boundTags); taosMemoryFreeClear(pCache->boundTags); @@ -775,9 +775,9 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) { int32_t code = 0; int32_t finalCode = 0; size_t keyLen = 0; - STableDataCxt** pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); + void* pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); while (pIter) { - STableDataBlocks* pBlock = *pIter; + STableDataCxt* pBlock = *(STableDataCxt**)pIter; char* key = taosHashGetKey(pIter, &keyLen); STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock); diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index d5b743b9c2..d64d1ce0c0 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -133,7 +133,7 @@ int32_t insMergeTableDataBlocks(SHashObj *pHashObj, SArray **pVgDataBlocks); int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq); int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize); int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf); -int32_t insFindCol(struct SToken *pColname, int32_t start, int32_t end, SSchema *pSchema); +int16_t insFindCol(struct SToken *pColname, int16_t start, int16_t end, SSchema *pSchema); void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname, SArray *tagName, uint8_t tagNum, int32_t ttl); int32_t insMemRowAppend(SMsgBuf *pMsgBuf, const void *value, int32_t len, void *param); @@ -173,5 +173,6 @@ void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash); void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt); void insDestroyVgroupDataCxtList(SArray *pVgCxtList); void insDestroyVgroupDataCxtHashMap(SHashObj *pVgCxtHash); +void insDestroyTableDataCxt(STableDataCxt* pTableCxt); #endif // TDENGINE_PAR_INSERT_UTIL_H diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 2c56d691c3..2b01ad4a37 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -928,7 +928,7 @@ int32_t initTableColSubmitData(STableDataCxt* pTableCxt) { } for (int32_t i = 0; i < pTableCxt->boundColsInfo.numOfBound; ++i) { - SSchema* pSchema = pTableCxt->pMeta->schema[pTableCxt->boundColsInfo.pColIndex[i]]; + SSchema* pSchema = &pTableCxt->pMeta->schema[pTableCxt->boundColsInfo.pColIndex[i]]; SColData* pCol = taosArrayReserve(pTableCxt->pData->aCol, 1); if (NULL == pCol) { return TSDB_CODE_OUT_OF_MEMORY; @@ -1193,7 +1193,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC } } - if (TSDB_CODE_SUCCESS == code) { + if (TSDB_CODE_SUCCESS == code && !isParseBindParam) { SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 476d28a51d..0727b3bcf4 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -141,7 +141,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch goto end; } - insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pMeta->tableInfo.numOfTags); + insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL); end: for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) { @@ -203,7 +203,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu return TSDB_CODE_SUCCESS; } -int32_t buildBoundFields(int32_t numOfBound, int16_t boundColumns, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields, +int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields, uint8_t timePrec) { if (fields) { *fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD)); @@ -265,7 +265,7 @@ int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fiel return TSDB_CODE_SUCCESS; } - CHECK_CODE(buildBoundFields(&pDataBlock->boundColsInfo.numOfBound, pDataBlock->boundColsInfo.pColIndex, pSchema, fieldNum, fields, + CHECK_CODE(buildBoundFields(pDataBlock->boundColsInfo.numOfBound, pDataBlock->boundColsInfo.pColIndex, pSchema, fieldNum, fields, pDataBlock->pMeta->tableInfo.precision)); return TSDB_CODE_SUCCESS; @@ -338,12 +338,12 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset) { insDestroyTableDataCxt(*pDst); return TSDB_CODE_OUT_OF_MEMORY; } + + pNewCxt->pData = pNewTb; if (reset) { code = qResetStmtDataBlock(*pDst, true); } - - pNewCxt->pData = pNewTb; } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index d2f5ec4f61..487593470f 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -1230,14 +1230,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { if (colFormat) { taosArraySort(pTableCxt->pData->aCol, insColDataComp); - int32_t colNum = taosArrayGetSize(pTableCxt->pData->aCol); - for (int32_t i = 0; i < colNum; ++i) { - SColData *pCol = taosArrayGet(pTableCxt->pData->aCol, i); - code = tColDataSortMerge(pCol); - if (code) { - break; - } - } + tColDataSortMerge(pTableCxt->pData->aCol); } else { if (!pTableCxt->ordered) { tRowSort(pTableCxt->pData->aRowP);