[td-11818] refactor.
This commit is contained in:
parent
09fe692b85
commit
7128b3cdab
|
@ -154,14 +154,14 @@ typedef struct SVgDataBlocks {
|
||||||
char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ...
|
char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ...
|
||||||
} SVgDataBlocks;
|
} SVgDataBlocks;
|
||||||
|
|
||||||
typedef struct SInsertStmtInfo {
|
typedef struct SVnodeModifOpStmtInfo {
|
||||||
int16_t nodeType;
|
int16_t nodeType;
|
||||||
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
|
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
|
||||||
int8_t schemaAttache; // denote if submit block is built with table schema or not
|
int8_t schemaAttache; // denote if submit block is built with table schema or not
|
||||||
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
|
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
|
||||||
uint32_t insertType; // insert data from [file|sql statement| bound statement]
|
uint32_t insertType; // insert data from [file|sql statement| bound statement]
|
||||||
const char* sql; // current sql statement position
|
const char* sql; // current sql statement position
|
||||||
} SInsertStmtInfo;
|
} SVnodeModifOpStmtInfo;
|
||||||
|
|
||||||
typedef struct SDclStmtInfo {
|
typedef struct SDclStmtInfo {
|
||||||
int16_t nodeType;
|
int16_t nodeType;
|
||||||
|
|
|
@ -22,7 +22,7 @@ extern "C" {
|
||||||
|
|
||||||
#include "parser.h"
|
#include "parser.h"
|
||||||
|
|
||||||
int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo);
|
int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ
|
||||||
*/
|
*/
|
||||||
SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
|
SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
|
||||||
|
|
||||||
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
|
SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Evaluate the numeric and timestamp arithmetic expression in the WHERE clause.
|
* Evaluate the numeric and timestamp arithmetic expression in the WHERE clause.
|
||||||
|
|
|
@ -548,11 +548,12 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
|
||||||
taosArrayPush(pBufArray, &pVgData);
|
taosArrayPush(pBufArray, &pVgData);
|
||||||
} while (true);
|
} while (true);
|
||||||
|
|
||||||
SInsertStmtInfo* pStmtInfo = calloc(1, sizeof(SInsertStmtInfo));
|
SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo));
|
||||||
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
|
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
|
||||||
pStmtInfo->pDataBlocks = pBufArray;
|
pStmtInfo->pDataBlocks = pBufArray;
|
||||||
*pOutput = pStmtInfo;
|
|
||||||
*len = sizeof(SInsertStmtInfo);
|
*pOutput = (char*) pStmtInfo;
|
||||||
|
*len = sizeof(SVnodeModifOpStmtInfo);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -823,14 +824,14 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
|
SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
|
||||||
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
|
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
|
||||||
assert(pCreateTable->type == TSQL_CREATE_CTABLE);
|
assert(pCreateTable->type == TSQL_CREATE_CTABLE);
|
||||||
|
|
||||||
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
|
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
|
||||||
SMsgBuf* pMsgBuf = &m;
|
SMsgBuf* pMsgBuf = &m;
|
||||||
|
|
||||||
SInsertStmtInfo* pInsertStmt = NULL;
|
SVnodeModifOpStmtInfo* pInsertStmt = NULL;
|
||||||
|
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen);
|
int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen);
|
||||||
|
|
|
@ -61,7 +61,7 @@ typedef struct SInsertParseContext {
|
||||||
SArray* pTableDataBlocks; // global
|
SArray* pTableDataBlocks; // global
|
||||||
SArray* pVgDataBlocks; // global
|
SArray* pVgDataBlocks; // global
|
||||||
int32_t totalNum;
|
int32_t totalNum;
|
||||||
SInsertStmtInfo* pOutput;
|
SVnodeModifOpStmtInfo* pOutput;
|
||||||
} SInsertParseContext;
|
} SInsertParseContext;
|
||||||
|
|
||||||
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
|
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
|
||||||
|
@ -611,7 +611,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
// [(field1_name, ...)]
|
// [(field1_name, ...)]
|
||||||
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
|
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
|
||||||
// [...];
|
// [...];
|
||||||
int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
|
int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) {
|
||||||
SInsertParseContext context = {
|
SInsertParseContext context = {
|
||||||
.pComCxt = pContext,
|
.pComCxt = pContext,
|
||||||
.pSql = (char*) pContext->pSql,
|
.pSql = (char*) pContext->pSql,
|
||||||
|
@ -620,7 +620,7 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
|
||||||
.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false),
|
.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false),
|
||||||
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
|
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
|
||||||
.totalNum = 0,
|
.totalNum = 0,
|
||||||
.pOutput = calloc(1, sizeof(SInsertStmtInfo))
|
.pOutput = calloc(1, sizeof(SVnodeModifOpStmtInfo))
|
||||||
};
|
};
|
||||||
|
|
||||||
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
|
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
|
||||||
|
|
|
@ -53,7 +53,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (toVnode) {
|
if (toVnode) {
|
||||||
SInsertStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
|
SVnodeModifOpStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
|
||||||
if (pInsertInfo == NULL) {
|
if (pInsertInfo == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -87,7 +87,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
||||||
|
|
||||||
int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
||||||
if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) {
|
if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) {
|
||||||
return parseInsertSql(pCxt, (SInsertStmtInfo**)pQuery);
|
return parseInsertSql(pCxt, (SVnodeModifOpStmtInfo**)pQuery);
|
||||||
} else {
|
} else {
|
||||||
return parseQuerySql(pCxt, pQuery);
|
return parseQuerySql(pCxt, pQuery);
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,7 +60,7 @@ protected:
|
||||||
return code_;
|
return code_;
|
||||||
}
|
}
|
||||||
|
|
||||||
SInsertStmtInfo* reslut() {
|
SVnodeModifOpStmtInfo* reslut() {
|
||||||
return res_;
|
return res_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,7 +128,7 @@ private:
|
||||||
char sqlBuf_[max_sql_len];
|
char sqlBuf_[max_sql_len];
|
||||||
SParseContext cxt_;
|
SParseContext cxt_;
|
||||||
int32_t code_;
|
int32_t code_;
|
||||||
SInsertStmtInfo* res_;
|
SVnodeModifOpStmtInfo* res_;
|
||||||
};
|
};
|
||||||
|
|
||||||
// INSERT INTO tb_name VALUES (field1_value, ...)
|
// INSERT INTO tb_name VALUES (field1_value, ...)
|
||||||
|
|
|
@ -38,7 +38,7 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
|
static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
|
||||||
SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode;
|
SVnodeModifOpStmtInfo* pInsert = (SVnodeModifOpStmtInfo*)pNode;
|
||||||
|
|
||||||
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
|
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
|
||||||
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
|
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
|
||||||
|
|
Loading…
Reference in New Issue