[TD-98]
This commit is contained in:
parent
aa9abd4dad
commit
b9ebb7d33d
|
@ -1291,7 +1291,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
|
||||||
return doParseInsertSql(pSql, pSql->sqlstr + index);
|
return doParseInsertSql(pSql, pSql->sqlstr + index);
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsParseSql(SSqlObj *pSql, bool initalParse) {
|
int tsParseSql(SSqlObj *pSql, bool initialParse) {
|
||||||
int32_t ret = TSDB_CODE_SUCCESS;
|
int32_t ret = TSDB_CODE_SUCCESS;
|
||||||
tscTrace("continue parse sql: %s", pSql->asyncTblPos);
|
tscTrace("continue parse sql: %s", pSql->asyncTblPos);
|
||||||
|
|
||||||
|
@ -1301,10 +1301,11 @@ int tsParseSql(SSqlObj *pSql, bool initalParse) {
|
||||||
* Set the fp before parse the sql string, in case of getmetermeta failed, in which
|
* Set the fp before parse the sql string, in case of getmetermeta failed, in which
|
||||||
* the error handle callback function can rightfully restore the user defined function (fp)
|
* the error handle callback function can rightfully restore the user defined function (fp)
|
||||||
*/
|
*/
|
||||||
if (pSql->fp != NULL && initalParse) {
|
if (initialParse) {
|
||||||
pSql->fetchFp = pSql->fp;
|
tscFreeSqlCmdData(&pSql->cmd);
|
||||||
|
|
||||||
// replace user defined callback function with multi-insert proxy function
|
// replace user defined callback function with multi-insert proxy function
|
||||||
|
pSql->fetchFp = pSql->fp;
|
||||||
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
|
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1315,7 +1316,7 @@ int tsParseSql(SSqlObj *pSql, bool initalParse) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (initalParse) {
|
if (initialParse) {
|
||||||
tscFreeSqlCmdData(&pSql->cmd);
|
tscFreeSqlCmdData(&pSql->cmd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1449,57 +1450,6 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* multi-vnodes insertion in sync query model
|
|
||||||
*
|
|
||||||
* modify history
|
|
||||||
* 2019.05.10 lihui
|
|
||||||
* Remove the code for importing records from files
|
|
||||||
*/
|
|
||||||
void tscProcessMultiVnodesInsert(SSqlObj *pSql) {
|
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
|
||||||
|
|
||||||
// not insert/import, return directly
|
|
||||||
if (pCmd->command != TSDB_SQL_INSERT) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// SSqlCmd may have been released
|
|
||||||
if (pCmd->pDataBlocks == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
STableDataBlocks *pDataBlock = NULL;
|
|
||||||
STableMetaInfo * pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
||||||
assert(pCmd->numOfClause == 1);
|
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
/* the first block has been sent to server in processSQL function */
|
|
||||||
assert(pTableMetaInfo->vnodeIndex >= 1 && pCmd->pDataBlocks != NULL);
|
|
||||||
|
|
||||||
if (pTableMetaInfo->vnodeIndex < pCmd->pDataBlocks->nSize) {
|
|
||||||
SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
|
|
||||||
|
|
||||||
for (int32_t i = pTableMetaInfo->vnodeIndex; i < pDataBlocks->nSize; ++i) {
|
|
||||||
pDataBlock = pDataBlocks->pData[i];
|
|
||||||
if (pDataBlock == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
|
|
||||||
tscTrace("%p build submit data block failed, vnodeIdx:%d, total:%d", pSql, pTableMetaInfo->vnodeIndex,
|
|
||||||
pDataBlocks->nSize);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
tscProcessSql(pSql);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// all data have been submit to vnode, release data blocks
|
|
||||||
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
|
||||||
}
|
|
||||||
|
|
||||||
// multi-vnodes insertion in sync query model
|
// multi-vnodes insertion in sync query model
|
||||||
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
|
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
|
|
|
@ -297,11 +297,6 @@ int taos_query(TAOS *taos, const char *sqlstr) {
|
||||||
|
|
||||||
// wait for the callback function to post the semaphore
|
// wait for the callback function to post the semaphore
|
||||||
sem_wait(&pSql->rspSem);
|
sem_wait(&pSql->rspSem);
|
||||||
|
|
||||||
if (pSql->res.code != TSDB_CODE_SUCCESS) {
|
|
||||||
tscFreeSqlCmdData(&pSql->cmd);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pSql->res.code;
|
return pSql->res.code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue