[td-719]
This commit is contained in:
parent
5d87d141eb
commit
afcc90a4b4
|
@ -171,11 +171,7 @@ typedef struct STableDataBlocks {
|
||||||
* to avoid it to be removed from cache
|
* to avoid it to be removed from cache
|
||||||
*/
|
*/
|
||||||
STableMeta *pTableMeta;
|
STableMeta *pTableMeta;
|
||||||
|
char *pData;
|
||||||
union {
|
|
||||||
char *filename;
|
|
||||||
char *pData;
|
|
||||||
};
|
|
||||||
|
|
||||||
// for parameter ('?') binding
|
// for parameter ('?') binding
|
||||||
uint32_t numOfAllocedParams;
|
uint32_t numOfAllocedParams;
|
||||||
|
@ -398,7 +394,7 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ;
|
||||||
int doAsyncParseSql(SSqlObj* pSql);
|
int doAsyncParseSql(SSqlObj* pSql);
|
||||||
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
|
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
|
||||||
|
|
||||||
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
|
void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql);
|
||||||
void tscKillSTableQuery(SSqlObj *pSql);
|
void tscKillSTableQuery(SSqlObj *pSql);
|
||||||
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
|
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
|
||||||
bool tscIsUpdateQuery(SSqlObj* pSql);
|
bool tscIsUpdateQuery(SSqlObj* pSql);
|
||||||
|
|
|
@ -1153,29 +1153,19 @@ int tsParseInsertSql(SSqlObj *pSql) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
char fname[PATH_MAX] = {0};
|
strncpy(pCmd->payload, sToken.z, sToken.n);
|
||||||
strncpy(fname, sToken.z, sToken.n);
|
strdequote(pCmd->payload);
|
||||||
strdequote(fname);
|
|
||||||
|
|
||||||
|
// todo refactor extract method
|
||||||
wordexp_t full_path;
|
wordexp_t full_path;
|
||||||
if (wordexp(fname, &full_path, 0) != 0) {
|
if (wordexp(pCmd->payload, &full_path, 0) != 0) {
|
||||||
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
|
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
strcpy(fname, full_path.we_wordv[0]);
|
|
||||||
|
tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize);
|
||||||
wordfree(&full_path);
|
wordfree(&full_path);
|
||||||
|
|
||||||
STableDataBlocks *pDataBlock = NULL;
|
|
||||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
|
||||||
|
|
||||||
int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name,
|
|
||||||
pTableMeta, &pDataBlock);
|
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayPush(pCmd->pDataBlocks, &pDataBlock);
|
|
||||||
strcpy(pDataBlock->filename, fname);
|
|
||||||
} else if (sToken.type == TK_LP) {
|
} else if (sToken.type == TK_LP) {
|
||||||
/* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
|
/* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
|
||||||
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
|
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
|
||||||
|
@ -1424,10 +1414,18 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
|
||||||
int32_t count = 0;
|
int32_t count = 0;
|
||||||
int32_t maxRows = 0;
|
int32_t maxRows = 0;
|
||||||
|
|
||||||
STableDataBlocks *pTableDataBlock = taosArrayGetP(pSql->cmd.pDataBlocks, 0);
|
tscDestroyBlockArrayList(pSql->cmd.pDataBlocks);
|
||||||
pTableDataBlock->size = pTableDataBlock->headerSize;
|
pCmd->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
|
||||||
|
|
||||||
|
STableDataBlocks *pTableDataBlock = NULL;
|
||||||
|
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
// return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pCmd->pDataBlocks, &pTableDataBlock);
|
||||||
tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows);
|
tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows);
|
||||||
|
|
||||||
char *tokenBuf = calloc(1, 4096);
|
char *tokenBuf = calloc(1, 4096);
|
||||||
|
|
||||||
while ((readLen = getline(&line, &n, fp)) != -1) {
|
while ((readLen = getline(&line, &n, fp)) != -1) {
|
||||||
|
@ -1559,52 +1557,33 @@ static UNUSED_FUNC int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpT
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
|
void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) {
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
if (pCmd->command != TSDB_SQL_INSERT) {
|
if (pCmd->command != TSDB_SQL_INSERT) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && strlen(pCmd->payload) != 0);
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
|
||||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
|
||||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
|
||||||
|
|
||||||
assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE/* && pCmd->pDataBlocks != NULL*/);
|
|
||||||
SArray *pDataBlockList = pCmd->pDataBlocks;
|
|
||||||
STableDataBlocks* pDataBlock = taosArrayGetP(pDataBlockList, 0);
|
|
||||||
|
|
||||||
char path[PATH_MAX] = {0};
|
|
||||||
|
|
||||||
SImportFileSupport* pSupporter = calloc(1, sizeof(SImportFileSupport));
|
|
||||||
pSupporter->pSql = pSql;
|
|
||||||
|
|
||||||
|
SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport));
|
||||||
SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);
|
SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);
|
||||||
|
|
||||||
pNew->cmd.pDataBlocks = taosArrayInit(4, POINTER_BYTES);
|
pNew->cmd.pDataBlocks = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
pCmd->count = 1;
|
||||||
|
|
||||||
STableDataBlocks *pTableDataBlock = NULL;
|
FILE *fp = fopen(pCmd->payload, "r");
|
||||||
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
|
if (fp == NULL) {
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
pSql->res.code = TAOS_SYSTEM_ERROR(errno);
|
||||||
// return ret;
|
tscError("%p failed to open file %s to load data from file, code:%s", pSql, pCmd->payload, tstrerror(pSql->res.code));
|
||||||
|
|
||||||
|
tfree(pSupporter)
|
||||||
|
tscQueueAsyncRes(pSql);
|
||||||
|
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pNew->cmd.pDataBlocks, &pTableDataBlock);
|
pSupporter->pSql = pSql;
|
||||||
|
pSupporter->fp = fp;
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) {
|
parseFileSendDataBlock(pSupporter, pNew, 0);
|
||||||
tscError("%p failed to malloc when insert file", pSql);
|
|
||||||
}
|
|
||||||
pCmd->count = 1;
|
|
||||||
|
|
||||||
tstrncpy(path, pDataBlock->filename, sizeof(path));
|
|
||||||
|
|
||||||
FILE *fp = fopen(path, "r");
|
|
||||||
if (fp == NULL) {
|
|
||||||
tscError("%p failed to open file %s to load data from file, reason:%s", pSql, path, strerror(errno));
|
|
||||||
// continue;// todo handle error
|
|
||||||
}
|
|
||||||
|
|
||||||
pSupporter->fp = fp;
|
|
||||||
|
|
||||||
parseFileSendDataBlock(pSupporter, pNew, 0);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1880,7 +1880,7 @@ void tscDoQuery(SSqlObj* pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
|
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
|
||||||
tscProcessMultiVnodesInsertFromFile(pSql);
|
tscProcessMultiVnodesImportFromFile(pSql);
|
||||||
} else {
|
} else {
|
||||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||||
uint16_t type = pQueryInfo->type;
|
uint16_t type = pQueryInfo->type;
|
||||||
|
|
|
@ -245,7 +245,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
||||||
#define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE
|
#define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE
|
||||||
|
|
||||||
#define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE
|
#define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE
|
||||||
#define TSDB_DEFAULT_PAYLOAD_SIZE 2048 // default payload size
|
#define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value
|
||||||
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
|
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
|
||||||
#define TSDB_CQ_SQL_SIZE 1024
|
#define TSDB_CQ_SQL_SIZE 1024
|
||||||
#define TSDB_MAX_VNODES 256
|
#define TSDB_MAX_VNODES 256
|
||||||
|
|
Loading…
Reference in New Issue