[TBASE-1360]
This commit is contained in:
parent
3f19b39492
commit
14349874f4
|
@ -386,6 +386,8 @@ typedef struct _sql_obj {
|
||||||
SSqlRes res;
|
SSqlRes res;
|
||||||
|
|
||||||
char numOfSubs;
|
char numOfSubs;
|
||||||
|
char* asyncTblPos;
|
||||||
|
void* pTableHashList;
|
||||||
struct _sql_obj **pSubs;
|
struct _sql_obj **pSubs;
|
||||||
struct _sql_obj * prev, *next;
|
struct _sql_obj * prev, *next;
|
||||||
} SSqlObj;
|
} SSqlObj;
|
||||||
|
|
|
@ -840,9 +840,17 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pTableHashList = taosInitIntHash(128, POINTER_BYTES, taosHashInt);
|
if ((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList)) {
|
||||||
|
pSql->pTableHashList = taosInitIntHash(128, POINTER_BYTES, taosHashInt);
|
||||||
pSql->cmd.pDataBlocks = tscCreateBlockArrayList();
|
pSql->cmd.pDataBlocks = tscCreateBlockArrayList();
|
||||||
|
if (NULL == pSql->pTableHashList || NULL == pSql->cmd.pDataBlocks) {
|
||||||
|
code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||||
|
goto _error_clean;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
str = pSql->asyncTblPos;
|
||||||
|
}
|
||||||
|
|
||||||
tscTrace("%p create data block list for submit data, %p", pSql, pSql->cmd.pDataBlocks);
|
tscTrace("%p create data block list for submit data, %p", pSql, pSql->cmd.pDataBlocks);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -861,6 +869,8 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pSql->asyncTblPos = sToken.z;
|
||||||
|
|
||||||
// Check if the table name available or not
|
// Check if the table name available or not
|
||||||
if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) {
|
if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) {
|
||||||
code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
|
code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
|
||||||
|
@ -875,7 +885,8 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
|
||||||
void *fp = pSql->fp;
|
void *fp = pSql->fp;
|
||||||
if ((code = tscParseSqlForCreateTableOnDemand(&str, pSql)) != TSDB_CODE_SUCCESS) {
|
if ((code = tscParseSqlForCreateTableOnDemand(&str, pSql)) != TSDB_CODE_SUCCESS) {
|
||||||
if (fp != NULL) {
|
if (fp != NULL) {
|
||||||
goto _clean;
|
//goto _clean;
|
||||||
|
return code;
|
||||||
} else {
|
} else {
|
||||||
/*
|
/*
|
||||||
* for async insert, the free data block operations, which is tscDestroyBlockArrayList,
|
* for async insert, the free data block operations, which is tscDestroyBlockArrayList,
|
||||||
|
@ -918,7 +929,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
|
||||||
* app here insert data in different vnodes, so we need to set the following
|
* app here insert data in different vnodes, so we need to set the following
|
||||||
* data in another submit procedure using async insert routines
|
* data in another submit procedure using async insert routines
|
||||||
*/
|
*/
|
||||||
code = doParseInsertStatement(pSql, pTableHashList, &str, &spd, &totalNum);
|
code = doParseInsertStatement(pSql, pSql->pTableHashList, &str, &spd, &totalNum);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error_clean;
|
goto _error_clean;
|
||||||
}
|
}
|
||||||
|
@ -1033,7 +1044,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
|
||||||
goto _error_clean;
|
goto _error_clean;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doParseInsertStatement(pSql, pTableHashList, &str, &spd, &totalNum);
|
code = doParseInsertStatement(pSql, pSql->pTableHashList, &str, &spd, &totalNum);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error_clean;
|
goto _error_clean;
|
||||||
}
|
}
|
||||||
|
@ -1073,7 +1084,8 @@ _error_clean:
|
||||||
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
||||||
|
|
||||||
_clean:
|
_clean:
|
||||||
taosCleanUpIntHash(pTableHashList);
|
taosCleanUpIntHash(pSql->pTableHashList);
|
||||||
|
pSql->pTableHashList = NULL;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1108,7 +1120,11 @@ int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) {
|
||||||
|
|
||||||
// must before clean the sqlcmd object
|
// must before clean the sqlcmd object
|
||||||
tscRemoveAllMeterMetaInfo(&pSql->cmd, false);
|
tscRemoveAllMeterMetaInfo(&pSql->cmd, false);
|
||||||
tscCleanSqlCmd(&pSql->cmd);
|
|
||||||
|
if (NULL == pSql->asyncTblPos) {
|
||||||
|
tscTrace("continue parse sql: %s", pSql->asyncTblPos);
|
||||||
|
tscCleanSqlCmd(&pSql->cmd);
|
||||||
|
}
|
||||||
|
|
||||||
if (tscIsInsertOrImportData(pSql->sqlstr)) {
|
if (tscIsInsertOrImportData(pSql->sqlstr)) {
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -3581,9 +3581,9 @@ int tscGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) {
|
||||||
* for async insert operation, release data block buffer before issue new object to get metermeta
|
* for async insert operation, release data block buffer before issue new object to get metermeta
|
||||||
* because in metermeta callback function, the tscParse function will generate the submit data blocks
|
* because in metermeta callback function, the tscParse function will generate the submit data blocks
|
||||||
*/
|
*/
|
||||||
if (pSql->fp != NULL && pSql->pStream == NULL) {
|
//if (pSql->fp != NULL && pSql->pStream == NULL) {
|
||||||
tscFreeSqlCmdData(pCmd);
|
// tscFreeSqlCmdData(pCmd);
|
||||||
}
|
//}
|
||||||
|
|
||||||
return tscDoGetMeterMeta(pSql, meterId, index);
|
return tscDoGetMeterMeta(pSql, meterId, index);
|
||||||
}
|
}
|
||||||
|
|
|
@ -230,6 +230,12 @@ int taos_query_imp(STscObj* pObj, SSqlObj* pSql) {
|
||||||
|
|
||||||
pRes->numOfRows = 1;
|
pRes->numOfRows = 1;
|
||||||
pRes->numOfTotal = 0;
|
pRes->numOfTotal = 0;
|
||||||
|
pSql->asyncTblPos = NULL;
|
||||||
|
if (NULL != pSql->pTableHashList) {
|
||||||
|
taosCleanUpIntHash(pSql->pTableHashList);
|
||||||
|
pSql->pTableHashList = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
tscTrace("%p SQL: %s pObj:%p", pSql, pSql->sqlstr, pObj);
|
tscTrace("%p SQL: %s pObj:%p", pSql, pSql->sqlstr, pObj);
|
||||||
|
|
||||||
pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false);
|
pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false);
|
||||||
|
@ -940,6 +946,12 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
|
||||||
|
|
||||||
strtolower(pSql->sqlstr, sql);
|
strtolower(pSql->sqlstr, sql);
|
||||||
|
|
||||||
|
pSql->asyncTblPos = NULL;
|
||||||
|
if (NULL != pSql->pTableHashList) {
|
||||||
|
taosCleanUpIntHash(pSql->pTableHashList);
|
||||||
|
pSql->pTableHashList = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false);
|
pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false);
|
||||||
int code = pRes->code;
|
int code = pRes->code;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue