diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 3039f93a30..25d15ab11e 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -69,6 +69,7 @@ for (int i = 1; i < keyLen; ++i) { \ #define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" " #define MAX_RETRY_TIMES 5 +#define LINE_BATCH 20 //================================================================================================= typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; @@ -164,7 +165,7 @@ typedef struct{ typedef struct { int64_t id; - Params params; + Params *params; bool isLast; SMLProtocolType protocol; @@ -2311,29 +2312,24 @@ static int32_t isSchemalessDb(STscObj *taos, SCatalog *catalog){ } static void smlInsertCallback(void* param, void* res, int32_t code) { - if (code != TSDB_CODE_SUCCESS) { - uError("failed to execute, reason:%s\n", taos_errstr(res)); - } SRequestObj *pRequest = (SRequestObj *)res; - int32_t rows = taos_affected_rows(pRequest); SSmlHandle* info = (SSmlHandle *)param; // lock - taosThreadSpinLock(&info->params.lock); - info->params.request->body.resInfo.numOfRows += rows; if(code != TSDB_CODE_SUCCESS){ - info->params.request->code = code; + taosThreadSpinLock(&info->params->lock); + info->params->request->code = code; + taosThreadSpinUnlock(&info->params->lock); } - taosThreadSpinUnlock(&info->params.lock); // unlock - printf("SML:0x%"PRIx64" insert finished, code: %d, total: %d, insert: %d\n", info->id, code, info->affectedRows, rows); - Params pParam = info->params; + printf("SML:0x%"PRIx64" insert finished, code: %d, total: %d\n", info->id, code, info->affectedRows); + Params *pParam = info->params; bool isLast = info->isLast; smlDestroyInfo(info); if(isLast){ - tsem_post(&pParam.sem); + tsem_post(&pParam->sem); } } @@ -2366,8 +2362,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr } ((STscObj *)taos)->schemalessType = 1; - SSmlMsgBuf msg = {.buf = request->msgBuf, .len = ERROR_MSG_BUF_DEFAULT_SIZE}; + SSmlMsgBuf msg = {.len = ERROR_MSG_BUF_DEFAULT_SIZE, .buf = request->msgBuf}; + int cnt = ceil(((double)numLines)/LINE_BATCH); Params params = {.request = request}; tsem_init(¶ms.sem, 0, 0); taosThreadSpinInit(&(params.lock), 0); @@ -2385,7 +2382,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr goto end; } - if(isSchemalessDb(taos, params.catalog) != TSDB_CODE_SUCCESS){ + if(isSchemalessDb(((STscObj *)taos), params.catalog) != TSDB_CODE_SUCCESS){ request->code = TSDB_CODE_SML_INVALID_DB_CONF; smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL); goto end; @@ -2409,8 +2406,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr goto end; } - int32_t perBatch = 20000; - for (int i = 0; i < ceil(((double)numLines)/perBatch); ++i) { + for (int i = 0; i < cnt; ++i) { SRequestObj* req = (SRequestObj*)createRequest((STscObj *)taos, TSDB_SQL_INSERT); if(!req){ request->code = TSDB_CODE_OUT_OF_MEMORY; @@ -2424,7 +2420,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr goto end; } - if(numLines >= perBatch){ + int32_t perBatch = LINE_BATCH; + + if(numLines > perBatch){ numLines -= perBatch; info->isLast = false; }else{ @@ -2433,7 +2431,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr info->isLast = true; } - info->params = params; + info->params = ¶ms; info->pCatalog = params.catalog; info->affectedRows = perBatch; info->pRequest->body.queryFp = smlInsertCallback; diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 8137583978..25bf13a113 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -1325,7 +1325,7 @@ TEST(testCase, sml_oom_Test) { pRes = taos_query(taos, "use oom"); taos_free_result(pRes); - TAOS_RES* res = taos_schemaless_insert(taos, (char**)sql, 100, TSDB_SML_LINE_PROTOCOL, 0); + TAOS_RES* res = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, 0); ASSERT_EQ(taos_errno(res), 0); taos_free_result(pRes); }