enh: insert client optimize
This commit is contained in:
parent
4290582249
commit
555a575c58
|
@ -236,12 +236,6 @@ static int32_t getTableVgroup(SInsertParseContext* pCxt, int32_t tbNo, SName* pT
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname, bool isStb) {
|
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname, bool isStb) {
|
||||||
// bool pass = false;
|
|
||||||
// CHECK_CODE(checkAuth(pCxt, dbFname, &pass));
|
|
||||||
// if (!pass) {
|
|
||||||
// return TSDB_CODE_PAR_PERMISSION_DENIED;
|
|
||||||
// }
|
|
||||||
|
|
||||||
CHECK_CODE(getTableSchema(pCxt, tbNo, name, isStb, &pCxt->pTableMeta));
|
CHECK_CODE(getTableSchema(pCxt, tbNo, name, isStb, &pCxt->pTableMeta));
|
||||||
if (!isStb) {
|
if (!isStb) {
|
||||||
SVgroupInfo vg;
|
SVgroupInfo vg;
|
||||||
|
@ -1230,8 +1224,6 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
|
||||||
}
|
}
|
||||||
pCxt->pSql += index;
|
pCxt->pSql += index;
|
||||||
|
|
||||||
// int64_t memStart = taosGetTimestampMs();
|
|
||||||
|
|
||||||
if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
|
if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
|
||||||
int32_t tSize;
|
int32_t tSize;
|
||||||
CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
|
CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
|
||||||
|
@ -1239,17 +1231,12 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
|
||||||
maxRows = tSize;
|
maxRows = tSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pCxt->memElapsed += taosGetTimestampMs() - memStart;
|
|
||||||
// int64_t parRowStart = taosGetTimestampMs();
|
|
||||||
|
|
||||||
bool gotRow = false;
|
bool gotRow = false;
|
||||||
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, pCxt->tmpTokenBuf));
|
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, pCxt->tmpTokenBuf));
|
||||||
if (gotRow) {
|
if (gotRow) {
|
||||||
pDataBlock->size += extendedRowSize; // len;
|
pDataBlock->size += extendedRowSize; // len;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pCxt->parRowElapsed += taosGetTimestampMs() - parRowStart;
|
|
||||||
|
|
||||||
NEXT_VALID_TOKEN(pCxt->pSql, sToken);
|
NEXT_VALID_TOKEN(pCxt->pSql, sToken);
|
||||||
if (TK_NK_COMMA == sToken.type) {
|
if (TK_NK_COMMA == sToken.type) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
|
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
|
||||||
|
@ -1269,13 +1256,9 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) {
|
static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) {
|
||||||
// int64_t memStart = taosGetTimestampMs();
|
|
||||||
|
|
||||||
int32_t maxNumOfRows;
|
int32_t maxNumOfRows;
|
||||||
CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));
|
CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));
|
||||||
|
|
||||||
// pCxt->memElapsed += taosGetTimestampMs() - memStart;
|
|
||||||
|
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
|
CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
|
||||||
|
|
||||||
|
@ -1396,16 +1379,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
bool autoCreateTbl = false;
|
bool autoCreateTbl = false;
|
||||||
|
|
||||||
// int64_t parBodyStart = taosGetTimestampMs();
|
|
||||||
// int64_t parTableElapsed = 0;
|
|
||||||
// int64_t getTableElapsed = 0;
|
|
||||||
// int64_t getDataBufElapsed = 0;
|
|
||||||
// int64_t parValueElapsed = 0;
|
|
||||||
|
|
||||||
// for each table
|
// for each table
|
||||||
while (1) {
|
while (1) {
|
||||||
// int64_t parTableStart = taosGetTimestampMs();
|
|
||||||
|
|
||||||
SToken sToken;
|
SToken sToken;
|
||||||
char* tbName = NULL;
|
char* tbName = NULL;
|
||||||
|
|
||||||
|
@ -1444,7 +1419,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
SToken tbnameToken = sToken;
|
SToken tbnameToken = sToken;
|
||||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||||
|
|
||||||
if (!pCxt->pComCxt->async) {
|
if (!pCxt->pComCxt->async || TK_USING == sToken.type) {
|
||||||
CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
|
CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
|
||||||
|
|
||||||
tNameExtractFullName(&name, tbFName);
|
tNameExtractFullName(&name, tbFName);
|
||||||
|
@ -1471,9 +1446,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
// parTableElapsed += taosGetTimestampMs() - parTableStart;
|
|
||||||
// int64_t getTableStart = taosGetTimestampMs();
|
|
||||||
|
|
||||||
if (TK_USING == sToken.type) {
|
if (TK_USING == sToken.type) {
|
||||||
CHECK_CODE(parseUsingClause(pCxt, tbNum, &name, tbFName));
|
CHECK_CODE(parseUsingClause(pCxt, tbNum, &name, tbFName));
|
||||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||||
|
@ -1482,9 +1454,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
CHECK_CODE(getTableMeta(pCxt, tbNum, &name, dbFName));
|
CHECK_CODE(getTableMeta(pCxt, tbNum, &name, dbFName));
|
||||||
}
|
}
|
||||||
|
|
||||||
// getTableElapsed += taosGetTimestampMs() - getTableStart;
|
|
||||||
// int64_t getDataBufStart = taosGetTimestampMs();
|
|
||||||
|
|
||||||
STableDataBlocks* dataBuf = NULL;
|
STableDataBlocks* dataBuf = NULL;
|
||||||
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, &pCxt->pTableMeta->uid, sizeof(pCxt->pTableMeta->uid),
|
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, &pCxt->pTableMeta->uid, sizeof(pCxt->pTableMeta->uid),
|
||||||
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk),
|
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk),
|
||||||
|
@ -1498,13 +1467,9 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
pCxt->pSql = pCurrPos;
|
pCxt->pSql = pCurrPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
// getDataBufElapsed += taosGetTimestampMs() - getDataBufStart;
|
|
||||||
|
|
||||||
if (TK_VALUES == sToken.type) {
|
if (TK_VALUES == sToken.type) {
|
||||||
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
|
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
|
||||||
// int64_t parValueStart = taosGetTimestampMs();
|
|
||||||
CHECK_CODE(parseValuesClause(pCxt, dataBuf));
|
CHECK_CODE(parseValuesClause(pCxt, dataBuf));
|
||||||
// parValueElapsed += taosGetTimestampMs() - parValueStart;
|
|
||||||
TSDB_QUERY_SET_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_INSERT);
|
TSDB_QUERY_SET_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_INSERT);
|
||||||
|
|
||||||
tbNum++;
|
tbNum++;
|
||||||
|
@ -1528,12 +1493,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
|
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
|
||||||
}
|
}
|
||||||
|
|
||||||
// printf(
|
|
||||||
// "parse %d sql %d tables %d rows elapsed parBodyElapsed=%ldms parTableElapsed=%ldms getTableElapsed=%ldms "
|
|
||||||
// "getDataBufElapsed=%ldms parValueElapsed=%ldms(memElapsed=%ldms parRowElapsed=%ldms)\n",
|
|
||||||
// pCxt->pComCxt->sqlLen, tbNum, pCxt->totalNum, taosGetTimestampMs() - parBodyStart, parTableElapsed,
|
|
||||||
// getTableElapsed, getDataBufElapsed, parValueElapsed, pCxt->memElapsed, pCxt->parRowElapsed);
|
|
||||||
|
|
||||||
qDebug("0x%" PRIx64 " insert input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum);
|
qDebug("0x%" PRIx64 " insert input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum);
|
||||||
|
|
||||||
if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
|
if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
|
||||||
|
@ -1700,9 +1659,6 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
|
||||||
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, bool isStable, int32_t tableNo, SToken* pTbToken) {
|
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, bool isStable, int32_t tableNo, SToken* pTbToken) {
|
||||||
SName name;
|
SName name;
|
||||||
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
|
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
|
||||||
// CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache));
|
|
||||||
// CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache));
|
|
||||||
// CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
|
|
||||||
CHECK_CODE(reserveTableMetaInCacheForInsert(&name, isStable ? CATALOG_REQ_TYPE_META : CATALOG_REQ_TYPE_BOTH, tableNo,
|
CHECK_CODE(reserveTableMetaInCacheForInsert(&name, isStable ? CATALOG_REQ_TYPE_META : CATALOG_REQ_TYPE_BOTH, tableNo,
|
||||||
pCxt->pMetaCache));
|
pCxt->pMetaCache));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1766,7 +1722,7 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
|
||||||
CHECK_CODE(collectTableMetaKey(pCxt, true, tableNo, &sToken));
|
CHECK_CODE(collectTableMetaKey(pCxt, true, tableNo, &sToken));
|
||||||
CHECK_CODE(skipUsingClause(pCxt));
|
CHECK_CODE(skipUsingClause(pCxt));
|
||||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||||
} else {
|
} else if (!existedUsing) {
|
||||||
CHECK_CODE(collectTableMetaKey(pCxt, false, tableNo, &tbnameToken));
|
CHECK_CODE(collectTableMetaKey(pCxt, false, tableNo, &tbnameToken));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1803,9 +1759,7 @@ int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery, SParseMetaCa
|
||||||
.pMetaCache = pMetaCache};
|
.pMetaCache = pMetaCache};
|
||||||
int32_t code = skipInsertInto(&context.pSql, &context.msg);
|
int32_t code = skipInsertInto(&context.pSql, &context.msg);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
// int64_t parSyntaxStart = taosGetTimestampMs();
|
|
||||||
code = parseInsertBodySyntax(&context);
|
code = parseInsertBodySyntax(&context);
|
||||||
// printf("parse %d sql elapsed=%ldms\n", pContext->sqlLen, taosGetTimestampMs() - parSyntaxStart);
|
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
*pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
|
*pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
|
||||||
|
|
|
@ -54,6 +54,7 @@ class InsertTest : public Test {
|
||||||
cxt_.sqlLen = strlen(sql);
|
cxt_.sqlLen = strlen(sql);
|
||||||
sqlBuf_[cxt_.sqlLen] = '\0';
|
sqlBuf_[cxt_.sqlLen] = '\0';
|
||||||
cxt_.pSql = sqlBuf_;
|
cxt_.pSql = sqlBuf_;
|
||||||
|
cxt_.pUser = "root";
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t run() {
|
int32_t run() {
|
||||||
|
@ -77,7 +78,7 @@ class InsertTest : public Test {
|
||||||
|
|
||||||
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
|
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
|
||||||
MockCatalogService::destoryCatalogReq);
|
MockCatalogService::destoryCatalogReq);
|
||||||
code_ = buildCatalogReq(metaCache.get(), catalogReq.get());
|
code_ = buildCatalogReq(&cxt_, metaCache.get(), catalogReq.get());
|
||||||
if (code_ != TSDB_CODE_SUCCESS) {
|
if (code_ != TSDB_CODE_SUCCESS) {
|
||||||
cout << "buildCatalogReq code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
cout << "buildCatalogReq code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
||||||
return code_;
|
return code_;
|
||||||
|
@ -88,7 +89,7 @@ class InsertTest : public Test {
|
||||||
|
|
||||||
metaCache.reset(new SParseMetaCache());
|
metaCache.reset(new SParseMetaCache());
|
||||||
request = false;
|
request = false;
|
||||||
code_ = putMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get());
|
code_ = putMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), true);
|
||||||
if (code_ != TSDB_CODE_SUCCESS) {
|
if (code_ != TSDB_CODE_SUCCESS) {
|
||||||
cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
||||||
return code_;
|
return code_;
|
||||||
|
|
|
@ -225,8 +225,8 @@ class ParserTestBaseImpl {
|
||||||
DO_WITH_THROW(collectMetaKey, pCxt, pQuery, pMetaCache);
|
DO_WITH_THROW(collectMetaKey, pCxt, pQuery, pMetaCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
void doBuildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
|
void doBuildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
|
||||||
DO_WITH_THROW(buildCatalogReq, pMetaCache, pCatalogReq);
|
DO_WITH_THROW(buildCatalogReq, pCxt, pMetaCache, pCatalogReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
void doGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) {
|
void doGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) {
|
||||||
|
@ -234,7 +234,7 @@ class ParserTestBaseImpl {
|
||||||
}
|
}
|
||||||
|
|
||||||
void doPutMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
|
void doPutMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
|
||||||
DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache);
|
DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
void doAuthenticate(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
|
void doAuthenticate(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
|
||||||
|
@ -348,7 +348,7 @@ class ParserTestBaseImpl {
|
||||||
|
|
||||||
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
|
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
|
||||||
MockCatalogService::destoryCatalogReq);
|
MockCatalogService::destoryCatalogReq);
|
||||||
doBuildCatalogReq(metaCache.get(), catalogReq.get());
|
doBuildCatalogReq(&cxt, metaCache.get(), catalogReq.get());
|
||||||
|
|
||||||
string err;
|
string err;
|
||||||
thread t1([&]() {
|
thread t1([&]() {
|
||||||
|
|
Loading…
Reference in New Issue