feat: support batch loading of csv files

This commit is contained in:
Xiaoyu Wang 2022-10-11 17:28:58 +08:00
parent 18fa48ae57
commit 0563eb2475
3 changed files with 134 additions and 117 deletions

View File

@ -922,7 +922,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
pRequest->code = code1;
}
if (pRequest->code == TSDB_CODE_SUCCESS && pWrapper->pParseCtx->needMultiParse) {
if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pWrapper->pParseCtx && pWrapper->pParseCtx->needMultiParse) {
code = continueInsertFromCsv(pWrapper, pRequest);
if (TSDB_CODE_SUCCESS == code) {
return;

View File

@ -314,7 +314,8 @@ static int32_t getBytes(uint8_t type, int32_t length){
}
}
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, SArray* results, int32_t numOfCols, bool isTag) {
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
SArray *results, int32_t numOfCols, bool isTag) {
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
ESchemaAction action = SCHEMA_ACTION_NULL;
@ -338,9 +339,8 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
// static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
// int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray* pColumns, SArray* pTags,
STableMeta *pTableMeta, ESchemaAction action){
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta,
ESchemaAction action) {
SRequestObj *pRequest = NULL;
SMCreateStbReq pReq = {0};
int32_t code = TSDB_CODE_SUCCESS;
@ -463,8 +463,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
}
info->cost.numOfCreateSTables++;
} else if (code == TSDB_CODE_SUCCESS) {
hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
HASH_NO_LOCK);
for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
@ -476,8 +476,10 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto end;
}
if (action != SCHEMA_ACTION_NULL) {
SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
SArray *pColumns =
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
SArray *pTags =
taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
SField field = {0};
@ -490,7 +492,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
taosArrayPush(pTags, &field);
}
}
smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags, pTableMeta->tableInfo.numOfColumns, true);
smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
pTableMeta->tableInfo.numOfColumns, true);
code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
if (code != TSDB_CODE_SUCCESS) {
@ -519,8 +522,10 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto end;
}
if (action != SCHEMA_ACTION_NULL) {
SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
SArray *pColumns =
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
SArray *pTags =
taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
SField field = {0};
@ -534,7 +539,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
}
}
smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns, pTableMeta->tableInfo.numOfColumns, false);
smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
pTableMeta->tableInfo.numOfColumns, false);
code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
if (code != TSDB_CODE_SUCCESS) {
@ -1830,7 +1836,8 @@ static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
if (pVal->type == TSDB_DATA_TYPE_NCHAR && pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
if (pVal->type == TSDB_DATA_TYPE_NCHAR &&
pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
@ -2314,7 +2321,8 @@ static int32_t smlInsertData(SSmlHandle *info) {
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, info->msgBuf.buf, info->msgBuf.len);
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
info->msgBuf.buf, info->msgBuf.len);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
return code;
@ -2336,7 +2344,12 @@ static int32_t smlInsertData(SSmlHandle *info) {
SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
launchAsyncQuery(info->pRequest, info->pQuery, NULL, NULL);
SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
if (pWrapper == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pWrapper->pRequest = info->pRequest;
launchAsyncQuery(info->pRequest, info->pQuery, NULL, pWrapper);
return TSDB_CODE_SUCCESS;
}

View File

@ -215,6 +215,10 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
}
void qDestroyParseContext(SParseContext* pCxt) {
if (NULL == pCxt) {
return;
}
taosArrayDestroy(pCxt->pTableMetaPos);
taosArrayDestroy(pCxt->pTableVgroupPos);
taosMemoryFree(pCxt);