enh: insert optimize
This commit is contained in:
parent
379cdc0566
commit
4290582249
|
@ -53,6 +53,8 @@ typedef struct SParseContext {
|
|||
int8_t schemalessType;
|
||||
const char* svrVer;
|
||||
bool nodeOffline;
|
||||
SArray* pTableMetaPos; // sql table pos => catalog data pos
|
||||
SArray* pTableVgroupPos; // sql table pos => catalog data pos
|
||||
} SParseContext;
|
||||
|
||||
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
|
||||
|
@ -84,8 +86,8 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
|
|||
int32_t rowNum);
|
||||
int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD_E** fields);
|
||||
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields);
|
||||
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName, TAOS_MULTI_BIND* bind,
|
||||
char* msgBuf, int32_t msgBufLen);
|
||||
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
|
||||
TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
|
||||
void destroyBoundColumnInfo(void* pBoundInfo);
|
||||
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
|
||||
int32_t msgBufLen);
|
||||
|
|
|
@ -689,11 +689,11 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
|||
TDMT_VND_CREATE_TABLE == pRequest->type) {
|
||||
pRequest->body.resInfo.numOfRows = res.numOfRows;
|
||||
if (TDMT_VND_SUBMIT == pRequest->type) {
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertRows, res.numOfRows);
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
|
||||
}
|
||||
|
||||
|
||||
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||
}
|
||||
|
||||
|
@ -800,8 +800,8 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
|
|||
break;
|
||||
}
|
||||
case TDMT_VND_SUBMIT: {
|
||||
atomic_add_fetch_64((int64_t *)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
|
||||
|
||||
atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
|
||||
|
||||
code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
|
||||
break;
|
||||
}
|
||||
|
@ -832,9 +832,9 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
|||
if (pResult) {
|
||||
pRequest->body.resInfo.numOfRows = pResult->numOfRows;
|
||||
if (TDMT_VND_SUBMIT == pRequest->type) {
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertRows, pResult->numOfRows);
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -877,14 +877,14 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
|||
if (pQuery->pRoot) {
|
||||
pRequest->stmtType = pQuery->pRoot->type;
|
||||
}
|
||||
|
||||
|
||||
if (pQuery->pRoot && !pRequest->inRetry) {
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||
if (QUERY_NODE_VNODE_MODIF_STMT == pQuery->pRoot->type) {
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
|
||||
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
|
||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1467,9 +1467,9 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
|
|||
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
|
||||
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
|
||||
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
|
||||
|
||||
if (pResultInfo->numOfRows == 0) {
|
||||
return NULL;
|
||||
|
@ -1983,7 +1983,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
|
|||
|
||||
bool inEscape = false;
|
||||
int32_t code = 0;
|
||||
void *pIter = NULL;
|
||||
void* pIter = NULL;
|
||||
|
||||
int32_t vIdx = 0;
|
||||
int32_t vPos[2];
|
||||
|
|
|
@ -22,6 +22,7 @@ extern "C" {
|
|||
|
||||
#include "catalog.h"
|
||||
#include "os.h"
|
||||
#include "parser.h"
|
||||
#include "query.h"
|
||||
|
||||
#define parserFatal(param, ...) qFatal("PARSER: " param, ##__VA_ARGS__)
|
||||
|
@ -44,18 +45,37 @@ typedef struct SParseTablesMetaReq {
|
|||
SHashObj* pTables;
|
||||
} SParseTablesMetaReq;
|
||||
|
||||
typedef enum ECatalogReqType {
|
||||
CATALOG_REQ_TYPE_META = 1,
|
||||
CATALOG_REQ_TYPE_VGROUP,
|
||||
CATALOG_REQ_TYPE_BOTH
|
||||
} ECatalogReqType;
|
||||
|
||||
typedef struct SInsertTablesMetaReq {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
SArray* pTableMetaPos;
|
||||
SArray* pTableMetaReq; // element is SName
|
||||
SArray* pTableVgroupPos;
|
||||
SArray* pTableVgroupReq; // element is SName
|
||||
} SInsertTablesMetaReq;
|
||||
|
||||
typedef struct SParseMetaCache {
|
||||
SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
|
||||
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
|
||||
SHashObj* pTableVgroup; // key is tbFName, element is SVgroupInfo*
|
||||
SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo*
|
||||
SHashObj* pDbInfo; // key is tbFName, element is SDbInfo*
|
||||
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
|
||||
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
|
||||
SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>*
|
||||
SHashObj* pTableCfg; // key is tbFName, element is STableCfg*
|
||||
SArray* pDnodes; // element is SEpSet
|
||||
bool dnodeRequired;
|
||||
SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
|
||||
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
|
||||
SHashObj* pTableVgroup; // key is tbFName, element is SVgroupInfo*
|
||||
SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo*
|
||||
SHashObj* pDbInfo; // key is tbFName, element is SDbInfo*
|
||||
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
|
||||
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
|
||||
SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>*
|
||||
SHashObj* pTableCfg; // key is tbFName, element is STableCfg*
|
||||
SArray* pDnodes; // element is SEpSet
|
||||
bool dnodeRequired;
|
||||
SHashObj* pInsertTables; // key is dbName, element is SInsertTablesMetaReq*, for insert
|
||||
const char* pUser;
|
||||
const SArray* pTableMetaData; // pRes = STableMeta*
|
||||
const SArray* pTableVgroupData; // pRes = SVgroupInfo*
|
||||
int32_t sqlTableNum;
|
||||
} SParseMetaCache;
|
||||
|
||||
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
|
||||
|
@ -72,8 +92,9 @@ STableMeta* tableMetaDup(const STableMeta* pTableMeta);
|
|||
|
||||
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
|
||||
|
||||
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
|
||||
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache);
|
||||
int32_t buildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
|
||||
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache,
|
||||
bool insertValuesStmt);
|
||||
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
|
||||
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache);
|
||||
int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
|
||||
|
@ -100,6 +121,12 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun
|
|||
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes);
|
||||
int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput);
|
||||
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
|
||||
int32_t reserveTableMetaInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
|
||||
SParseMetaCache* pMetaCache);
|
||||
int32_t getTableMetaFromCacheForInsert(SArray* pTableMetaPos, SParseMetaCache* pMetaCache, int32_t tableNo,
|
||||
STableMeta** pMeta);
|
||||
int32_t getTableVgroupFromCacheForInsert(SArray* pTableVgroupPos, SParseMetaCache* pMetaCache, int32_t tableNo,
|
||||
SVgroupInfo* pVgroup);
|
||||
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -18,10 +18,10 @@
|
|||
#include "parInt.h"
|
||||
#include "parToken.h"
|
||||
#include "parUtil.h"
|
||||
#include "query.h"
|
||||
#include "tglobal.h"
|
||||
#include "ttime.h"
|
||||
#include "ttypes.h"
|
||||
#include "query.h"
|
||||
|
||||
#define NEXT_TOKEN(pSql, sToken) \
|
||||
do { \
|
||||
|
@ -73,6 +73,9 @@ typedef struct SInsertParseContext {
|
|||
SStmtCallback* pStmtCb;
|
||||
SParseMetaCache* pMetaCache;
|
||||
char sTableName[TSDB_TABLE_NAME_LEN];
|
||||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
|
||||
int64_t memElapsed;
|
||||
int64_t parRowElapsed;
|
||||
} SInsertParseContext;
|
||||
|
||||
typedef struct SInsertParseSyntaxCxt {
|
||||
|
@ -203,10 +206,11 @@ static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass)
|
|||
return catalogChkAuth(pBasicCtx->pCatalog, &conn, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
|
||||
}
|
||||
|
||||
static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta) {
|
||||
static int32_t getTableSchema(SInsertParseContext* pCxt, int32_t tbNo, SName* pTbName, bool isStb,
|
||||
STableMeta** pTableMeta) {
|
||||
SParseContext* pBasicCtx = pCxt->pComCxt;
|
||||
if (pBasicCtx->async) {
|
||||
return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta);
|
||||
return getTableMetaFromCacheForInsert(pBasicCtx->pTableMetaPos, pCxt->pMetaCache, tbNo, pTableMeta);
|
||||
}
|
||||
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
|
||||
.requestId = pBasicCtx->requestId,
|
||||
|
@ -219,10 +223,10 @@ static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool is
|
|||
return catalogGetTableMeta(pBasicCtx->pCatalog, &conn, pTbName, pTableMeta);
|
||||
}
|
||||
|
||||
static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroupInfo* pVg) {
|
||||
static int32_t getTableVgroup(SInsertParseContext* pCxt, int32_t tbNo, SName* pTbName, SVgroupInfo* pVg) {
|
||||
SParseContext* pBasicCtx = pCxt->pComCxt;
|
||||
if (pBasicCtx->async) {
|
||||
return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg);
|
||||
return getTableVgroupFromCacheForInsert(pBasicCtx->pTableVgroupPos, pCxt->pMetaCache, tbNo, pVg);
|
||||
}
|
||||
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
|
||||
.requestId = pBasicCtx->requestId,
|
||||
|
@ -231,28 +235,28 @@ static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroup
|
|||
return catalogGetTableHashVgroup(pBasicCtx->pCatalog, &conn, pTbName, pVg);
|
||||
}
|
||||
|
||||
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
|
||||
bool pass = false;
|
||||
CHECK_CODE(checkAuth(pCxt, dbFname, &pass));
|
||||
if (!pass) {
|
||||
return TSDB_CODE_PAR_PERMISSION_DENIED;
|
||||
}
|
||||
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname, bool isStb) {
|
||||
// bool pass = false;
|
||||
// CHECK_CODE(checkAuth(pCxt, dbFname, &pass));
|
||||
// if (!pass) {
|
||||
// return TSDB_CODE_PAR_PERMISSION_DENIED;
|
||||
// }
|
||||
|
||||
CHECK_CODE(getTableSchema(pCxt, name, isStb, &pCxt->pTableMeta));
|
||||
CHECK_CODE(getTableSchema(pCxt, tbNo, name, isStb, &pCxt->pTableMeta));
|
||||
if (!isStb) {
|
||||
SVgroupInfo vg;
|
||||
CHECK_CODE(getTableVgroup(pCxt, name, &vg));
|
||||
CHECK_CODE(getTableVgroup(pCxt, tbNo, name, &vg));
|
||||
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
|
||||
return getTableMetaImpl(pCxt, name, dbFname, false);
|
||||
static int32_t getTableMeta(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname) {
|
||||
return getTableMetaImpl(pCxt, tbNo, name, dbFname, false);
|
||||
}
|
||||
|
||||
static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
|
||||
return getTableMetaImpl(pCxt, name, dbFname, true);
|
||||
static int32_t getSTableMeta(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname) {
|
||||
return getTableMetaImpl(pCxt, tbNo, name, dbFname, true);
|
||||
}
|
||||
|
||||
static int32_t getDBCfg(SInsertParseContext* pCxt, const char* pDbFName, SDbCfgInfo* pInfo) {
|
||||
|
@ -1028,10 +1032,10 @@ end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
|
||||
int32_t len, STableMeta* pMeta) {
|
||||
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, int32_t tbNo, SName* pTableName,
|
||||
const char* pName, int32_t len, STableMeta* pMeta) {
|
||||
SVgroupInfo vg;
|
||||
CHECK_CODE(getTableVgroup(pCxt, pTableName, &vg));
|
||||
CHECK_CODE(getTableVgroup(pCxt, tbNo, pTableName, &vg));
|
||||
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
||||
|
||||
pMeta->uid = 0;
|
||||
|
@ -1084,7 +1088,7 @@ static int32_t ignoreAutoCreateTableClause(SInsertParseContext* pCxt) {
|
|||
}
|
||||
|
||||
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
|
||||
static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tbFName) {
|
||||
static int32_t parseUsingClause(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* tbFName) {
|
||||
int32_t len = strlen(tbFName);
|
||||
STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
|
||||
if (NULL != pMeta) {
|
||||
|
@ -1102,11 +1106,11 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb
|
|||
tNameGetFullDbName(&sname, dbFName);
|
||||
strcpy(pCxt->sTableName, sname.tname);
|
||||
|
||||
CHECK_CODE(getSTableMeta(pCxt, &sname, dbFName));
|
||||
CHECK_CODE(getSTableMeta(pCxt, tbNo, &sname, dbFName));
|
||||
if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
|
||||
return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
|
||||
}
|
||||
CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, name, tbFName, len, pCxt->pTableMeta));
|
||||
CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, tbNo, name, tbFName, len, pCxt->pTableMeta));
|
||||
|
||||
SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
|
||||
setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
|
||||
|
@ -1216,7 +1220,7 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
|
|||
CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
|
||||
|
||||
(*numOfRows) = 0;
|
||||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
|
||||
// char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
|
||||
SToken sToken;
|
||||
while (1) {
|
||||
int32_t index = 0;
|
||||
|
@ -1226,6 +1230,8 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
|
|||
}
|
||||
pCxt->pSql += index;
|
||||
|
||||
// int64_t memStart = taosGetTimestampMs();
|
||||
|
||||
if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
|
||||
int32_t tSize;
|
||||
CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
|
||||
|
@ -1233,12 +1239,17 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
|
|||
maxRows = tSize;
|
||||
}
|
||||
|
||||
// pCxt->memElapsed += taosGetTimestampMs() - memStart;
|
||||
// int64_t parRowStart = taosGetTimestampMs();
|
||||
|
||||
bool gotRow = false;
|
||||
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
|
||||
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, pCxt->tmpTokenBuf));
|
||||
if (gotRow) {
|
||||
pDataBlock->size += extendedRowSize; // len;
|
||||
}
|
||||
|
||||
// pCxt->parRowElapsed += taosGetTimestampMs() - parRowStart;
|
||||
|
||||
NEXT_VALID_TOKEN(pCxt->pSql, sToken);
|
||||
if (TK_NK_COMMA == sToken.type) {
|
||||
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
|
||||
|
@ -1258,15 +1269,20 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
|
|||
}
|
||||
|
||||
static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) {
|
||||
// int64_t memStart = taosGetTimestampMs();
|
||||
|
||||
int32_t maxNumOfRows;
|
||||
CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));
|
||||
|
||||
// pCxt->memElapsed += taosGetTimestampMs() - memStart;
|
||||
|
||||
int32_t numOfRows = 0;
|
||||
CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
|
||||
|
||||
SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
|
||||
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
|
||||
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than INT32_MAX");
|
||||
return buildInvalidOperationMsg(&pCxt->msg,
|
||||
"too many rows in sql, total number of rows should be less than INT32_MAX");
|
||||
}
|
||||
|
||||
dataBuf->numOfTables = 1;
|
||||
|
@ -1338,7 +1354,8 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa
|
|||
|
||||
SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
|
||||
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
|
||||
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than INT32_MAX");
|
||||
return buildInvalidOperationMsg(&pCxt->msg,
|
||||
"too many rows in sql, total number of rows should be less than INT32_MAX");
|
||||
}
|
||||
|
||||
dataBuf->numOfTables = 1;
|
||||
|
@ -1347,7 +1364,9 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa
|
|||
}
|
||||
|
||||
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
|
||||
taosMemoryFreeClear(pCxt->pTableMeta);
|
||||
if (!pCxt->pComCxt->async) {
|
||||
taosMemoryFreeClear(pCxt->pTableMeta);
|
||||
}
|
||||
destroyBoundColumnInfo(&pCxt->tags);
|
||||
tdDestroySVCreateTbReq(&pCxt->createTblReq);
|
||||
}
|
||||
|
@ -1372,11 +1391,21 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
|
|||
// [...];
|
||||
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||
int32_t tbNum = 0;
|
||||
SName name;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
bool autoCreateTbl = false;
|
||||
|
||||
// int64_t parBodyStart = taosGetTimestampMs();
|
||||
// int64_t parTableElapsed = 0;
|
||||
// int64_t getTableElapsed = 0;
|
||||
// int64_t getDataBufElapsed = 0;
|
||||
// int64_t parValueElapsed = 0;
|
||||
|
||||
// for each table
|
||||
while (1) {
|
||||
// int64_t parTableStart = taosGetTimestampMs();
|
||||
|
||||
SToken sToken;
|
||||
char* tbName = NULL;
|
||||
|
||||
|
@ -1415,20 +1444,21 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
SToken tbnameToken = sToken;
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
|
||||
SName name;
|
||||
CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
|
||||
if (!pCxt->pComCxt->async) {
|
||||
CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
|
||||
|
||||
tNameExtractFullName(&name, tbFName);
|
||||
CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(&name, dbFName);
|
||||
CHECK_CODE(taosHashPut(pCxt->pDbFNameHashObj, dbFName, strlen(dbFName), dbFName, sizeof(dbFName)));
|
||||
tNameExtractFullName(&name, tbFName);
|
||||
CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
|
||||
|
||||
tNameGetFullDbName(&name, dbFName);
|
||||
CHECK_CODE(taosHashPut(pCxt->pDbFNameHashObj, dbFName, strlen(dbFName), dbFName, sizeof(dbFName)));
|
||||
}
|
||||
|
||||
bool existedUsing = false;
|
||||
// USING clause
|
||||
if (TK_USING == sToken.type) {
|
||||
existedUsing = true;
|
||||
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
|
||||
CHECK_CODE(parseUsingClause(pCxt, tbNum, &name, tbFName));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
autoCreateTbl = true;
|
||||
}
|
||||
|
@ -1438,22 +1468,28 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
// pSql -> field1_name, ...)
|
||||
pBoundColsStart = pCxt->pSql;
|
||||
CHECK_CODE(ignoreBoundColumns(pCxt));
|
||||
// CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
}
|
||||
|
||||
// parTableElapsed += taosGetTimestampMs() - parTableStart;
|
||||
// int64_t getTableStart = taosGetTimestampMs();
|
||||
|
||||
if (TK_USING == sToken.type) {
|
||||
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
|
||||
CHECK_CODE(parseUsingClause(pCxt, tbNum, &name, tbFName));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
autoCreateTbl = true;
|
||||
} else if (!existedUsing) {
|
||||
CHECK_CODE(getTableMeta(pCxt, &name, dbFName));
|
||||
CHECK_CODE(getTableMeta(pCxt, tbNum, &name, dbFName));
|
||||
}
|
||||
|
||||
// getTableElapsed += taosGetTimestampMs() - getTableStart;
|
||||
// int64_t getDataBufStart = taosGetTimestampMs();
|
||||
|
||||
STableDataBlocks* dataBuf = NULL;
|
||||
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
|
||||
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
|
||||
&dataBuf, NULL, &pCxt->createTblReq));
|
||||
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, &pCxt->pTableMeta->uid, sizeof(pCxt->pTableMeta->uid),
|
||||
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk),
|
||||
getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL,
|
||||
&pCxt->createTblReq));
|
||||
|
||||
if (NULL != pBoundColsStart) {
|
||||
char* pCurrPos = pCxt->pSql;
|
||||
|
@ -1462,9 +1498,13 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
pCxt->pSql = pCurrPos;
|
||||
}
|
||||
|
||||
// getDataBufElapsed += taosGetTimestampMs() - getDataBufStart;
|
||||
|
||||
if (TK_VALUES == sToken.type) {
|
||||
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
|
||||
// int64_t parValueStart = taosGetTimestampMs();
|
||||
CHECK_CODE(parseValuesClause(pCxt, dataBuf));
|
||||
// parValueElapsed += taosGetTimestampMs() - parValueStart;
|
||||
TSDB_QUERY_SET_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_INSERT);
|
||||
|
||||
tbNum++;
|
||||
|
@ -1488,6 +1528,12 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
|
||||
}
|
||||
|
||||
// printf(
|
||||
// "parse %d sql %d tables %d rows elapsed parBodyElapsed=%ldms parTableElapsed=%ldms getTableElapsed=%ldms "
|
||||
// "getDataBufElapsed=%ldms parValueElapsed=%ldms(memElapsed=%ldms parRowElapsed=%ldms)\n",
|
||||
// pCxt->pComCxt->sqlLen, tbNum, pCxt->totalNum, taosGetTimestampMs() - parBodyStart, parTableElapsed,
|
||||
// getTableElapsed, getDataBufElapsed, parValueElapsed, pCxt->memElapsed, pCxt->parRowElapsed);
|
||||
|
||||
qDebug("0x%" PRIx64 " insert input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum);
|
||||
|
||||
if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
|
||||
|
@ -1532,7 +1578,9 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
|
|||
.totalNum = 0,
|
||||
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
|
||||
.pStmtCb = pContext->pStmtCb,
|
||||
.pMetaCache = pMetaCache};
|
||||
.pMetaCache = pMetaCache,
|
||||
.memElapsed = 0,
|
||||
.parRowElapsed = 0};
|
||||
|
||||
if (pContext->pStmtCb && *pQuery) {
|
||||
(*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
|
||||
|
@ -1540,7 +1588,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
|
|||
} else {
|
||||
context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
context.pTableBlockHashObj =
|
||||
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
}
|
||||
|
||||
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
|
||||
|
@ -1649,24 +1697,27 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
|
||||
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, bool isStable, int32_t tableNo, SToken* pTbToken) {
|
||||
SName name;
|
||||
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
|
||||
CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache));
|
||||
CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache));
|
||||
CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
|
||||
// CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache));
|
||||
// CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache));
|
||||
// CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
|
||||
CHECK_CODE(reserveTableMetaInCacheForInsert(&name, isStable ? CATALOG_REQ_TYPE_META : CATALOG_REQ_TYPE_BOTH, tableNo,
|
||||
pCxt->pMetaCache));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
|
||||
static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, int32_t tableNo, SToken* pTbToken) {
|
||||
SName name;
|
||||
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
|
||||
CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
|
||||
CHECK_CODE(reserveTableMetaInCacheForInsert(&name, CATALOG_REQ_TYPE_VGROUP, tableNo, pCxt->pMetaCache));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
|
||||
bool hasData = false;
|
||||
bool hasData = false;
|
||||
int32_t tableNo = 0;
|
||||
// for each table
|
||||
while (1) {
|
||||
SToken sToken;
|
||||
|
@ -1695,9 +1746,9 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
|
|||
// USING clause
|
||||
if (TK_USING == sToken.type) {
|
||||
existedUsing = true;
|
||||
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken));
|
||||
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, tableNo, &tbnameToken));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
|
||||
CHECK_CODE(collectTableMetaKey(pCxt, true, tableNo, &sToken));
|
||||
CHECK_CODE(skipUsingClause(pCxt));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
}
|
||||
|
@ -1710,15 +1761,17 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
|
|||
|
||||
if (TK_USING == sToken.type && !existedUsing) {
|
||||
existedUsing = true;
|
||||
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken));
|
||||
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, tableNo, &tbnameToken));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
|
||||
CHECK_CODE(collectTableMetaKey(pCxt, true, tableNo, &sToken));
|
||||
CHECK_CODE(skipUsingClause(pCxt));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
} else {
|
||||
CHECK_CODE(collectTableMetaKey(pCxt, &tbnameToken));
|
||||
CHECK_CODE(collectTableMetaKey(pCxt, false, tableNo, &tbnameToken));
|
||||
}
|
||||
|
||||
++tableNo;
|
||||
|
||||
if (TK_VALUES == sToken.type) {
|
||||
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
|
||||
CHECK_CODE(skipValuesClause(pCxt));
|
||||
|
@ -1750,7 +1803,9 @@ int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery, SParseMetaCa
|
|||
.pMetaCache = pMetaCache};
|
||||
int32_t code = skipInsertInto(&context.pSql, &context.msg);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
// int64_t parSyntaxStart = taosGetTimestampMs();
|
||||
code = parseInsertBodySyntax(&context);
|
||||
// printf("parse %d sql elapsed=%ldms\n", pContext->sqlLen, taosGetTimestampMs() - parSyntaxStart);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
|
||||
|
@ -2073,7 +2128,8 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
|
|||
|
||||
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
|
||||
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
|
||||
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
|
||||
return buildInvalidOperationMsg(&pBuf,
|
||||
"too many rows in sql, total number of rows should be less than INT32_MAX");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -476,9 +476,11 @@ static int32_t buildDbReq(SHashObj* pDbsHash, SArray** pDbs) {
|
|||
|
||||
static int32_t buildTableReqFromDb(SHashObj* pDbsHash, SArray** pDbs) {
|
||||
if (NULL != pDbsHash) {
|
||||
*pDbs = taosArrayInit(taosHashGetSize(pDbsHash), sizeof(STablesReq));
|
||||
if (NULL == *pDbs) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
*pDbs = taosArrayInit(taosHashGetSize(pDbsHash), sizeof(STablesReq));
|
||||
if (NULL == *pDbs) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
SParseTablesMetaReq* p = taosHashIterate(pDbsHash, NULL);
|
||||
while (NULL != p) {
|
||||
|
@ -530,7 +532,62 @@ static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
|
||||
static int32_t buildCatalogReqForInsert(SParseContext* pCxt, const SParseMetaCache* pMetaCache,
|
||||
SCatalogReq* pCatalogReq) {
|
||||
int32_t ndbs = taosHashGetSize(pMetaCache->pInsertTables);
|
||||
pCatalogReq->pTableMeta = taosArrayInit(ndbs, sizeof(STablesReq));
|
||||
if (NULL == pCatalogReq->pTableMeta) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pCatalogReq->pTableHash = taosArrayInit(ndbs, sizeof(STablesReq));
|
||||
if (NULL == pCatalogReq->pTableHash) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pCatalogReq->pUser = taosArrayInit(ndbs, sizeof(SUserAuthInfo));
|
||||
if (NULL == pCatalogReq->pUser) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pCxt->pTableMetaPos = taosArrayInit(pMetaCache->sqlTableNum, sizeof(int32_t));
|
||||
pCxt->pTableVgroupPos = taosArrayInit(pMetaCache->sqlTableNum, sizeof(int32_t));
|
||||
|
||||
int32_t metaReqNo = 0;
|
||||
int32_t vgroupReqNo = 0;
|
||||
SInsertTablesMetaReq* p = taosHashIterate(pMetaCache->pInsertTables, NULL);
|
||||
while (NULL != p) {
|
||||
STablesReq req = {0};
|
||||
strcpy(req.dbFName, p->dbFName);
|
||||
TSWAP(req.pTables, p->pTableMetaReq);
|
||||
taosArrayPush(pCatalogReq->pTableMeta, &req);
|
||||
|
||||
req.pTables = NULL;
|
||||
TSWAP(req.pTables, p->pTableVgroupReq);
|
||||
taosArrayPush(pCatalogReq->pTableHash, &req);
|
||||
|
||||
int32_t ntables = taosArrayGetSize(p->pTableMetaPos);
|
||||
for (int32_t i = 0; i < ntables; ++i) {
|
||||
taosArrayInsert(pCxt->pTableMetaPos, *(int32_t*)taosArrayGet(p->pTableMetaPos, i), &metaReqNo);
|
||||
++metaReqNo;
|
||||
}
|
||||
|
||||
ntables = taosArrayGetSize(p->pTableVgroupPos);
|
||||
for (int32_t i = 0; i < ntables; ++i) {
|
||||
taosArrayInsert(pCxt->pTableVgroupPos, *(int32_t*)taosArrayGet(p->pTableVgroupPos, i), &vgroupReqNo);
|
||||
++vgroupReqNo;
|
||||
}
|
||||
|
||||
SUserAuthInfo auth = {0};
|
||||
strcpy(auth.user, pCxt->pUser);
|
||||
strcpy(auth.dbFName, p->dbFName);
|
||||
auth.type = AUTH_TYPE_WRITE;
|
||||
taosArrayPush(pCatalogReq->pUser, &auth);
|
||||
|
||||
p = taosHashIterate(pMetaCache->pInsertTables, p);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildCatalogReqForQuery(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
|
||||
int32_t code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildDbReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup);
|
||||
|
@ -560,6 +617,13 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t buildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
|
||||
if (NULL != pMetaCache->pInsertTables) {
|
||||
return buildCatalogReqForInsert(pCxt, pMetaCache, pCatalogReq);
|
||||
}
|
||||
return buildCatalogReqForQuery(pMetaCache, pCatalogReq);
|
||||
}
|
||||
|
||||
static int32_t putMetaDataToHash(const char* pKey, int32_t len, const SArray* pData, int32_t index, SHashObj** pHash) {
|
||||
if (NULL == *pHash) {
|
||||
*pHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
|
@ -647,7 +711,8 @@ static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHas
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
|
||||
int32_t putMetaDataToCacheForQuery(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
|
||||
SParseMetaCache* pMetaCache) {
|
||||
int32_t code = putDbTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = putDbDataToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, &pMetaCache->pDbVgroup);
|
||||
|
@ -677,6 +742,27 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t putMetaDataToCacheForInsert(const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
|
||||
int32_t ndbs = taosArrayGetSize(pMetaData->pUser);
|
||||
for (int32_t i = 0; i < ndbs; ++i) {
|
||||
SMetaRes* pRes = taosArrayGet(pMetaData->pUser, i);
|
||||
if (!(*(bool*)pRes->pRes)) {
|
||||
return TSDB_CODE_PAR_PERMISSION_DENIED;
|
||||
}
|
||||
}
|
||||
pMetaCache->pTableMetaData = pMetaData->pTableMeta;
|
||||
pMetaCache->pTableVgroupData = pMetaData->pTableHash;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache,
|
||||
bool insertValuesStmt) {
|
||||
if (insertValuesStmt) {
|
||||
return putMetaDataToCacheForInsert(pMetaData, pMetaCache);
|
||||
}
|
||||
return putMetaDataToCacheForQuery(pCatalogReq, pMetaData, pMetaCache);
|
||||
}
|
||||
|
||||
static int32_t reserveTableReqInCacheImpl(const char* pTbFName, int32_t len, SHashObj** pTables) {
|
||||
if (NULL == *pTables) {
|
||||
*pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
|
@ -977,6 +1063,82 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t reserveTableReqInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
|
||||
SInsertTablesMetaReq* pReq) {
|
||||
switch (reqType) {
|
||||
case CATALOG_REQ_TYPE_META:
|
||||
taosArrayPush(pReq->pTableMetaReq, pName);
|
||||
taosArrayPush(pReq->pTableMetaPos, &tableNo);
|
||||
break;
|
||||
case CATALOG_REQ_TYPE_VGROUP:
|
||||
taosArrayPush(pReq->pTableVgroupReq, pName);
|
||||
taosArrayPush(pReq->pTableVgroupPos, &tableNo);
|
||||
break;
|
||||
case CATALOG_REQ_TYPE_BOTH:
|
||||
taosArrayPush(pReq->pTableMetaReq, pName);
|
||||
taosArrayPush(pReq->pTableMetaPos, &tableNo);
|
||||
taosArrayPush(pReq->pTableVgroupReq, pName);
|
||||
taosArrayPush(pReq->pTableVgroupPos, &tableNo);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t reserveTableReqInDbCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
|
||||
SHashObj* pDbs) {
|
||||
SInsertTablesMetaReq req = {.pTableMetaReq = taosArrayInit(4, sizeof(SName)),
|
||||
.pTableMetaPos = taosArrayInit(4, sizeof(int32_t)),
|
||||
.pTableVgroupReq = taosArrayInit(4, sizeof(SName)),
|
||||
.pTableVgroupPos = taosArrayInit(4, sizeof(int32_t))};
|
||||
tNameGetFullDbName(pName, req.dbFName);
|
||||
int32_t code = reserveTableReqInCacheForInsert(pName, reqType, tableNo, &req);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = taosHashPut(pDbs, pName->dbname, strlen(pName->dbname), &req, sizeof(SInsertTablesMetaReq));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t reserveTableMetaInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
|
||||
SParseMetaCache* pMetaCache) {
|
||||
if (NULL == pMetaCache->pInsertTables) {
|
||||
pMetaCache->pInsertTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
if (NULL == pMetaCache->pInsertTables) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
pMetaCache->sqlTableNum = tableNo;
|
||||
SInsertTablesMetaReq* pReq = taosHashGet(pMetaCache->pInsertTables, pName->dbname, strlen(pName->dbname));
|
||||
if (NULL == pReq) {
|
||||
return reserveTableReqInDbCacheForInsert(pName, reqType, tableNo, pMetaCache->pInsertTables);
|
||||
}
|
||||
return reserveTableReqInCacheForInsert(pName, reqType, tableNo, pReq);
|
||||
}
|
||||
|
||||
int32_t getTableMetaFromCacheForInsert(SArray* pTableMetaPos, SParseMetaCache* pMetaCache, int32_t tableNo,
|
||||
STableMeta** pMeta) {
|
||||
int32_t reqIndex = *(int32_t*)taosArrayGet(pTableMetaPos, tableNo);
|
||||
SMetaRes* pRes = taosArrayGet(pMetaCache->pTableMetaData, reqIndex);
|
||||
if (TSDB_CODE_SUCCESS == pRes->code) {
|
||||
*pMeta = pRes->pRes;
|
||||
if (NULL == *pMeta) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return pRes->code;
|
||||
}
|
||||
|
||||
int32_t getTableVgroupFromCacheForInsert(SArray* pTableVgroupPos, SParseMetaCache* pMetaCache, int32_t tableNo,
|
||||
SVgroupInfo* pVgroup) {
|
||||
int32_t reqIndex = *(int32_t*)taosArrayGet(pTableVgroupPos, tableNo);
|
||||
SMetaRes* pRes = taosArrayGet(pMetaCache->pTableVgroupData, reqIndex);
|
||||
if (TSDB_CODE_SUCCESS == pRes->code) {
|
||||
memcpy(pVgroup, pRes->pRes, sizeof(SVgroupInfo));
|
||||
}
|
||||
return pRes->code;
|
||||
}
|
||||
|
||||
void destoryParseTablesMetaReqHash(SHashObj* pHash) {
|
||||
SParseTablesMetaReq* p = taosHashIterate(pHash, NULL);
|
||||
while (NULL != p) {
|
||||
|
|
|
@ -185,7 +185,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
|
|||
code = parseSqlSyntax(pCxt, pQuery, &metaCache);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildCatalogReq(&metaCache, pCatalogReq);
|
||||
code = buildCatalogReq(pCxt, &metaCache, pCatalogReq);
|
||||
}
|
||||
destoryParseMetaCache(&metaCache, true);
|
||||
terrno = code;
|
||||
|
@ -195,7 +195,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
|
|||
int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
|
||||
const struct SMetaData* pMetaData, SQuery* pQuery) {
|
||||
SParseMetaCache metaCache = {0};
|
||||
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache);
|
||||
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache, NULL == pQuery->pRoot);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (NULL == pQuery->pRoot) {
|
||||
code = parseInsertSql(pCxt, &pQuery, &metaCache);
|
||||
|
|
Loading…
Reference in New Issue