insert using implement
This commit is contained in:
parent
c70eb15bb9
commit
ba5503da68
|
@ -277,7 +277,6 @@ typedef struct SVnodeModifOpStmt {
|
|||
ENodeType nodeType;
|
||||
ENodeType sqlNodeType;
|
||||
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
|
||||
int8_t schemaAttache; // denote if submit block is built with table schema or not
|
||||
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
|
||||
uint32_t insertType; // insert data from [file|sql statement| bound statement]
|
||||
const char* sql; // current sql statement position
|
||||
|
|
|
@ -307,7 +307,7 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_MAX_TOTAL_BLOCKS 10000
|
||||
#define TSDB_DEFAULT_TOTAL_BLOCKS 6
|
||||
|
||||
#define TSDB_MIN_DAYS_PER_FILE (1 * 1440) // unit minute
|
||||
#define TSDB_MIN_DAYS_PER_FILE 60 // unit minute
|
||||
#define TSDB_MAX_DAYS_PER_FILE (3650 * 1440)
|
||||
#define TSDB_DEFAULT_DAYS_PER_FILE (10 * 1440)
|
||||
|
||||
|
|
|
@ -16,7 +16,16 @@
|
|||
#include "command.h"
|
||||
#include "tdatablock.h"
|
||||
|
||||
// #define SET_VARSTR(pData, val, pOffset)
|
||||
static int32_t getSchemaBytes(const SSchema* pSchema) {
|
||||
switch (pSchema->type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
return (pSchema->bytes - VARSTR_HEADER_SIZE);
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
return (pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
||||
default:
|
||||
return pSchema->bytes;
|
||||
}
|
||||
}
|
||||
|
||||
static void buildRspData(const STableMeta* pMeta, char* pData) {
|
||||
int32_t* pColSizes = (int32_t*)pData;
|
||||
|
@ -50,7 +59,7 @@ static void buildRspData(const STableMeta* pMeta, char* pData) {
|
|||
// Length
|
||||
pData += BitmapLen(numOfRows);
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
*(int32_t*)pData = pMeta->schema[i].bytes;
|
||||
*(int32_t*)pData = getSchemaBytes(pMeta->schema + i);
|
||||
pData += sizeof(int32_t);
|
||||
}
|
||||
pColSizes[2] = sizeof(int32_t) * numOfRows;
|
||||
|
|
|
@ -77,7 +77,7 @@ typedef struct STableDataBlocks {
|
|||
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
|
||||
char *pData;
|
||||
bool cloned;
|
||||
|
||||
int32_t createTbReqLen;
|
||||
SParsedDataColInfo boundColumnInfo;
|
||||
SRowBuilder rowBuilder;
|
||||
} STableDataBlocks;
|
||||
|
@ -118,6 +118,7 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks*
|
|||
pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? dataBuf->pTableMeta->uid : dataBuf->pTableMeta->suid);
|
||||
pBlocks->uid = dataBuf->pTableMeta->uid;
|
||||
pBlocks->sversion = dataBuf->pTableMeta->sversion;
|
||||
pBlocks->schemaLen = dataBuf->createTbReqLen;
|
||||
|
||||
if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
|
@ -136,7 +137,7 @@ void destroyBlockHashmap(SHashObj* pDataBlockHash);
|
|||
int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo);
|
||||
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
|
||||
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
|
||||
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList);
|
||||
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks);
|
||||
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq);
|
||||
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks);
|
||||
|
||||
#endif // TDENGINE_DATABLOCKMGT_H
|
||||
|
|
|
@ -753,7 +753,7 @@ static int32_t buildCreateTbReq(SInsertParseContext* pCxt, const SName* pName, S
|
|||
}
|
||||
|
||||
// pSql -> tag1_value, ...)
|
||||
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema, uint8_t precision, const SName* pName) {
|
||||
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const SName* pName) {
|
||||
if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -763,9 +763,9 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema,
|
|||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
|
||||
for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
|
||||
NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
|
||||
SSchema* pSchema = &pTagsSchema[pCxt->tags.boundColumns[i]];
|
||||
param.schema = pSchema;
|
||||
CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, ¶m, &pCxt->msg));
|
||||
SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i] - 1]; // colId starts with 1
|
||||
param.schema = pTagSchema;
|
||||
CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, ¶m, &pCxt->msg));
|
||||
}
|
||||
|
||||
SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
|
||||
|
@ -791,6 +791,7 @@ static int32_t storeTableMeta(SHashObj* pHash, const char* pName, int32_t len, S
|
|||
if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
pBackup->uid = tGenIdPI64();
|
||||
return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES);
|
||||
}
|
||||
|
||||
|
@ -833,7 +834,11 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
|
|||
if (TK_NK_LP != sToken.type) {
|
||||
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
|
||||
}
|
||||
CHECK_CODE(parseTagsClause(pCxt, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision, &name));
|
||||
CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, &name));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
if (TK_NK_RP != sToken.type) {
|
||||
return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1015,7 +1020,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
|
||||
STableDataBlocks *dataBuf = NULL;
|
||||
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
|
||||
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL));
|
||||
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL, &pCxt->createTblReq));
|
||||
|
||||
if (TK_NK_LP == sToken.type) {
|
||||
// pSql -> field1_name, ...)
|
||||
|
@ -1046,7 +1051,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
}
|
||||
// merge according to vgId
|
||||
if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
|
||||
CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->schemaAttache, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
|
||||
CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
|
||||
}
|
||||
return buildOutput(pCxt);
|
||||
}
|
||||
|
|
|
@ -149,8 +149,28 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
|
||||
int32_t len = tSerializeSVCreateTbReq(NULL, pCreateTbReq);
|
||||
if (pBlocks->nAllocSize - pBlocks->size < len) {
|
||||
pBlocks->nAllocSize += len + pBlocks->rowSize;
|
||||
char* pTmp = taosMemoryRealloc(pBlocks->pData, pBlocks->nAllocSize);
|
||||
if (pTmp != NULL) {
|
||||
pBlocks->pData = pTmp;
|
||||
memset(pBlocks->pData + pBlocks->size, 0, pBlocks->nAllocSize - pBlocks->size);
|
||||
} else {
|
||||
pBlocks->nAllocSize -= len + pBlocks->rowSize;
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
char* pBuf = pBlocks->pData + pBlocks->size;
|
||||
tSerializeSVCreateTbReq((void**)&pBuf, pCreateTbReq);
|
||||
pBlocks->size += len;
|
||||
pBlocks->createTbReqLen = len;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
|
||||
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList) {
|
||||
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq) {
|
||||
*dataBlocks = NULL;
|
||||
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
|
||||
if (t1 != NULL) {
|
||||
|
@ -163,6 +183,13 @@ int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int3
|
|||
return ret;
|
||||
}
|
||||
|
||||
if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctbCfg.pTag) {
|
||||
ret = buildCreateTbMsg(*dataBlocks, pCreateTbReq);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
|
||||
if (pBlockList) {
|
||||
taosArrayPush(pBlockList, dataBlocks);
|
||||
|
@ -294,7 +321,7 @@ int sortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKey
|
|||
|
||||
int32_t extendedRowSize = getExtendedRowSize(dataBuf);
|
||||
SBlockKeyTuple *pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
|
||||
char * pBlockData = pBlocks->data;
|
||||
char * pBlockData = pBlocks->data + pBlocks->schemaLen;
|
||||
int n = 0;
|
||||
while (n < nRows) {
|
||||
pBlkKeyTuple->skey = TD_ROW_KEY((STSRow *)pBlockData);
|
||||
|
@ -340,44 +367,26 @@ int sortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKey
|
|||
}
|
||||
|
||||
// Erase the empty space reserved for binary data
|
||||
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple, int8_t schemaAttached, bool isRawPayload) {
|
||||
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple, bool isRawPayload) {
|
||||
// TODO: optimize this function, handle the case while binary is not presented
|
||||
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
|
||||
STableComInfo tinfo = getTableInfo(pTableMeta);
|
||||
SSchema* pSchema = getTableColumnSchema(pTableMeta);
|
||||
|
||||
int32_t nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
|
||||
SSubmitBlk* pBlock = pDataBlock;
|
||||
memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk));
|
||||
pDataBlock = (char*)pDataBlock + sizeof(SSubmitBlk);
|
||||
memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
|
||||
pDataBlock = (char*)pDataBlock + nonDataLen;
|
||||
|
||||
int32_t flen = 0; // original total length of row
|
||||
|
||||
// schema needs to be included into the submit data block
|
||||
if (schemaAttached) {
|
||||
int32_t numOfCols = getNumOfColumns(pTableDataBlock->pTableMeta);
|
||||
for(int32_t j = 0; j < numOfCols; ++j) {
|
||||
STColumn* pCol = (STColumn*) pDataBlock;
|
||||
pCol->colId = htons(pSchema[j].colId);
|
||||
pCol->type = pSchema[j].type;
|
||||
pCol->bytes = htons(pSchema[j].bytes);
|
||||
pCol->offset = 0;
|
||||
|
||||
pDataBlock = (char*)pDataBlock + sizeof(STColumn);
|
||||
if (isRawPayload) {
|
||||
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
|
||||
flen += TYPE_BYTES[pSchema[j].type];
|
||||
}
|
||||
|
||||
int32_t schemaSize = sizeof(STColumn) * numOfCols;
|
||||
pBlock->schemaLen = schemaSize;
|
||||
} else {
|
||||
if (isRawPayload) {
|
||||
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
|
||||
flen += TYPE_BYTES[pSchema[j].type];
|
||||
}
|
||||
}
|
||||
pBlock->schemaLen = 0;
|
||||
}
|
||||
pBlock->schemaLen = pTableDataBlock->createTbReqLen;
|
||||
|
||||
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
|
||||
char* p = pTableDataBlock->pData + nonDataLen;
|
||||
pBlock->dataLen = 0;
|
||||
int32_t numOfRows = pBlock->numOfRows;
|
||||
|
||||
|
@ -414,7 +423,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB
|
|||
return pBlock->dataLen + pBlock->schemaLen;
|
||||
}
|
||||
|
||||
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks) {
|
||||
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) {
|
||||
const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
|
||||
int code = 0;
|
||||
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
|
||||
|
@ -429,7 +438,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
|
|||
if (pBlocks->numOfRows > 0) {
|
||||
STableDataBlocks* dataBuf = NULL;
|
||||
int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
|
||||
INSERT_HEAD_SIZE, 0, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
|
||||
INSERT_HEAD_SIZE, 0, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
taosHashCleanup(pVnodeDataBlockHashList);
|
||||
destroyBlockArrayList(pVnodeDataBlockList);
|
||||
|
@ -474,7 +483,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
|
|||
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);
|
||||
|
||||
// erase the empty space reserved for binary data
|
||||
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, schemaAttached, isRawPayload);
|
||||
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload);
|
||||
assert(finalLen <= len);
|
||||
|
||||
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
|
||||
|
|
|
@ -62,7 +62,7 @@ protected:
|
|||
void dumpReslut() {
|
||||
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
|
||||
size_t num = taosArrayGetSize(pStmt->pDataBlocks);
|
||||
cout << "schemaAttache:" << (int32_t)pStmt->schemaAttache << ", payloadType:" << (int32_t)pStmt->payloadType << ", insertType:" << pStmt->insertType << ", numOfVgs:" << num << endl;
|
||||
cout << "payloadType:" << (int32_t)pStmt->payloadType << ", insertType:" << pStmt->insertType << ", numOfVgs:" << num << endl;
|
||||
for (size_t i = 0; i < num; ++i) {
|
||||
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i);
|
||||
cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl;
|
||||
|
@ -81,7 +81,6 @@ protected:
|
|||
|
||||
void checkReslut(int32_t numOfTables, int16_t numOfRows1, int16_t numOfRows2 = -1) {
|
||||
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
|
||||
ASSERT_EQ(pStmt->schemaAttache, 0);
|
||||
ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV);
|
||||
ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
|
||||
size_t num = taosArrayGetSize(pStmt->pDataBlocks);
|
||||
|
@ -168,6 +167,18 @@ TEST_F(InsertTest, multiTableMultiRowTest) {
|
|||
checkReslut(2, 3, 2);
|
||||
}
|
||||
|
||||
// INSERT INTO
|
||||
// tb1_name USING st1_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...)
|
||||
// tb2_name USING st2_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...)
|
||||
TEST_F(InsertTest, autoCreateTableTest) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("insert into st1s1 using st1 tags(1, 'wxy') values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
dumpReslut();
|
||||
checkReslut(1, 3);
|
||||
}
|
||||
|
||||
TEST_F(InsertTest, toleranceTest) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
|
|
Loading…
Reference in New Issue