Merge branch 'szhou/insert-stb-tbname' into szhou/continue-coing
This commit is contained in:
commit
8d2d473cbc
|
@ -0,0 +1,162 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// TAOS standard API example. The same syntax as MySQL, but only a subset
|
||||||
|
// to compile: gcc -o demo demo.c -ltaos
|
||||||
|
|
||||||
|
#include <inttypes.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <sys/time.h>
|
||||||
|
|
||||||
|
#include "taos.h" // TAOS header file
|
||||||
|
|
||||||
|
static void executeSql(TAOS *taos, char *command) {
|
||||||
|
int i;
|
||||||
|
TAOS_RES *pSql = NULL;
|
||||||
|
int32_t code = -1;
|
||||||
|
|
||||||
|
for (i = 0; i < 5; i++) {
|
||||||
|
if (NULL != pSql) {
|
||||||
|
taos_free_result(pSql);
|
||||||
|
pSql = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pSql = taos_query(taos, command);
|
||||||
|
code = taos_errno(pSql);
|
||||||
|
if (0 == code) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql));
|
||||||
|
taos_free_result(pSql);
|
||||||
|
taos_close(taos);
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(pSql);
|
||||||
|
}
|
||||||
|
|
||||||
|
void TestInsert(TAOS *taos, char *qstr) {
|
||||||
|
executeSql(taos, "drop database if exists demo2");
|
||||||
|
executeSql(taos, "create database demo2");
|
||||||
|
executeSql(taos, "use demo2");
|
||||||
|
|
||||||
|
executeSql(taos, "create table st (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10), tbname2 binary(192)) tags(t1 int, t2 float, t3 binary(10))");
|
||||||
|
printf("success to create table\n");
|
||||||
|
|
||||||
|
struct timeval start_time;
|
||||||
|
gettimeofday(&start_time, NULL);
|
||||||
|
|
||||||
|
for (int tblIdx = 0; tblIdx < 10; ++tblIdx) {
|
||||||
|
int len = 0;
|
||||||
|
len += sprintf(qstr+len, "insert into ct%d using st tags(%d, %f, '%s')", tblIdx, tblIdx, (float)tblIdx, "childtable");
|
||||||
|
int batchStart = len;
|
||||||
|
for (int batchIdx = 0; batchIdx < 10000; ++batchIdx) {
|
||||||
|
len = batchStart;
|
||||||
|
len += sprintf(qstr+len, " values");
|
||||||
|
if (batchIdx % 5000 == 1)
|
||||||
|
printf("%s %d\n", qstr, batchIdx);
|
||||||
|
|
||||||
|
for (int rowIdx = 0; rowIdx < 100; ++ rowIdx) {
|
||||||
|
int i = rowIdx + batchIdx * 100 + tblIdx*10000*100;
|
||||||
|
len += sprintf(qstr+len, " (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s', 'ct%d')", (uint64_t)(1546300800000 + i), (int8_t)i, (int16_t)i, i, i, i*1.0, i*2.0, "hello", tblIdx);
|
||||||
|
}
|
||||||
|
TAOS_RES *result1 = taos_query(taos, qstr);
|
||||||
|
if (result1 == NULL || taos_errno(result1) != 0) {
|
||||||
|
printf("failed to insert row, reason:%s. qstr: %s\n", taos_errstr(result1), qstr);
|
||||||
|
taos_free_result(result1);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
taos_free_result(result1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
struct timeval end_time;
|
||||||
|
gettimeofday(&end_time, NULL);
|
||||||
|
double elapsed_time = (end_time.tv_sec - start_time.tv_sec) +
|
||||||
|
(end_time.tv_usec - start_time.tv_usec) / 1000000.0;
|
||||||
|
printf("elapsed time: %.3f\n", elapsed_time);
|
||||||
|
executeSql(taos, "drop database if exists demo2");
|
||||||
|
}
|
||||||
|
|
||||||
|
void TestInsertStb(TAOS *taos, char *qstr) {
|
||||||
|
executeSql(taos, "drop database if exists demo");
|
||||||
|
executeSql(taos, "create database demo");
|
||||||
|
executeSql(taos, "use demo");
|
||||||
|
|
||||||
|
executeSql(taos, "create table st (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10)) tags(t1 int, t2 float, t3 binary(10))");
|
||||||
|
printf("success to create table\n");
|
||||||
|
|
||||||
|
struct timeval start_time;
|
||||||
|
gettimeofday(&start_time, NULL);
|
||||||
|
|
||||||
|
for (int tblIdx = 0; tblIdx < 10; ++tblIdx) {
|
||||||
|
int len = 0;
|
||||||
|
len += sprintf(qstr+len, "insert into st(tbname, t1, t2, t3, ts, ti, si, i, bi, f, d, b)");
|
||||||
|
int batchStart = len;
|
||||||
|
for (int batchIdx = 0; batchIdx < 10000; ++batchIdx) {
|
||||||
|
len = batchStart;
|
||||||
|
len += sprintf(qstr+len, " values");
|
||||||
|
if (batchIdx % 5000 == 1)
|
||||||
|
printf("%s %d table %d\n", qstr, batchIdx, tblIdx);
|
||||||
|
|
||||||
|
for (int rowIdx = 0; rowIdx < 100; ++rowIdx) {
|
||||||
|
int i = rowIdx + batchIdx * 100 + tblIdx*10000*100;
|
||||||
|
len += sprintf(qstr+len, " ('ct%d', %d, %f, '%s', %" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", tblIdx, tblIdx, (float)tblIdx, "childtable",
|
||||||
|
(uint64_t)(1546300800000 + i), (int8_t)i, (int16_t)i, i, i, i*1.0, i*2.0, "hello");
|
||||||
|
}
|
||||||
|
TAOS_RES *result1 = taos_query(taos, qstr);
|
||||||
|
if (result1 == NULL || taos_errno(result1) != 0) {
|
||||||
|
printf("failed to insert row, reason:%s. qstr: %s\n", taos_errstr(result1), qstr);
|
||||||
|
taos_free_result(result1);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
taos_free_result(result1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
struct timeval end_time;
|
||||||
|
gettimeofday(&end_time, NULL);
|
||||||
|
double elapsed_time = (end_time.tv_sec - start_time.tv_sec) +
|
||||||
|
(end_time.tv_usec - start_time.tv_usec) / 1000000.0;
|
||||||
|
|
||||||
|
printf("elapsed time: %.3f\n", elapsed_time);
|
||||||
|
executeSql(taos, "drop database if exists demo");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
|
// connect to server
|
||||||
|
if (argc < 2) {
|
||||||
|
printf("please input server-ip \n");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
|
||||||
|
if (taos == NULL) {
|
||||||
|
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
char* qstr = malloc(1024*1024);
|
||||||
|
TestInsert(taos, qstr);
|
||||||
|
TestInsertStb(taos, qstr);
|
||||||
|
free(qstr);
|
||||||
|
taos_close(taos);
|
||||||
|
taos_cleanup();
|
||||||
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ exe:
|
||||||
gcc $(CFLAGS) ./stream_demo.c -o $(ROOT)stream_demo $(LFLAGS)
|
gcc $(CFLAGS) ./stream_demo.c -o $(ROOT)stream_demo $(LFLAGS)
|
||||||
gcc $(CFLAGS) ./tmq.c -o $(ROOT)tmq $(LFLAGS)
|
gcc $(CFLAGS) ./tmq.c -o $(ROOT)tmq $(LFLAGS)
|
||||||
gcc $(CFLAGS) ./schemaless.c -o $(ROOT)schemaless $(LFLAGS)
|
gcc $(CFLAGS) ./schemaless.c -o $(ROOT)schemaless $(LFLAGS)
|
||||||
|
gcc $(CFLAGS) ./insert_stb.c -o $(ROOT)insert_stb $(LFLAGS)
|
||||||
|
|
||||||
clean:
|
clean:
|
||||||
rm $(ROOT)asyncdemo
|
rm $(ROOT)asyncdemo
|
||||||
|
@ -25,3 +26,4 @@ clean:
|
||||||
rm $(ROOT)stream_demo
|
rm $(ROOT)stream_demo
|
||||||
rm $(ROOT)tmq
|
rm $(ROOT)tmq
|
||||||
rm $(ROOT)schemaless
|
rm $(ROOT)schemaless
|
||||||
|
rm $(ROOT)insert_stb
|
||||||
|
|
|
@ -230,13 +230,12 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E
|
||||||
token.z = tmpTokenBuf;
|
token.z = tmpTokenBuf;
|
||||||
token.n = strdequote(token.z);
|
token.n = strdequote(token.z);
|
||||||
|
|
||||||
if (boundColsType == BOUND_ALL_AND_TBNAME) {
|
if (boundColsType == BOUND_ALL_AND_TBNAME &&
|
||||||
if (token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) {
|
token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) {
|
||||||
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex;
|
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex;
|
||||||
pUseCols[tbnameSchemaIndex] = true;
|
pUseCols[tbnameSchemaIndex] = true;
|
||||||
++pBoundInfo->numOfBound;
|
++pBoundInfo->numOfBound;
|
||||||
continue;
|
continue;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
int16_t t = lastColIdx + 1;
|
int16_t t = lastColIdx + 1;
|
||||||
int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema);
|
int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema);
|
||||||
|
@ -1567,6 +1566,7 @@ typedef struct SStbRowsDataContext {
|
||||||
STag* pTag;
|
STag* pTag;
|
||||||
STableMeta* pCtbMeta;
|
STableMeta* pCtbMeta;
|
||||||
SVCreateTbReq* pCreateCtbReq;
|
SVCreateTbReq* pCreateCtbReq;
|
||||||
|
bool hasTimestampTag;
|
||||||
} SStbRowsDataContext;
|
} SStbRowsDataContext;
|
||||||
|
|
||||||
typedef union SRowsDataContext{
|
typedef union SRowsDataContext{
|
||||||
|
@ -1574,19 +1574,52 @@ typedef union SRowsDataContext{
|
||||||
SStbRowsDataContext* pStbRowsCxt;
|
SStbRowsDataContext* pStbRowsCxt;
|
||||||
} SRowsDataContext;
|
} SRowsDataContext;
|
||||||
|
|
||||||
|
static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* pStbRowsCxt, SToken* pToken,
|
||||||
|
char* ctbName, bool* pFoundCtbName) {
|
||||||
|
*pFoundCtbName = false;
|
||||||
|
int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
|
||||||
|
if (code == TSDB_CODE_SUCCESS){
|
||||||
|
if (isNullValue(TSDB_DATA_TYPE_BINARY, pToken)) {
|
||||||
|
return buildInvalidOperationMsg(&pCxt->msg, "tbname can not be null value");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pToken->n > 0) {
|
||||||
|
if (pToken->n <= TSDB_TABLE_NAME_LEN - 1) {
|
||||||
|
memcpy(ctbName, pToken->z, pToken->n);
|
||||||
|
ctbName[pToken->n] = '\0';
|
||||||
|
*pFoundCtbName = true;
|
||||||
|
tNameSetDbName(&pStbRowsCxt->ctbName, pStbRowsCxt->stbName.acctId, pStbRowsCxt->stbName.dbname, strlen(pStbRowsCxt->stbName.dbname));
|
||||||
|
tNameAddTbName(&pStbRowsCxt->ctbName, ctbName, pToken->n);
|
||||||
|
} else {
|
||||||
|
return buildInvalidOperationMsg(&pCxt->msg, "tbname is too long");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return buildInvalidOperationMsg(&pCxt->msg, "tbname can not be empty");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
|
static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
|
||||||
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) {
|
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow,
|
||||||
|
SToken* pToken, bool *pCtbFirst) {
|
||||||
SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo;
|
SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo;
|
||||||
SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta);
|
SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta);
|
||||||
|
|
||||||
bool isJsonTag = false;
|
bool isJsonTag = false;
|
||||||
SArray* pTagName = pStbRowsCxt->aTagNames;
|
SArray* pTagNames = pStbRowsCxt->aTagNames;
|
||||||
SArray* pTagVals = pStbRowsCxt->aTagVals;
|
SArray* pTagVals = pStbRowsCxt->aTagVals;
|
||||||
|
|
||||||
bool bFoundTbName = false;
|
bool bFoundTbName = false;
|
||||||
const char* pOrigSql = *ppSql;
|
const char* pOrigSql = *ppSql;
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
bool canParseTagsAfter = !pStbRowsCxt->pTagCond && !pStbRowsCxt->hasTimestampTag;
|
||||||
|
SToken tagTokens[TSDB_MAX_TAGS] = {0};
|
||||||
|
SSchema* tagSchemas[TSDB_MAX_TAGS] = {0};
|
||||||
|
int numOfTagTokens = 0;
|
||||||
|
|
||||||
for (int i = 0; i < pCols->numOfBound && code == TSDB_CODE_SUCCESS; ++i) {
|
for (int i = 0; i < pCols->numOfBound && code == TSDB_CODE_SUCCESS; ++i) {
|
||||||
const char* pTmpSql = *ppSql;
|
const char* pTmpSql = *ppSql;
|
||||||
bool ignoreComma = false;
|
bool ignoreComma = false;
|
||||||
|
@ -1608,25 +1641,20 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
|
||||||
} else if (pCols->pColIndex[i] < getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
|
} else if (pCols->pColIndex[i] < getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
|
||||||
SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
|
SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
|
||||||
isJsonTag = pTagSchema->type == TSDB_DATA_TYPE_JSON;
|
isJsonTag = pTagSchema->type == TSDB_DATA_TYPE_JSON;
|
||||||
code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
|
if (canParseTagsAfter) {
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
tagTokens[numOfTagTokens] = *pToken;
|
||||||
code = parseTagValue(pCxt, pStmt, ppSql, pTagSchema, pToken, pTagName, pTagVals, &pStbRowsCxt->pTag);
|
tagSchemas[numOfTagTokens] = pTagSchema;
|
||||||
|
++numOfTagTokens;
|
||||||
|
} else {
|
||||||
|
code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
code = parseTagValue(pCxt, pStmt, ppSql, pTagSchema, pToken, pTagNames, pTagVals, &pStbRowsCxt->pTag);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (pCols->pColIndex[i] == getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
|
else if (pCols->pColIndex[i] == getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
|
||||||
SColVal tbnameVal = COL_VAL_NONE(-1, TSDB_DATA_TYPE_BINARY);
|
char ctbName[TSDB_TABLE_NAME_LEN];
|
||||||
tbnameVal.value.val = 0;
|
code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, ctbName, &bFoundTbName);
|
||||||
code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)tGetTbnameColumnSchema(),
|
|
||||||
getTableInfo(pStbRowsCxt->pStbMeta).precision, &tbnameVal);
|
|
||||||
if (code == TSDB_CODE_SUCCESS && COL_VAL_IS_VALUE(&tbnameVal) && tbnameVal.value.nData>0) {
|
|
||||||
tNameSetDbName(&pStbRowsCxt->ctbName, pStbRowsCxt->stbName.acctId, pStbRowsCxt->stbName.dbname, strlen(pStbRowsCxt->stbName.dbname));
|
|
||||||
char ctbName[TSDB_TABLE_NAME_LEN];
|
|
||||||
memcpy(ctbName, tbnameVal.value.pData, tbnameVal.value.nData);
|
|
||||||
ctbName[tbnameVal.value.nData] = '\0';
|
|
||||||
tNameAddTbName(&pStbRowsCxt->ctbName, ctbName, tbnameVal.value.nData);
|
|
||||||
bFoundTbName = true;
|
|
||||||
}
|
|
||||||
taosMemoryFreeClear(tbnameVal.value.pData);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) {
|
if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) {
|
||||||
|
@ -1639,9 +1667,37 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
|
||||||
if (!bFoundTbName) {
|
if (!bFoundTbName) {
|
||||||
code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql);
|
code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql);
|
||||||
}
|
}
|
||||||
if (code == TSDB_CODE_SUCCESS && !isJsonTag) {
|
|
||||||
code = tTagNew(pTagVals, 1, false, &pStbRowsCxt->pTag);
|
bool ctbFirst = true;
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
char ctbFName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
|
||||||
|
STableMeta** pCtbMeta = taosHashGet(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName));
|
||||||
|
ctbFirst = *pCtbFirst = (pCtbMeta == NULL);
|
||||||
|
if (!ctbFirst) {
|
||||||
|
pStbRowsCxt->pCtbMeta->uid = (*pCtbMeta)->uid;
|
||||||
|
pStbRowsCxt->pCtbMeta->vgId = (*pCtbMeta)->vgId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
if (code == TSDB_CODE_SUCCESS && ctbFirst) {
|
||||||
|
for (int32_t i = 0; code == TSDB_CODE_SUCCESS && i < numOfTagTokens; ++i) {
|
||||||
|
SToken* pTagToken = tagTokens + i;
|
||||||
|
SSchema* pTagSchema = tagSchemas[i];
|
||||||
|
code = checkAndTrimValue(pTagToken, pCxt->tmpTokenBuf, &pCxt->msg);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
code = parseTagValue(pCxt, pStmt, NULL, pTagSchema, pTagToken, pStbRowsCxt->aTagNames, pStbRowsCxt->aTagVals,
|
||||||
|
&pStbRowsCxt->pTag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (code == TSDB_CODE_SUCCESS && !isJsonTag) {
|
||||||
|
code = tTagNew(pStbRowsCxt->aTagVals, 1, false, &pStbRowsCxt->pTag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code == TSDB_CODE_SUCCESS && pStbRowsCxt->pTagCond) {
|
||||||
|
code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond);
|
||||||
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
*pGotRow = true;
|
*pGotRow = true;
|
||||||
}
|
}
|
||||||
|
@ -1650,9 +1706,6 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
|
||||||
|
|
||||||
static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SStbRowsDataContext* pStbRowsCxt) {
|
static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SStbRowsDataContext* pStbRowsCxt) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (pStbRowsCxt->pTagCond) {
|
|
||||||
code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond);
|
|
||||||
}
|
|
||||||
|
|
||||||
pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
||||||
if (pStbRowsCxt->pCreateCtbReq == NULL) {
|
if (pStbRowsCxt->pCreateCtbReq == NULL) {
|
||||||
|
@ -1666,28 +1719,24 @@ static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnod
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
collectUseTable(&pStbRowsCxt->ctbName, pStmt->pTableNameHashObj);
|
|
||||||
|
|
||||||
char ctbFName[TSDB_TABLE_FNAME_LEN];
|
char ctbFName[TSDB_TABLE_FNAME_LEN];
|
||||||
tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
|
tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
|
||||||
STableMeta** pCtbMeta = taosHashGet(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName));
|
SVgroupInfo vg;
|
||||||
if (NULL != pCtbMeta) {
|
SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter,
|
||||||
pStbRowsCxt->pCtbMeta->uid = (*pCtbMeta)->uid;
|
.requestId = pCxt->pComCxt->requestId,
|
||||||
pStbRowsCxt->pCtbMeta->vgId = (*pCtbMeta)->vgId;
|
.requestObjRefId = pCxt->pComCxt->requestRid,
|
||||||
} else {
|
.mgmtEps = pCxt->pComCxt->mgmtEpSet};
|
||||||
SVgroupInfo vg;
|
code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStmt->targetTableName, &vg);
|
||||||
SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter,
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
.requestId = pCxt->pComCxt->requestId,
|
|
||||||
.requestObjRefId = pCxt->pComCxt->requestRid,
|
|
||||||
.mgmtEps = pCxt->pComCxt->mgmtEpSet};
|
|
||||||
code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStmt->targetTableName, &vg);
|
|
||||||
taosHashPut(pStmt->pVgroupsHashObj, (const char*)(&vg.vgId), sizeof(vg.vgId), &vg, sizeof(vg));
|
taosHashPut(pStmt->pVgroupsHashObj, (const char*)(&vg.vgId), sizeof(vg.vgId), &vg, sizeof(vg));
|
||||||
pStbRowsCxt->pCtbMeta->uid = taosHashGetSize(pStmt->pSubTableHashObj) + 1;
|
pStbRowsCxt->pCtbMeta->uid = taosHashGetSize(pStmt->pSubTableHashObj) + 1;
|
||||||
pStbRowsCxt->pCtbMeta->vgId = vg.vgId;
|
pStbRowsCxt->pCtbMeta->vgId = vg.vgId;
|
||||||
|
|
||||||
STableMeta* pBackup = NULL;
|
STableMeta* pBackup = NULL;
|
||||||
cloneTableMeta(pStmt->pTableMeta, &pBackup);
|
cloneTableMeta(pStbRowsCxt->pCtbMeta, &pBackup);
|
||||||
taosHashPut(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName), &pBackup, POINTER_BYTES);
|
taosHashPut(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName), &pBackup, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
collectUseTable(&pStbRowsCxt->ctbName, pStmt->pTableNameHashObj);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1725,12 +1774,13 @@ static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
|
||||||
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
|
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
|
||||||
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) {
|
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) {
|
||||||
resetStbRowsDataContextPreStbRow(pStbRowsCxt);
|
resetStbRowsDataContextPreStbRow(pStbRowsCxt);
|
||||||
int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken);
|
bool bFirstTable = false;
|
||||||
|
int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken, &bFirstTable);
|
||||||
if (code != TSDB_CODE_SUCCESS || !*pGotRow) {
|
if (code != TSDB_CODE_SUCCESS || !*pGotRow) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS && bFirstTable) {
|
||||||
code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt);
|
code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2053,6 +2103,14 @@ static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModif
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta,
|
code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta,
|
||||||
&pStbRowsCxt->boundColsInfo);
|
&pStbRowsCxt->boundColsInfo);
|
||||||
|
pStbRowsCxt->hasTimestampTag = false;
|
||||||
|
for (int32_t i = 0; i < pStbRowsCxt->boundColsInfo.numOfBound; ++i) {
|
||||||
|
int16_t schemaIndex = pStbRowsCxt->boundColsInfo.pColIndex[i];
|
||||||
|
if (schemaIndex != getTbnameSchemaIndex(pStmt->pTableMeta) &&
|
||||||
|
schemaIndex >= getNumOfColumns(pStmt->pTableMeta) && pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
|
pStbRowsCxt->hasTimestampTag = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
pStmt->pStbRowsCxt = pStbRowsCxt;
|
pStmt->pStbRowsCxt = pStbRowsCxt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue