TD-10674 insert unit test with multiple scenarios
This commit is contained in:
parent
84d102b5e7
commit
e90ef66049
|
@ -136,6 +136,7 @@ typedef struct SParseContext {
|
|||
const char* pDbname;
|
||||
void *pRpc;
|
||||
const char* pClusterId;
|
||||
struct SCatalog* pCatalog;
|
||||
const SEpSet* pEpSet;
|
||||
int64_t id; // query id, generated by uuid generator
|
||||
int8_t schemaAttached; // denote if submit block is built with table schema or not
|
||||
|
|
|
@ -170,10 +170,11 @@ int32_t schemaIdxCompar(const void *lhs, const void *rhs);
|
|||
int32_t boundIdxCompar(const void *lhs, const void *rhs);
|
||||
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols);
|
||||
void destroyBoundColumnInfo(SParsedDataColInfo* pColList);
|
||||
void destroyBlockArrayList(SArray* pDataBlockList);
|
||||
int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen);
|
||||
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
|
||||
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
|
||||
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList);
|
||||
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap);
|
||||
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks);
|
||||
|
||||
#endif // TDENGINE_DATABLOCKMGT_H
|
||||
|
|
|
@ -269,19 +269,17 @@ void destroyDataBlock(STableDataBlocks* pDataBlock) {
|
|||
tfree(pDataBlock);
|
||||
}
|
||||
|
||||
void* destroyBlockArrayList(SArray* pDataBlockList) {
|
||||
void destroyBlockArrayList(SArray* pDataBlockList) {
|
||||
if (pDataBlockList == NULL) {
|
||||
return NULL;
|
||||
return;
|
||||
}
|
||||
|
||||
size_t size = taosArrayGetSize(pDataBlockList);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
void* d = taosArrayGetP(pDataBlockList, i);
|
||||
destroyDataBlock(d);
|
||||
destroyDataBlock(taosArrayGetP(pDataBlockList, i));
|
||||
}
|
||||
|
||||
taosArrayDestroy(pDataBlockList);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// data block is disordered, sort it in ascending order
|
||||
|
@ -298,6 +296,7 @@ void sortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) {
|
|||
int32_t i = 0;
|
||||
int32_t j = 1;
|
||||
|
||||
// delete rows with timestamp conflicts
|
||||
while (j < pBlocks->numOfRows) {
|
||||
TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
|
||||
TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
|
||||
|
@ -430,7 +429,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB
|
|||
|
||||
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
|
||||
pBlock->dataLen = 0;
|
||||
int32_t numOfRows = htons(pBlock->numOfRows);
|
||||
int32_t numOfRows = pBlock->numOfRows;
|
||||
|
||||
if (isRawPayload) {
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
|
@ -467,18 +466,10 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB
|
|||
}
|
||||
}
|
||||
|
||||
int32_t len = pBlock->dataLen + pBlock->schemaLen;
|
||||
pBlock->dataLen = htonl(pBlock->dataLen);
|
||||
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||
|
||||
return len;
|
||||
return pBlock->dataLen + pBlock->schemaLen;
|
||||
}
|
||||
|
||||
static void extractTableNameList(SHashObj* pHashObj, bool freeBlockMap) {
|
||||
// todo
|
||||
}
|
||||
|
||||
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap) {
|
||||
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks) {
|
||||
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
|
||||
int code = 0;
|
||||
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
|
||||
|
@ -537,24 +528,13 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
|
|||
(isRawPayload ? (pOneTableBlock->rowSize + expandSize) : getExtendedRowSize(pOneTableBlock)) +
|
||||
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);
|
||||
|
||||
pBlocks->tid = htonl(pBlocks->tid);
|
||||
pBlocks->uid = htobe64(pBlocks->uid);
|
||||
pBlocks->sversion = htonl(pBlocks->sversion);
|
||||
pBlocks->numOfRows = htons(pBlocks->numOfRows);
|
||||
pBlocks->schemaLen = 0;
|
||||
|
||||
// erase the empty space reserved for binary data
|
||||
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, schemaAttached, isRawPayload);
|
||||
assert(finalLen <= len);
|
||||
|
||||
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
|
||||
assert(dataBuf->size <= dataBuf->nAllocSize);
|
||||
|
||||
// the length does not include the SSubmitBlk structure
|
||||
pBlocks->dataLen = htonl(finalLen);
|
||||
dataBuf->numOfTables += 1;
|
||||
|
||||
pBlocks->numOfRows = 0;
|
||||
}
|
||||
|
||||
p = taosHashIterate(pHashObj, p);
|
||||
|
@ -565,12 +545,10 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
|
|||
pOneTableBlock = *p;
|
||||
}
|
||||
|
||||
extractTableNameList(pHashObj, freeBlockMap);
|
||||
|
||||
// free the table data blocks;
|
||||
taosHashCleanup(pVnodeDataBlockHashList);
|
||||
tfree(blkKeyInfo.pKeyTuple);
|
||||
|
||||
*pVgDataBlocks = pVnodeDataBlockList;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -31,33 +31,16 @@
|
|||
pSql += index; \
|
||||
} while (0)
|
||||
|
||||
#define NEXT_TOKEN_KEEP_SQL(pSql, sToken, index) \
|
||||
do { \
|
||||
sToken = tStrGetToken(pSql, &index, false); \
|
||||
} while (0)
|
||||
|
||||
#define CHECK_CODE(expr) \
|
||||
do { \
|
||||
int32_t code = expr; \
|
||||
if (TSDB_CODE_SUCCESS != code) { \
|
||||
terrno = code; \
|
||||
return terrno; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define CHECK_CODE_1(expr, destroy) \
|
||||
do { \
|
||||
int32_t code = expr; \
|
||||
if (TSDB_CODE_SUCCESS != code) { \
|
||||
(void)destroy; \
|
||||
terrno = code; \
|
||||
return terrno; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define CHECK_CODE_2(expr, destroy1, destroy2) \
|
||||
do { \
|
||||
int32_t code = expr; \
|
||||
if (TSDB_CODE_SUCCESS != code) { \
|
||||
(void)destroy1; \
|
||||
(void)destroy2; \
|
||||
terrno = code; \
|
||||
return terrno; \
|
||||
return code; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
@ -70,9 +53,12 @@ typedef struct SInsertParseContext {
|
|||
SParseContext* pComCxt;
|
||||
const char* pSql;
|
||||
SMsgBuf msg;
|
||||
struct SCatalog* pCatalog;
|
||||
STableMeta* pTableMeta;
|
||||
SHashObj* pTableBlockHashObj; // data block for each table. need release
|
||||
SParsedDataColInfo tags;
|
||||
SKVRowBuilder tagsBuilder;
|
||||
SHashObj* pTableBlockHashObj;
|
||||
SArray* pTableDataBlocks;
|
||||
SArray* pVgDataBlocks;
|
||||
int32_t totalNum;
|
||||
SInsertStmtInfo* pOutput;
|
||||
} SInsertParseContext;
|
||||
|
@ -151,14 +137,6 @@ static int32_t toInt64(const char* z, int16_t type, int32_t n, int64_t* value, b
|
|||
return ret;
|
||||
}
|
||||
|
||||
static int32_t createInsertStmtInfo(SInsertStmtInfo **pInsertInfo) {
|
||||
*pInsertInfo = calloc(1, sizeof(SQueryStmtInfo));
|
||||
if (NULL == *pInsertInfo) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
|
||||
SToken sToken;
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
|
@ -194,7 +172,7 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
|
|||
char fullDbName[TSDB_FULL_DB_NAME_LEN] = {0};
|
||||
char tableName[TSDB_TABLE_NAME_LEN] = {0};
|
||||
CHECK_CODE(buildName(pCxt, pTname, fullDbName, tableName));
|
||||
CHECK_CODE(catalogGetTableMeta(pCxt->pCatalog, pCxt->pComCxt->pRpc, pCxt->pComCxt->pEpSet, fullDbName, tableName, &pCxt->pTableMeta));
|
||||
CHECK_CODE(catalogGetTableMeta(pCxt->pComCxt->pCatalog, pCxt->pComCxt->pRpc, pCxt->pComCxt->pEpSet, fullDbName, tableName, &pCxt->pTableMeta));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -209,6 +187,51 @@ static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pS
|
|||
return -1;
|
||||
}
|
||||
|
||||
static void fillMsgHeader(SVgDataBlocks* dst) {
|
||||
SMsgDesc* desc = (SMsgDesc*)dst->pData;
|
||||
desc->numOfVnodes = htonl(1);
|
||||
SSubmitMsg* submit = (SSubmitMsg*)(desc + 1);
|
||||
submit->header.vgId = htonl(dst->vgId);
|
||||
submit->header.contLen = htonl(dst->size - sizeof(SMsgDesc));
|
||||
submit->length = submit->header.contLen;
|
||||
submit->numOfBlocks = htonl(dst->numOfTables);
|
||||
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
|
||||
int32_t numOfBlocks = dst->numOfTables;
|
||||
while (numOfBlocks--) {
|
||||
int32_t dataLen = blk->dataLen;
|
||||
blk->uid = htobe64(blk->uid);
|
||||
blk->tid = htonl(blk->tid);
|
||||
blk->padding = htonl(blk->padding);
|
||||
blk->sversion = htonl(blk->sversion);
|
||||
blk->dataLen = htonl(blk->dataLen);
|
||||
blk->schemaLen = htonl(blk->schemaLen);
|
||||
blk->numOfRows = htons(blk->numOfRows);
|
||||
blk = (SSubmitBlk*)(blk->data + dataLen);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t buildOutput(SInsertParseContext* pCxt) {
|
||||
size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks);
|
||||
pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
||||
if (NULL == pCxt->pOutput->pDataBlocks) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
for (size_t i = 0; i < numOfVg; ++i) {
|
||||
STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i);
|
||||
SVgDataBlocks* dst = calloc(1, sizeof(SVgDataBlocks));
|
||||
if (NULL == dst) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
dst->vgId = src->vgId;
|
||||
dst->numOfTables = src->numOfTables;
|
||||
dst->size = src->size;
|
||||
SWAP(dst->pData, src->pData, char*);
|
||||
fillMsgHeader(dst);
|
||||
taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
|
||||
// once the data block is disordered, we do NOT keep previous timestamp any more
|
||||
if (!pDataBlocks->ordered) {
|
||||
|
@ -219,16 +242,14 @@ static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start)
|
|||
|
||||
if (k == INT64_MIN) {
|
||||
if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
|
||||
return -1;
|
||||
} else if (pDataBlocks->tsSource == -1) {
|
||||
pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
|
||||
return TSDB_CODE_FAILED; // client time/server time can not be mixed
|
||||
}
|
||||
pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
|
||||
} else {
|
||||
if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
|
||||
return -1; // client time/server time can not be mixed
|
||||
} else if (pDataBlocks->tsSource == -1) {
|
||||
pDataBlocks->tsSource = TSDB_USE_CLI_TS;
|
||||
return TSDB_CODE_FAILED; // client time/server time can not be mixed
|
||||
}
|
||||
pDataBlocks->tsSource = TSDB_USE_CLI_TS;
|
||||
}
|
||||
|
||||
if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
|
||||
|
@ -608,26 +629,24 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
|
|||
}
|
||||
|
||||
// pSql -> tag1_value, ...)
|
||||
static int32_t parseTagsClause(SInsertParseContext* pCxt, SParsedDataColInfo* pSpd, SSchema* pTagsSchema, uint8_t precision) {
|
||||
SKVRowBuilder kvRowBuilder = {0};
|
||||
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
|
||||
destroyBoundColumnInfo(pSpd);
|
||||
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema, uint8_t precision) {
|
||||
if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SKvParam param = {.builder = &kvRowBuilder};
|
||||
SKvParam param = {.builder = &pCxt->tagsBuilder};
|
||||
SToken sToken;
|
||||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
|
||||
for (int i = 0; i < pSpd->numOfBound; ++i) {
|
||||
for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
SSchema* pSchema = &pTagsSchema[pSpd->boundedColumns[i]];
|
||||
SSchema* pSchema = &pTagsSchema[pCxt->tags.boundedColumns[i]];
|
||||
param.schema = pSchema;
|
||||
CHECK_CODE_2(parseOneValue(pCxt, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, ¶m), tdDestroyKVRowBuilder(&kvRowBuilder), destroyBoundColumnInfo(pSpd));
|
||||
CHECK_CODE(parseOneValue(pCxt, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, ¶m));
|
||||
}
|
||||
|
||||
destroyBoundColumnInfo(pSpd);
|
||||
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
destroyBoundColumnInfo(&pCxt->tags);
|
||||
SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
|
||||
tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
|
||||
if (NULL == row) {
|
||||
return buildInvalidOperationMsg(&pCxt->msg, "tag value expected");
|
||||
}
|
||||
|
@ -650,13 +669,12 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
|
|||
}
|
||||
|
||||
SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
|
||||
SParsedDataColInfo spd = {0};
|
||||
setBoundColumnInfo(&spd, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
|
||||
setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
|
||||
|
||||
// pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
if (TK_LP == sToken.type) {
|
||||
CHECK_CODE_1(parseBoundColumns(pCxt, &spd, pTagsSchema), destroyBoundColumnInfo(&spd));
|
||||
CHECK_CODE(parseBoundColumns(pCxt, &pCxt->tags, pTagsSchema));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
}
|
||||
|
||||
|
@ -668,7 +686,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
|
|||
if (TK_LP != sToken.type) {
|
||||
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
|
||||
}
|
||||
CHECK_CODE(parseTagsClause(pCxt, &spd, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision));
|
||||
CHECK_CODE(parseTagsClause(pCxt, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -732,10 +750,12 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
|
|||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
|
||||
SToken sToken;
|
||||
while (1) {
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
int32_t index = 0;
|
||||
NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
|
||||
if (TK_LP != sToken.type) {
|
||||
break;
|
||||
}
|
||||
pCxt->pSql += index;
|
||||
|
||||
if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
|
||||
int32_t tSize;
|
||||
|
@ -815,7 +835,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
|
||||
if (TK_LP == sToken.type) {
|
||||
// pSql -> field1_name, ...)
|
||||
CHECK_CODE_1(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)), destroyBoundColumnInfo(&dataBuf->boundColumnInfo));
|
||||
CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
}
|
||||
|
||||
|
@ -842,9 +862,18 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
}
|
||||
// merge according to vgId
|
||||
if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
|
||||
CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->schemaAttache, pCxt->pOutput->payloadType, true));
|
||||
CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->schemaAttache, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return buildOutput(pCxt);
|
||||
}
|
||||
|
||||
static void destroyInsertParseContext(SInsertParseContext* pCxt) {
|
||||
tfree(pCxt->pTableMeta);
|
||||
destroyBoundColumnInfo(&pCxt->tags);
|
||||
tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
|
||||
taosHashCleanup(pCxt->pTableBlockHashObj);
|
||||
destroyBlockArrayList(pCxt->pTableDataBlocks);
|
||||
destroyBlockArrayList(pCxt->pVgDataBlocks);
|
||||
}
|
||||
|
||||
// INSERT INTO
|
||||
|
@ -854,26 +883,30 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
|
||||
// [...];
|
||||
int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
|
||||
CHECK_CODE(createInsertStmtInfo(pInfo));
|
||||
|
||||
SInsertParseContext context = {
|
||||
.pComCxt = pContext,
|
||||
.pSql = pContext->pSql,
|
||||
.msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
|
||||
.pCatalog = NULL,
|
||||
.pTableMeta = NULL,
|
||||
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
|
||||
.totalNum = 0,
|
||||
.pOutput = *pInfo
|
||||
.pOutput = calloc(1, sizeof(SInsertStmtInfo))
|
||||
};
|
||||
|
||||
if (NULL == context.pTableBlockHashObj) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
if (NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
CHECK_CODE(catalogGetHandle(pContext->pClusterId, &context.pCatalog));
|
||||
CHECK_CODE(skipInsertInto(&context));
|
||||
CHECK_CODE(parseInsertBody(&context));
|
||||
*pInfo = context.pOutput;
|
||||
context.pOutput->schemaAttache = pContext->schemaAttached;
|
||||
context.pOutput->payloadType = PAYLOAD_TYPE_KV;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
int32_t code = skipInsertInto(&context);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = parseInsertBody(&context);
|
||||
}
|
||||
destroyInsertParseContext(&context);
|
||||
terrno = code;
|
||||
return (TSDB_CODE_SUCCESS == code ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED);
|
||||
}
|
||||
|
|
|
@ -17,5 +17,3 @@ TARGET_LINK_LIBRARIES(
|
|||
parserTest
|
||||
PUBLIC os util common parser catalog transport gtest function planner qcom
|
||||
)
|
||||
|
||||
TARGET_LINK_OPTIONS(parserTest PRIVATE -Wl,-wrap,malloc)
|
||||
|
|
|
@ -0,0 +1,182 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "insertParser.h"
|
||||
// #include "mockCatalog.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace testing;
|
||||
|
||||
namespace {
|
||||
string toString(int32_t code) {
|
||||
return tstrerror(code);
|
||||
}
|
||||
}
|
||||
|
||||
// syntax:
|
||||
// INSERT INTO
|
||||
// tb_name
|
||||
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
|
||||
// [(field1_name, ...)]
|
||||
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
|
||||
// [...];
|
||||
class InsertTest : public Test {
|
||||
protected:
|
||||
void setDatabase(const string& acctId, const string& db) {
|
||||
acctId_ = acctId;
|
||||
db_ = db;
|
||||
}
|
||||
|
||||
void bind(const char* sql) {
|
||||
reset();
|
||||
cxt_.pAcctId = acctId_.c_str();
|
||||
cxt_.pDbname = db_.c_str();
|
||||
strcpy(sqlBuf_, sql);
|
||||
cxt_.sqlLen = strlen(sql);
|
||||
sqlBuf_[cxt_.sqlLen] = '\0';
|
||||
cxt_.pSql = sqlBuf_;
|
||||
|
||||
}
|
||||
|
||||
int32_t run() {
|
||||
code_ = parseInsertSql(&cxt_, &res_);
|
||||
if (code_ != TSDB_CODE_SUCCESS) {
|
||||
cout << "code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
||||
}
|
||||
return code_;
|
||||
}
|
||||
|
||||
SInsertStmtInfo* reslut() {
|
||||
return res_;
|
||||
}
|
||||
|
||||
void dumpReslut() {
|
||||
size_t num = taosArrayGetSize(res_->pDataBlocks);
|
||||
cout << "schemaAttache:" << (int32_t)res_->schemaAttache << ", payloadType:" << (int32_t)res_->payloadType << ", insertType:" << res_->insertType << ", numOfVgs:" << num << endl;
|
||||
for (size_t i = 0; i < num; ++i) {
|
||||
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i);
|
||||
cout << "vgId:" << vg->vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl;
|
||||
SMsgDesc* desc = (SMsgDesc*)(vg->pData);
|
||||
cout << "numOfVnodes:" << ntohl(desc->numOfVnodes) << endl;
|
||||
SSubmitMsg* submit = (SSubmitMsg*)(desc + 1);
|
||||
cout << "length:" << ntohl(submit->length) << ", numOfBlocks:" << ntohl(submit->numOfBlocks) << endl;
|
||||
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
|
||||
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
cout << "Block:" << i << endl;
|
||||
cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << ntohl(blk->tid) << ", padding:" << ntohl(blk->padding) << ", sversion:" << ntohl(blk->sversion)
|
||||
<< ", dataLen:" << ntohl(blk->dataLen) << ", schemaLen:" << ntohl(blk->schemaLen) << ", numOfRows:" << ntohs(blk->numOfRows) << endl;
|
||||
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void checkReslut(int32_t numOfTables, int16_t numOfRows1, int16_t numOfRows2 = -1) {
|
||||
ASSERT_EQ(res_->schemaAttache, 0);
|
||||
ASSERT_EQ(res_->payloadType, PAYLOAD_TYPE_KV);
|
||||
ASSERT_EQ(res_->insertType, TSDB_QUERY_TYPE_INSERT);
|
||||
size_t num = taosArrayGetSize(res_->pDataBlocks);
|
||||
ASSERT_GE(num, 0);
|
||||
for (size_t i = 0; i < num; ++i) {
|
||||
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i);
|
||||
ASSERT_EQ(vg->numOfTables, numOfTables);
|
||||
ASSERT_GE(vg->size, 0);
|
||||
SMsgDesc* desc = (SMsgDesc*)(vg->pData);
|
||||
ASSERT_EQ(ntohl(desc->numOfVnodes), 1);
|
||||
SSubmitMsg* submit = (SSubmitMsg*)(desc + 1);
|
||||
ASSERT_GE(ntohl(submit->length), 0);
|
||||
ASSERT_GE(ntohl(submit->numOfBlocks), 0);
|
||||
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
|
||||
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
ASSERT_EQ(ntohs(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1)));
|
||||
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
static const int max_err_len = 1024;
|
||||
static const int max_sql_len = 1024 * 1024;
|
||||
|
||||
void reset() {
|
||||
memset(&cxt_, 0, sizeof(cxt_));
|
||||
memset(errMagBuf_, 0, max_err_len);
|
||||
cxt_.pMsg = errMagBuf_;
|
||||
cxt_.msgLen = max_err_len;
|
||||
code_ = TSDB_CODE_SUCCESS;
|
||||
res_ = nullptr;
|
||||
}
|
||||
|
||||
string acctId_;
|
||||
string db_;
|
||||
char errMagBuf_[max_err_len];
|
||||
char sqlBuf_[max_sql_len];
|
||||
SParseContext cxt_;
|
||||
int32_t code_;
|
||||
SInsertStmtInfo* res_;
|
||||
};
|
||||
|
||||
// INSERT INTO tb_name VALUES (field1_value, ...)
|
||||
TEST_F(InsertTest, singleTableSingleRowTest) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("insert into t1 values (now, 1, \"beijing\")");
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
dumpReslut();
|
||||
checkReslut(1, 1);
|
||||
}
|
||||
|
||||
// INSERT INTO tb_name VALUES (field1_value, ...)(field1_value, ...)
|
||||
TEST_F(InsertTest, singleTableMultiRowTest) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("insert into t1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
dumpReslut();
|
||||
checkReslut(1, 3);
|
||||
}
|
||||
|
||||
// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...)
|
||||
TEST_F(InsertTest, multiTableSingleRowTest) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")");
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
dumpReslut();
|
||||
checkReslut(2, 1);
|
||||
}
|
||||
|
||||
// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...)
|
||||
TEST_F(InsertTest, multiTableMultiRowTest) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"
|
||||
" st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")");
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
dumpReslut();
|
||||
checkReslut(2, 3, 2);
|
||||
}
|
||||
|
||||
TEST_F(InsertTest, toleranceTest) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("insert into");
|
||||
ASSERT_NE(run(), TSDB_CODE_SUCCESS);
|
||||
bind("insert into t");
|
||||
ASSERT_NE(run(), TSDB_CODE_SUCCESS);
|
||||
}
|
|
@ -1,129 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "insertParser.h"
|
||||
#include "mockCatalog.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace testing;
|
||||
|
||||
namespace {
|
||||
string toString(int32_t code) {
|
||||
return tstrerror(code);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" {
|
||||
|
||||
#include <execinfo.h>
|
||||
|
||||
void *__real_malloc(size_t);
|
||||
|
||||
void *__wrap_malloc(size_t c) {
|
||||
// printf("My MALLOC called: %d\n", c);
|
||||
// void *array[32];
|
||||
// int size = backtrace(array, 32);
|
||||
// char **symbols = backtrace_symbols(array, size);
|
||||
// for (int i = 0; i < size; ++i) {
|
||||
// cout << symbols[i] << endl;
|
||||
// }
|
||||
// free(symbols);
|
||||
|
||||
return __real_malloc(c);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// syntax:
|
||||
// INSERT INTO
|
||||
// tb_name
|
||||
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
|
||||
// [(field1_name, ...)]
|
||||
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
|
||||
// [...];
|
||||
class InsertTest : public Test {
|
||||
protected:
|
||||
void setDatabase(const string& acctId, const string& db) {
|
||||
acctId_ = acctId;
|
||||
db_ = db;
|
||||
}
|
||||
|
||||
void bind(const char* sql) {
|
||||
reset();
|
||||
cxt_.pAcctId = acctId_.c_str();
|
||||
cxt_.pDbname = db_.c_str();
|
||||
strcpy(sqlBuf_, sql);
|
||||
cxt_.sqlLen = strlen(sql);
|
||||
sqlBuf_[cxt_.sqlLen] = '\0';
|
||||
cxt_.pSql = sqlBuf_;
|
||||
|
||||
}
|
||||
|
||||
int32_t run() {
|
||||
code_ = parseInsertSql(&cxt_, &res_);
|
||||
if (code_ != TSDB_CODE_SUCCESS) {
|
||||
cout << "code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
||||
}
|
||||
return code_;
|
||||
}
|
||||
|
||||
SInsertStmtInfo* reslut() {
|
||||
return res_;
|
||||
}
|
||||
|
||||
private:
|
||||
static const int max_err_len = 1024;
|
||||
static const int max_sql_len = 1024 * 1024;
|
||||
|
||||
void reset() {
|
||||
memset(&cxt_, 0, sizeof(cxt_));
|
||||
memset(errMagBuf_, 0, max_err_len);
|
||||
cxt_.pMsg = errMagBuf_;
|
||||
cxt_.msgLen = max_err_len;
|
||||
code_ = TSDB_CODE_SUCCESS;
|
||||
res_ = nullptr;
|
||||
}
|
||||
|
||||
string acctId_;
|
||||
string db_;
|
||||
char errMagBuf_[max_err_len];
|
||||
char sqlBuf_[max_sql_len];
|
||||
SParseContext cxt_;
|
||||
int32_t code_;
|
||||
SInsertStmtInfo* res_;
|
||||
};
|
||||
|
||||
// INSERT INTO tb_name VALUES (field1_value, ...)
|
||||
TEST_F(InsertTest, simpleTest) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("insert into t1 values (now, 1, \"wxy\")");
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
SInsertStmtInfo* res = reslut();
|
||||
// todo check
|
||||
ASSERT_EQ(res->insertType, TSDB_QUERY_TYPE_INSERT);
|
||||
// ASSERT_EQ(taosArrayGetSize(res->pDataBlocks), 1);
|
||||
}
|
||||
|
||||
TEST_F(InsertTest, toleranceTest) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("insert into");
|
||||
ASSERT_NE(run(), TSDB_CODE_SUCCESS);
|
||||
bind("insert into t");
|
||||
ASSERT_NE(run(), TSDB_CODE_SUCCESS);
|
||||
}
|
|
@ -25,15 +25,15 @@ namespace {
|
|||
void generateTestT1(MockCatalogService* mcs) {
|
||||
ITableBuilder& builder = mcs->createTableBuilder("root.test", "t1", TSDB_NORMAL_TABLE, 3)
|
||||
.setPrecision(TSDB_TIME_PRECISION_MILLI).setVgid(1).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP)
|
||||
.addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 10);
|
||||
.addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 20);
|
||||
builder.done();
|
||||
}
|
||||
|
||||
void generateTestST1(MockCatalogService* mcs) {
|
||||
ITableBuilder& builder = mcs->createTableBuilder("root.test", "st1", TSDB_SUPER_TABLE, 3, 2)
|
||||
.setPrecision(TSDB_TIME_PRECISION_MILLI).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP)
|
||||
.addTag("tag1", TSDB_DATA_TYPE_INT).addTag("tag2", TSDB_DATA_TYPE_BINARY, 10)
|
||||
.addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 10);
|
||||
.addTag("tag1", TSDB_DATA_TYPE_INT).addTag("tag2", TSDB_DATA_TYPE_BINARY, 20)
|
||||
.addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 20);
|
||||
builder.done();
|
||||
mcs->createSubTable("root.test", "st1", "st1s1", 1);
|
||||
mcs->createSubTable("root.test", "st1", "st1s2", 2);
|
||||
|
|
|
@ -22,8 +22,4 @@ void initMetaDataEnv();
|
|||
void generateMetaData();
|
||||
void destroyMetaDataEnv();
|
||||
|
||||
// mock
|
||||
// int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle);
|
||||
// int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
|
||||
|
||||
#endif // MOCK_CATALOG_H
|
||||
|
|
|
@ -33,6 +33,7 @@ public:
|
|||
col->colId = colId_++;
|
||||
col->bytes = bytes;
|
||||
strcpy(col->name, name.c_str());
|
||||
rowsize_ += bytes;
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
@ -147,11 +148,11 @@ public:
|
|||
|
||||
for (const auto& db : meta_) {
|
||||
std::cout << "Databse:" << db.first << std::endl;
|
||||
std::cout << SH("Table") << SH("Type") << SH("Precision") << IH("Vgid") << std::endl;
|
||||
std::cout << SH("Table") << SH("Type") << SH("Precision") << IH("Vgid") << IH("RowSize") << std::endl;
|
||||
std::cout << SL(3, 1) << std::endl;
|
||||
for (const auto& table : db.second) {
|
||||
const auto& schema = table.second->schema;
|
||||
std::cout << SF(table.first) << SF(ttToString(schema->tableType)) << SF(pToString(schema->tableInfo.precision)) << IF(schema->vgId) << std::endl;
|
||||
std::cout << SF(table.first) << SF(ttToString(schema->tableType)) << SF(pToString(schema->tableInfo.precision)) << IF(schema->vgId) << IF(schema->tableInfo.rowSize) << std::endl;
|
||||
}
|
||||
std::cout << std::endl;
|
||||
}
|
||||
|
|
|
@ -221,7 +221,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
|||
node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
size_t size = taosArrayGetSize(pPlanNode->pChildren);
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
SPhyNode* child = createPhyNode(pCxt, taosArrayGet(pPlanNode->pChildren, i));
|
||||
SPhyNode* child = createPhyNode(pCxt, taosArrayGetP(pPlanNode->pChildren, i));
|
||||
child->pParent = node;
|
||||
taosArrayPush(node->pChildren, &child);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue