feat: insert from csv
This commit is contained in:
parent
99563fc136
commit
6141bb03da
|
@ -848,14 +848,10 @@ set_quantifier_opt(A) ::= ALL.
|
||||||
|
|
||||||
%type select_list { SNodeList* }
|
%type select_list { SNodeList* }
|
||||||
%destructor select_list { nodesDestroyList($$); }
|
%destructor select_list { nodesDestroyList($$); }
|
||||||
select_list(A) ::= NK_STAR. { A = NULL; }
|
select_list(A) ::= select_item(B). { A = createNodeList(pCxt, B); }
|
||||||
select_list(A) ::= select_sublist(B). { A = B; }
|
select_list(A) ::= select_list(B) NK_COMMA select_item(C). { A = addNodeToList(pCxt, B, C); }
|
||||||
|
|
||||||
%type select_sublist { SNodeList* }
|
|
||||||
%destructor select_sublist { nodesDestroyList($$); }
|
|
||||||
select_sublist(A) ::= select_item(B). { A = createNodeList(pCxt, B); }
|
|
||||||
select_sublist(A) ::= select_sublist(B) NK_COMMA select_item(C). { A = addNodeToList(pCxt, B, C); }
|
|
||||||
|
|
||||||
|
select_item(A) ::= NK_STAR(B). { A = createColumnNode(pCxt, NULL, &B); }
|
||||||
select_item(A) ::= common_expression(B). { A = releaseRawExprNode(pCxt, B); }
|
select_item(A) ::= common_expression(B). { A = releaseRawExprNode(pCxt, B); }
|
||||||
select_item(A) ::= common_expression(B) column_alias(C). { A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &C); }
|
select_item(A) ::= common_expression(B) column_alias(C). { A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &C); }
|
||||||
select_item(A) ::= common_expression(B) AS column_alias(C). { A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &C); }
|
select_item(A) ::= common_expression(B) AS column_alias(C). { A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &C); }
|
||||||
|
|
|
@ -1302,6 +1302,74 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t parseCsvFile(SInsertParseContext* pCxt, TdFilePtr fp, STableDataBlocks* pDataBlock, int maxRows,
|
||||||
|
int32_t* numOfRows) {
|
||||||
|
STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
|
||||||
|
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
|
||||||
|
CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
|
||||||
|
|
||||||
|
(*numOfRows) = 0;
|
||||||
|
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
|
||||||
|
char* pLine = NULL;
|
||||||
|
int64_t readLen = 0;
|
||||||
|
while ((readLen = taosGetLineFile(fp, &pLine)) != -1) {
|
||||||
|
if (('\r' == pLine[readLen - 1]) || ('\n' == pLine[readLen - 1])) {
|
||||||
|
pLine[--readLen] = '\0';
|
||||||
|
}
|
||||||
|
|
||||||
|
if (readLen == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
|
||||||
|
int32_t tSize;
|
||||||
|
CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
|
||||||
|
ASSERT(tSize >= maxRows);
|
||||||
|
maxRows = tSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
strtolower(pLine, pLine);
|
||||||
|
char* pRawSql = pCxt->pSql;
|
||||||
|
pCxt->pSql = pLine;
|
||||||
|
bool gotRow = false;
|
||||||
|
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
|
||||||
|
if (gotRow) {
|
||||||
|
pDataBlock->size += extendedRowSize; // len;
|
||||||
|
(*numOfRows)++;
|
||||||
|
}
|
||||||
|
pCxt->pSql = pRawSql;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (0 == (*numOfRows) && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
|
||||||
|
return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STableDataBlocks* dataBuf) {
|
||||||
|
char filePathStr[TSDB_FILENAME_LEN] = {0};
|
||||||
|
strncpy(filePathStr, filePath.z, filePath.n);
|
||||||
|
TdFilePtr fp = taosOpenFile(filePathStr, TD_FILE_READ | TD_FILE_STREAM);
|
||||||
|
if (NULL == fp) {
|
||||||
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t maxNumOfRows;
|
||||||
|
CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));
|
||||||
|
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
CHECK_CODE(parseCsvFile(pCxt, fp, dataBuf, maxNumOfRows, &numOfRows));
|
||||||
|
|
||||||
|
SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
|
||||||
|
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
|
||||||
|
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
|
||||||
|
}
|
||||||
|
|
||||||
|
dataBuf->numOfTables = 1;
|
||||||
|
pCxt->totalNum += numOfRows;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
|
void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
|
||||||
taosMemoryFreeClear(pReq->name);
|
taosMemoryFreeClear(pReq->name);
|
||||||
taosMemoryFreeClear(pReq->ctb.pTag);
|
taosMemoryFreeClear(pReq->ctb.pTag);
|
||||||
|
@ -1421,7 +1489,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
|
if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
|
||||||
return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
|
return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
|
||||||
}
|
}
|
||||||
// todo
|
CHECK_CODE(parseDataFromFile(pCxt, sToken, dataBuf));
|
||||||
pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT;
|
pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT;
|
||||||
|
|
||||||
tbNum++;
|
tbNum++;
|
||||||
|
|
|
@ -86,6 +86,7 @@ static SKeyword keywordTable[] = {
|
||||||
{"EXISTS", TK_EXISTS},
|
{"EXISTS", TK_EXISTS},
|
||||||
{"EXPLAIN", TK_EXPLAIN},
|
{"EXPLAIN", TK_EXPLAIN},
|
||||||
{"EVERY", TK_EVERY},
|
{"EVERY", TK_EVERY},
|
||||||
|
{"FILE", TK_FILE},
|
||||||
{"FILL", TK_FILL},
|
{"FILL", TK_FILL},
|
||||||
{"FIRST", TK_FIRST},
|
{"FIRST", TK_FIRST},
|
||||||
{"FLOAT", TK_FLOAT},
|
{"FLOAT", TK_FLOAT},
|
||||||
|
|
|
@ -1984,13 +1984,18 @@ static int32_t createMultiResFuncsFromStar(STranslateContext* pCxt, SFunctionNod
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
if (NULL == pSelect->pProjectionList) { // select * ...
|
|
||||||
return createAllColumns(pCxt, &pSelect->pProjectionList);
|
|
||||||
} else {
|
|
||||||
SNode* pNode = NULL;
|
SNode* pNode = NULL;
|
||||||
WHERE_EACH(pNode, pSelect->pProjectionList) {
|
WHERE_EACH(pNode, pSelect->pProjectionList) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (isMultiResFunc(pNode)) {
|
if (isStar(pNode)) {
|
||||||
|
SNodeList* pCols = NULL;
|
||||||
|
code = createAllColumns(pCxt, &pCols);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
INSERT_LIST(pSelect->pProjectionList, pCols);
|
||||||
|
ERASE_NODE(pSelect->pProjectionList);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else if (isMultiResFunc(pNode)) {
|
||||||
SNodeList* pFuncs = NULL;
|
SNodeList* pFuncs = NULL;
|
||||||
code = createMultiResFuncsFromStar(pCxt, (SFunctionNode*)pNode, &pFuncs);
|
code = createMultiResFuncsFromStar(pCxt, (SFunctionNode*)pNode, &pFuncs);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -2012,7 +2017,6 @@ static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
}
|
}
|
||||||
WHERE_NEXT;
|
WHERE_NEXT;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -364,7 +364,7 @@ int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) {
|
||||||
int32_t *length = taos_fetch_lengths(tres);
|
int32_t *length = taos_fetch_lengths(tres);
|
||||||
for (int32_t i = 0; i < num_fields; i++) {
|
for (int32_t i = 0; i < num_fields; i++) {
|
||||||
if (i > 0) {
|
if (i > 0) {
|
||||||
taosFprintfFile(pFile, "\n");
|
taosFprintfFile(pFile, ",");
|
||||||
}
|
}
|
||||||
shellDumpFieldToFile(pFile, (const char *)row[i], fields + i, length[i], precision);
|
shellDumpFieldToFile(pFile, (const char *)row[i], fields + i, length[i], precision);
|
||||||
}
|
}
|
||||||
|
@ -394,9 +394,9 @@ void shellPrintNChar(const char *str, int32_t length, int32_t width) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
int w = 0;
|
int w = 0;
|
||||||
if(*(str + pos) == '\t' || *(str + pos) == '\n' || *(str + pos) == '\r'){
|
if (*(str + pos) == '\t' || *(str + pos) == '\n' || *(str + pos) == '\r') {
|
||||||
w = bytes;
|
w = bytes;
|
||||||
}else{
|
} else {
|
||||||
w = taosWcharWidth(wc);
|
w = taosWcharWidth(wc);
|
||||||
}
|
}
|
||||||
pos += bytes;
|
pos += bytes;
|
||||||
|
@ -513,7 +513,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
bool shellIsLimitQuery(const char *sql) {
|
bool shellIsLimitQuery(const char *sql) {
|
||||||
//todo refactor
|
// todo refactor
|
||||||
if (taosStrCaseStr(sql, " limit ") != NULL) {
|
if (taosStrCaseStr(sql, " limit ") != NULL) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -522,7 +522,7 @@ bool shellIsLimitQuery(const char *sql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool shellIsShowQuery(const char *sql) {
|
bool shellIsShowQuery(const char *sql) {
|
||||||
//todo refactor
|
// todo refactor
|
||||||
if (taosStrCaseStr(sql, "show ") != NULL) {
|
if (taosStrCaseStr(sql, "show ") != NULL) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -530,7 +530,6 @@ bool shellIsShowQuery(const char *sql) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) {
|
int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) {
|
||||||
TAOS_ROW row = taos_fetch_row(tres);
|
TAOS_ROW row = taos_fetch_row(tres);
|
||||||
if (row == NULL) {
|
if (row == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue