diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index f2493f6c57..bf060db5e2 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -151,13 +151,14 @@ typedef struct { typedef struct { SRequestObj *request; tsem_t sem; + int32_t cnt; + int32_t total; TdThreadSpinlock lock; } Params; typedef struct { int64_t id; Params *params; - bool isLast; SMLProtocolType protocol; int8_t precision; @@ -2449,28 +2450,26 @@ static void smlInsertCallback(void *param, void *res, int32_t code) { int32_t rows = taos_affected_rows(pRequest); uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf); - // lock - taosThreadSpinLock(&info->params->lock); - if (code != TSDB_CODE_SUCCESS) { - info->params->request->code = code; - info->params->request->body.resInfo.numOfRows += rows; - }else{ - info->params->request->body.resInfo.numOfRows += info->affectedRows; - } - taosThreadSpinUnlock(&info->params->lock); - // unlock - - uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows); Params *pParam = info->params; - bool isLast = info->isLast; + // lock + taosThreadSpinLock(&pParam->lock); + pParam->cnt++; + if (code != TSDB_CODE_SUCCESS) { + pParam->request->code = code; + pParam->request->body.resInfo.numOfRows += rows; + }else{ + pParam->request->body.resInfo.numOfRows += info->affectedRows; + } + if (pParam->cnt == pParam->total) { + tsem_post(&pParam->sem); + } + taosThreadSpinUnlock(&pParam->lock); + // unlock + uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows); info->cost.endTime = taosGetTimestampUs(); info->cost.code = code; smlPrintStatisticInfo(info); smlDestroyInfo(info); - - if (isLast) { - tsem_post(&pParam->sem); - } } /** @@ -2512,7 +2511,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr pTscObj->schemalessType = 1; SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; - Params params; + Params params = {0}; params.request = request; tsem_init(¶ms.sem, 0, 0); taosThreadSpinInit(&(params.lock), 0); @@ -2557,6 +2556,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr } batchs = ceil(((double)numLines) / LINE_BATCH); + params.total = batchs; for (int i = 0; i < batchs; ++i) { SRequestObj* req = (SRequestObj*)createRequest(pTscObj->id, TSDB_SQL_INSERT); if(!req){ @@ -2575,11 +2575,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr if (numLines > perBatch) { numLines -= perBatch; - info->isLast = false; } else { perBatch = numLines; numLines = 0; - info->isLast = true; } info->params = ¶ms;