From 28db09a127a090364ce7ed503d37e48652c1eaf4 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 8 Feb 2023 17:10:09 +0800 Subject: [PATCH] fix:change async to sync in schemaless --- source/client/src/clientSml.c | 132 +++++----------------------------- 1 file changed, 18 insertions(+), 114 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index c24aa536c2..0694f4c457 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -149,17 +149,8 @@ typedef struct { int64_t endTime; } SSmlCostInfo; -typedef struct { - SRequestObj *request; - tsem_t sem; - int32_t cnt; - int32_t total; - TdThreadSpinlock lock; -} Params; - typedef struct { int64_t id; - Params *params; SMLProtocolType protocol; int8_t precision; @@ -178,7 +169,6 @@ typedef struct { SQuery *pQuery; SSmlCostInfo cost; - int32_t affectedRows; SSmlMsgBuf msgBuf; SHashObj *dumplicateKey; // for dumplicate key SArray *colsContainer; // for cols parse, if dataFormat == false @@ -1513,7 +1503,6 @@ static void smlDestroyInfo(SSmlHandle *info) { if (!info->dataFormat) { taosArrayDestroy(info->colsContainer); } - destroyRequest(info->pRequest); cJSON_Delete(info->root); taosMemoryFreeClear(info); @@ -2351,20 +2340,11 @@ static int32_t smlInsertData(SSmlHandle *info) { } info->cost.insertRpcTime = taosGetTimestampUs(); - // launchQueryImpl(info->pRequest, info->pQuery, false, NULL); - // info->affectedRows = taos_affected_rows(info->pRequest); - // return info->pRequest->code; - SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary; atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); - SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); - if (pWrapper == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - pWrapper->pRequest = info->pRequest; - launchAsyncQuery(info->pRequest, info->pQuery, NULL, pWrapper); - return TSDB_CODE_SUCCESS; + launchQueryImpl(info->pRequest, info->pQuery, true, NULL); + return info->pRequest->code; } static void smlPrintStatisticInfo(SSmlHandle *info) { @@ -2498,48 +2478,13 @@ static int32_t isSchemalessDb(STscObj *taos, SRequestObj *request) { return TSDB_CODE_SUCCESS; } -static void smlInsertCallback(void *param, void *res, int32_t code) { - SRequestObj *pRequest = (SRequestObj *)res; - SSmlHandle *info = (SSmlHandle *)param; - int32_t rows = taos_affected_rows(pRequest); - - uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf); - Params *pParam = info->params; - // lock - taosThreadSpinLock(&pParam->lock); - pParam->cnt++; - if (code != TSDB_CODE_SUCCESS) { - pParam->request->code = code; - pParam->request->body.resInfo.numOfRows += rows; - } else { - pParam->request->body.resInfo.numOfRows += info->affectedRows; - } - // unlock - taosThreadSpinUnlock(&pParam->lock); - - if (pParam->cnt == pParam->total) { - tsem_post(&pParam->sem); - } - uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows); - info->cost.endTime = taosGetTimestampUs(); - info->cost.code = code; - smlPrintStatisticInfo(info); - smlDestroyInfo(info); -} - TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd, int numLines, int protocol, int precision, int32_t ttl) { - int batchs = 0; STscObj *pTscObj = request->pTscObj; - + SSmlHandle *info = NULL; pTscObj->schemalessType = 1; SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; - Params params = {0}; - params.request = request; - tsem_init(¶ms.sem, 0, 0); - taosThreadSpinInit(&(params.lock), 0); - if (request->pDb == NULL) { request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; smlBuildInvalidDataMsg(&msg, "Database not specified", NULL); @@ -2573,65 +2518,24 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char goto end; } - batchs = ceil(((double)numLines) / tsSmlBatchSize); - params.total = batchs; - for (int i = 0; i < batchs; ++i) { - SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0); - if (!req) { - request->code = TSDB_CODE_OUT_OF_MEMORY; - uError("SML:taos_schemaless_insert error request is null"); - goto end; - } - SSmlHandle *info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision); - if (!info) { - request->code = TSDB_CODE_OUT_OF_MEMORY; - uError("SML:taos_schemaless_insert error SSmlHandle is null"); - goto end; - } - - info->isRawLine = (rawLine == NULL); - info->ttl = ttl; - - int32_t perBatch = tsSmlBatchSize; - - if (numLines > perBatch) { - numLines -= perBatch; - } else { - perBatch = numLines; - numLines = 0; - } - - info->params = ¶ms; - info->affectedRows = perBatch; - info->pRequest->body.queryFp = smlInsertCallback; - info->pRequest->body.param = info; - int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch); - if (lines) { - lines += perBatch; - } - if (rawLine) { - int num = 0; - while (rawLine < rawLineEnd) { - if (*(rawLine++) == '\n') { - num++; - } - if (num == perBatch) { - break; - } - } - } - if (code != TSDB_CODE_SUCCESS) { - info->pRequest->body.queryFp(info, req, code); - } + info = smlBuildSmlInfo(pTscObj, request, (SMLProtocolType)protocol, precision); + if (!info) { + request->code = TSDB_CODE_OUT_OF_MEMORY; + uError("SML:taos_schemaless_insert error SSmlHandle is null"); + goto end; } - tsem_wait(¶ms.sem); + + info->isRawLine = (rawLine == NULL); + info->ttl = ttl; + request->code = smlProcess(info, lines, rawLine, rawLineEnd, numLines); end: - taosThreadSpinDestroy(¶ms.lock); - tsem_destroy(¶ms.sem); - // ((STscObj *)taos)->schemalessType = 0; - pTscObj->schemalessType = 1; - uDebug("resultend:%s", request->msgBuf); + uDebug("SML:0x%" PRIx64 " insert finished, code: %d", info->id, request->code); + info->cost.endTime = taosGetTimestampUs(); + info->cost.code = request->code; + smlPrintStatisticInfo(info); + smlDestroyInfo(info); + return (TAOS_RES *)request; }