enh: support config batch rows number when import data from csv file
This commit is contained in:
parent
09a99142cb
commit
5bedf6b19a
|
@ -122,7 +122,7 @@ extern bool tsUseAdapter;
|
|||
// client
|
||||
extern int32_t tsMinSlidingTime;
|
||||
extern int32_t tsMinIntervalTime;
|
||||
extern int32_t tsMaxMemUsedByInsert;
|
||||
extern int32_t tsMaxInsertBatchRows;
|
||||
|
||||
// build info
|
||||
extern char version[];
|
||||
|
|
|
@ -148,8 +148,8 @@ int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
|
|||
// 1 database precision unit for interval time range, changed accordingly
|
||||
int32_t tsMinIntervalTime = 1;
|
||||
|
||||
// maximum memory allowed to be allocated for a single csv load (in MB)
|
||||
int32_t tsMaxMemUsedByInsert = 1024;
|
||||
// maximum batch rows numbers imported from a single csv load
|
||||
int32_t tsMaxInsertBatchRows = 1000000;
|
||||
|
||||
float tsSelectivityRatio = 1.0;
|
||||
int32_t tsTagFilterResCacheSize = 1024 * 10;
|
||||
|
@ -340,7 +340,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
|
||||
// if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
|
||||
// if (cfgAddInt32(pCfg, "smlBatchSize", tsSmlBatchSize, 1, INT32_MAX, true) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxInsertBatchRows", tsMaxInsertBatchRows, 1, INT32_MAX, true) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1;
|
||||
|
@ -725,7 +725,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
|||
// tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
|
||||
|
||||
// tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
|
||||
tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32;
|
||||
tsMaxInsertBatchRows = cfgGetItem(pCfg, "maxInsertBatchRows")->i32;
|
||||
|
||||
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
|
||||
tsCompressMsgSize = cfgGetItem(pCfg, "compressMsgSize")->i32;
|
||||
|
@ -987,7 +987,7 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
|||
} else if (strcasecmp("maxNumOfDistinctRes", name) == 0) {
|
||||
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
|
||||
} else if (strcasecmp("maxMemUsedByInsert", name) == 0) {
|
||||
tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32;
|
||||
tsMaxInsertBatchRows = cfgGetItem(pCfg, "maxInsertBatchRows")->i32;
|
||||
} else if (strcasecmp("maxRetryWaitTime", name) == 0) {
|
||||
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
|
||||
}
|
||||
|
|
|
@ -1552,7 +1552,7 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
|
|||
(*pNumOfRows)++;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && (*pNumOfRows) > tsMaxMemUsedByInsert * 1024 * 1024) {
|
||||
if (TSDB_CODE_SUCCESS == code && (*pNumOfRows) > tsMaxInsertBatchRows) {
|
||||
pStmt->fileProcessing = true;
|
||||
break;
|
||||
}
|
||||
|
@ -1561,6 +1561,8 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
|
|||
}
|
||||
taosMemoryFree(pLine);
|
||||
|
||||
parserDebug("0x%" PRIx64 " %d rows have been parsed", pCxt->pComCxt->requestId, *pNumOfRows);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) &&
|
||||
(!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) && !pStmt->fileProcessing) {
|
||||
code = buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
|
||||
|
|
|
@ -272,6 +272,41 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSubmitTbData* pTmp = taosMemoryCalloc(1, sizeof(SSubmitTbData));
|
||||
if (NULL == pTmp) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
pTmp->flags = pSrc->flags;
|
||||
pTmp->suid = pSrc->suid;
|
||||
pTmp->uid = pSrc->uid;
|
||||
pTmp->sver = pSrc->sver;
|
||||
pTmp->pCreateTbReq = pSrc->pCreateTbReq;
|
||||
if (pTmp->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||
pTmp->aCol = taosArrayInit(128, sizeof(SColData));
|
||||
if (NULL == pTmp->aCol) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pTmp);
|
||||
}
|
||||
} else {
|
||||
pTmp->aRowP = taosArrayInit(128, POINTER_BYTES);
|
||||
if (NULL == pTmp->aRowP) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pTmp);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pSrc);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pDst = pTmp;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
static void resetColValues(SArray* pValues) {
|
||||
int32_t num = taosArrayGetSize(pValues);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
|
@ -381,7 +416,7 @@ static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCx
|
|||
}
|
||||
}
|
||||
taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData);
|
||||
taosMemoryFreeClear(pTableCxt->pData);
|
||||
rebuildTableData(pTableCxt->pData, &pTableCxt->pData);
|
||||
|
||||
qDebug("add tableDataCxt uid:%" PRId64 " to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
|
||||
|
||||
|
|
Loading…
Reference in New Issue