Merge pull request #16032 from taosdata/feature/3.0_insert_perf_wxy
enh: insert client optimize
This commit is contained in:
commit
68318d574c
|
@ -53,6 +53,8 @@ typedef struct SParseContext {
|
|||
int8_t schemalessType;
|
||||
const char* svrVer;
|
||||
bool nodeOffline;
|
||||
SArray* pTableMetaPos; // sql table pos => catalog data pos
|
||||
SArray* pTableVgroupPos; // sql table pos => catalog data pos
|
||||
} SParseContext;
|
||||
|
||||
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
|
||||
|
@ -84,8 +86,8 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
|
|||
int32_t rowNum);
|
||||
int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD_E** fields);
|
||||
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields);
|
||||
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName, TAOS_MULTI_BIND* bind,
|
||||
char* msgBuf, int32_t msgBufLen);
|
||||
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
|
||||
TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
|
||||
void destroyBoundColumnInfo(void* pBoundInfo);
|
||||
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
|
||||
int32_t msgBufLen);
|
||||
|
|
|
@ -689,11 +689,11 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
|||
TDMT_VND_CREATE_TABLE == pRequest->type) {
|
||||
pRequest->body.resInfo.numOfRows = res.numOfRows;
|
||||
if (TDMT_VND_SUBMIT == pRequest->type) {
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertRows, res.numOfRows);
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
|
||||
}
|
||||
|
||||
|
||||
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||
}
|
||||
|
||||
|
@ -800,8 +800,8 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
|
|||
break;
|
||||
}
|
||||
case TDMT_VND_SUBMIT: {
|
||||
atomic_add_fetch_64((int64_t *)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
|
||||
|
||||
atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
|
||||
|
||||
code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
|
||||
break;
|
||||
}
|
||||
|
@ -832,9 +832,9 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
|||
if (pResult) {
|
||||
pRequest->body.resInfo.numOfRows = pResult->numOfRows;
|
||||
if (TDMT_VND_SUBMIT == pRequest->type) {
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertRows, pResult->numOfRows);
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -877,14 +877,14 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
|||
if (pQuery->pRoot) {
|
||||
pRequest->stmtType = pQuery->pRoot->type;
|
||||
}
|
||||
|
||||
|
||||
if (pQuery->pRoot && !pRequest->inRetry) {
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||
if (QUERY_NODE_VNODE_MODIF_STMT == pQuery->pRoot->type) {
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
|
||||
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
|
||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1467,9 +1467,9 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
|
|||
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
|
||||
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
|
||||
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
|
||||
|
||||
if (pResultInfo->numOfRows == 0) {
|
||||
return NULL;
|
||||
|
@ -2006,7 +2006,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
|
|||
|
||||
bool inEscape = false;
|
||||
int32_t code = 0;
|
||||
void *pIter = NULL;
|
||||
void* pIter = NULL;
|
||||
|
||||
int32_t vIdx = 0;
|
||||
int32_t vPos[2];
|
||||
|
|
|
@ -22,6 +22,7 @@ extern "C" {
|
|||
|
||||
#include "catalog.h"
|
||||
#include "os.h"
|
||||
#include "parser.h"
|
||||
#include "query.h"
|
||||
|
||||
#define parserFatal(param, ...) qFatal("PARSER: " param, ##__VA_ARGS__)
|
||||
|
@ -44,18 +45,37 @@ typedef struct SParseTablesMetaReq {
|
|||
SHashObj* pTables;
|
||||
} SParseTablesMetaReq;
|
||||
|
||||
typedef enum ECatalogReqType {
|
||||
CATALOG_REQ_TYPE_META = 1,
|
||||
CATALOG_REQ_TYPE_VGROUP,
|
||||
CATALOG_REQ_TYPE_BOTH
|
||||
} ECatalogReqType;
|
||||
|
||||
typedef struct SInsertTablesMetaReq {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
SArray* pTableMetaPos;
|
||||
SArray* pTableMetaReq; // element is SName
|
||||
SArray* pTableVgroupPos;
|
||||
SArray* pTableVgroupReq; // element is SName
|
||||
} SInsertTablesMetaReq;
|
||||
|
||||
typedef struct SParseMetaCache {
|
||||
SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
|
||||
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
|
||||
SHashObj* pTableVgroup; // key is tbFName, element is SVgroupInfo*
|
||||
SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo*
|
||||
SHashObj* pDbInfo; // key is tbFName, element is SDbInfo*
|
||||
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
|
||||
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
|
||||
SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>*
|
||||
SHashObj* pTableCfg; // key is tbFName, element is STableCfg*
|
||||
SArray* pDnodes; // element is SEpSet
|
||||
bool dnodeRequired;
|
||||
SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
|
||||
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
|
||||
SHashObj* pTableVgroup; // key is tbFName, element is SVgroupInfo*
|
||||
SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo*
|
||||
SHashObj* pDbInfo; // key is tbFName, element is SDbInfo*
|
||||
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
|
||||
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
|
||||
SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>*
|
||||
SHashObj* pTableCfg; // key is tbFName, element is STableCfg*
|
||||
SArray* pDnodes; // element is SEpSet
|
||||
bool dnodeRequired;
|
||||
SHashObj* pInsertTables; // key is dbName, element is SInsertTablesMetaReq*, for insert
|
||||
const char* pUser;
|
||||
const SArray* pTableMetaData; // pRes = STableMeta*
|
||||
const SArray* pTableVgroupData; // pRes = SVgroupInfo*
|
||||
int32_t sqlTableNum;
|
||||
} SParseMetaCache;
|
||||
|
||||
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
|
||||
|
@ -72,8 +92,9 @@ STableMeta* tableMetaDup(const STableMeta* pTableMeta);
|
|||
|
||||
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
|
||||
|
||||
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
|
||||
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache);
|
||||
int32_t buildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
|
||||
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache,
|
||||
bool insertValuesStmt);
|
||||
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
|
||||
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache);
|
||||
int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
|
||||
|
@ -100,6 +121,12 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun
|
|||
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes);
|
||||
int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput);
|
||||
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
|
||||
int32_t reserveTableMetaInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
|
||||
SParseMetaCache* pMetaCache);
|
||||
int32_t getTableMetaFromCacheForInsert(SArray* pTableMetaPos, SParseMetaCache* pMetaCache, int32_t tableNo,
|
||||
STableMeta** pMeta);
|
||||
int32_t getTableVgroupFromCacheForInsert(SArray* pTableVgroupPos, SParseMetaCache* pMetaCache, int32_t tableNo,
|
||||
SVgroupInfo* pVgroup);
|
||||
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -73,6 +73,9 @@ typedef struct SInsertParseContext {
|
|||
SStmtCallback* pStmtCb;
|
||||
SParseMetaCache* pMetaCache;
|
||||
char sTableName[TSDB_TABLE_NAME_LEN];
|
||||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
|
||||
int64_t memElapsed;
|
||||
int64_t parRowElapsed;
|
||||
} SInsertParseContext;
|
||||
|
||||
typedef struct SInsertParseSyntaxCxt {
|
||||
|
@ -203,10 +206,11 @@ static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass)
|
|||
return catalogChkAuth(pBasicCtx->pCatalog, &conn, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
|
||||
}
|
||||
|
||||
static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta) {
|
||||
static int32_t getTableSchema(SInsertParseContext* pCxt, int32_t tbNo, SName* pTbName, bool isStb,
|
||||
STableMeta** pTableMeta) {
|
||||
SParseContext* pBasicCtx = pCxt->pComCxt;
|
||||
if (pBasicCtx->async) {
|
||||
return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta);
|
||||
return getTableMetaFromCacheForInsert(pBasicCtx->pTableMetaPos, pCxt->pMetaCache, tbNo, pTableMeta);
|
||||
}
|
||||
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
|
||||
.requestId = pBasicCtx->requestId,
|
||||
|
@ -219,10 +223,10 @@ static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool is
|
|||
return catalogGetTableMeta(pBasicCtx->pCatalog, &conn, pTbName, pTableMeta);
|
||||
}
|
||||
|
||||
static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroupInfo* pVg) {
|
||||
static int32_t getTableVgroup(SInsertParseContext* pCxt, int32_t tbNo, SName* pTbName, SVgroupInfo* pVg) {
|
||||
SParseContext* pBasicCtx = pCxt->pComCxt;
|
||||
if (pBasicCtx->async) {
|
||||
return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg);
|
||||
return getTableVgroupFromCacheForInsert(pBasicCtx->pTableVgroupPos, pCxt->pMetaCache, tbNo, pVg);
|
||||
}
|
||||
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
|
||||
.requestId = pBasicCtx->requestId,
|
||||
|
@ -231,28 +235,22 @@ static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroup
|
|||
return catalogGetTableHashVgroup(pBasicCtx->pCatalog, &conn, pTbName, pVg);
|
||||
}
|
||||
|
||||
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
|
||||
bool pass = false;
|
||||
CHECK_CODE(checkAuth(pCxt, dbFname, &pass));
|
||||
if (!pass) {
|
||||
return TSDB_CODE_PAR_PERMISSION_DENIED;
|
||||
}
|
||||
|
||||
CHECK_CODE(getTableSchema(pCxt, name, isStb, &pCxt->pTableMeta));
|
||||
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname, bool isStb) {
|
||||
CHECK_CODE(getTableSchema(pCxt, tbNo, name, isStb, &pCxt->pTableMeta));
|
||||
if (!isStb) {
|
||||
SVgroupInfo vg;
|
||||
CHECK_CODE(getTableVgroup(pCxt, name, &vg));
|
||||
CHECK_CODE(getTableVgroup(pCxt, tbNo, name, &vg));
|
||||
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
|
||||
return getTableMetaImpl(pCxt, name, dbFname, false);
|
||||
static int32_t getTableMeta(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname) {
|
||||
return getTableMetaImpl(pCxt, tbNo, name, dbFname, false);
|
||||
}
|
||||
|
||||
static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
|
||||
return getTableMetaImpl(pCxt, name, dbFname, true);
|
||||
static int32_t getSTableMeta(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname) {
|
||||
return getTableMetaImpl(pCxt, tbNo, name, dbFname, true);
|
||||
}
|
||||
|
||||
static int32_t getDBCfg(SInsertParseContext* pCxt, const char* pDbFName, SDbCfgInfo* pInfo) {
|
||||
|
@ -1028,13 +1026,13 @@ end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
|
||||
int32_t len, STableMeta* pMeta) {
|
||||
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, int32_t tbNo, SName* pTableName,
|
||||
const char* pName, int32_t len, STableMeta* pMeta) {
|
||||
SVgroupInfo vg;
|
||||
CHECK_CODE(getTableVgroup(pCxt, pTableName, &vg));
|
||||
CHECK_CODE(getTableVgroup(pCxt, tbNo, pTableName, &vg));
|
||||
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
||||
|
||||
pMeta->uid = 0;
|
||||
pMeta->uid = tbNo;
|
||||
pMeta->vgId = vg.vgId;
|
||||
pMeta->tableType = TSDB_CHILD_TABLE;
|
||||
|
||||
|
@ -1084,7 +1082,7 @@ static int32_t ignoreAutoCreateTableClause(SInsertParseContext* pCxt) {
|
|||
}
|
||||
|
||||
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
|
||||
static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tbFName) {
|
||||
static int32_t parseUsingClause(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* tbFName) {
|
||||
int32_t len = strlen(tbFName);
|
||||
STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
|
||||
if (NULL != pMeta) {
|
||||
|
@ -1102,11 +1100,11 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb
|
|||
tNameGetFullDbName(&sname, dbFName);
|
||||
strcpy(pCxt->sTableName, sname.tname);
|
||||
|
||||
CHECK_CODE(getSTableMeta(pCxt, &sname, dbFName));
|
||||
CHECK_CODE(getSTableMeta(pCxt, tbNo, &sname, dbFName));
|
||||
if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
|
||||
return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
|
||||
}
|
||||
CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, name, tbFName, len, pCxt->pTableMeta));
|
||||
CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, tbNo, name, tbFName, len, pCxt->pTableMeta));
|
||||
|
||||
SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
|
||||
setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
|
||||
|
@ -1195,7 +1193,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
|
|||
tdSRowEnd(pBuilder);
|
||||
|
||||
*gotRow = true;
|
||||
|
||||
|
||||
#ifdef TD_DEBUG_PRINT_ROW
|
||||
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(schema, spd->numOfCols, 1);
|
||||
tdSRowPrint(row, pSTSchema, __func__);
|
||||
|
@ -1214,7 +1212,7 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
|
|||
CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
|
||||
|
||||
(*numOfRows) = 0;
|
||||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
|
||||
// char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
|
||||
SToken sToken;
|
||||
while (1) {
|
||||
int32_t index = 0;
|
||||
|
@ -1232,7 +1230,7 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
|
|||
}
|
||||
|
||||
bool gotRow = false;
|
||||
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
|
||||
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, pCxt->tmpTokenBuf));
|
||||
if (gotRow) {
|
||||
pDataBlock->size += extendedRowSize; // len;
|
||||
}
|
||||
|
@ -1347,7 +1345,9 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa
|
|||
}
|
||||
|
||||
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
|
||||
taosMemoryFreeClear(pCxt->pTableMeta);
|
||||
if (!pCxt->pComCxt->async) {
|
||||
taosMemoryFreeClear(pCxt->pTableMeta);
|
||||
}
|
||||
destroyBoundColumnInfo(&pCxt->tags);
|
||||
tdDestroySVCreateTbReq(&pCxt->createTblReq);
|
||||
}
|
||||
|
@ -1365,6 +1365,20 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
|
|||
destroyBlockArrayList(pCxt->pVgDataBlocks);
|
||||
}
|
||||
|
||||
static int32_t parseTableName(SInsertParseContext* pCxt, SToken* pTbnameToken, SName* pName, char* pDbFName,
|
||||
char* pTbFName) {
|
||||
int32_t code = createSName(pName, pTbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tNameExtractFullName(pName, pTbFName);
|
||||
code = taosHashPut(pCxt->pTableNameHashObj, pTbFName, strlen(pTbFName), pName, sizeof(SName));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tNameGetFullDbName(pName, pDbFName);
|
||||
code = taosHashPut(pCxt->pDbFNameHashObj, pDbFName, strlen(pDbFName), pDbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
// tb_name
|
||||
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
|
||||
// [(field1_name, ...)]
|
||||
|
@ -1372,7 +1386,9 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
|
|||
// [...];
|
||||
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||
int32_t tbNum = 0;
|
||||
SName name;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
bool autoCreateTbl = false;
|
||||
|
||||
// for each table
|
||||
|
@ -1415,20 +1431,15 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
SToken tbnameToken = sToken;
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
|
||||
SName name;
|
||||
CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
|
||||
|
||||
tNameExtractFullName(&name, tbFName);
|
||||
CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(&name, dbFName);
|
||||
CHECK_CODE(taosHashPut(pCxt->pDbFNameHashObj, dbFName, strlen(dbFName), dbFName, sizeof(dbFName)));
|
||||
if (!pCxt->pComCxt->async || TK_USING == sToken.type) {
|
||||
CHECK_CODE(parseTableName(pCxt, &tbnameToken, &name, dbFName, tbFName));
|
||||
}
|
||||
|
||||
bool existedUsing = false;
|
||||
// USING clause
|
||||
if (TK_USING == sToken.type) {
|
||||
existedUsing = true;
|
||||
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
|
||||
CHECK_CODE(parseUsingClause(pCxt, tbNum, &name, tbFName));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
autoCreateTbl = true;
|
||||
}
|
||||
|
@ -1438,22 +1449,31 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
// pSql -> field1_name, ...)
|
||||
pBoundColsStart = pCxt->pSql;
|
||||
CHECK_CODE(ignoreBoundColumns(pCxt));
|
||||
// CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
}
|
||||
|
||||
if (TK_USING == sToken.type) {
|
||||
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
|
||||
if (pCxt->pComCxt->async) {
|
||||
CHECK_CODE(parseTableName(pCxt, &tbnameToken, &name, dbFName, tbFName));
|
||||
}
|
||||
CHECK_CODE(parseUsingClause(pCxt, tbNum, &name, tbFName));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
autoCreateTbl = true;
|
||||
} else if (!existedUsing) {
|
||||
CHECK_CODE(getTableMeta(pCxt, &name, dbFName));
|
||||
CHECK_CODE(getTableMeta(pCxt, tbNum, &name, dbFName));
|
||||
}
|
||||
|
||||
STableDataBlocks* dataBuf = NULL;
|
||||
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
|
||||
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
|
||||
&dataBuf, NULL, &pCxt->createTblReq));
|
||||
if (pCxt->pComCxt->async) {
|
||||
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, &pCxt->pTableMeta->uid, sizeof(pCxt->pTableMeta->uid),
|
||||
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk),
|
||||
getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL,
|
||||
&pCxt->createTblReq));
|
||||
} else {
|
||||
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
|
||||
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
|
||||
&dataBuf, NULL, &pCxt->createTblReq));
|
||||
}
|
||||
|
||||
if (NULL != pBoundColsStart) {
|
||||
char* pCurrPos = pCxt->pSql;
|
||||
|
@ -1532,7 +1552,9 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
|
|||
.totalNum = 0,
|
||||
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
|
||||
.pStmtCb = pContext->pStmtCb,
|
||||
.pMetaCache = pMetaCache};
|
||||
.pMetaCache = pMetaCache,
|
||||
.memElapsed = 0,
|
||||
.parRowElapsed = 0};
|
||||
|
||||
if (pContext->pStmtCb && *pQuery) {
|
||||
(*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
|
||||
|
@ -1547,7 +1569,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
|
|||
} else {
|
||||
context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
context.pTableBlockHashObj =
|
||||
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
}
|
||||
|
||||
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
|
||||
|
@ -1656,24 +1678,24 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
|
||||
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, bool isStable, int32_t tableNo, SToken* pTbToken) {
|
||||
SName name;
|
||||
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
|
||||
CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache));
|
||||
CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache));
|
||||
CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
|
||||
CHECK_CODE(reserveTableMetaInCacheForInsert(&name, isStable ? CATALOG_REQ_TYPE_META : CATALOG_REQ_TYPE_BOTH, tableNo,
|
||||
pCxt->pMetaCache));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
|
||||
static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, int32_t tableNo, SToken* pTbToken) {
|
||||
SName name;
|
||||
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
|
||||
CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
|
||||
CHECK_CODE(reserveTableMetaInCacheForInsert(&name, CATALOG_REQ_TYPE_VGROUP, tableNo, pCxt->pMetaCache));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
|
||||
bool hasData = false;
|
||||
bool hasData = false;
|
||||
int32_t tableNo = 0;
|
||||
// for each table
|
||||
while (1) {
|
||||
SToken sToken;
|
||||
|
@ -1702,9 +1724,9 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
|
|||
// USING clause
|
||||
if (TK_USING == sToken.type) {
|
||||
existedUsing = true;
|
||||
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken));
|
||||
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, tableNo, &tbnameToken));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
|
||||
CHECK_CODE(collectTableMetaKey(pCxt, true, tableNo, &sToken));
|
||||
CHECK_CODE(skipUsingClause(pCxt));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
}
|
||||
|
@ -1717,15 +1739,17 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
|
|||
|
||||
if (TK_USING == sToken.type && !existedUsing) {
|
||||
existedUsing = true;
|
||||
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken));
|
||||
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, tableNo, &tbnameToken));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
|
||||
CHECK_CODE(collectTableMetaKey(pCxt, true, tableNo, &sToken));
|
||||
CHECK_CODE(skipUsingClause(pCxt));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
} else {
|
||||
CHECK_CODE(collectTableMetaKey(pCxt, &tbnameToken));
|
||||
} else if (!existedUsing) {
|
||||
CHECK_CODE(collectTableMetaKey(pCxt, false, tableNo, &tbnameToken));
|
||||
}
|
||||
|
||||
++tableNo;
|
||||
|
||||
if (TK_VALUES == sToken.type) {
|
||||
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
|
||||
CHECK_CODE(skipValuesClause(pCxt));
|
||||
|
|
|
@ -476,9 +476,11 @@ static int32_t buildDbReq(SHashObj* pDbsHash, SArray** pDbs) {
|
|||
|
||||
static int32_t buildTableReqFromDb(SHashObj* pDbsHash, SArray** pDbs) {
|
||||
if (NULL != pDbsHash) {
|
||||
*pDbs = taosArrayInit(taosHashGetSize(pDbsHash), sizeof(STablesReq));
|
||||
if (NULL == *pDbs) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
*pDbs = taosArrayInit(taosHashGetSize(pDbsHash), sizeof(STablesReq));
|
||||
if (NULL == *pDbs) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
SParseTablesMetaReq* p = taosHashIterate(pDbsHash, NULL);
|
||||
while (NULL != p) {
|
||||
|
@ -530,7 +532,62 @@ static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
|
||||
static int32_t buildCatalogReqForInsert(SParseContext* pCxt, const SParseMetaCache* pMetaCache,
|
||||
SCatalogReq* pCatalogReq) {
|
||||
int32_t ndbs = taosHashGetSize(pMetaCache->pInsertTables);
|
||||
pCatalogReq->pTableMeta = taosArrayInit(ndbs, sizeof(STablesReq));
|
||||
if (NULL == pCatalogReq->pTableMeta) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pCatalogReq->pTableHash = taosArrayInit(ndbs, sizeof(STablesReq));
|
||||
if (NULL == pCatalogReq->pTableHash) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pCatalogReq->pUser = taosArrayInit(ndbs, sizeof(SUserAuthInfo));
|
||||
if (NULL == pCatalogReq->pUser) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pCxt->pTableMetaPos = taosArrayInit(pMetaCache->sqlTableNum, sizeof(int32_t));
|
||||
pCxt->pTableVgroupPos = taosArrayInit(pMetaCache->sqlTableNum, sizeof(int32_t));
|
||||
|
||||
int32_t metaReqNo = 0;
|
||||
int32_t vgroupReqNo = 0;
|
||||
SInsertTablesMetaReq* p = taosHashIterate(pMetaCache->pInsertTables, NULL);
|
||||
while (NULL != p) {
|
||||
STablesReq req = {0};
|
||||
strcpy(req.dbFName, p->dbFName);
|
||||
TSWAP(req.pTables, p->pTableMetaReq);
|
||||
taosArrayPush(pCatalogReq->pTableMeta, &req);
|
||||
|
||||
req.pTables = NULL;
|
||||
TSWAP(req.pTables, p->pTableVgroupReq);
|
||||
taosArrayPush(pCatalogReq->pTableHash, &req);
|
||||
|
||||
int32_t ntables = taosArrayGetSize(p->pTableMetaPos);
|
||||
for (int32_t i = 0; i < ntables; ++i) {
|
||||
taosArrayInsert(pCxt->pTableMetaPos, *(int32_t*)taosArrayGet(p->pTableMetaPos, i), &metaReqNo);
|
||||
++metaReqNo;
|
||||
}
|
||||
|
||||
ntables = taosArrayGetSize(p->pTableVgroupPos);
|
||||
for (int32_t i = 0; i < ntables; ++i) {
|
||||
taosArrayInsert(pCxt->pTableVgroupPos, *(int32_t*)taosArrayGet(p->pTableVgroupPos, i), &vgroupReqNo);
|
||||
++vgroupReqNo;
|
||||
}
|
||||
|
||||
SUserAuthInfo auth = {0};
|
||||
strcpy(auth.user, pCxt->pUser);
|
||||
strcpy(auth.dbFName, p->dbFName);
|
||||
auth.type = AUTH_TYPE_WRITE;
|
||||
taosArrayPush(pCatalogReq->pUser, &auth);
|
||||
|
||||
p = taosHashIterate(pMetaCache->pInsertTables, p);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildCatalogReqForQuery(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
|
||||
int32_t code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildDbReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup);
|
||||
|
@ -560,6 +617,13 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t buildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
|
||||
if (NULL != pMetaCache->pInsertTables) {
|
||||
return buildCatalogReqForInsert(pCxt, pMetaCache, pCatalogReq);
|
||||
}
|
||||
return buildCatalogReqForQuery(pMetaCache, pCatalogReq);
|
||||
}
|
||||
|
||||
static int32_t putMetaDataToHash(const char* pKey, int32_t len, const SArray* pData, int32_t index, SHashObj** pHash) {
|
||||
if (NULL == *pHash) {
|
||||
*pHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
|
@ -647,7 +711,8 @@ static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHas
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
|
||||
int32_t putMetaDataToCacheForQuery(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
|
||||
SParseMetaCache* pMetaCache) {
|
||||
int32_t code = putDbTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = putDbDataToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, &pMetaCache->pDbVgroup);
|
||||
|
@ -677,6 +742,30 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t putMetaDataToCacheForInsert(const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
|
||||
int32_t ndbs = taosArrayGetSize(pMetaData->pUser);
|
||||
for (int32_t i = 0; i < ndbs; ++i) {
|
||||
SMetaRes* pRes = taosArrayGet(pMetaData->pUser, i);
|
||||
if (TSDB_CODE_SUCCESS != pRes->code) {
|
||||
return pRes->code;
|
||||
}
|
||||
if (!(*(bool*)pRes->pRes)) {
|
||||
return TSDB_CODE_PAR_PERMISSION_DENIED;
|
||||
}
|
||||
}
|
||||
pMetaCache->pTableMetaData = pMetaData->pTableMeta;
|
||||
pMetaCache->pTableVgroupData = pMetaData->pTableHash;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache,
|
||||
bool insertValuesStmt) {
|
||||
if (insertValuesStmt) {
|
||||
return putMetaDataToCacheForInsert(pMetaData, pMetaCache);
|
||||
}
|
||||
return putMetaDataToCacheForQuery(pCatalogReq, pMetaData, pMetaCache);
|
||||
}
|
||||
|
||||
static int32_t reserveTableReqInCacheImpl(const char* pTbFName, int32_t len, SHashObj** pTables) {
|
||||
if (NULL == *pTables) {
|
||||
*pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
|
@ -977,6 +1066,82 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t reserveTableReqInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
|
||||
SInsertTablesMetaReq* pReq) {
|
||||
switch (reqType) {
|
||||
case CATALOG_REQ_TYPE_META:
|
||||
taosArrayPush(pReq->pTableMetaReq, pName);
|
||||
taosArrayPush(pReq->pTableMetaPos, &tableNo);
|
||||
break;
|
||||
case CATALOG_REQ_TYPE_VGROUP:
|
||||
taosArrayPush(pReq->pTableVgroupReq, pName);
|
||||
taosArrayPush(pReq->pTableVgroupPos, &tableNo);
|
||||
break;
|
||||
case CATALOG_REQ_TYPE_BOTH:
|
||||
taosArrayPush(pReq->pTableMetaReq, pName);
|
||||
taosArrayPush(pReq->pTableMetaPos, &tableNo);
|
||||
taosArrayPush(pReq->pTableVgroupReq, pName);
|
||||
taosArrayPush(pReq->pTableVgroupPos, &tableNo);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t reserveTableReqInDbCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
|
||||
SHashObj* pDbs) {
|
||||
SInsertTablesMetaReq req = {.pTableMetaReq = taosArrayInit(4, sizeof(SName)),
|
||||
.pTableMetaPos = taosArrayInit(4, sizeof(int32_t)),
|
||||
.pTableVgroupReq = taosArrayInit(4, sizeof(SName)),
|
||||
.pTableVgroupPos = taosArrayInit(4, sizeof(int32_t))};
|
||||
tNameGetFullDbName(pName, req.dbFName);
|
||||
int32_t code = reserveTableReqInCacheForInsert(pName, reqType, tableNo, &req);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = taosHashPut(pDbs, pName->dbname, strlen(pName->dbname), &req, sizeof(SInsertTablesMetaReq));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t reserveTableMetaInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
|
||||
SParseMetaCache* pMetaCache) {
|
||||
if (NULL == pMetaCache->pInsertTables) {
|
||||
pMetaCache->pInsertTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
if (NULL == pMetaCache->pInsertTables) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
pMetaCache->sqlTableNum = tableNo;
|
||||
SInsertTablesMetaReq* pReq = taosHashGet(pMetaCache->pInsertTables, pName->dbname, strlen(pName->dbname));
|
||||
if (NULL == pReq) {
|
||||
return reserveTableReqInDbCacheForInsert(pName, reqType, tableNo, pMetaCache->pInsertTables);
|
||||
}
|
||||
return reserveTableReqInCacheForInsert(pName, reqType, tableNo, pReq);
|
||||
}
|
||||
|
||||
int32_t getTableMetaFromCacheForInsert(SArray* pTableMetaPos, SParseMetaCache* pMetaCache, int32_t tableNo,
|
||||
STableMeta** pMeta) {
|
||||
int32_t reqIndex = *(int32_t*)taosArrayGet(pTableMetaPos, tableNo);
|
||||
SMetaRes* pRes = taosArrayGet(pMetaCache->pTableMetaData, reqIndex);
|
||||
if (TSDB_CODE_SUCCESS == pRes->code) {
|
||||
*pMeta = pRes->pRes;
|
||||
if (NULL == *pMeta) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return pRes->code;
|
||||
}
|
||||
|
||||
int32_t getTableVgroupFromCacheForInsert(SArray* pTableVgroupPos, SParseMetaCache* pMetaCache, int32_t tableNo,
|
||||
SVgroupInfo* pVgroup) {
|
||||
int32_t reqIndex = *(int32_t*)taosArrayGet(pTableVgroupPos, tableNo);
|
||||
SMetaRes* pRes = taosArrayGet(pMetaCache->pTableVgroupData, reqIndex);
|
||||
if (TSDB_CODE_SUCCESS == pRes->code) {
|
||||
memcpy(pVgroup, pRes->pRes, sizeof(SVgroupInfo));
|
||||
}
|
||||
return pRes->code;
|
||||
}
|
||||
|
||||
void destoryParseTablesMetaReqHash(SHashObj* pHash) {
|
||||
SParseTablesMetaReq* p = taosHashIterate(pHash, NULL);
|
||||
while (NULL != p) {
|
||||
|
|
|
@ -185,7 +185,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
|
|||
code = parseSqlSyntax(pCxt, pQuery, &metaCache);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildCatalogReq(&metaCache, pCatalogReq);
|
||||
code = buildCatalogReq(pCxt, &metaCache, pCatalogReq);
|
||||
}
|
||||
destoryParseMetaCache(&metaCache, true);
|
||||
terrno = code;
|
||||
|
@ -195,7 +195,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
|
|||
int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
|
||||
const struct SMetaData* pMetaData, SQuery* pQuery) {
|
||||
SParseMetaCache metaCache = {0};
|
||||
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache);
|
||||
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache, NULL == pQuery->pRoot);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (NULL == pQuery->pRoot) {
|
||||
code = parseInsertSql(pCxt, &pQuery, &metaCache);
|
||||
|
|
|
@ -13,21 +13,13 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <functional>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "mockCatalogService.h"
|
||||
#include "os.h"
|
||||
#include "parInt.h"
|
||||
#include "parTestUtil.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace std::placeholders;
|
||||
using namespace testing;
|
||||
|
||||
namespace {
|
||||
string toString(int32_t code) { return tstrerror(code); }
|
||||
} // namespace
|
||||
namespace ParserTest {
|
||||
|
||||
// syntax:
|
||||
// INSERT INTO
|
||||
|
@ -36,259 +28,60 @@ string toString(int32_t code) { return tstrerror(code); }
|
|||
// [(field1_name, ...)]
|
||||
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
|
||||
// [...];
|
||||
class InsertTest : public Test {
|
||||
protected:
|
||||
InsertTest() : res_(nullptr) {}
|
||||
~InsertTest() { reset(); }
|
||||
|
||||
void setDatabase(const string& acctId, const string& db) {
|
||||
acctId_ = acctId;
|
||||
db_ = db;
|
||||
}
|
||||
|
||||
void bind(const char* sql) {
|
||||
reset();
|
||||
cxt_.acctId = atoi(acctId_.c_str());
|
||||
cxt_.db = (char*)db_.c_str();
|
||||
strcpy(sqlBuf_, sql);
|
||||
cxt_.sqlLen = strlen(sql);
|
||||
sqlBuf_[cxt_.sqlLen] = '\0';
|
||||
cxt_.pSql = sqlBuf_;
|
||||
}
|
||||
|
||||
int32_t run() {
|
||||
code_ = parseInsertSql(&cxt_, &res_, nullptr);
|
||||
if (code_ != TSDB_CODE_SUCCESS) {
|
||||
cout << "code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
||||
}
|
||||
return code_;
|
||||
}
|
||||
|
||||
int32_t runAsync() {
|
||||
cxt_.async = true;
|
||||
bool request = true;
|
||||
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
|
||||
new SParseMetaCache(), std::bind(_destoryParseMetaCache, _1, cref(request)));
|
||||
code_ = parseInsertSyntax(&cxt_, &res_, metaCache.get());
|
||||
if (code_ != TSDB_CODE_SUCCESS) {
|
||||
cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
||||
return code_;
|
||||
}
|
||||
|
||||
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
|
||||
MockCatalogService::destoryCatalogReq);
|
||||
code_ = buildCatalogReq(metaCache.get(), catalogReq.get());
|
||||
if (code_ != TSDB_CODE_SUCCESS) {
|
||||
cout << "buildCatalogReq code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
||||
return code_;
|
||||
}
|
||||
|
||||
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
|
||||
g_mockCatalogService->catalogGetAllMeta(catalogReq.get(), metaData.get());
|
||||
|
||||
metaCache.reset(new SParseMetaCache());
|
||||
request = false;
|
||||
code_ = putMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get());
|
||||
if (code_ != TSDB_CODE_SUCCESS) {
|
||||
cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
||||
return code_;
|
||||
}
|
||||
|
||||
code_ = parseInsertSql(&cxt_, &res_, metaCache.get());
|
||||
if (code_ != TSDB_CODE_SUCCESS) {
|
||||
cout << "parseInsertSql code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
||||
return code_;
|
||||
}
|
||||
|
||||
return code_;
|
||||
}
|
||||
|
||||
void dumpReslut() {
|
||||
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
|
||||
size_t num = taosArrayGetSize(pStmt->pDataBlocks);
|
||||
cout << "payloadType:" << (int32_t)pStmt->payloadType << ", insertType:" << pStmt->insertType
|
||||
<< ", numOfVgs:" << num << endl;
|
||||
for (size_t i = 0; i < num; ++i) {
|
||||
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i);
|
||||
cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl;
|
||||
SSubmitReq* submit = (SSubmitReq*)vg->pData;
|
||||
cout << "length:" << ntohl(submit->length) << ", numOfBlocks:" << ntohl(submit->numOfBlocks) << endl;
|
||||
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
|
||||
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
cout << "Block:" << i << endl;
|
||||
cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", sversion:" << ntohl(blk->sversion)
|
||||
<< ", dataLen:" << ntohl(blk->dataLen) << ", schemaLen:" << ntohl(blk->schemaLen)
|
||||
<< ", numOfRows:" << ntohl(blk->numOfRows) << endl;
|
||||
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void checkReslut(int32_t numOfTables, int32_t numOfRows1, int32_t numOfRows2 = -1) {
|
||||
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
|
||||
ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV);
|
||||
ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
|
||||
size_t num = taosArrayGetSize(pStmt->pDataBlocks);
|
||||
ASSERT_GE(num, 0);
|
||||
for (size_t i = 0; i < num; ++i) {
|
||||
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i);
|
||||
ASSERT_EQ(vg->numOfTables, numOfTables);
|
||||
ASSERT_GE(vg->size, 0);
|
||||
SSubmitReq* submit = (SSubmitReq*)vg->pData;
|
||||
ASSERT_GE(ntohl(submit->length), 0);
|
||||
ASSERT_GE(ntohl(submit->numOfBlocks), 0);
|
||||
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
|
||||
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
ASSERT_EQ(ntohl(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1)));
|
||||
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
static const int max_err_len = 1024;
|
||||
static const int max_sql_len = 1024 * 1024;
|
||||
|
||||
static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
|
||||
destoryParseMetaCache(pMetaCache, request);
|
||||
delete pMetaCache;
|
||||
}
|
||||
|
||||
void reset() {
|
||||
memset(&cxt_, 0, sizeof(cxt_));
|
||||
memset(errMagBuf_, 0, max_err_len);
|
||||
cxt_.pMsg = errMagBuf_;
|
||||
cxt_.msgLen = max_err_len;
|
||||
code_ = TSDB_CODE_SUCCESS;
|
||||
qDestroyQuery(res_);
|
||||
res_ = nullptr;
|
||||
}
|
||||
|
||||
SVnodeModifOpStmt* getVnodeModifStmt(SQuery* pQuery) { return (SVnodeModifOpStmt*)pQuery->pRoot; }
|
||||
|
||||
string acctId_;
|
||||
string db_;
|
||||
char errMagBuf_[max_err_len];
|
||||
char sqlBuf_[max_sql_len];
|
||||
SParseContext cxt_;
|
||||
int32_t code_;
|
||||
SQuery* res_;
|
||||
};
|
||||
class ParserInsertTest : public ParserTestBase {};
|
||||
|
||||
// INSERT INTO tb_name [(field1_name, ...)] VALUES (field1_value, ...)
|
||||
TEST_F(InsertTest, singleTableSingleRowTest) {
|
||||
setDatabase("root", "test");
|
||||
TEST_F(ParserInsertTest, singleTableSingleRowTest) {
|
||||
useDb("root", "test");
|
||||
|
||||
bind("insert into t1 values (now, 1, 'beijing', 3, 4, 5)");
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
dumpReslut();
|
||||
checkReslut(1, 1);
|
||||
run("INSERT INTO t1 VALUES (now, 1, 'beijing', 3, 4, 5)");
|
||||
|
||||
bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)");
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
|
||||
bind("insert into t1 values (now, 1, 'beijing', 3, 4, 5)");
|
||||
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
|
||||
dumpReslut();
|
||||
checkReslut(1, 1);
|
||||
|
||||
bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)");
|
||||
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
|
||||
run("INSERT INTO t1 (ts, c1, c2, c3, c4, c5) VALUES (now, 1, 'beijing', 3, 4, 5)");
|
||||
}
|
||||
|
||||
// INSERT INTO tb_name VALUES (field1_value, ...)(field1_value, ...)
|
||||
TEST_F(InsertTest, singleTableMultiRowTest) {
|
||||
setDatabase("root", "test");
|
||||
TEST_F(ParserInsertTest, singleTableMultiRowTest) {
|
||||
useDb("root", "test");
|
||||
|
||||
bind(
|
||||
"insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)"
|
||||
run("INSERT INTO t1 VALUES (now, 1, 'beijing', 3, 4, 5)"
|
||||
"(now+1s, 2, 'shanghai', 6, 7, 8)"
|
||||
"(now+2s, 3, 'guangzhou', 9, 10, 11)");
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
dumpReslut();
|
||||
checkReslut(1, 3);
|
||||
|
||||
bind(
|
||||
"insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)"
|
||||
"(now+2s, 3, 'guangzhou', 9, 10, 11)");
|
||||
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...)
|
||||
TEST_F(InsertTest, multiTableSingleRowTest) {
|
||||
setDatabase("root", "test");
|
||||
TEST_F(ParserInsertTest, multiTableSingleRowTest) {
|
||||
useDb("root", "test");
|
||||
|
||||
bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")");
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
dumpReslut();
|
||||
checkReslut(2, 1);
|
||||
|
||||
bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")");
|
||||
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
|
||||
run("INSERT INTO st1s1 VALUES (now, 1, 'beijing') st1s2 VALUES (now, 10, '131028')");
|
||||
}
|
||||
|
||||
// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...)
|
||||
TEST_F(InsertTest, multiTableMultiRowTest) {
|
||||
setDatabase("root", "test");
|
||||
TEST_F(ParserInsertTest, multiTableMultiRowTest) {
|
||||
useDb("root", "test");
|
||||
|
||||
bind(
|
||||
"insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"
|
||||
" st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")");
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
dumpReslut();
|
||||
checkReslut(2, 3, 2);
|
||||
|
||||
bind(
|
||||
"insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"
|
||||
" st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")");
|
||||
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
|
||||
run("INSERT INTO "
|
||||
"st1s1 VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou') "
|
||||
"st1s2 VALUES (now, 10, '131028')(now+1s, 20, '132028')");
|
||||
}
|
||||
|
||||
// INSERT INTO
|
||||
// tb1_name USING st1_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...)
|
||||
// tb2_name USING st2_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...)
|
||||
TEST_F(InsertTest, autoCreateTableTest) {
|
||||
setDatabase("root", "test");
|
||||
TEST_F(ParserInsertTest, autoCreateTableTest) {
|
||||
useDb("root", "test");
|
||||
|
||||
bind(
|
||||
"insert into st1s1 using st1 tags(1, 'wxy', now) "
|
||||
"values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
dumpReslut();
|
||||
checkReslut(1, 3);
|
||||
run("INSERT INTO st1s1 USING st1 TAGS(1, 'wxy', now) "
|
||||
"VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou')");
|
||||
|
||||
bind(
|
||||
"insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")"
|
||||
"(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
run("INSERT INTO st1s1 USING st1 (tag1, tag2) TAGS(1, 'wxy') (ts, c1, c2) "
|
||||
"VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou')");
|
||||
|
||||
bind(
|
||||
"insert into st1s1 using st1 tags(1, 'wxy', now) "
|
||||
"values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
|
||||
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
|
||||
run("INSERT INTO st1s1 (ts, c1, c2) USING st1 (tag1, tag2) TAGS(1, 'wxy') "
|
||||
"VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou')");
|
||||
|
||||
bind(
|
||||
"insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")"
|
||||
"(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
|
||||
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
|
||||
|
||||
bind(
|
||||
"insert into st1s1 using st1 tags(1, 'wxy', now) values (now, 1, \"beijing\")"
|
||||
"st1s1 using st1 tags(1, 'wxy', now) values (now+1s, 2, \"shanghai\")");
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
run("INSERT INTO "
|
||||
"st1s1 USING st1 (tag1, tag2) TAGS(1, 'wxy') (ts, c1, c2) VALUES (now, 1, 'beijing') "
|
||||
"st1s2 (ts, c1, c2) USING st1 TAGS(2, 'abc', now) VALUES (now+1s, 2, 'shanghai')");
|
||||
}
|
||||
|
||||
TEST_F(InsertTest, toleranceTest) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("insert into");
|
||||
ASSERT_NE(run(), TSDB_CODE_SUCCESS);
|
||||
bind("insert into t");
|
||||
ASSERT_NE(run(), TSDB_CODE_SUCCESS);
|
||||
|
||||
bind("insert into");
|
||||
ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS);
|
||||
bind("insert into t");
|
||||
ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS);
|
||||
}
|
||||
} // namespace ParserTest
|
||||
|
|
|
@ -225,16 +225,17 @@ class ParserTestBaseImpl {
|
|||
DO_WITH_THROW(collectMetaKey, pCxt, pQuery, pMetaCache);
|
||||
}
|
||||
|
||||
void doBuildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
|
||||
DO_WITH_THROW(buildCatalogReq, pMetaCache, pCatalogReq);
|
||||
void doBuildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
|
||||
DO_WITH_THROW(buildCatalogReq, pCxt, pMetaCache, pCatalogReq);
|
||||
}
|
||||
|
||||
void doGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) {
|
||||
DO_WITH_THROW(g_mockCatalogService->catalogGetAllMeta, pCatalogReq, pMetaData);
|
||||
}
|
||||
|
||||
void doPutMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
|
||||
DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache);
|
||||
void doPutMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache,
|
||||
bool isInsertValues) {
|
||||
DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache, isInsertValues);
|
||||
}
|
||||
|
||||
void doAuthenticate(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
|
||||
|
@ -261,7 +262,9 @@ class ParserTestBaseImpl {
|
|||
void doParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq) {
|
||||
DO_WITH_THROW(qParseSqlSyntax, pCxt, pQuery, pCatalogReq);
|
||||
ASSERT_NE(*pQuery, nullptr);
|
||||
res_.parsedAst_ = toString((*pQuery)->pRoot);
|
||||
if (nullptr != (*pQuery)->pRoot) {
|
||||
res_.parsedAst_ = toString((*pQuery)->pRoot);
|
||||
}
|
||||
}
|
||||
|
||||
void doAnalyseSqlSemantic(SParseContext* pCxt, const SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
|
||||
|
@ -270,6 +273,17 @@ class ParserTestBaseImpl {
|
|||
res_.calcConstAst_ = toString(pQuery->pRoot);
|
||||
}
|
||||
|
||||
void doParseInsertSql(SParseContext* pCxt, SQuery** pQuery, SParseMetaCache* pMetaCache) {
|
||||
DO_WITH_THROW(parseInsertSql, pCxt, pQuery, pMetaCache);
|
||||
ASSERT_NE(*pQuery, nullptr);
|
||||
res_.parsedAst_ = toString((*pQuery)->pRoot);
|
||||
}
|
||||
|
||||
void doParseInsertSyntax(SParseContext* pCxt, SQuery** pQuery, SParseMetaCache* pMetaCache) {
|
||||
DO_WITH_THROW(parseInsertSyntax, pCxt, pQuery, pMetaCache);
|
||||
ASSERT_NE(*pQuery, nullptr);
|
||||
}
|
||||
|
||||
string toString(const SNode* pRoot) {
|
||||
char* pStr = NULL;
|
||||
int32_t len = 0;
|
||||
|
@ -287,15 +301,20 @@ class ParserTestBaseImpl {
|
|||
SParseContext cxt = {0};
|
||||
setParseContext(sql, &cxt);
|
||||
|
||||
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
|
||||
doParse(&cxt, query.get());
|
||||
SQuery* pQuery = *(query.get());
|
||||
if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) {
|
||||
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
|
||||
doParseInsertSql(&cxt, query.get(), nullptr);
|
||||
} else {
|
||||
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
|
||||
doParse(&cxt, query.get());
|
||||
SQuery* pQuery = *(query.get());
|
||||
|
||||
doAuthenticate(&cxt, pQuery, nullptr);
|
||||
doAuthenticate(&cxt, pQuery, nullptr);
|
||||
|
||||
doTranslate(&cxt, pQuery, nullptr);
|
||||
doTranslate(&cxt, pQuery, nullptr);
|
||||
|
||||
doCalculateConstant(&cxt, pQuery);
|
||||
doCalculateConstant(&cxt, pQuery);
|
||||
}
|
||||
|
||||
if (g_dump) {
|
||||
dump();
|
||||
|
@ -338,17 +357,22 @@ class ParserTestBaseImpl {
|
|||
setParseContext(sql, &cxt, true);
|
||||
|
||||
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
|
||||
doParse(&cxt, query.get());
|
||||
SQuery* pQuery = *(query.get());
|
||||
|
||||
bool request = true;
|
||||
bool request = true;
|
||||
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
|
||||
new SParseMetaCache(), bind(_destoryParseMetaCache, _1, cref(request)));
|
||||
doCollectMetaKey(&cxt, pQuery, metaCache.get());
|
||||
bool isInsertValues = qIsInsertValuesSql(cxt.pSql, cxt.sqlLen);
|
||||
if (isInsertValues) {
|
||||
doParseInsertSyntax(&cxt, query.get(), metaCache.get());
|
||||
} else {
|
||||
doParse(&cxt, query.get());
|
||||
doCollectMetaKey(&cxt, *(query.get()), metaCache.get());
|
||||
}
|
||||
|
||||
SQuery* pQuery = *(query.get());
|
||||
|
||||
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
|
||||
MockCatalogService::destoryCatalogReq);
|
||||
doBuildCatalogReq(metaCache.get(), catalogReq.get());
|
||||
doBuildCatalogReq(&cxt, metaCache.get(), catalogReq.get());
|
||||
|
||||
string err;
|
||||
thread t1([&]() {
|
||||
|
@ -358,13 +382,17 @@ class ParserTestBaseImpl {
|
|||
|
||||
metaCache.reset(new SParseMetaCache());
|
||||
request = false;
|
||||
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get());
|
||||
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), isInsertValues);
|
||||
|
||||
doAuthenticate(&cxt, pQuery, metaCache.get());
|
||||
if (isInsertValues) {
|
||||
doParseInsertSql(&cxt, query.get(), metaCache.get());
|
||||
} else {
|
||||
doAuthenticate(&cxt, pQuery, metaCache.get());
|
||||
|
||||
doTranslate(&cxt, pQuery, metaCache.get());
|
||||
doTranslate(&cxt, pQuery, metaCache.get());
|
||||
|
||||
doCalculateConstant(&cxt, pQuery);
|
||||
doCalculateConstant(&cxt, pQuery);
|
||||
}
|
||||
} catch (const TerminateFlag& e) {
|
||||
// success and terminate
|
||||
} catch (const runtime_error& e) {
|
||||
|
|
Loading…
Reference in New Issue