diff --git a/examples/c/insert_stb.c b/examples/c/insert_stb.c new file mode 100644 index 0000000000..dbe34e3bfb --- /dev/null +++ b/examples/c/insert_stb.c @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// TAOS standard API example. The same syntax as MySQL, but only a subset +// to compile: gcc -o demo demo.c -ltaos + +#include +#include +#include +#include +#include + +#include "taos.h" // TAOS header file + +static void executeSql(TAOS *taos, char *command) { + int i; + TAOS_RES *pSql = NULL; + int32_t code = -1; + + for (i = 0; i < 5; i++) { + if (NULL != pSql) { + taos_free_result(pSql); + pSql = NULL; + } + + pSql = taos_query(taos, command); + code = taos_errno(pSql); + if (0 == code) { + break; + } + } + + if (code != 0) { + fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql)); + taos_free_result(pSql); + taos_close(taos); + exit(EXIT_FAILURE); + } + + taos_free_result(pSql); +} + +void TestInsert(TAOS *taos, char *qstr) { + executeSql(taos, "drop database if exists demo2"); + executeSql(taos, "create database demo2"); + executeSql(taos, "use demo2"); + + executeSql(taos, "create table st (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10), tbname2 binary(192)) tags(t1 int, t2 float, t3 binary(10))"); + printf("success to create table\n"); + + struct timeval start_time; + gettimeofday(&start_time, NULL); + + for (int tblIdx = 0; tblIdx < 10; ++tblIdx) { + int len = 0; + len += sprintf(qstr+len, "insert into ct%d using st tags(%d, %f, '%s')", tblIdx, tblIdx, (float)tblIdx, "childtable"); + int batchStart = len; + for (int batchIdx = 0; batchIdx < 10000; ++batchIdx) { + len = batchStart; + len += sprintf(qstr+len, " values"); + if (batchIdx % 5000 == 1) + printf("%s %d\n", qstr, batchIdx); + + for (int rowIdx = 0; rowIdx < 100; ++ rowIdx) { + int i = rowIdx + batchIdx * 100 + tblIdx*10000*100; + len += sprintf(qstr+len, " (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s', 'ct%d')", (uint64_t)(1546300800000 + i), (int8_t)i, (int16_t)i, i, i, i*1.0, i*2.0, "hello", tblIdx); + } + TAOS_RES *result1 = taos_query(taos, qstr); + if (result1 == NULL || taos_errno(result1) != 0) { + printf("failed to insert row, reason:%s. qstr: %s\n", taos_errstr(result1), qstr); + taos_free_result(result1); + exit(1); + } + taos_free_result(result1); + } + } + struct timeval end_time; + gettimeofday(&end_time, NULL); + double elapsed_time = (end_time.tv_sec - start_time.tv_sec) + + (end_time.tv_usec - start_time.tv_usec) / 1000000.0; + printf("elapsed time: %.3f\n", elapsed_time); + executeSql(taos, "drop database if exists demo2"); +} + +void TestInsertStb(TAOS *taos, char *qstr) { + executeSql(taos, "drop database if exists demo"); + executeSql(taos, "create database demo"); + executeSql(taos, "use demo"); + + executeSql(taos, "create table st (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10)) tags(t1 int, t2 float, t3 binary(10))"); + printf("success to create table\n"); + + struct timeval start_time; + gettimeofday(&start_time, NULL); + + for (int tblIdx = 0; tblIdx < 10; ++tblIdx) { + int len = 0; + len += sprintf(qstr+len, "insert into st(tbname, t1, t2, t3, ts, ti, si, i, bi, f, d, b)"); + int batchStart = len; + for (int batchIdx = 0; batchIdx < 10000; ++batchIdx) { + len = batchStart; + len += sprintf(qstr+len, " values"); + if (batchIdx % 5000 == 1) + printf("%s %d table %d\n", qstr, batchIdx, tblIdx); + + for (int rowIdx = 0; rowIdx < 100; ++rowIdx) { + int i = rowIdx + batchIdx * 100 + tblIdx*10000*100; + len += sprintf(qstr+len, " ('ct%d', %d, %f, '%s', %" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", tblIdx, tblIdx, (float)tblIdx, "childtable", + (uint64_t)(1546300800000 + i), (int8_t)i, (int16_t)i, i, i, i*1.0, i*2.0, "hello"); + } + TAOS_RES *result1 = taos_query(taos, qstr); + if (result1 == NULL || taos_errno(result1) != 0) { + printf("failed to insert row, reason:%s. qstr: %s\n", taos_errstr(result1), qstr); + taos_free_result(result1); + exit(1); + } + taos_free_result(result1); + } + } + struct timeval end_time; + gettimeofday(&end_time, NULL); + double elapsed_time = (end_time.tv_sec - start_time.tv_sec) + + (end_time.tv_usec - start_time.tv_usec) / 1000000.0; + + printf("elapsed time: %.3f\n", elapsed_time); + executeSql(taos, "drop database if exists demo"); +} + + +int main(int argc, char *argv[]) { + + // connect to server + if (argc < 2) { + printf("please input server-ip \n"); + return 0; + } + + TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0); + if (taos == NULL) { + printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); + exit(1); + } + char* qstr = malloc(1024*1024); + TestInsert(taos, qstr); + TestInsertStb(taos, qstr); + free(qstr); + taos_close(taos); + taos_cleanup(); +} + diff --git a/examples/c/makefile b/examples/c/makefile index 244d13fad7..5fc590f424 100644 --- a/examples/c/makefile +++ b/examples/c/makefile @@ -17,6 +17,7 @@ exe: gcc $(CFLAGS) ./stream_demo.c -o $(ROOT)stream_demo $(LFLAGS) gcc $(CFLAGS) ./tmq.c -o $(ROOT)tmq $(LFLAGS) gcc $(CFLAGS) ./schemaless.c -o $(ROOT)schemaless $(LFLAGS) + gcc $(CFLAGS) ./insert_stb.c -o $(ROOT)insert_stb $(LFLAGS) clean: rm $(ROOT)asyncdemo @@ -25,3 +26,4 @@ clean: rm $(ROOT)stream_demo rm $(ROOT)tmq rm $(ROOT)schemaless + rm $(ROOT)insert_stb diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index f8cb019ced..567d050836 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -230,13 +230,12 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E token.z = tmpTokenBuf; token.n = strdequote(token.z); - if (boundColsType == BOUND_ALL_AND_TBNAME) { - if (token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) { - pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex; - pUseCols[tbnameSchemaIndex] = true; - ++pBoundInfo->numOfBound; - continue; - } + if (boundColsType == BOUND_ALL_AND_TBNAME && + token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) { + pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex; + pUseCols[tbnameSchemaIndex] = true; + ++pBoundInfo->numOfBound; + continue; } int16_t t = lastColIdx + 1; int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema); @@ -1567,6 +1566,7 @@ typedef struct SStbRowsDataContext { STag* pTag; STableMeta* pCtbMeta; SVCreateTbReq* pCreateCtbReq; + bool hasTimestampTag; } SStbRowsDataContext; typedef union SRowsDataContext{ @@ -1574,19 +1574,52 @@ typedef union SRowsDataContext{ SStbRowsDataContext* pStbRowsCxt; } SRowsDataContext; +static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* pStbRowsCxt, SToken* pToken, + char* ctbName, bool* pFoundCtbName) { + *pFoundCtbName = false; + int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg); + if (code == TSDB_CODE_SUCCESS){ + if (isNullValue(TSDB_DATA_TYPE_BINARY, pToken)) { + return buildInvalidOperationMsg(&pCxt->msg, "tbname can not be null value"); + } + + if (pToken->n > 0) { + if (pToken->n <= TSDB_TABLE_NAME_LEN - 1) { + memcpy(ctbName, pToken->z, pToken->n); + ctbName[pToken->n] = '\0'; + *pFoundCtbName = true; + tNameSetDbName(&pStbRowsCxt->ctbName, pStbRowsCxt->stbName.acctId, pStbRowsCxt->stbName.dbname, strlen(pStbRowsCxt->stbName.dbname)); + tNameAddTbName(&pStbRowsCxt->ctbName, ctbName, pToken->n); + } else { + return buildInvalidOperationMsg(&pCxt->msg, "tbname is too long"); + } + } else { + return buildInvalidOperationMsg(&pCxt->msg, "tbname can not be empty"); + } + } + return code; +} + static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, - SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) { + SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, + SToken* pToken, bool *pCtbFirst) { SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo; SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta); bool isJsonTag = false; - SArray* pTagName = pStbRowsCxt->aTagNames; + SArray* pTagNames = pStbRowsCxt->aTagNames; SArray* pTagVals = pStbRowsCxt->aTagVals; bool bFoundTbName = false; const char* pOrigSql = *ppSql; int32_t code = TSDB_CODE_SUCCESS; + + bool canParseTagsAfter = !pStbRowsCxt->pTagCond && !pStbRowsCxt->hasTimestampTag; + SToken tagTokens[TSDB_MAX_TAGS] = {0}; + SSchema* tagSchemas[TSDB_MAX_TAGS] = {0}; + int numOfTagTokens = 0; + for (int i = 0; i < pCols->numOfBound && code == TSDB_CODE_SUCCESS; ++i) { const char* pTmpSql = *ppSql; bool ignoreComma = false; @@ -1608,25 +1641,20 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS } else if (pCols->pColIndex[i] < getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) { SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]]; isJsonTag = pTagSchema->type == TSDB_DATA_TYPE_JSON; - code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg); - if (code == TSDB_CODE_SUCCESS) { - code = parseTagValue(pCxt, pStmt, ppSql, pTagSchema, pToken, pTagName, pTagVals, &pStbRowsCxt->pTag); + if (canParseTagsAfter) { + tagTokens[numOfTagTokens] = *pToken; + tagSchemas[numOfTagTokens] = pTagSchema; + ++numOfTagTokens; + } else { + code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg); + if (code == TSDB_CODE_SUCCESS) { + code = parseTagValue(pCxt, pStmt, ppSql, pTagSchema, pToken, pTagNames, pTagVals, &pStbRowsCxt->pTag); + } } } else if (pCols->pColIndex[i] == getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) { - SColVal tbnameVal = COL_VAL_NONE(-1, TSDB_DATA_TYPE_BINARY); - tbnameVal.value.val = 0; - code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)tGetTbnameColumnSchema(), - getTableInfo(pStbRowsCxt->pStbMeta).precision, &tbnameVal); - if (code == TSDB_CODE_SUCCESS && COL_VAL_IS_VALUE(&tbnameVal) && tbnameVal.value.nData>0) { - tNameSetDbName(&pStbRowsCxt->ctbName, pStbRowsCxt->stbName.acctId, pStbRowsCxt->stbName.dbname, strlen(pStbRowsCxt->stbName.dbname)); - char ctbName[TSDB_TABLE_NAME_LEN]; - memcpy(ctbName, tbnameVal.value.pData, tbnameVal.value.nData); - ctbName[tbnameVal.value.nData] = '\0'; - tNameAddTbName(&pStbRowsCxt->ctbName, ctbName, tbnameVal.value.nData); - bFoundTbName = true; - } - taosMemoryFreeClear(tbnameVal.value.pData); + char ctbName[TSDB_TABLE_NAME_LEN]; + code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, ctbName, &bFoundTbName); } if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) { @@ -1639,9 +1667,37 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS if (!bFoundTbName) { code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql); } - if (code == TSDB_CODE_SUCCESS && !isJsonTag) { - code = tTagNew(pTagVals, 1, false, &pStbRowsCxt->pTag); + + bool ctbFirst = true; + if (code == TSDB_CODE_SUCCESS) { + char ctbFName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName); + STableMeta** pCtbMeta = taosHashGet(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName)); + ctbFirst = *pCtbFirst = (pCtbMeta == NULL); + if (!ctbFirst) { + pStbRowsCxt->pCtbMeta->uid = (*pCtbMeta)->uid; + pStbRowsCxt->pCtbMeta->vgId = (*pCtbMeta)->vgId; + } } + if (code == TSDB_CODE_SUCCESS && ctbFirst) { + for (int32_t i = 0; code == TSDB_CODE_SUCCESS && i < numOfTagTokens; ++i) { + SToken* pTagToken = tagTokens + i; + SSchema* pTagSchema = tagSchemas[i]; + code = checkAndTrimValue(pTagToken, pCxt->tmpTokenBuf, &pCxt->msg); + if (code == TSDB_CODE_SUCCESS) { + code = parseTagValue(pCxt, pStmt, NULL, pTagSchema, pTagToken, pStbRowsCxt->aTagNames, pStbRowsCxt->aTagVals, + &pStbRowsCxt->pTag); + } + } + if (code == TSDB_CODE_SUCCESS && !isJsonTag) { + code = tTagNew(pStbRowsCxt->aTagVals, 1, false, &pStbRowsCxt->pTag); + } + } + + if (code == TSDB_CODE_SUCCESS && pStbRowsCxt->pTagCond) { + code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond); + } + if (code == TSDB_CODE_SUCCESS) { *pGotRow = true; } @@ -1650,9 +1706,6 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SStbRowsDataContext* pStbRowsCxt) { int32_t code = TSDB_CODE_SUCCESS; - if (pStbRowsCxt->pTagCond) { - code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond); - } pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); if (pStbRowsCxt->pCreateCtbReq == NULL) { @@ -1666,28 +1719,24 @@ static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnod } if (code == TSDB_CODE_SUCCESS) { - collectUseTable(&pStbRowsCxt->ctbName, pStmt->pTableNameHashObj); - char ctbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName); - STableMeta** pCtbMeta = taosHashGet(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName)); - if (NULL != pCtbMeta) { - pStbRowsCxt->pCtbMeta->uid = (*pCtbMeta)->uid; - pStbRowsCxt->pCtbMeta->vgId = (*pCtbMeta)->vgId; - } else { - SVgroupInfo vg; - SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter, - .requestId = pCxt->pComCxt->requestId, - .requestObjRefId = pCxt->pComCxt->requestRid, - .mgmtEps = pCxt->pComCxt->mgmtEpSet}; - code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStmt->targetTableName, &vg); + SVgroupInfo vg; + SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter, + .requestId = pCxt->pComCxt->requestId, + .requestObjRefId = pCxt->pComCxt->requestRid, + .mgmtEps = pCxt->pComCxt->mgmtEpSet}; + code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStmt->targetTableName, &vg); + if (code == TSDB_CODE_SUCCESS) { taosHashPut(pStmt->pVgroupsHashObj, (const char*)(&vg.vgId), sizeof(vg.vgId), &vg, sizeof(vg)); pStbRowsCxt->pCtbMeta->uid = taosHashGetSize(pStmt->pSubTableHashObj) + 1; pStbRowsCxt->pCtbMeta->vgId = vg.vgId; + STableMeta* pBackup = NULL; - cloneTableMeta(pStmt->pTableMeta, &pBackup); + cloneTableMeta(pStbRowsCxt->pCtbMeta, &pBackup); taosHashPut(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName), &pBackup, POINTER_BYTES); } + collectUseTable(&pStbRowsCxt->ctbName, pStmt->pTableNameHashObj); } return code; } @@ -1724,13 +1773,14 @@ static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) { static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) { - resetStbRowsDataContextPreStbRow(pStbRowsCxt); - int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken); + resetStbRowsDataContextPreStbRow(pStbRowsCxt); + bool bFirstTable = false; + int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken, &bFirstTable); if (code != TSDB_CODE_SUCCESS || !*pGotRow) { return code; } - if (code == TSDB_CODE_SUCCESS) { + if (code == TSDB_CODE_SUCCESS && bFirstTable) { code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt); } @@ -2053,6 +2103,14 @@ static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModif if (code == TSDB_CODE_SUCCESS) { code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta, &pStbRowsCxt->boundColsInfo); + pStbRowsCxt->hasTimestampTag = false; + for (int32_t i = 0; i < pStbRowsCxt->boundColsInfo.numOfBound; ++i) { + int16_t schemaIndex = pStbRowsCxt->boundColsInfo.pColIndex[i]; + if (schemaIndex != getTbnameSchemaIndex(pStmt->pTableMeta) && + schemaIndex >= getNumOfColumns(pStmt->pTableMeta) && pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_TIMESTAMP) { + pStbRowsCxt->hasTimestampTag = true; + } + } pStmt->pStbRowsCxt = pStbRowsCxt; }