Merge pull request #26536 from taosdata/enh/TD-30856-3.0

enh: 'create table' parses csv file per tsMaxInsertBatchRows
This commit is contained in:
Hongze Cheng 2024-07-15 13:54:48 +08:00 committed by GitHub
commit 77e2582cc1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 5367 additions and 5175 deletions

View File

@ -132,16 +132,16 @@
#define TK_TABLE 114
#define TK_NK_LP 115
#define TK_NK_RP 116
#define TK_STABLE 117
#define TK_COLUMN 118
#define TK_MODIFY 119
#define TK_RENAME 120
#define TK_TAG 121
#define TK_SET 122
#define TK_NK_EQ 123
#define TK_USING 124
#define TK_TAGS 125
#define TK_FILE 126
#define TK_USING 117
#define TK_FILE 118
#define TK_STABLE 119
#define TK_COLUMN 120
#define TK_MODIFY 121
#define TK_RENAME 122
#define TK_TAG 123
#define TK_SET 124
#define TK_NK_EQ 125
#define TK_TAGS 126
#define TK_BOOL 127
#define TK_TINYINT 128
#define TK_SMALLINT 129

View File

@ -221,9 +221,6 @@ typedef struct SCreateSubTableFromFileClause {
bool ignoreExists;
SNodeList* pSpecificTags;
char filePath[PATH_MAX];
TdFilePtr fp;
SArray* aCreateTbData;
SArray* aTagIndexs;
} SCreateSubTableFromFileClause;
typedef struct SCreateMultiTablesStmt {

View File

@ -504,6 +504,10 @@ typedef void (*FFreeTableBlockHash)(SHashObj*);
typedef void (*FFreeVgourpBlockArray)(SArray*);
struct SStbRowsDataContext;
typedef void (*FFreeStbRowsDataContext)(struct SStbRowsDataContext*);
struct SCreateTbInfo;
struct SParseFileContext;
typedef void (*FDestroyParseFileContext)(struct SParseFileContext**);
typedef struct SVnodeModifyOpStmt {
ENodeType nodeType;
ENodeType sqlNodeType;
@ -535,6 +539,10 @@ typedef struct SVnodeModifyOpStmt {
bool stbSyntax;
struct SStbRowsDataContext* pStbRowsCxt;
FFreeStbRowsDataContext freeStbRowsCxtFunc;
struct SCreateTbInfo* pCreateTbInfo;
struct SParseFileContext* pParFileCxt;
FDestroyParseFileContext destroyParseFileCxt;
} SVnodeModifyOpStmt;
typedef struct SExplainOptions {

View File

@ -1061,6 +1061,13 @@ void nodesDestroyNode(SNode* pNode) {
pStmt->freeStbRowsCxtFunc(pStmt->pStbRowsCxt);
}
taosMemoryFreeClear(pStmt->pStbRowsCxt);
taosMemoryFreeClear(pStmt->pCreateTbInfo);
if (pStmt->destroyParseFileCxt) {
pStmt->destroyParseFileCxt(&pStmt->pParFileCxt);
}
taosCloseFile(&pStmt->fp);
break;
}
@ -1093,15 +1100,6 @@ void nodesDestroyNode(SNode* pNode) {
}
case QUERY_NODE_CREATE_SUBTABLE_FROM_FILE_CLAUSE: {
SCreateSubTableFromFileClause* pStmt = (SCreateSubTableFromFileClause*)pNode;
if (pStmt->aCreateTbData) {
taosArrayDestroy(pStmt->aCreateTbData);
}
if (pStmt->aTagIndexs) {
taosArrayDestroy(pStmt->aTagIndexs);
}
if (pStmt->fp) {
taosCloseFile(&pStmt->fp);
}
nodesDestroyList(pStmt->pSpecificTags);
break;
}

View File

@ -29,6 +29,7 @@ extern "C" {
#define QUERY_SMA_OPTIMIZE_ENABLE 1
int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData);
int32_t continueCreateTbFromFile(SParseContext* pCxt, SQuery** pQuery);
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);
int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache);
int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache);

View File

@ -353,6 +353,8 @@ end_opt(A) ::= END WITH TIMESTAMP NK_STRING(B).
cmd ::= CREATE TABLE not_exists_opt(A) full_table_name(B)
NK_LP column_def_list(C) NK_RP tags_def_opt(D) table_options(E). { pCxt->pRootNode = createCreateTableStmt(pCxt, A, B, C, D, E); }
cmd ::= CREATE TABLE multi_create_clause(A). { pCxt->pRootNode = createCreateMultiTableStmt(pCxt, A); }
cmd ::= CREATE TABLE not_exists_opt(B) USING full_table_name(C)
NK_LP tag_list_opt(D) NK_RP FILE NK_STRING(E). { pCxt->pRootNode = createCreateSubTableFromFileClause(pCxt, B, C, D, &E); }
cmd ::= CREATE STABLE not_exists_opt(A) full_table_name(B)
NK_LP column_def_list(C) NK_RP tags_def(D) table_options(E). { pCxt->pRootNode = createCreateTableStmt(pCxt, A, B, C, D, E); }
cmd ::= DROP TABLE multi_drop_clause(A). { pCxt->pRootNode = createDropTableStmt(pCxt, A); }
@ -385,15 +387,11 @@ alter_table_clause(A) ::=
%destructor multi_create_clause { nodesDestroyList($$); }
multi_create_clause(A) ::= create_subtable_clause(B). { A = createNodeList(pCxt, B); }
multi_create_clause(A) ::= multi_create_clause(B) create_subtable_clause(C). { A = addNodeToList(pCxt, B, C); }
multi_create_clause(A) ::= create_from_file_clause(B). { A = createNodeList(pCxt, B); }
create_subtable_clause(A) ::=
not_exists_opt(B) full_table_name(C) USING full_table_name(D)
specific_cols_opt(E) TAGS NK_LP tags_literal_list(F) NK_RP table_options(G). { A = createCreateSubTableClause(pCxt, B, C, D, E, F, G); }
create_from_file_clause(A) ::= not_exists_opt(B) USING full_table_name(C)
NK_LP tag_list_opt(D) NK_RP FILE NK_STRING(E). { A = createCreateSubTableFromFileClause(pCxt, B, C, D, &E); }
%type multi_drop_clause { SNodeList* }
%destructor multi_drop_clause { nodesDestroyList($$); }
multi_drop_clause(A) ::= drop_table_clause(B). { A = createNodeList(pCxt, B); }

View File

@ -310,6 +310,24 @@ static int32_t collectMetaKeyFromCreateMultiTable(SCollectMetaKeyCxt* pCxt, SCre
return code;
}
static int32_t collectMetaKeyFromCreateSubTableFromFile(SCollectMetaKeyCxt* pCxt,
SCreateSubTableFromFileClause* pClause) {
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode = NULL;
code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pClause->useDbName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code =
reserveTableMetaInCache(pCxt->pParseCxt->acctId, pClause->useDbName, pClause->useTableName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pClause->useDbName, NULL,
AUTH_TYPE_WRITE, pCxt->pMetaCache);
}
return code;
}
static int32_t collectMetaKeyFromDropTable(SCollectMetaKeyCxt* pCxt, SDropTableStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode = NULL;
@ -866,6 +884,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt);
case QUERY_NODE_CREATE_MULTI_TABLES_STMT:
return collectMetaKeyFromCreateMultiTable(pCxt, (SCreateMultiTablesStmt*)pStmt);
case QUERY_NODE_CREATE_SUBTABLE_FROM_FILE_CLAUSE:
return collectMetaKeyFromCreateSubTableFromFile(pCxt, (SCreateSubTableFromFileClause*)pStmt);
case QUERY_NODE_DROP_TABLE_STMT:
return collectMetaKeyFromDropTable(pCxt, (SDropTableStmt*)pStmt);
case QUERY_NODE_DROP_SUPER_TABLE_STMT:

View File

@ -11889,7 +11889,8 @@ static int32_t createOperatorNode(EOperatorType opType, const char* pColName, SN
return TSDB_CODE_SUCCESS;
}
static int32_t createParOperatorNode(EOperatorType opType, const char* pLeftCol, const char* pRightCol, SNode** ppResOp) {
static int32_t createParOperatorNode(EOperatorType opType, const char* pLeftCol, const char* pRightCol,
SNode** ppResOp) {
SOperatorNode* pOper = (SOperatorNode*)nodesMakeNode(QUERY_NODE_OPERATOR);
CHECK_POINTER_OUT_OF_MEM(pOper);
@ -12694,7 +12695,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
}
static int32_t buildTagIndexForBindTags(SMsgBuf* pMsgBuf, SCreateSubTableFromFileClause* pStmt,
STableMeta* pSuperTableMeta) {
STableMeta* pSuperTableMeta, SArray* aTagIndexs) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfTags = getNumOfTags(pSuperTableMeta);
@ -12757,7 +12758,7 @@ static int32_t buildTagIndexForBindTags(SMsgBuf* pMsgBuf, SCreateSubTableFromFil
goto _OUT;
}
if (NULL == taosArrayPush(pStmt->aTagIndexs, &idx)) {
if (NULL == taosArrayPush(aTagIndexs, &idx)) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OUT;
}
@ -12772,21 +12773,19 @@ _OUT:
return code;
}
typedef struct {
// refer
STableMeta* pSuperTableMeta;
SArray* pTagIndexs;
TdFilePtr fp;
// containers
typedef struct SParseFileContext {
SHashObj* pTbNameHash;
SArray* aTagNames;
bool tagNameFilled;
SArray* aTagVals;
STableMeta* pStbMeta;
SArray* aTagIndexs;
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
SArray* aCreateTbData;
// per line
const char* pSql;
SArray* aTagVals;
STag* pTag;
SName ctbName;
SVgroupInfo vg;
@ -12810,48 +12809,48 @@ static int32_t fillVgroupInfo(SParseContext* pParseCxt, const SName* pName, SVgr
return code;
}
static int32_t parseOneStbRow(SMsgBuf* pMsgBuf, SParseFileContext* pParFileCtx) {
static int32_t parseOneStbRow(SMsgBuf* pMsgBuf, SParseFileContext* pParFileCxt) {
int32_t code = TSDB_CODE_SUCCESS;
int sz = taosArrayGetSize(pParFileCtx->pTagIndexs);
int32_t numOfTags = getNumOfTags(pParFileCtx->pSuperTableMeta);
uint8_t precision = getTableInfo(pParFileCtx->pSuperTableMeta).precision;
SSchema* pSchemas = getTableTagSchema(pParFileCtx->pSuperTableMeta);
int sz = taosArrayGetSize(pParFileCxt->aTagIndexs);
int32_t numOfTags = getNumOfTags(pParFileCxt->pStbMeta);
uint8_t precision = getTableInfo(pParFileCxt->pStbMeta).precision;
SSchema* pSchemas = getTableTagSchema(pParFileCxt->pStbMeta);
for (int i = 0; i < sz; i++) {
const char* pSql = pParFileCtx->pSql;
const char* pSql = pParFileCxt->pSql;
int32_t pos = 0;
SToken token = tStrGetToken(pSql, &pos, true, NULL);
pParFileCtx->pSql += pos;
pParFileCxt->pSql += pos;
if (TK_NK_RP == token.type) {
code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
break;
}
int16_t index = *(int16_t*)taosArrayGet(pParFileCtx->pTagIndexs, i);
int16_t index = *(int16_t*)taosArrayGet(pParFileCxt->aTagIndexs, i);
if (index < numOfTags) {
// parse tag
const SSchema* pTagSchema = &pSchemas[index];
code = checkAndTrimValue(&token, pParFileCtx->tmpTokenBuf, pMsgBuf, pTagSchema->type);
code = checkAndTrimValue(&token, pParFileCxt->tmpTokenBuf, pMsgBuf, pTagSchema->type);
if (TSDB_CODE_SUCCESS == code && TK_NK_VARIABLE == token.type) {
code = buildInvalidOperationMsg(pMsgBuf, "not expected row value");
}
if (TSDB_CODE_SUCCESS == code) {
SArray* aTagNames = pParFileCtx->tagNameFilled ? NULL : pParFileCtx->aTagNames;
code = parseTagValue(pMsgBuf, &pParFileCtx->pSql, precision, (SSchema*)pTagSchema, &token,
aTagNames, pParFileCtx->aTagVals, &pParFileCtx->pTag);
SArray* aTagNames = pParFileCxt->tagNameFilled ? NULL : pParFileCxt->aTagNames;
code = parseTagValue(pMsgBuf, &pParFileCxt->pSql, precision, (SSchema*)pTagSchema, &token, aTagNames,
pParFileCxt->aTagVals, &pParFileCxt->pTag);
}
} else {
// parse tbname
code = checkAndTrimValue(&token, pParFileCtx->tmpTokenBuf, pMsgBuf, TSDB_DATA_TYPE_BINARY);
code = checkAndTrimValue(&token, pParFileCxt->tmpTokenBuf, pMsgBuf, TSDB_DATA_TYPE_BINARY);
if (TK_NK_VARIABLE == token.type) {
code = buildInvalidOperationMsg(pMsgBuf, "not expected tbname");
}
if (TSDB_CODE_SUCCESS == code) {
bool bFoundTbName = false;
code = parseTbnameToken(pMsgBuf, pParFileCtx->ctbName.tname, &token, &bFoundTbName);
code = parseTbnameToken(pMsgBuf, pParFileCxt->ctbName.tname, &token, &bFoundTbName);
}
}
@ -12859,8 +12858,8 @@ static int32_t parseOneStbRow(SMsgBuf* pMsgBuf, SParseFileContext* pParFileCtx)
}
if (TSDB_CODE_SUCCESS == code) {
pParFileCtx->tagNameFilled = true;
code = tTagNew(pParFileCtx->aTagVals, 1, false, &pParFileCtx->pTag);
pParFileCxt->tagNameFilled = true;
code = tTagNew(pParFileCxt->aTagVals, 1, false, &pParFileCxt->pTag);
}
return code;
@ -12885,159 +12884,250 @@ static void clearCreateTbArrayFp(void *data) {
taosMemoryFreeClear(p->pTag);
}
static int32_t parseCsvFile(SMsgBuf* pMsgBuf, SParseContext* pParseCxt, SParseFileContext* pParseFileCtx,
SArray* aCreateTbData) {
static int32_t parseCsvFile(SMsgBuf* pMsgBuf, SParseContext* pParseCxt, SParseFileContext* pParFileCxt, TdFilePtr fp,
int32_t maxLineCount) {
int32_t code = TSDB_CODE_SUCCESS;
char* pLine = NULL;
int64_t readLen = 0;
while (TSDB_CODE_SUCCESS == code && (readLen = taosGetLineFile(pParseFileCtx->fp, &pLine)) != -1) {
int32_t lineCount = 0;
while (TSDB_CODE_SUCCESS == code && (readLen = taosGetLineFile(fp, &pLine)) != -1) {
if (('\r' == pLine[readLen - 1]) || ('\n' == pLine[readLen - 1])) {
pLine[--readLen] = '\0';
}
if (readLen == 0) continue;
if (pLine[0] == '#') continue;
if (pLine[0] == '#') continue; // ignore comment line begins with '#'
strtolower(pLine, pLine);
pParseFileCtx->pSql = pLine;
pParFileCxt->pSql = pLine;
code = parseOneStbRow(pMsgBuf, pParseFileCtx);
code = parseOneStbRow(pMsgBuf, pParFileCxt);
if (TSDB_CODE_SUCCESS == code) {
if (taosHashGet(pParseFileCtx->pTbNameHash, pParseFileCtx->ctbName.tname,
strlen(pParseFileCtx->ctbName.tname) + 1) != NULL) {
taosMemoryFreeClear(pParseFileCtx->pTag);
code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_TBNAME_DUPLICATED, pParseFileCtx->ctbName.tname);
if (taosHashGet(pParFileCxt->pTbNameHash, pParFileCxt->ctbName.tname, strlen(pParFileCxt->ctbName.tname) + 1) !=
NULL) {
taosMemoryFreeClear(pParFileCxt->pTag);
code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_TBNAME_DUPLICATED, pParFileCxt->ctbName.tname);
break;
}
code = taosHashPut(pParseFileCtx->pTbNameHash, pParseFileCtx->ctbName.tname,
strlen(pParseFileCtx->ctbName.tname) + 1, NULL, 0);
code = taosHashPut(pParFileCxt->pTbNameHash, pParFileCxt->ctbName.tname, strlen(pParFileCxt->ctbName.tname) + 1,
NULL, 0);
}
if (TSDB_CODE_SUCCESS == code) {
code = fillVgroupInfo(pParseCxt, &pParseFileCtx->ctbName, &pParseFileCtx->vg);
code = fillVgroupInfo(pParseCxt, &pParFileCxt->ctbName, &pParFileCxt->vg);
}
if (TSDB_CODE_SUCCESS == code) {
SCreateTableData data = {.ctbName = pParseFileCtx->ctbName,
.aTagNames = pParseFileCtx->aTagNames,
.pTag = pParseFileCtx->pTag,
.vg = pParseFileCtx->vg};
SCreateTableData data = {.ctbName = pParFileCxt->ctbName,
.aTagNames = pParFileCxt->aTagNames,
.pTag = pParFileCxt->pTag,
.vg = pParFileCxt->vg};
taosArrayPush(aCreateTbData, &data);
taosArrayPush(pParFileCxt->aCreateTbData, &data);
} else {
taosMemoryFreeClear(pParseFileCtx->pTag);
taosMemoryFreeClear(pParFileCxt->pTag);
}
pParseFileCtx->pTag = NULL;
taosArrayClearEx(pParseFileCtx->aTagVals, clearTagValArrayFp);
pParFileCxt->pTag = NULL;
taosArrayClearEx(pParFileCxt->aTagVals, clearTagValArrayFp);
lineCount++;
if (lineCount == maxLineCount) break;
}
if (TSDB_CODE_SUCCESS != code) {
taosArrayClearEx(aCreateTbData, clearCreateTbArrayFp);
taosArrayClearEx(pParFileCxt->aCreateTbData, clearCreateTbArrayFp);
}
taosMemoryFree(pLine);
return code;
}
static int32_t prepareReadFromFile(SCreateSubTableFromFileClause* pStmt) {
static void destructParseFileContext(SParseFileContext** ppParFileCxt) {
if (NULL == ppParFileCxt || NULL == *ppParFileCxt) {
return;
}
SParseFileContext* pParFileCxt = *ppParFileCxt;
taosHashCleanup(pParFileCxt->pTbNameHash);
taosArrayDestroy(pParFileCxt->aTagNames);
taosMemoryFreeClear(pParFileCxt->pStbMeta);
taosArrayDestroy(pParFileCxt->aTagIndexs);
taosArrayDestroy(pParFileCxt->aCreateTbData);
taosArrayDestroy(pParFileCxt->aTagVals);
taosMemoryFree(pParFileCxt);
*ppParFileCxt = NULL;
return;
}
static int32_t constructParseFileContext(SCreateSubTableFromFileClause* pStmt, STableMeta* pSuperTableMeta,
int32_t acctId, SParseFileContext** ppParFileCxt) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pStmt->fp) {
pStmt->fp = taosOpenFile(pStmt->filePath, TD_FILE_READ | TD_FILE_STREAM);
if (NULL == pStmt->fp) {
SParseFileContext* pParFileCxt = taosMemoryCalloc(1, sizeof(SParseFileContext));
pParFileCxt->pStbMeta = pSuperTableMeta;
pParFileCxt->tagNameFilled = false;
pParFileCxt->pTag = NULL;
pParFileCxt->ctbName.type = TSDB_TABLE_NAME_T;
pParFileCxt->ctbName.acctId = acctId;
strcpy(pParFileCxt->ctbName.dbname, pStmt->useDbName);
if (NULL == pParFileCxt->pTbNameHash) {
pParFileCxt->pTbNameHash =
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
if (!pParFileCxt->pTbNameHash) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _ERR;
}
}
if (NULL == pParFileCxt->aTagNames) {
pParFileCxt->aTagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
if (NULL == pParFileCxt->aTagNames) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _ERR;
}
}
if (NULL == pParFileCxt->aCreateTbData) {
pParFileCxt->aCreateTbData = taosArrayInit(16, sizeof(SCreateTableData));
if (NULL == pParFileCxt->aCreateTbData) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _ERR;
}
}
if (NULL == pParFileCxt->aTagIndexs) {
pParFileCxt->aTagIndexs = taosArrayInit(pStmt->pSpecificTags->length, sizeof(int16_t));
if (!pParFileCxt->aTagIndexs) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _ERR;
}
}
if (NULL == pParFileCxt->aTagVals) {
pParFileCxt->aTagVals = taosArrayInit(8, sizeof(STagVal));
if (!pParFileCxt->aTagVals) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _ERR;
}
}
*ppParFileCxt = pParFileCxt;
return code;
_ERR:
destructParseFileContext(&pParFileCxt);
return code;
}
typedef struct SCreateTbInfo {
bool ignoreExists;
char useDbName[TSDB_DB_NAME_LEN];
char useTableName[TSDB_TABLE_NAME_LEN];
} SCreateTbInfo;
static int32_t prepareReadCsvFile(STranslateContext* pCxt, SCreateSubTableFromFileClause* pCreateStmt,
SVnodeModifyOpStmt* pModifyStmt) {
int32_t code = 0;
TdFilePtr fp = NULL;
SCreateTbInfo* pCreateInfo = NULL;
SParseFileContext* pParFileCxt = NULL;
if (NULL == pModifyStmt->fp) {
fp = taosOpenFile(pCreateStmt->filePath, TD_FILE_READ | TD_FILE_STREAM);
if (NULL == fp) {
code = TAOS_SYSTEM_ERROR(errno);
goto _ERR;
}
}
if (NULL == pStmt->aCreateTbData) {
pStmt->aCreateTbData = taosArrayInit(16, sizeof(SCreateTableData));
if (NULL == pStmt->aCreateTbData) {
{
pCreateInfo = taosMemoryCalloc(1, sizeof(SCreateTbInfo));
if (NULL == pCreateInfo) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _ERR;
}
}
if (NULL == pStmt->aTagIndexs) {
pStmt->aTagIndexs = taosArrayInit(pStmt->pSpecificTags->length, sizeof(int16_t));
if (!pStmt->aTagIndexs) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _ERR;
pCreateInfo->ignoreExists = pCreateStmt->ignoreExists;
strncpy(pCreateInfo->useDbName, pCreateStmt->useDbName, TSDB_DB_NAME_LEN);
strncpy(pCreateInfo->useTableName, pCreateStmt->useTableName, TSDB_TABLE_NAME_LEN);
}
{
STableMeta* pSuperTableMeta = NULL;
code = getTableMeta(pCxt, pCreateStmt->useDbName, pCreateStmt->useTableName, &pSuperTableMeta);
if (TSDB_CODE_SUCCESS != code) goto _ERR;
code = constructParseFileContext(pCreateStmt, pSuperTableMeta, pCxt->pParseCxt->acctId, &pParFileCxt);
if (TSDB_CODE_SUCCESS != code) goto _ERR;
code = buildTagIndexForBindTags(&pCxt->msgBuf, pCreateStmt, pParFileCxt->pStbMeta, pParFileCxt->aTagIndexs);
if (TSDB_CODE_SUCCESS != code) goto _ERR;
}
pModifyStmt->fp = fp;
pModifyStmt->fileProcessing = false;
pModifyStmt->pCreateTbInfo = pCreateInfo;
pModifyStmt->pParFileCxt = pParFileCxt;
return code;
_ERR:
taosCloseFile(&pStmt->fp);
taosArrayDestroy(pStmt->aCreateTbData);
taosArrayDestroy(pStmt->aTagIndexs);
taosCloseFile(&fp);
taosMemoryFreeClear(pCreateInfo);
destructParseFileContext(&pParFileCxt);
return code;
}
static int32_t rewriteCreateSubTableFromFile(STranslateContext* pCxt, SCreateSubTableFromFileClause* pStmt,
SHashObj* pVgroupHashmap) {
static int32_t resetParseFileContext(SParseFileContext* pParFileCxt) {
taosArrayClear(pParFileCxt->aCreateTbData);
taosArrayClearEx(pParFileCxt->aTagVals, clearTagValArrayFp);
return TSDB_CODE_SUCCESS;
}
static int32_t createSubTableFromFile(SMsgBuf* pMsgBuf, SParseContext* pParseCxt, SVnodeModifyOpStmt* pModifyStmt) {
int32_t code = 0;
STableMeta* pSuperTableMeta = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = getTableMeta(pCxt, pStmt->useDbName, pStmt->useTableName, &pSuperTableMeta);
}
SCreateTbInfo* pCreateInfo = pModifyStmt->pCreateTbInfo;
SParseFileContext* pParFileCxt = pModifyStmt->pParFileCxt;
if (TSDB_CODE_SUCCESS == code) {
code = prepareReadFromFile(pStmt);
code = parseCsvFile(pMsgBuf, pParseCxt, pParFileCxt, pModifyStmt->fp, tsMaxInsertBatchRows);
}
STableMeta* pSuperTableMeta = pParFileCxt->pStbMeta;
if (TSDB_CODE_SUCCESS == code) {
code = buildTagIndexForBindTags(&pCxt->msgBuf, pStmt, pSuperTableMeta);
}
SParseFileContext parseFileCtx = {
.pSuperTableMeta = pSuperTableMeta, .fp = pStmt->fp, .pTagIndexs = pStmt->aTagIndexs};
parseFileCtx.pTbNameHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
parseFileCtx.aTagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
parseFileCtx.tagNameFilled = false;
parseFileCtx.aTagVals = taosArrayInit(8, sizeof(STagVal));
parseFileCtx.pTag = NULL;
parseFileCtx.ctbName.type = TSDB_TABLE_NAME_T;
parseFileCtx.ctbName.acctId = pCxt->pParseCxt->acctId;
strcpy(parseFileCtx.ctbName.dbname, pStmt->useDbName);
if (NULL == parseFileCtx.aTagNames || NULL == parseFileCtx.aTagVals || NULL == parseFileCtx.pTbNameHash) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OUT;
}
if (TSDB_CODE_SUCCESS == code) {
code = parseCsvFile(&pCxt->msgBuf, pCxt->pParseCxt, &parseFileCtx, pStmt->aCreateTbData);
}
if (TSDB_CODE_SUCCESS == code) {
int sz = taosArrayGetSize(pStmt->aCreateTbData);
int sz = taosArrayGetSize(pParFileCxt->aCreateTbData);
for (int i = 0; i < sz; i++) {
SCreateTableData* pData = taosArrayGet(pStmt->aCreateTbData, i);
SCreateTableData* pData = taosArrayGet(pParFileCxt->aCreateTbData, i);
code = collectUseTable(&pData->ctbName, pCxt->pTargetTables);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(pData->pTag);
// code = collectUseTable(&pData->ctbName, pCxt->pTargetTables);
// if (TSDB_CODE_SUCCESS != code) {
// taosMemoryFree(pData->pTag);
// }
code = addCreateTbReqIntoVgroup(pModifyStmt->pVgroupsHashObj, pCreateInfo->useDbName, pSuperTableMeta->uid,
pCreateInfo->useTableName, pData->ctbName.tname, pData->aTagNames,
pSuperTableMeta->tableInfo.numOfTags, pData->pTag, TSDB_DEFAULT_TABLE_TTL, NULL,
pCreateInfo->ignoreExists, &pData->vg);
}
code = addCreateTbReqIntoVgroup(pVgroupHashmap, pStmt->useDbName, pSuperTableMeta->uid, pStmt->useTableName,
pData->ctbName.tname, pData->aTagNames, pSuperTableMeta->tableInfo.numOfTags,
pData->pTag, TSDB_DEFAULT_TABLE_TTL, NULL, pStmt->ignoreExists, &pData->vg);
if (TSDB_CODE_SUCCESS == code) {
pModifyStmt->fileProcessing = (sz == tsMaxInsertBatchRows);
}
}
_OUT:
taosMemoryFreeClear(pSuperTableMeta);
taosHashCleanup(parseFileCtx.pTbNameHash);
taosArrayDestroy(parseFileCtx.aTagNames);
taosArrayDestroy(parseFileCtx.aTagVals);
(void)resetParseFileContext(pModifyStmt->pParFileCxt);
return code;
}
@ -13074,13 +13164,8 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode;
FOREACH(pNode, pStmt->pSubTables) {
if (pNode->type == QUERY_NODE_CREATE_SUBTABLE_CLAUSE) {
SCreateSubTableClause* pClause = (SCreateSubTableClause*)pNode;
code = rewriteCreateSubTable(pCxt, pClause, pVgroupHashmap);
} else {
SCreateSubTableFromFileClause* pClause = (SCreateSubTableFromFileClause*)pNode;
code = rewriteCreateSubTableFromFile(pCxt, pClause, pVgroupHashmap);
}
if (TSDB_CODE_SUCCESS != code) {
taosHashCleanup(pVgroupHashmap);
return code;
@ -13096,6 +13181,75 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
return rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
}
static int32_t rewriteCreateTableFromFile(STranslateContext* pCxt, SQuery* pQuery) {
SVnodeModifyOpStmt* pModifyStmt = (SVnodeModifyOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT);
if (pModifyStmt == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pModifyStmt->sqlNodeType = nodeType(pQuery->pRoot);
pModifyStmt->fileProcessing = false;
pModifyStmt->destroyParseFileCxt = destructParseFileContext;
pModifyStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (NULL == pModifyStmt->pVgroupsHashObj) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosHashSetFreeFp(pModifyStmt->pVgroupsHashObj, destroyCreateTbReqBatch);
SCreateSubTableFromFileClause* pCreateStmt = (SCreateSubTableFromFileClause*)pQuery->pRoot;
int32_t code = prepareReadCsvFile(pCxt, pCreateStmt, pModifyStmt);
if (TSDB_CODE_SUCCESS != code) {
taosHashCleanup(pModifyStmt->pVgroupsHashObj);
return code;
}
code = createSubTableFromFile(&pCxt->msgBuf, pCxt->pParseCxt, pModifyStmt);
if (TSDB_CODE_SUCCESS != code) {
taosHashCleanup(pModifyStmt->pVgroupsHashObj);
return code;
}
SArray* pBufArray = serializeVgroupsCreateTableBatch(pModifyStmt->pVgroupsHashObj);
taosHashClear(pModifyStmt->pVgroupsHashObj);
if (NULL == pBufArray) {
taosHashCleanup(pModifyStmt->pVgroupsHashObj);
return TSDB_CODE_OUT_OF_MEMORY;
}
pModifyStmt->pDataBlocks = pBufArray;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pModifyStmt;
return TSDB_CODE_SUCCESS;
}
int32_t continueCreateTbFromFile(SParseContext* pParseCxt, SQuery** pQuery) {
SVnodeModifyOpStmt* pModifyStmt = (SVnodeModifyOpStmt*)(*pQuery)->pRoot;
SMsgBuf tmpBuf = {0};
tmpBuf.buf = taosMemoryMalloc(1024);
int32_t code = createSubTableFromFile(&tmpBuf, pParseCxt, pModifyStmt);
if (TSDB_CODE_SUCCESS != code) goto _OUT;
SArray* pBufArray = serializeVgroupsCreateTableBatch(pModifyStmt->pVgroupsHashObj);
taosHashClear(pModifyStmt->pVgroupsHashObj);
if (NULL == pBufArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OUT;
}
pModifyStmt->pDataBlocks = pBufArray;
(*pQuery)->execStage = QUERY_EXEC_STAGE_SCHEDULE;
if (!pModifyStmt->fileProcessing) {
(*pQuery)->execMode = QUERY_EXEC_MODE_EMPTY_RESULT;
}
code = TSDB_CODE_SUCCESS;
_OUT:
taosMemoryFreeClear(tmpBuf.buf);
return code;
}
typedef struct SVgroupDropTableBatch {
SVDropTbBatchReq req;
SVgroupInfo info;
@ -13798,7 +13952,8 @@ static int32_t createParWhenThenNode(SNode* pWhen, SNode* pThen, SNode** ppResWh
return TSDB_CODE_SUCCESS;
}
static int32_t createParCaseWhenNode(SNode* pCase, SNodeList* pWhenThenList, SNode* pElse, const char* pAias, SNode** ppResCaseWhen) {
static int32_t createParCaseWhenNode(SNode* pCase, SNodeList* pWhenThenList, SNode* pElse, const char* pAias,
SNode** ppResCaseWhen) {
SCaseWhenNode* pCaseWhen = (SCaseWhenNode*)nodesMakeNode(QUERY_NODE_CASE_WHEN);
CHECK_POINTER_OUT_OF_MEM(pCaseWhen);
@ -13813,7 +13968,8 @@ static int32_t createParCaseWhenNode(SNode* pCase, SNodeList* pWhenThenList, SNo
return TSDB_CODE_SUCCESS;
}
static int32_t createParFunctionNode(const char* pFunName, const char* pAias, SNodeList* pParameterList, SNode** ppResFunc) {
static int32_t createParFunctionNode(const char* pFunName, const char* pAias, SNodeList* pParameterList,
SNode** ppResFunc) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
CHECK_POINTER_OUT_OF_MEM(pFunc);
strcpy(pFunc->functionName, pFunName);
@ -13885,14 +14041,14 @@ static int32_t rewriteShowAliveStmt(STranslateContext* pCxt, SQuery* pQuery) {
SNode* pElse = nodesMakeValueNodeFromInt32(0);
CHECK_POINTER_OUT_OF_MEM(pElse);
// case when (v1_status = "leader" or v2_status = "lead er" or v3_status = "leader" or v4_status = "leader") then 1 else 0 end
// case when (v1_status = "leader" or v2_status = "lead er" or v3_status = "leader" or v4_status = "leader") then 1
// else 0 end
SNode* pCaseWhen = NULL;
CHECK_RES_OUT_OF_MEM(createParCaseWhenNode(NULL, pWhenThenlist, pElse, NULL, &pCaseWhen));
SNodeList* pParaList = NULL;
CHECK_RES_OUT_OF_MEM(createParListNode(pCaseWhen, &pParaList));
// sum( case when ... end) as leader_col
SNode* pSumFun = NULL;
const char* pSumColAlias = "leader_col";
@ -13914,11 +14070,13 @@ static int32_t rewriteShowAliveStmt(STranslateContext* pCxt, SQuery* pQuery) {
SSelectStmt* pSubSelect = NULL;
// select sum( case when .... end) as leader_col, count(*) as count_col from information_schema.ins_vgroups
CHECK_RES_OUT_OF_MEM(createSimpleSelectStmtFromProjList(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_VGROUPS, pProjList, &pSubSelect));
CHECK_RES_OUT_OF_MEM(
createSimpleSelectStmtFromProjList(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_VGROUPS, pProjList, &pSubSelect));
if (pDbName && pDbName[0] != 0) {
// for show db.alive
// select sum( case when .... end) as leader_col, count(*) as count_col from information_schema.ins_vgroups where db_name = "..."
// select sum( case when .... end) as leader_col, count(*) as count_col from information_schema.ins_vgroups where
// db_name = "..."
SNode* pDbCond = NULL;
pValNode = nodesMakeValueNodeFromString(pDbName);
CHECK_RES_OUT_OF_MEM(createOperatorNode(OP_TYPE_EQUAL, "db_name", (SNode*)pValNode, &pDbCond));
@ -13928,8 +14086,6 @@ static int32_t rewriteShowAliveStmt(STranslateContext* pCxt, SQuery* pQuery) {
pSubSelect->pWhere = pDbCond;
}
pCond1 = NULL;
CHECK_RES_OUT_OF_MEM(createParOperatorNode(OP_TYPE_EQUAL, pSumColAlias, pCountColAlias, &pCond1));
pCond2 = NULL;
@ -13961,7 +14117,8 @@ static int32_t rewriteShowAliveStmt(STranslateContext* pCxt, SQuery* pQuery) {
CHECK_RES_OUT_OF_MEM(createParWhenThenNode(pTemp2, pThen, &pWhenThen));
CHECK_RES_OUT_OF_MEM(nodesListStrictAppend(pWhenThenlist, pWhenThen));
// case when leader_col = count_col and count_col > 0 then 1 when leader_col < count_col and count_col > 0 then 2 else 0 end as status
// case when leader_col = count_col and count_col > 0 then 1 when leader_col < count_col and count_col > 0 then 2 else
// 0 end as status
pCaseWhen = NULL;
pElse = nodesMakeValueNodeFromInt32(0);
CHECK_POINTER_OUT_OF_MEM(pElse);
@ -13973,7 +14130,6 @@ static int32_t rewriteShowAliveStmt(STranslateContext* pCxt, SQuery* pQuery) {
SNode* pTempTblNode = NULL;
CHECK_RES_OUT_OF_MEM(createParTempTableNode(pSubSelect, &pTempTblNode));
SSelectStmt* pStmt = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT);
CHECK_POINTER_OUT_OF_MEM(pStmt);
pStmt->pProjectionList = pProjList;
@ -14047,6 +14203,9 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_CREATE_MULTI_TABLES_STMT:
code = rewriteCreateMultiTable(pCxt, pQuery);
break;
case QUERY_NODE_CREATE_SUBTABLE_FROM_FILE_CLAUSE:
code = rewriteCreateTableFromFile(pCxt, pQuery);
break;
case QUERY_NODE_DROP_TABLE_STMT:
code = rewriteDropTable(pCxt, pQuery);
break;

View File

@ -48,6 +48,33 @@ bool qIsInsertValuesSql(const char* pStr, size_t length) {
return false;
}
bool qIsCreateTbFromFileSql(const char* pStr, size_t length) {
if (NULL == pStr) {
return false;
}
const char* pSql = pStr;
int32_t index = 0;
SToken t = tStrGetToken((char*)pStr, &index, false, NULL);
if (TK_CREATE != t.type) {
return false;
}
do {
pStr += index;
index = 0;
t = tStrGetToken((char*)pStr, &index, false, NULL);
if (TK_FILE == t.type) {
return true;
}
if (0 == t.type || 0 == t.n) {
break;
}
} while (pStr - pSql < length);
return false;
}
bool qParseDbName(const char* pStr, size_t length, char** pDbName) {
(void) length;
int32_t index = 0;
@ -239,11 +266,19 @@ static int32_t parseQuerySyntax(SParseContext* pCxt, SQuery** pQuery, struct SCa
return code;
}
static int32_t parseCreateTbFromFileSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
if (NULL == *pQuery) return parseQuerySyntax(pCxt, pQuery, pCatalogReq);
return continueCreateTbFromFile(pCxt, pQuery);
}
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
if (TSDB_CODE_SUCCESS == code) {
if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSql(pCxt, pQuery, pCatalogReq, NULL);
} else if (qIsCreateTbFromFileSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseCreateTbFromFileSyntax(pCxt, pQuery, pCatalogReq);
} else {
code = parseQuerySyntax(pCxt, pQuery, pCatalogReq);
}

File diff suppressed because it is too large Load Diff

View File

@ -1708,6 +1708,7 @@ static int32_t getMsgType(ENodeType sqlType) {
switch (sqlType) {
case QUERY_NODE_CREATE_TABLE_STMT:
case QUERY_NODE_CREATE_MULTI_TABLES_STMT:
case QUERY_NODE_CREATE_SUBTABLE_FROM_FILE_CLAUSE:
return TDMT_VND_CREATE_TABLE;
case QUERY_NODE_DROP_TABLE_STMT:
return TDMT_VND_DROP_TABLE;