From 34662ea10a9d6dfbb11c52efab767772a2db96b4 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 30 Nov 2022 16:57:24 +0800 Subject: [PATCH] enh: support column mode write --- include/common/tdataformat.h | 1 + include/libs/parser/parser.h | 1 - source/client/inc/clientStmt.h | 6 +- source/client/src/clientStmt.c | 46 ++-- source/common/src/tdataformat.c | 8 + source/libs/parser/inc/parInsertUtil.h | 4 +- source/libs/parser/src/parInsertSql.c | 44 +++- source/libs/parser/src/parInsertStmt.c | 352 +++++++++---------------- source/libs/parser/src/parInsertUtil.c | 62 ++++- 9 files changed, 242 insertions(+), 282 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index f3c1bc22c2..60a43f768e 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -113,6 +113,7 @@ int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, voi void tColDataDestroy(void *ph); void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn); void tColDataClear(SColData *pColData); +void tColDataDeepClear(SColData *pColData); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal); uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal); diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index b72d85243b..0d1af5b608 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -87,7 +87,6 @@ void qCleanupKeywordsTable(); int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash); int32_t qResetStmtDataBlock(void* block, bool keepBuf); int32_t qCloneStmtDataBlock(void** pDst, void* pSrc); -void qFreeStmtDataBlock(void* pDataBlock); int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId); void qDestroyStmtDataBlock(void* pBlock); STableMeta* qGetTableMetaInDataBlock(void* pDataBlock); diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index e5507ccf81..1bda1684c4 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "catalog.h" -typedef void STableDataBlocks; +typedef void STableDataCxt; typedef enum { STMT_TYPE_INSERT = 1, @@ -43,8 +43,8 @@ typedef enum { } STMT_STATUS; typedef struct SStmtTableCache { - STableDataBlocks *pDataBlock; - void *boundTags; + STableDataCxt *pDataCtx; + void *boundTags; } SStmtTableCache; typedef struct SStmtQueryResInfo { diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 291e0f6ae3..0e90858089 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -289,22 +289,18 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) { size_t keyLen = 0; void* pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); while (pIter) { - STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter; - char* key = taosHashGetKey(pIter, &keyLen); - STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks); + STableDataCxt* pBlocks = *(STableDataCxt*)pIter; + char* key = taosHashGetKey(pIter, &keyLen); + STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks); if (keepTable && (strlen(pStmt->bInfo.tbFName) == keyLen) && strncmp(pStmt->bInfo.tbFName, key, keyLen) == 0) { - STMT_ERR_RET(qResetStmtDataBlock(pBlocks, true)); + STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false)); pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); continue; } - if (STMT_TYPE_MULTI_INSERT == pStmt->sql.type) { - qFreeStmtDataBlock(pBlocks); - } else { - qDestroyStmtDataBlock(pBlocks); - } + qDestroyStmtDataBlock(pBlocks); taosHashRemove(pStmt->exec.pBlockHash, key, keyLen); pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); @@ -354,7 +350,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } -int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataBlocks* pDataBlock, STableDataBlocks** newBlock, uint64_t uid) { +int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid) { SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); SVgroupInfo vgInfo = {0}; SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter, @@ -375,9 +371,9 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { pStmt->bInfo.needParse = true; pStmt->bInfo.inExecCache = false; - STableDataBlocks* pBlockInExec = + STableDataCxt* pCxtInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); - if (pBlockInExec) { + if (pCxtInExec) { pStmt->bInfo.needParse = false; pStmt->bInfo.inExecCache = true; @@ -411,8 +407,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { pStmt->bInfo.tbUid = 0; - STableDataBlocks* pNewBlock = NULL; - STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataBlock, &pNewBlock, 0)); + STableDataCxt* pNewBlock = NULL; + STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0)); if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) { @@ -489,8 +485,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { pStmt->bInfo.boundTags = pCache->boundTags; pStmt->bInfo.tagsCached = true; - STableDataBlocks* pNewBlock = NULL; - STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataBlock, &pNewBlock, uid)); + STableDataCxt* pNewBlock = NULL; + STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid)); if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) { @@ -614,8 +610,8 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) { return TSDB_CODE_SUCCESS; } - STableDataBlocks** pDataBlock = - (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + STableDataCxt** pDataBlock = + (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); if (NULL == pDataBlock) { tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); @@ -637,8 +633,8 @@ int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); } - STableDataBlocks** pDataBlock = - (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + STableDataCxt** pDataBlock = + (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); if (NULL == pDataBlock) { tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); @@ -655,8 +651,8 @@ int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); } - STableDataBlocks** pDataBlock = - (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + STableDataCxt** pDataBlock = + (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); if (NULL == pDataBlock) { tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); @@ -729,8 +725,8 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) { return TSDB_CODE_SUCCESS; } - STableDataBlocks** pDataBlock = - (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + STableDataCxt** pDataBlock = + (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); if (NULL == pDataBlock) { tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); @@ -779,7 +775,7 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) { int32_t code = 0; int32_t finalCode = 0; size_t keyLen = 0; - STableDataBlocks** pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); + STableDataCxt** pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); while (pIter) { STableDataBlocks* pBlock = *pIter; char* key = taosHashGetKey(pIter, &keyLen); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 07ece3b8a2..4e7563b93a 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -1547,6 +1547,14 @@ void tColDataClear(SColData *pColData) { pColData->nData = 0; } +void tColDataDeepClear(SColData *pColData) { + pColData->pBitMap = NULL; + pColData->aOffset = NULL; + pColData->pData = NULL; + + tColDataClear(pColData); +} + static FORCE_INLINE int32_t tColDataPutValue(SColData *pColData, uint8_t *pData, uint32_t nData) { int32_t code = 0; diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index ce14f822e4..4c41129946 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -142,7 +142,7 @@ int32_t insBuildOutput(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray void insDestroyDataBlock(STableDataBlocks *pDataBlock); typedef struct SBoundColInfo { - int32_t *pColIndex; // bound index => schema index + int16_t *pColIndex; // bound index => schema index int32_t numOfCols; int32_t numOfBound; } SBoundColInfo; @@ -162,7 +162,7 @@ typedef struct SVgroupDataCxt { int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo); int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta, - SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt); + SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode); int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks); int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks); void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash); diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 03068ba83e..d886ba915f 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -181,7 +181,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, b pBoundInfo->numOfBound = 0; - int32_t lastColIdx = -1; // last column found + int16_t lastColIdx = -1; // last column found int32_t code = TSDB_CODE_SUCCESS; while (TSDB_CODE_SUCCESS == code) { SToken token; @@ -196,8 +196,8 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, b token.z = tmpTokenBuf; token.n = strdequote(token.z); - int32_t t = lastColIdx + 1; - int32_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema); + int16_t t = lastColIdx + 1; + int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema); if (index < 0 && t > 0) { index = insFindCol(&token, 0, t, pSchema); } @@ -886,17 +886,17 @@ static int32_t preParseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModif static int32_t getTableDataCxt(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt** pTableCxt) { if (pCxt->pComCxt->async) { - uint64_t uid = pStmt->pTableMeta->uid; - if (pStmt->usingTableProcessing) { - pStmt->pTableMeta->uid = 0; - } - return insGetTableDataCxt(pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid), pStmt->pTableMeta, - &pStmt->pCreateTblReq, pTableCxt); + return insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid), pStmt->pTableMeta, + &pStmt->pCreateTblReq, pTableCxt, false); } + char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&pStmt->targetTableName, tbFName); + if (pStmt->usingTableProcessing) { + pStmt->pTableMeta->uid = 0; + } return insGetTableDataCxt(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), pStmt->pTableMeta, - &pStmt->pCreateTblReq, pTableCxt); + &pStmt->pCreateTblReq, pTableCxt, NULL != pCxt->pComCxt->pStmtCb); } static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt) { @@ -921,6 +921,23 @@ static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpS return TSDB_CODE_SUCCESS; } +int32_t initTableColSubmitData(STableDataCxt* pTableCxt) { + if (0 == (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT)) { + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < pTableCxt->boundColsInfo.numOfBound; ++i) { + SSchema* pSchema = pTableCxt->pMeta->schema[pTableCxt->boundColsInfo.pColIndex[i]]; + SColData* pCol = taosArrayReserve(pTableCxt->pData->aCol, 1); + if (NULL == pCol) { + return TSDB_CODE_OUT_OF_MEMORY; + } + tColDataInit(pCol, pSchema->colId, pSchema->type, 0); + } + + return TSDB_CODE_SUCCESS; +} + // input pStmt->pSql: // 1. [(tag1_name, ...)] ... // 2. VALUES ... | FILE ... @@ -933,6 +950,9 @@ static int32_t parseSchemaClauseBottom(SInsertParseContext* pCxt, SVnodeModifOpS if (TSDB_CODE_SUCCESS == code) { code = parseBoundColumnsClause(pCxt, pStmt, *pTableCxt); } + if (TSDB_CODE_SUCCESS == code) { + code = initTableColSubmitData(*pTableCxt); + } return code; } @@ -1411,7 +1431,6 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif } static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { -#if 0 SParsedDataColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags)); if (NULL == tags) { return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -1426,9 +1445,6 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) pStmt->pVgroupsHashObj = NULL; pStmt->pTableBlockHashObj = NULL; return code; -#else - return TSDB_CODE_FAILED; -#endif } static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index bcfafdda87..476d28a51d 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -32,20 +32,25 @@ typedef struct SKvParam { int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) { int32_t code = TSDB_CODE_SUCCESS; SArray* pVgDataBlocks = NULL; + SVnodeModifOpStmt *pStmt = (SVnodeModifOpStmt*)pQuery->pRoot; + // merge according to vgId if (taosHashGetSize(pBlockHash) > 0) { - code = insMergeTableDataBlocks(pBlockHash, &pVgDataBlocks); + code = insMergeTableDataCxt(pBlockHash, &pVgDataBlocks); } if (TSDB_CODE_SUCCESS == code) { - code = insBuildOutput(pVgHash, pVgDataBlocks, &((SVnodeModifOpStmt*)pQuery->pRoot)->pDataBlocks); + code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks); + } + + if (pStmt->freeArrayFunc) { + pStmt->freeArrayFunc(pVgDataBlocks); } - insDestroyBlockArrayList(pVgDataBlocks); return code; } int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { - STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; + STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; int32_t code = TSDB_CODE_SUCCESS; SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; @@ -64,7 +69,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch goto end; } - SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); + SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta); bool isJson = false; STag* pTag = NULL; @@ -136,10 +141,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch goto end; } - SVCreateTbReq tbReq = {0}; - insBuildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags); - code = insBuildCreateTbMsg(pDataBlock, &tbReq); - tdDestroySVCreateTbReq(&tbReq); + insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pMeta->tableInfo.numOfTags); end: for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) { @@ -155,198 +157,90 @@ end: } int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { - STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; - SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); - int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock); - SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; - SRowBuilder* pBuilder = &pDataBlock->rowBuilder; - SMemParam param = {.rb = pBuilder}; + STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; + SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta); + SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; int32_t rowNum = bind->num; - CHECK_CODE( - insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo)); + for (int c = 0; c < boundInfo->numOfBound; ++c) { + SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; + SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, c); - CHECK_CODE(insAllocateMemForSize(pDataBlock, extendedRowSize * bind->num)); - - for (int32_t r = 0; r < bind->num; ++r) { - STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header - tdSRowResetBuf(pBuilder, row); - - for (int c = 0; c < spd->numOfBound; ++c) { - SSchema* pColSchema = &pSchema[spd->boundColumns[c]]; - - if (bind[c].num != rowNum) { - return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); - } - - param.schema = pColSchema; - insGetSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx); - - if (bind[c].is_null && bind[c].is_null[r]) { - if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL"); - } - - CHECK_CODE(insMemRowAppend(&pBuf, NULL, 0, ¶m)); - } else { - if (bind[c].buffer_type != pColSchema->type) { - return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); - } - - int32_t colLen = pColSchema->bytes; - if (IS_VAR_DATA_TYPE(pColSchema->type)) { - colLen = bind[c].length[r]; - } - - CHECK_CODE(insMemRowAppend(&pBuf, (char*)bind[c].buffer + bind[c].buffer_length * r, colLen, ¶m)); - } - - if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { - TSKEY tsKey = TD_ROW_KEY(row); - insCheckTimestamp(pDataBlock, (const char*)&tsKey); - } - } - // set the null value for the columns that do not assign values - if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { - pBuilder->hasNone = true; - } - tdSRowEnd(pBuilder); -#ifdef TD_DEBUG_PRINT_ROW - STSchema* pSTSchema = tBuildTSchema(pSchema, spd->numOfCols, 1); - tdSRowPrint(row, pSTSchema, __func__); - taosMemoryFree(pSTSchema); -#endif - pDataBlock->size += extendedRowSize; - } - - SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); - return insSetBlockInfo(pBlocks, pDataBlock, bind->num, &pBuf); -} - -int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, - int32_t rowNum) { - STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; - SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); - int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock); - SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; - SRowBuilder* pBuilder = &pDataBlock->rowBuilder; - SMemParam param = {.rb = pBuilder}; - SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; - bool rowStart = (0 == colIdx); - bool rowEnd = ((colIdx + 1) == spd->numOfBound); - - if (rowStart) { - CHECK_CODE( - insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo)); - CHECK_CODE(insAllocateMemForSize(pDataBlock, extendedRowSize * bind->num)); - } - - for (int32_t r = 0; r < bind->num; ++r) { - STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r); // skip the SSubmitBlk header - if (rowStart) { - tdSRowResetBuf(pBuilder, row); - } else { - tdSRowGetBuf(pBuilder, row); - } - - SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx]]; - - if (bind->num != rowNum) { + if (bind[c].num != rowNum) { return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); } - param.schema = pColSchema; - insGetSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, ¶m.toffset, ¶m.colIdx); - - if (bind->is_null && bind->is_null[r]) { - if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL"); - } - - CHECK_CODE(insMemRowAppend(&pBuf, NULL, 0, ¶m)); - } else { - if (bind->buffer_type != pColSchema->type) { - return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); - } - - int32_t colLen = pColSchema->bytes; - if (IS_VAR_DATA_TYPE(pColSchema->type)) { - colLen = bind->length[r]; - } - - CHECK_CODE(insMemRowAppend(&pBuf, (char*)bind->buffer + bind->buffer_length * r, colLen, ¶m)); + if (bind[c].buffer_type != pColSchema->type) { + return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); } - if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { - TSKEY tsKey = TD_ROW_KEY(row); - insCheckTimestamp(pDataBlock, (const char*)&tsKey); - } - - // set the null value for the columns that do not assign values - if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { - pBuilder->hasNone = true; - } - if (rowEnd) { - tdSRowEnd(pBuilder); - } -#ifdef TD_DEBUG_PRINT_ROW - if (rowEnd) { - STSchema* pSTSchema = tBuildTSchema(pSchema, spd->numOfCols, 1); - tdSRowPrint(row, pSTSchema, __func__); - taosMemoryFree(pSTSchema); - } -#endif - } - - if (rowEnd) { - pDataBlock->size += extendedRowSize * bind->num; - - SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); - CHECK_CODE(insSetBlockInfo(pBlocks, pDataBlock, bind->num, &pBuf)); + tColDataAddValueByBind(pCol, bind + c); } return TSDB_CODE_SUCCESS; } -int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields, +int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, + int32_t rowNum) { + STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; + SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta); + SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo; + SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; + SSchema* pColSchema = &pSchema[boundInfo->pColIndex[colIdx]]; + SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, colIdx); + + if (bind[colIdx].num != rowNum) { + return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); + } + + if (bind[colIdx].buffer_type != pColSchema->type) { + return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); + } + + tColDataAddValueByBind(pCol, bind); + + return TSDB_CODE_SUCCESS; +} + +int32_t buildBoundFields(int32_t numOfBound, int16_t boundColumns, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields, uint8_t timePrec) { if (fields) { - *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD)); + *fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD)); if (NULL == *fields) { return TSDB_CODE_OUT_OF_MEMORY; } - SSchema* schema = &pSchema[boundInfo->boundColumns[0]]; + SSchema* schema = &pSchema[boundColumns[0]]; if (TSDB_DATA_TYPE_TIMESTAMP == schema->type) { (*fields)[0].precision = timePrec; } - for (int32_t i = 0; i < boundInfo->numOfBound; ++i) { - schema = &pSchema[boundInfo->boundColumns[i]]; + for (int32_t i = 0; i < numOfBound; ++i) { + schema = &pSchema[boundColumns[i]]; strcpy((*fields)[i].name, schema->name); (*fields)[i].type = schema->type; (*fields)[i].bytes = schema->bytes; } } - *fieldNum = boundInfo->numOfBound; + *fieldNum = numOfBound; return TSDB_CODE_SUCCESS; } int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) { - STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; + STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; if (NULL == tags) { return TSDB_CODE_QRY_APP_ERROR; } - if (pDataBlock->pTableMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pTableMeta->tableType != TSDB_CHILD_TABLE) { + if (pDataBlock->pMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pMeta->tableType != TSDB_CHILD_TABLE) { return TSDB_CODE_TSC_STMT_API_ERROR; } - SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); + SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta); if (tags->numOfBound <= 0) { *fieldNum = 0; *fields = NULL; @@ -354,15 +248,15 @@ int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TA return TSDB_CODE_SUCCESS; } - CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields, 0)); + CHECK_CODE(buildBoundFields(tags->numOfBound, tags->boundColumns, pSchema, fieldNum, fields, 0)); return TSDB_CODE_SUCCESS; } int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fields) { - STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; - SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); - if (pDataBlock->boundColumnInfo.numOfBound <= 0) { + STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; + SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta); + if (pDataBlock->boundColsInfo.numOfBound <= 0) { *fieldNum = 0; if (fields) { *fields = NULL; @@ -371,107 +265,113 @@ int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fiel return TSDB_CODE_SUCCESS; } - CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields, - pDataBlock->pTableMeta->tableInfo.precision)); + CHECK_CODE(buildBoundFields(&pDataBlock->boundColsInfo.numOfBound, pDataBlock->boundColsInfo.pColIndex, pSchema, fieldNum, fields, + pDataBlock->pMeta->tableInfo.precision)); return TSDB_CODE_SUCCESS; } -int32_t qResetStmtDataBlock(void* block, bool keepBuf) { - STableDataBlocks* pBlock = (STableDataBlocks*)block; +int32_t qResetStmtDataBlock(void* block, bool deepClear) { + STableDataCxt* pBlock = (STableDataCxt*)block; + int32_t colNum = taosArrayGetSize(pBlock->pData->aCol); - if (keepBuf) { - taosMemoryFreeClear(pBlock->pData); - pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE); - if (NULL == pBlock->pData) { - return TSDB_CODE_OUT_OF_MEMORY; + for (int32_t i = 0; i < colNum; ++i) { + SColData *pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i); + if (deepClear) { + tColDataDeepClear(pCol); + } else { + tColDataClear(pCol); } - memset(pBlock->pData, 0, sizeof(SSubmitBlk)); - } else { - pBlock->pData = NULL; } - pBlock->ordered = true; - pBlock->prevTS = INT64_MIN; - pBlock->size = sizeof(SSubmitBlk); - pBlock->tsSource = -1; - pBlock->numOfTables = 1; - pBlock->nAllocSize = TSDB_PAYLOAD_SIZE; - pBlock->headerSize = pBlock->size; - pBlock->createTbReqLen = 0; - - memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder)); - return TSDB_CODE_SUCCESS; } -int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) { - *pDst = taosMemoryMalloc(sizeof(STableDataBlocks)); +int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset) { + int32_t code = 0; + + *pDst = taosMemoryCalloc(1, sizeof(STableDataCxt)); if (NULL == *pDst) { return TSDB_CODE_OUT_OF_MEMORY; } - memcpy(*pDst, pSrc, sizeof(STableDataBlocks)); - ((STableDataBlocks*)(*pDst))->cloned = true; + STableDataCxt* pNewCxt = (STableDataCxt*)*pDst; + STableDataCxt* pCxt = (STableDataCxt*)pSrc; + pNewCxt->pSchema = NULL; + pNewCxt->pValues = NULL; - STableDataBlocks* pBlock = (STableDataBlocks*)(*pDst); - if (pBlock->pTableMeta) { - void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pBlock->pTableMeta)); + if (pCxt->pMeta) { + void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pCxt->pMeta)); if (NULL == pNewMeta) { - taosMemoryFreeClear(*pDst); + insDestroyTableDataCxt(*pDst); return TSDB_CODE_OUT_OF_MEMORY; } - memcpy(pNewMeta, pBlock->pTableMeta, TABLE_META_SIZE(pBlock->pTableMeta)); - pBlock->pTableMeta = pNewMeta; + memcpy(pNewMeta, pCxt->pMeta, TABLE_META_SIZE(pCxt->pMeta)); + pNewCxt->pMeta = pNewMeta; } - return qResetStmtDataBlock(*pDst, false); + memcpy(&pNewCxt->boundColsInfo, &pCxt->boundColsInfo, sizeof(pCxt->boundColsInfo)); + pNewCxt->boundColsInfo.pColIndex = NULL; + + if (pCxt->boundColsInfo.pColIndex) { + void* pNewColIdx = taosMemoryMalloc(pCxt->boundColsInfo.numOfBound * sizeof(*pCxt->boundColsInfo.pColIndex)); + if (NULL == pNewColIdx) { + insDestroyTableDataCxt(*pDst); + return TSDB_CODE_OUT_OF_MEMORY; + } + memcpy(pNewColIdx, pCxt->boundColsInfo.pColIndex, pCxt->boundColsInfo.numOfBound * sizeof(*pCxt->boundColsInfo.pColIndex)); + pNewCxt->boundColsInfo.pColIndex = pNewColIdx; + } + + if (pCxt->pData) { + SSubmitTbData *pNewTb = (SSubmitTbData*)taosMemoryMalloc(sizeof(SSubmitTbData)); + if (NULL == pNewTb) { + insDestroyTableDataCxt(*pDst); + return TSDB_CODE_OUT_OF_MEMORY; + } + + memcpy(pNewTb, pCxt->pData, sizeof(*pCxt->pData)); + pNewTb->pCreateTbReq = NULL; + + pNewTb->aCol = taosArrayDup(pCxt->pData->aCol, NULL); + if (NULL == pNewTb) { + insDestroyTableDataCxt(*pDst); + return TSDB_CODE_OUT_OF_MEMORY; + } + + if (reset) { + code = qResetStmtDataBlock(*pDst, true); + } + + pNewCxt->pData = pNewTb; + } + + + return code; } int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) { - int32_t code = qCloneStmtDataBlock(pDst, pSrc); + int32_t code = qCloneStmtDataBlock(pDst, pSrc, false); if (code) { return code; } - STableDataBlocks* pBlock = (STableDataBlocks*)*pDst; - pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize); - if (NULL == pBlock->pData) { - qFreeStmtDataBlock(pBlock); - return TSDB_CODE_OUT_OF_MEMORY; + STableDataCxt* pBlock = (STableDataCxt*)*pDst; + if (pBlock->pMeta) { + pBlock->pMeta->uid = uid; + pBlock->pMeta->vgId = vgId; } - pBlock->vgId = vgId; - - if (pBlock->pTableMeta) { - pBlock->pTableMeta->uid = uid; - pBlock->pTableMeta->vgId = vgId; - } - - memset(pBlock->pData, 0, sizeof(SSubmitBlk)); - return TSDB_CODE_SUCCESS; } -STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataBlocks*)pDataBlock)->pTableMeta; } - -void qFreeStmtDataBlock(void* pDataBlock) { - if (pDataBlock == NULL) { - return; - } - - taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pTableMeta); - taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData); - taosMemoryFreeClear(pDataBlock); -} +STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataCxt*)pDataBlock)->pMeta; } void qDestroyStmtDataBlock(void* pBlock) { if (pBlock == NULL) { return; } - STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; - - pDataBlock->cloned = false; - insDestroyDataBlock(pDataBlock); + STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; + insDestroyTableDataCxt(pDataBlock); } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 1592f9d7e1..ad3e9d169f 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -839,7 +839,7 @@ int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const c return code; } -int32_t insFindCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) { +int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchema) { while (start < end) { if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) { return start; @@ -956,7 +956,7 @@ int32_t insBuildOutput(SHashObj* pVgroupsHashObj, SArray* pVgDataBlocks, SArray* return TSDB_CODE_SUCCESS; } -static void initBoundCols(int32_t ncols, int32_t* pBoundCols) { +static void initBoundCols(int32_t ncols, int16_t* pBoundCols) { for (int32_t i = 0; i < ncols; ++i) { pBoundCols[i] = i; } @@ -973,7 +973,7 @@ static void initColValues(STableMeta* pTableMeta, SArray* pValues) { int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) { pInfo->numOfCols = numOfBound; pInfo->numOfBound = numOfBound; - pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int32_t)); + pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t)); if (NULL == pInfo->pColIndex) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -983,7 +983,7 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) { static void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); } -static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput) { +static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput, bool colMode) { STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt)); if (NULL == pTableCxt) { return TSDB_CODE_OUT_OF_MEMORY; @@ -1019,14 +1019,22 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat code = TSDB_CODE_OUT_OF_MEMORY; } else { pTableCxt->pData->flags = NULL != *pCreateTbReq ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0; + pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0; pTableCxt->pData->suid = pTableMeta->suid; pTableCxt->pData->uid = pTableMeta->uid; pTableCxt->pData->sver = pTableMeta->sversion; pTableCxt->pData->pCreateTbReq = *pCreateTbReq; *pCreateTbReq = NULL; - pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES); - if (NULL == pTableCxt->pData->aRowP) { - code = TSDB_CODE_OUT_OF_MEMORY; + if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData)); + if (NULL == pTableCxt->pData->aCol) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } else { + pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES); + if (NULL == pTableCxt->pData->aRowP) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } } } @@ -1041,12 +1049,12 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat } int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta, - SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt) { + SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode) { *pTableCxt = taosHashGet(pHash, id, idLen); if (NULL != *pTableCxt) { return TSDB_CODE_SUCCESS; } - int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt); + int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode); if (TSDB_CODE_SUCCESS == code) { code = taosHashPut(pHash, id, idLen, pTableCxt, POINTER_BYTES); } @@ -1162,6 +1170,18 @@ static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHa return code; } +int insColDataComp(const void* lp, const void* rp) { + SColData* pLeft = (SColData*)lp; + SColData* pRight = (SColData*)rp; + if (pLeft->cid < pRight->cid) { + return -1; + } else if (pLeft->cid > pRight->cid) { + return 1; + } + + return 0; +} + int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES); @@ -1172,11 +1192,31 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { } int32_t code = TSDB_CODE_SUCCESS; - + bool colFormat = false; + void* p = taosHashIterate(pTableHash, NULL); + if (p) { + STableDataCxt* pTableCxt = *(STableDataCxt**)p; + colFormat = (0 != (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT)); + } + while (TSDB_CODE_SUCCESS == code && NULL != p) { STableDataCxt* pTableCxt = *(STableDataCxt**)p; - code = tRowMergeSort(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0); + if (colFormat) { + taosArraySort(pTableCxt->pData->aCol, insColDataComp); + + int32_t colNum = taosArrayGetSize(pTableCxt->pData->aCol); + for (int32_t i = 0; i < colNum; ++i) { + SColData *pCol = taosArrayGet(pTableCxt->pData->aCol, i); + code = tColDataSortMerge(pCol); + if (code) { + break; + } + } + } else { + code = tRowMergeSort(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0); + } + if (TSDB_CODE_SUCCESS == code) { SVgroupDataCxt* pVgCxt = NULL; int32_t vgId = pTableCxt->pMeta->vgId;