enh: insert optimize
This commit is contained in:
parent
7484ff430c
commit
ac2ce21414
|
@ -43,6 +43,7 @@ typedef struct SInsertParseContext {
|
|||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
|
||||
SParsedDataColInfo tags; // for stmt
|
||||
bool missCache;
|
||||
bool usingDuplicateTable;
|
||||
} SInsertParseContext;
|
||||
|
||||
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
|
||||
|
@ -725,7 +726,7 @@ static int32_t parseTableOptions(SInsertParseContext* pCxt, SVnodeModifOpStmt* p
|
|||
// 1. [(field1_name, ...)]
|
||||
// 2. VALUES ... | FILE ...
|
||||
static int32_t parseUsingClauseBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
||||
if ('\0' == pStmt->usingTableName.tname[0]) {
|
||||
if (!pStmt->usingTableProcessing || pCxt->usingDuplicateTable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -854,7 +855,7 @@ static int32_t parseUsingTableNameImpl(SInsertParseContext* pCxt, SVnodeModifOpS
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getUsingTableSchema(pCxt, pStmt);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = storeTableMeta(pCxt, pStmt);
|
||||
}
|
||||
return code;
|
||||
|
@ -874,11 +875,11 @@ static int32_t parseUsingTableName(SInsertParseContext* pCxt, SVnodeModifOpStmt*
|
|||
return getTargetTableSchema(pCxt, pStmt);
|
||||
}
|
||||
|
||||
pStmt->usingTableProcessing = true;
|
||||
// pStmt->pSql -> stb_name [(tag1_name, ...)
|
||||
pStmt->pSql += index;
|
||||
bool duplicate = false;
|
||||
int32_t code = parseDuplicateUsingClause(pCxt, pStmt, &duplicate);
|
||||
if (TSDB_CODE_SUCCESS == code && !duplicate) {
|
||||
int32_t code = parseDuplicateUsingClause(pCxt, pStmt, &pCxt->usingDuplicateTable);
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->usingDuplicateTable) {
|
||||
return parseUsingTableNameImpl(pCxt, pStmt);
|
||||
}
|
||||
return code;
|
||||
|
@ -1331,6 +1332,7 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt,
|
|||
(*pNumOfRows) = 0;
|
||||
char* pLine = NULL;
|
||||
int64_t readLen = 0;
|
||||
pStmt->fileProcessing = false;
|
||||
while (TSDB_CODE_SUCCESS == code && (readLen = taosGetLineFile(pStmt->fp, &pLine)) != -1) {
|
||||
if (('\r' == pLine[readLen - 1]) || ('\n' == pLine[readLen - 1])) {
|
||||
pLine[--readLen] = '\0';
|
||||
|
@ -1348,7 +1350,8 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt,
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SToken token;
|
||||
strtolower(pLine, pLine);
|
||||
code = parseOneRow(pCxt, (const char**)&pLine, pDataBuf, &gotRow, &token);
|
||||
const char* pRow = pLine;
|
||||
code = parseOneRow(pCxt, (const char**)&pRow, pDataBuf, &gotRow, &token);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && gotRow) {
|
||||
|
@ -1357,12 +1360,13 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt,
|
|||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && pDataBuf->nAllocSize > tsMaxMemUsedByInsert * 1024 * 1024) {
|
||||
pStmt->fileProcessing = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) &&
|
||||
(!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
|
||||
(!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) && !pStmt->fileProcessing) {
|
||||
code = buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
|
||||
}
|
||||
return code;
|
||||
|
@ -1383,13 +1387,13 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifOpStm
|
|||
pStmt->totalRowsNum += numOfRows;
|
||||
pStmt->totalTbNum += 1;
|
||||
TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT);
|
||||
if (taosEOFFile(pStmt->fp)) {
|
||||
if (!pStmt->fileProcessing) {
|
||||
taosCloseFile(&pStmt->fp);
|
||||
} else {
|
||||
parserDebug("0x%" PRIx64 " insert from csv. File is too large, do it in batches.", pCxt->pComCxt->requestId);
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SToken* pFilePath,
|
||||
|
@ -1534,6 +1538,10 @@ static void destroyEnvPreTable(SInsertParseContext* pCxt, SVnodeModifOpStmt* pSt
|
|||
destroyBoundColumnInfo(&pCxt->tags);
|
||||
taosMemoryFreeClear(pStmt->pTableMeta);
|
||||
tdDestroySVCreateTbReq(&pStmt->createTblReq);
|
||||
pCxt->missCache = false;
|
||||
pCxt->usingDuplicateTable = false;
|
||||
pStmt->usingTableProcessing = false;
|
||||
pStmt->fileProcessing = false;
|
||||
}
|
||||
|
||||
// tb_name
|
||||
|
@ -1546,7 +1554,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt, SVnodeModifOpStmt* pSt
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
bool hasData = true;
|
||||
// for each table
|
||||
while (TSDB_CODE_SUCCESS == code && hasData && !pCxt->missCache) {
|
||||
while (TSDB_CODE_SUCCESS == code && hasData && !pCxt->missCache && !pStmt->fileProcessing) {
|
||||
destroyEnvPreTable(pCxt, pStmt);
|
||||
// pStmt->pSql -> tb_name ...
|
||||
NEXT_TOKEN(pStmt->pSql, token);
|
||||
|
@ -1556,8 +1564,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt, SVnodeModifOpStmt* pSt
|
|||
}
|
||||
}
|
||||
|
||||
parserDebug("0x%" PRIx64 " insert input rows: %d", pCxt->pComCxt->requestId, pStmt->totalRowsNum);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = parseInsertBodyBottom(pCxt, pStmt);
|
||||
}
|
||||
|
@ -1670,7 +1676,24 @@ static int32_t getTableSchemaFromMetaData(const SMetaData* pMetaData, SVnodeModi
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t setVnodeModifOpStmt(SParseContext* pCxt, const SMetaData* pMetaData, SVnodeModifOpStmt* pStmt) {
|
||||
static void destoryTablesReq(void* p) {
|
||||
STablesReq* pRes = (STablesReq*)p;
|
||||
taosArrayDestroy(pRes->pTables);
|
||||
}
|
||||
|
||||
static void clearCatalogReq(SCatalogReq* pCatalogReq) {
|
||||
taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq);
|
||||
pCatalogReq->pTableMeta = NULL;
|
||||
taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq);
|
||||
pCatalogReq->pTableHash = NULL;
|
||||
taosArrayDestroy(pCatalogReq->pUser);
|
||||
pCatalogReq->pUser = NULL;
|
||||
}
|
||||
|
||||
static int32_t setVnodeModifOpStmt(SParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
|
||||
SVnodeModifOpStmt* pStmt) {
|
||||
clearCatalogReq(pCatalogReq);
|
||||
|
||||
if (pCxt->pStmtCb) {
|
||||
(*pCxt->pStmtCb->getExecInfoFn)(pCxt->pStmtCb->pStmt, &pStmt->pVgroupsHashObj, &pStmt->pTableBlockHashObj);
|
||||
if (NULL == pStmt->pVgroupsHashObj) {
|
||||
|
@ -1690,11 +1713,17 @@ static int32_t setVnodeModifOpStmt(SParseContext* pCxt, const SMetaData* pMetaDa
|
|||
return getTableSchemaFromMetaData(pMetaData, pStmt, false);
|
||||
}
|
||||
|
||||
static int32_t initInsertQuery(SParseContext* pCxt, const SMetaData* pMetaData, SQuery** pQuery) {
|
||||
static int32_t initInsertQuery(SParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
|
||||
SQuery** pQuery) {
|
||||
if (NULL == *pQuery) {
|
||||
return createInsertQuery(pCxt, pQuery);
|
||||
}
|
||||
return setVnodeModifOpStmt(pCxt, pMetaData, (SVnodeModifOpStmt*)(*pQuery)->pRoot);
|
||||
|
||||
if (!((SVnodeModifOpStmt*)(*pQuery)->pRoot)->fileProcessing) {
|
||||
return setVnodeModifOpStmt(pCxt, pCatalogReq, pMetaData, (SVnodeModifOpStmt*)(*pQuery)->pRoot);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t setRefreshMate(SQuery* pQuery) {
|
||||
|
@ -1735,8 +1764,6 @@ static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifOpStm
|
|||
code = parseDataFromFileImpl(pCxt, pStmt, pDataBuf);
|
||||
}
|
||||
|
||||
parserDebug("0x%" PRIx64 " insert again input rows: %d", pCxt->pComCxt->requestId, pStmt->totalRowsNum);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (pStmt->fileProcessing) {
|
||||
code = parseInsertBodyBottom(pCxt, pStmt);
|
||||
|
@ -1825,10 +1852,16 @@ static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifOpStm
|
|||
|
||||
static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) {
|
||||
if (pCxt->missCache) {
|
||||
parserDebug("0x%" PRIx64 " %d rows have been inserted before cache miss", pCxt->pComCxt->requestId,
|
||||
((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum);
|
||||
|
||||
pQuery->execStage = QUERY_EXEC_STAGE_PARSE;
|
||||
return buildInsertCatalogReq(pCxt, (SVnodeModifOpStmt*)pQuery->pRoot, pCatalogReq);
|
||||
}
|
||||
|
||||
parserDebug("0x%" PRIx64 " %d rows have been inserted", pCxt->pComCxt->requestId,
|
||||
((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum);
|
||||
|
||||
pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1837,9 +1870,11 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
|
|||
SInsertParseContext context = {
|
||||
.pComCxt = pCxt,
|
||||
.msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen},
|
||||
.missCache = false,
|
||||
.usingDuplicateTable = false,
|
||||
};
|
||||
|
||||
int32_t code = initInsertQuery(pCxt, pMetaData, pQuery);
|
||||
int32_t code = initInsertQuery(pCxt, pCatalogReq, pMetaData, pQuery);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = parseInsertSqlImpl(&context, (SVnodeModifOpStmt*)(*pQuery)->pRoot);
|
||||
}
|
||||
|
|
|
@ -228,8 +228,7 @@ int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, SRequestConnInfo* pConn
|
|||
return g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta);
|
||||
}
|
||||
|
||||
int32_t __catalogGetCachedTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
|
||||
STableMeta** pTableMeta) {
|
||||
int32_t __catalogGetCachedTableMeta(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta) {
|
||||
return g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta, true);
|
||||
}
|
||||
|
||||
|
@ -238,8 +237,7 @@ int32_t __catalogGetTableHashVgroup(struct SCatalog* pCatalog, SRequestConnInfo*
|
|||
return g_mockCatalogService->catalogGetTableHashVgroup(pTableName, vgInfo);
|
||||
}
|
||||
|
||||
int32_t __catalogGetCachedTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
|
||||
SVgroupInfo* pVgroup, bool* exists) {
|
||||
int32_t __catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists) {
|
||||
int32_t code = g_mockCatalogService->catalogGetTableHashVgroup(pTableName, pVgroup, true);
|
||||
*exists = 0 != pVgroup->vgId;
|
||||
return code;
|
||||
|
@ -269,6 +267,13 @@ int32_t __catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* us
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t __catalogChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool* pass,
|
||||
bool* exists) {
|
||||
*pass = true;
|
||||
*exists = true;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t __catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* pInfo) {
|
||||
return g_mockCatalogService->catalogGetUdfInfo(funcName, pInfo);
|
||||
}
|
||||
|
@ -311,6 +316,7 @@ void initMetaDataEnv() {
|
|||
stub.set(catalogGetDBVgList, __catalogGetDBVgList);
|
||||
stub.set(catalogGetDBCfg, __catalogGetDBCfg);
|
||||
stub.set(catalogChkAuth, __catalogChkAuth);
|
||||
stub.set(catalogChkAuthFromCache, __catalogChkAuthFromCache);
|
||||
stub.set(catalogGetUdfInfo, __catalogGetUdfInfo);
|
||||
stub.set(catalogRefreshGetTableMeta, __catalogRefreshGetTableMeta);
|
||||
stub.set(catalogRemoveTableMeta, __catalogRemoveTableMeta);
|
||||
|
|
Loading…
Reference in New Issue