diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 8e26fa98c9..df2170423f 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -136,6 +136,7 @@ typedef struct SParseContext { const char* pDbname; void *pRpc; const char* pClusterId; + struct SCatalog* pCatalog; const SEpSet* pEpSet; int64_t id; // query id, generated by uuid generator int8_t schemaAttached; // denote if submit block is built with table schema or not diff --git a/source/libs/parser/inc/dataBlockMgt.h b/source/libs/parser/inc/dataBlockMgt.h index 7d2e6e3aec..fbd92b89f8 100644 --- a/source/libs/parser/inc/dataBlockMgt.h +++ b/source/libs/parser/inc/dataBlockMgt.h @@ -170,10 +170,11 @@ int32_t schemaIdxCompar(const void *lhs, const void *rhs); int32_t boundIdxCompar(const void *lhs, const void *rhs); void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols); void destroyBoundColumnInfo(SParsedDataColInfo* pColList); +void destroyBlockArrayList(SArray* pDataBlockList); int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen); int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows); int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList); -int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap); +int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks); #endif // TDENGINE_DATABLOCKMGT_H diff --git a/source/libs/parser/src/dataBlockMgt.c b/source/libs/parser/src/dataBlockMgt.c index 3138f0c1d8..d37ac74d21 100644 --- a/source/libs/parser/src/dataBlockMgt.c +++ b/source/libs/parser/src/dataBlockMgt.c @@ -269,19 +269,17 @@ void destroyDataBlock(STableDataBlocks* pDataBlock) { tfree(pDataBlock); } -void* destroyBlockArrayList(SArray* pDataBlockList) { +void destroyBlockArrayList(SArray* pDataBlockList) { if (pDataBlockList == NULL) { - return NULL; + return; } size_t size = taosArrayGetSize(pDataBlockList); for (int32_t i = 0; i < size; i++) { - void* d = taosArrayGetP(pDataBlockList, i); - destroyDataBlock(d); + destroyDataBlock(taosArrayGetP(pDataBlockList, i)); } taosArrayDestroy(pDataBlockList); - return NULL; } // data block is disordered, sort it in ascending order @@ -298,6 +296,7 @@ void sortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) { int32_t i = 0; int32_t j = 1; + // delete rows with timestamp conflicts while (j < pBlocks->numOfRows) { TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i); TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j); @@ -430,7 +429,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB char* p = pTableDataBlock->pData + sizeof(SSubmitBlk); pBlock->dataLen = 0; - int32_t numOfRows = htons(pBlock->numOfRows); + int32_t numOfRows = pBlock->numOfRows; if (isRawPayload) { for (int32_t i = 0; i < numOfRows; ++i) { @@ -467,18 +466,10 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB } } - int32_t len = pBlock->dataLen + pBlock->schemaLen; - pBlock->dataLen = htonl(pBlock->dataLen); - pBlock->schemaLen = htonl(pBlock->schemaLen); - - return len; + return pBlock->dataLen + pBlock->schemaLen; } -static void extractTableNameList(SHashObj* pHashObj, bool freeBlockMap) { - // todo -} - -int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap) { +int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks) { const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); int code = 0; bool isRawPayload = IS_RAW_PAYLOAD(payloadType); @@ -537,24 +528,13 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t (isRawPayload ? (pOneTableBlock->rowSize + expandSize) : getExtendedRowSize(pOneTableBlock)) + sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta); - pBlocks->tid = htonl(pBlocks->tid); - pBlocks->uid = htobe64(pBlocks->uid); - pBlocks->sversion = htonl(pBlocks->sversion); - pBlocks->numOfRows = htons(pBlocks->numOfRows); - pBlocks->schemaLen = 0; - // erase the empty space reserved for binary data int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, schemaAttached, isRawPayload); assert(finalLen <= len); dataBuf->size += (finalLen + sizeof(SSubmitBlk)); assert(dataBuf->size <= dataBuf->nAllocSize); - - // the length does not include the SSubmitBlk structure - pBlocks->dataLen = htonl(finalLen); dataBuf->numOfTables += 1; - - pBlocks->numOfRows = 0; } p = taosHashIterate(pHashObj, p); @@ -565,12 +545,10 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t pOneTableBlock = *p; } - extractTableNameList(pHashObj, freeBlockMap); - // free the table data blocks; taosHashCleanup(pVnodeDataBlockHashList); tfree(blkKeyInfo.pKeyTuple); - + *pVgDataBlocks = pVnodeDataBlockList; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index 877ecb2c85..2481896f09 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -31,33 +31,16 @@ pSql += index; \ } while (0) +#define NEXT_TOKEN_KEEP_SQL(pSql, sToken, index) \ + do { \ + sToken = tStrGetToken(pSql, &index, false); \ + } while (0) + #define CHECK_CODE(expr) \ do { \ int32_t code = expr; \ if (TSDB_CODE_SUCCESS != code) { \ - terrno = code; \ - return terrno; \ - } \ - } while (0) - -#define CHECK_CODE_1(expr, destroy) \ - do { \ - int32_t code = expr; \ - if (TSDB_CODE_SUCCESS != code) { \ - (void)destroy; \ - terrno = code; \ - return terrno; \ - } \ - } while (0) - -#define CHECK_CODE_2(expr, destroy1, destroy2) \ - do { \ - int32_t code = expr; \ - if (TSDB_CODE_SUCCESS != code) { \ - (void)destroy1; \ - (void)destroy2; \ - terrno = code; \ - return terrno; \ + return code; \ } \ } while (0) @@ -70,9 +53,12 @@ typedef struct SInsertParseContext { SParseContext* pComCxt; const char* pSql; SMsgBuf msg; - struct SCatalog* pCatalog; STableMeta* pTableMeta; - SHashObj* pTableBlockHashObj; // data block for each table. need release + SParsedDataColInfo tags; + SKVRowBuilder tagsBuilder; + SHashObj* pTableBlockHashObj; + SArray* pTableDataBlocks; + SArray* pVgDataBlocks; int32_t totalNum; SInsertStmtInfo* pOutput; } SInsertParseContext; @@ -151,14 +137,6 @@ static int32_t toInt64(const char* z, int16_t type, int32_t n, int64_t* value, b return ret; } -static int32_t createInsertStmtInfo(SInsertStmtInfo **pInsertInfo) { - *pInsertInfo = calloc(1, sizeof(SQueryStmtInfo)); - if (NULL == *pInsertInfo) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - return TSDB_CODE_SUCCESS; -} - static int32_t skipInsertInto(SInsertParseContext* pCxt) { SToken sToken; NEXT_TOKEN(pCxt->pSql, sToken); @@ -194,7 +172,7 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { char fullDbName[TSDB_FULL_DB_NAME_LEN] = {0}; char tableName[TSDB_TABLE_NAME_LEN] = {0}; CHECK_CODE(buildName(pCxt, pTname, fullDbName, tableName)); - CHECK_CODE(catalogGetTableMeta(pCxt->pCatalog, pCxt->pComCxt->pRpc, pCxt->pComCxt->pEpSet, fullDbName, tableName, &pCxt->pTableMeta)); + CHECK_CODE(catalogGetTableMeta(pCxt->pComCxt->pCatalog, pCxt->pComCxt->pRpc, pCxt->pComCxt->pEpSet, fullDbName, tableName, &pCxt->pTableMeta)); return TSDB_CODE_SUCCESS; } @@ -209,6 +187,51 @@ static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pS return -1; } +static void fillMsgHeader(SVgDataBlocks* dst) { + SMsgDesc* desc = (SMsgDesc*)dst->pData; + desc->numOfVnodes = htonl(1); + SSubmitMsg* submit = (SSubmitMsg*)(desc + 1); + submit->header.vgId = htonl(dst->vgId); + submit->header.contLen = htonl(dst->size - sizeof(SMsgDesc)); + submit->length = submit->header.contLen; + submit->numOfBlocks = htonl(dst->numOfTables); + SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); + int32_t numOfBlocks = dst->numOfTables; + while (numOfBlocks--) { + int32_t dataLen = blk->dataLen; + blk->uid = htobe64(blk->uid); + blk->tid = htonl(blk->tid); + blk->padding = htonl(blk->padding); + blk->sversion = htonl(blk->sversion); + blk->dataLen = htonl(blk->dataLen); + blk->schemaLen = htonl(blk->schemaLen); + blk->numOfRows = htons(blk->numOfRows); + blk = (SSubmitBlk*)(blk->data + dataLen); + } +} + +static int32_t buildOutput(SInsertParseContext* pCxt) { + size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks); + pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); + if (NULL == pCxt->pOutput->pDataBlocks) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + for (size_t i = 0; i < numOfVg; ++i) { + STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i); + SVgDataBlocks* dst = calloc(1, sizeof(SVgDataBlocks)); + if (NULL == dst) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + dst->vgId = src->vgId; + dst->numOfTables = src->numOfTables; + dst->size = src->size; + SWAP(dst->pData, src->pData, char*); + fillMsgHeader(dst); + taosArrayPush(pCxt->pOutput->pDataBlocks, &dst); + } + return TSDB_CODE_SUCCESS; +} + static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start) { // once the data block is disordered, we do NOT keep previous timestamp any more if (!pDataBlocks->ordered) { @@ -219,16 +242,14 @@ static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start) if (k == INT64_MIN) { if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) { - return -1; - } else if (pDataBlocks->tsSource == -1) { - pDataBlocks->tsSource = TSDB_USE_SERVER_TS; + return TSDB_CODE_FAILED; // client time/server time can not be mixed } + pDataBlocks->tsSource = TSDB_USE_SERVER_TS; } else { if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) { - return -1; // client time/server time can not be mixed - } else if (pDataBlocks->tsSource == -1) { - pDataBlocks->tsSource = TSDB_USE_CLI_TS; + return TSDB_CODE_FAILED; // client time/server time can not be mixed } + pDataBlocks->tsSource = TSDB_USE_CLI_TS; } if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) { @@ -608,26 +629,24 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* } // pSql -> tag1_value, ...) -static int32_t parseTagsClause(SInsertParseContext* pCxt, SParsedDataColInfo* pSpd, SSchema* pTagsSchema, uint8_t precision) { - SKVRowBuilder kvRowBuilder = {0}; - if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { - destroyBoundColumnInfo(pSpd); +static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema, uint8_t precision) { + if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - SKvParam param = {.builder = &kvRowBuilder}; + SKvParam param = {.builder = &pCxt->tagsBuilder}; SToken sToken; char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" - for (int i = 0; i < pSpd->numOfBound; ++i) { + for (int i = 0; i < pCxt->tags.numOfBound; ++i) { NEXT_TOKEN(pCxt->pSql, sToken); - SSchema* pSchema = &pTagsSchema[pSpd->boundedColumns[i]]; + SSchema* pSchema = &pTagsSchema[pCxt->tags.boundedColumns[i]]; param.schema = pSchema; - CHECK_CODE_2(parseOneValue(pCxt, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, ¶m), tdDestroyKVRowBuilder(&kvRowBuilder), destroyBoundColumnInfo(pSpd)); + CHECK_CODE(parseOneValue(pCxt, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, ¶m)); } - destroyBoundColumnInfo(pSpd); - SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); - tdDestroyKVRowBuilder(&kvRowBuilder); + destroyBoundColumnInfo(&pCxt->tags); + SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder); + tdDestroyKVRowBuilder(&pCxt->tagsBuilder); if (NULL == row) { return buildInvalidOperationMsg(&pCxt->msg, "tag value expected"); } @@ -650,13 +669,12 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) } SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta); - SParsedDataColInfo spd = {0}; - setBoundColumnInfo(&spd, pTagsSchema, getNumOfTags(pCxt->pTableMeta)); + setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta)); // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...) NEXT_TOKEN(pCxt->pSql, sToken); if (TK_LP == sToken.type) { - CHECK_CODE_1(parseBoundColumns(pCxt, &spd, pTagsSchema), destroyBoundColumnInfo(&spd)); + CHECK_CODE(parseBoundColumns(pCxt, &pCxt->tags, pTagsSchema)); NEXT_TOKEN(pCxt->pSql, sToken); } @@ -668,7 +686,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) if (TK_LP != sToken.type) { return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z); } - CHECK_CODE(parseTagsClause(pCxt, &spd, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision)); + CHECK_CODE(parseTagsClause(pCxt, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision)); return TSDB_CODE_SUCCESS; } @@ -732,10 +750,12 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" SToken sToken; while (1) { - NEXT_TOKEN(pCxt->pSql, sToken); + int32_t index = 0; + NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index); if (TK_LP != sToken.type) { break; } + pCxt->pSql += index; if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) { int32_t tSize; @@ -815,7 +835,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { if (TK_LP == sToken.type) { // pSql -> field1_name, ...) - CHECK_CODE_1(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)), destroyBoundColumnInfo(&dataBuf->boundColumnInfo)); + CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta))); NEXT_TOKEN(pCxt->pSql, sToken); } @@ -842,9 +862,18 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { } // merge according to vgId if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCxt->pTableBlockHashObj) > 0) { - CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->schemaAttache, pCxt->pOutput->payloadType, true)); + CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->schemaAttache, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); } - return TSDB_CODE_SUCCESS; + return buildOutput(pCxt); +} + +static void destroyInsertParseContext(SInsertParseContext* pCxt) { + tfree(pCxt->pTableMeta); + destroyBoundColumnInfo(&pCxt->tags); + tdDestroyKVRowBuilder(&pCxt->tagsBuilder); + taosHashCleanup(pCxt->pTableBlockHashObj); + destroyBlockArrayList(pCxt->pTableDataBlocks); + destroyBlockArrayList(pCxt->pVgDataBlocks); } // INSERT INTO @@ -854,26 +883,30 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path // [...]; int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) { - CHECK_CODE(createInsertStmtInfo(pInfo)); - SInsertParseContext context = { .pComCxt = pContext, .pSql = pContext->pSql, .msg = {.buf = pContext->pMsg, .len = pContext->msgLen}, - .pCatalog = NULL, .pTableMeta = NULL, .pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false), .totalNum = 0, - .pOutput = *pInfo + .pOutput = calloc(1, sizeof(SInsertStmtInfo)) }; - if (NULL == context.pTableBlockHashObj) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; + if (NULL == context.pTableBlockHashObj || NULL == context.pOutput) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; } - CHECK_CODE(catalogGetHandle(pContext->pClusterId, &context.pCatalog)); - CHECK_CODE(skipInsertInto(&context)); - CHECK_CODE(parseInsertBody(&context)); + *pInfo = context.pOutput; + context.pOutput->schemaAttache = pContext->schemaAttached; + context.pOutput->payloadType = PAYLOAD_TYPE_KV; - return TSDB_CODE_SUCCESS; + int32_t code = skipInsertInto(&context); + if (TSDB_CODE_SUCCESS == code) { + code = parseInsertBody(&context); + } + destroyInsertParseContext(&context); + terrno = code; + return (TSDB_CODE_SUCCESS == code ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED); } diff --git a/source/libs/parser/test/CMakeLists.txt b/source/libs/parser/test/CMakeLists.txt index feae008273..e722b873b6 100644 --- a/source/libs/parser/test/CMakeLists.txt +++ b/source/libs/parser/test/CMakeLists.txt @@ -17,5 +17,3 @@ TARGET_LINK_LIBRARIES( parserTest PUBLIC os util common parser catalog transport gtest function planner qcom ) - -TARGET_LINK_OPTIONS(parserTest PRIVATE -Wl,-wrap,malloc) diff --git a/source/libs/parser/test/insertParserTest.cpp b/source/libs/parser/test/insertParserTest.cpp new file mode 100644 index 0000000000..433d189a45 --- /dev/null +++ b/source/libs/parser/test/insertParserTest.cpp @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +#include "insertParser.h" +// #include "mockCatalog.h" + +using namespace std; +using namespace testing; + +namespace { + string toString(int32_t code) { + return tstrerror(code); + } +} + +// syntax: +// INSERT INTO +// tb_name +// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] +// [(field1_name, ...)] +// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path +// [...]; +class InsertTest : public Test { +protected: + void setDatabase(const string& acctId, const string& db) { + acctId_ = acctId; + db_ = db; + } + + void bind(const char* sql) { + reset(); + cxt_.pAcctId = acctId_.c_str(); + cxt_.pDbname = db_.c_str(); + strcpy(sqlBuf_, sql); + cxt_.sqlLen = strlen(sql); + sqlBuf_[cxt_.sqlLen] = '\0'; + cxt_.pSql = sqlBuf_; + + } + + int32_t run() { + code_ = parseInsertSql(&cxt_, &res_); + if (code_ != TSDB_CODE_SUCCESS) { + cout << "code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; + } + return code_; + } + + SInsertStmtInfo* reslut() { + return res_; + } + + void dumpReslut() { + size_t num = taosArrayGetSize(res_->pDataBlocks); + cout << "schemaAttache:" << (int32_t)res_->schemaAttache << ", payloadType:" << (int32_t)res_->payloadType << ", insertType:" << res_->insertType << ", numOfVgs:" << num << endl; + for (size_t i = 0; i < num; ++i) { + SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i); + cout << "vgId:" << vg->vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl; + SMsgDesc* desc = (SMsgDesc*)(vg->pData); + cout << "numOfVnodes:" << ntohl(desc->numOfVnodes) << endl; + SSubmitMsg* submit = (SSubmitMsg*)(desc + 1); + cout << "length:" << ntohl(submit->length) << ", numOfBlocks:" << ntohl(submit->numOfBlocks) << endl; + int32_t numOfBlocks = ntohl(submit->numOfBlocks); + SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); + for (int32_t i = 0; i < numOfBlocks; ++i) { + cout << "Block:" << i << endl; + cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << ntohl(blk->tid) << ", padding:" << ntohl(blk->padding) << ", sversion:" << ntohl(blk->sversion) + << ", dataLen:" << ntohl(blk->dataLen) << ", schemaLen:" << ntohl(blk->schemaLen) << ", numOfRows:" << ntohs(blk->numOfRows) << endl; + blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen)); + } + } + } + + void checkReslut(int32_t numOfTables, int16_t numOfRows1, int16_t numOfRows2 = -1) { + ASSERT_EQ(res_->schemaAttache, 0); + ASSERT_EQ(res_->payloadType, PAYLOAD_TYPE_KV); + ASSERT_EQ(res_->insertType, TSDB_QUERY_TYPE_INSERT); + size_t num = taosArrayGetSize(res_->pDataBlocks); + ASSERT_GE(num, 0); + for (size_t i = 0; i < num; ++i) { + SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i); + ASSERT_EQ(vg->numOfTables, numOfTables); + ASSERT_GE(vg->size, 0); + SMsgDesc* desc = (SMsgDesc*)(vg->pData); + ASSERT_EQ(ntohl(desc->numOfVnodes), 1); + SSubmitMsg* submit = (SSubmitMsg*)(desc + 1); + ASSERT_GE(ntohl(submit->length), 0); + ASSERT_GE(ntohl(submit->numOfBlocks), 0); + int32_t numOfBlocks = ntohl(submit->numOfBlocks); + SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); + for (int32_t i = 0; i < numOfBlocks; ++i) { + ASSERT_EQ(ntohs(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1))); + blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen)); + } + } + } + +private: + static const int max_err_len = 1024; + static const int max_sql_len = 1024 * 1024; + + void reset() { + memset(&cxt_, 0, sizeof(cxt_)); + memset(errMagBuf_, 0, max_err_len); + cxt_.pMsg = errMagBuf_; + cxt_.msgLen = max_err_len; + code_ = TSDB_CODE_SUCCESS; + res_ = nullptr; + } + + string acctId_; + string db_; + char errMagBuf_[max_err_len]; + char sqlBuf_[max_sql_len]; + SParseContext cxt_; + int32_t code_; + SInsertStmtInfo* res_; +}; + +// INSERT INTO tb_name VALUES (field1_value, ...) +TEST_F(InsertTest, singleTableSingleRowTest) { + setDatabase("root", "test"); + + bind("insert into t1 values (now, 1, \"beijing\")"); + ASSERT_EQ(run(), TSDB_CODE_SUCCESS); + dumpReslut(); + checkReslut(1, 1); +} + +// INSERT INTO tb_name VALUES (field1_value, ...)(field1_value, ...) +TEST_F(InsertTest, singleTableMultiRowTest) { + setDatabase("root", "test"); + + bind("insert into t1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"); + ASSERT_EQ(run(), TSDB_CODE_SUCCESS); + dumpReslut(); + checkReslut(1, 3); +} + +// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...) +TEST_F(InsertTest, multiTableSingleRowTest) { + setDatabase("root", "test"); + + bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")"); + ASSERT_EQ(run(), TSDB_CODE_SUCCESS); + dumpReslut(); + checkReslut(2, 1); +} + +// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...) +TEST_F(InsertTest, multiTableMultiRowTest) { + setDatabase("root", "test"); + + bind("insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")" + " st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")"); + ASSERT_EQ(run(), TSDB_CODE_SUCCESS); + dumpReslut(); + checkReslut(2, 3, 2); +} + +TEST_F(InsertTest, toleranceTest) { + setDatabase("root", "test"); + + bind("insert into"); + ASSERT_NE(run(), TSDB_CODE_SUCCESS); + bind("insert into t"); + ASSERT_NE(run(), TSDB_CODE_SUCCESS); +} diff --git a/source/libs/parser/test/insertTest.cpp b/source/libs/parser/test/insertTest.cpp deleted file mode 100644 index 85db46d7bf..0000000000 --- a/source/libs/parser/test/insertTest.cpp +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include - -#include "insertParser.h" -#include "mockCatalog.h" - -using namespace std; -using namespace testing; - -namespace { - string toString(int32_t code) { - return tstrerror(code); - } -} - -extern "C" { - -#include - -void *__real_malloc(size_t); - -void *__wrap_malloc(size_t c) { - // printf("My MALLOC called: %d\n", c); - // void *array[32]; - // int size = backtrace(array, 32); - // char **symbols = backtrace_symbols(array, size); - // for (int i = 0; i < size; ++i) { - // cout << symbols[i] << endl; - // } - // free(symbols); - - return __real_malloc(c); -} - -} - -// syntax: -// INSERT INTO -// tb_name -// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] -// [(field1_name, ...)] -// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path -// [...]; -class InsertTest : public Test { -protected: - void setDatabase(const string& acctId, const string& db) { - acctId_ = acctId; - db_ = db; - } - - void bind(const char* sql) { - reset(); - cxt_.pAcctId = acctId_.c_str(); - cxt_.pDbname = db_.c_str(); - strcpy(sqlBuf_, sql); - cxt_.sqlLen = strlen(sql); - sqlBuf_[cxt_.sqlLen] = '\0'; - cxt_.pSql = sqlBuf_; - - } - - int32_t run() { - code_ = parseInsertSql(&cxt_, &res_); - if (code_ != TSDB_CODE_SUCCESS) { - cout << "code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; - } - return code_; - } - - SInsertStmtInfo* reslut() { - return res_; - } - -private: - static const int max_err_len = 1024; - static const int max_sql_len = 1024 * 1024; - - void reset() { - memset(&cxt_, 0, sizeof(cxt_)); - memset(errMagBuf_, 0, max_err_len); - cxt_.pMsg = errMagBuf_; - cxt_.msgLen = max_err_len; - code_ = TSDB_CODE_SUCCESS; - res_ = nullptr; - } - - string acctId_; - string db_; - char errMagBuf_[max_err_len]; - char sqlBuf_[max_sql_len]; - SParseContext cxt_; - int32_t code_; - SInsertStmtInfo* res_; -}; - -// INSERT INTO tb_name VALUES (field1_value, ...) -TEST_F(InsertTest, simpleTest) { - setDatabase("root", "test"); - - bind("insert into t1 values (now, 1, \"wxy\")"); - ASSERT_EQ(run(), TSDB_CODE_SUCCESS); - SInsertStmtInfo* res = reslut(); - // todo check - ASSERT_EQ(res->insertType, TSDB_QUERY_TYPE_INSERT); - // ASSERT_EQ(taosArrayGetSize(res->pDataBlocks), 1); -} - -TEST_F(InsertTest, toleranceTest) { - setDatabase("root", "test"); - - bind("insert into"); - ASSERT_NE(run(), TSDB_CODE_SUCCESS); - bind("insert into t"); - ASSERT_NE(run(), TSDB_CODE_SUCCESS); -} diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp index dc341f6af7..fdcb6e0433 100644 --- a/source/libs/parser/test/mockCatalog.cpp +++ b/source/libs/parser/test/mockCatalog.cpp @@ -25,15 +25,15 @@ namespace { void generateTestT1(MockCatalogService* mcs) { ITableBuilder& builder = mcs->createTableBuilder("root.test", "t1", TSDB_NORMAL_TABLE, 3) .setPrecision(TSDB_TIME_PRECISION_MILLI).setVgid(1).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP) - .addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 10); + .addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 20); builder.done(); } void generateTestST1(MockCatalogService* mcs) { ITableBuilder& builder = mcs->createTableBuilder("root.test", "st1", TSDB_SUPER_TABLE, 3, 2) .setPrecision(TSDB_TIME_PRECISION_MILLI).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP) - .addTag("tag1", TSDB_DATA_TYPE_INT).addTag("tag2", TSDB_DATA_TYPE_BINARY, 10) - .addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 10); + .addTag("tag1", TSDB_DATA_TYPE_INT).addTag("tag2", TSDB_DATA_TYPE_BINARY, 20) + .addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 20); builder.done(); mcs->createSubTable("root.test", "st1", "st1s1", 1); mcs->createSubTable("root.test", "st1", "st1s2", 2); diff --git a/source/libs/parser/test/mockCatalog.h b/source/libs/parser/test/mockCatalog.h index 35eb1ddcd9..f8e80a48aa 100644 --- a/source/libs/parser/test/mockCatalog.h +++ b/source/libs/parser/test/mockCatalog.h @@ -22,8 +22,4 @@ void initMetaDataEnv(); void generateMetaData(); void destroyMetaDataEnv(); -// mock -// int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle); -// int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta); - #endif // MOCK_CATALOG_H diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index e7271c3cb2..ad366e8f74 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -33,6 +33,7 @@ public: col->colId = colId_++; col->bytes = bytes; strcpy(col->name, name.c_str()); + rowsize_ += bytes; return *this; } @@ -147,11 +148,11 @@ public: for (const auto& db : meta_) { std::cout << "Databse:" << db.first << std::endl; - std::cout << SH("Table") << SH("Type") << SH("Precision") << IH("Vgid") << std::endl; + std::cout << SH("Table") << SH("Type") << SH("Precision") << IH("Vgid") << IH("RowSize") << std::endl; std::cout << SL(3, 1) << std::endl; for (const auto& table : db.second) { const auto& schema = table.second->schema; - std::cout << SF(table.first) << SF(ttToString(schema->tableType)) << SF(pToString(schema->tableInfo.precision)) << IF(schema->vgId) << std::endl; + std::cout << SF(table.first) << SF(ttToString(schema->tableType)) << SF(pToString(schema->tableInfo.precision)) << IF(schema->vgId) << IF(schema->tableInfo.rowSize) << std::endl; } std::cout << std::endl; } diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 1d29e48e30..0696a5b24e 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -221,7 +221,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); size_t size = taosArrayGetSize(pPlanNode->pChildren); for(int32_t i = 0; i < size; ++i) { - SPhyNode* child = createPhyNode(pCxt, taosArrayGet(pPlanNode->pChildren, i)); + SPhyNode* child = createPhyNode(pCxt, taosArrayGetP(pPlanNode->pChildren, i)); child->pParent = node; taosArrayPush(node->pChildren, &child); }