fix:change async to sync in schemaless
This commit is contained in:
parent
ac9cdf6c58
commit
28db09a127
|
@ -149,17 +149,8 @@ typedef struct {
|
||||||
int64_t endTime;
|
int64_t endTime;
|
||||||
} SSmlCostInfo;
|
} SSmlCostInfo;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SRequestObj *request;
|
|
||||||
tsem_t sem;
|
|
||||||
int32_t cnt;
|
|
||||||
int32_t total;
|
|
||||||
TdThreadSpinlock lock;
|
|
||||||
} Params;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t id;
|
int64_t id;
|
||||||
Params *params;
|
|
||||||
|
|
||||||
SMLProtocolType protocol;
|
SMLProtocolType protocol;
|
||||||
int8_t precision;
|
int8_t precision;
|
||||||
|
@ -178,7 +169,6 @@ typedef struct {
|
||||||
SQuery *pQuery;
|
SQuery *pQuery;
|
||||||
|
|
||||||
SSmlCostInfo cost;
|
SSmlCostInfo cost;
|
||||||
int32_t affectedRows;
|
|
||||||
SSmlMsgBuf msgBuf;
|
SSmlMsgBuf msgBuf;
|
||||||
SHashObj *dumplicateKey; // for dumplicate key
|
SHashObj *dumplicateKey; // for dumplicate key
|
||||||
SArray *colsContainer; // for cols parse, if dataFormat == false
|
SArray *colsContainer; // for cols parse, if dataFormat == false
|
||||||
|
@ -1513,7 +1503,6 @@ static void smlDestroyInfo(SSmlHandle *info) {
|
||||||
if (!info->dataFormat) {
|
if (!info->dataFormat) {
|
||||||
taosArrayDestroy(info->colsContainer);
|
taosArrayDestroy(info->colsContainer);
|
||||||
}
|
}
|
||||||
destroyRequest(info->pRequest);
|
|
||||||
|
|
||||||
cJSON_Delete(info->root);
|
cJSON_Delete(info->root);
|
||||||
taosMemoryFreeClear(info);
|
taosMemoryFreeClear(info);
|
||||||
|
@ -2351,20 +2340,11 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
}
|
}
|
||||||
info->cost.insertRpcTime = taosGetTimestampUs();
|
info->cost.insertRpcTime = taosGetTimestampUs();
|
||||||
|
|
||||||
// launchQueryImpl(info->pRequest, info->pQuery, false, NULL);
|
|
||||||
// info->affectedRows = taos_affected_rows(info->pRequest);
|
|
||||||
// return info->pRequest->code;
|
|
||||||
|
|
||||||
SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
|
SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
|
||||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
||||||
|
|
||||||
SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
|
launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
|
||||||
if (pWrapper == NULL) {
|
return info->pRequest->code;
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
pWrapper->pRequest = info->pRequest;
|
|
||||||
launchAsyncQuery(info->pRequest, info->pQuery, NULL, pWrapper);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void smlPrintStatisticInfo(SSmlHandle *info) {
|
static void smlPrintStatisticInfo(SSmlHandle *info) {
|
||||||
|
@ -2498,48 +2478,13 @@ static int32_t isSchemalessDb(STscObj *taos, SRequestObj *request) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void smlInsertCallback(void *param, void *res, int32_t code) {
|
|
||||||
SRequestObj *pRequest = (SRequestObj *)res;
|
|
||||||
SSmlHandle *info = (SSmlHandle *)param;
|
|
||||||
int32_t rows = taos_affected_rows(pRequest);
|
|
||||||
|
|
||||||
uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
|
|
||||||
Params *pParam = info->params;
|
|
||||||
// lock
|
|
||||||
taosThreadSpinLock(&pParam->lock);
|
|
||||||
pParam->cnt++;
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
pParam->request->code = code;
|
|
||||||
pParam->request->body.resInfo.numOfRows += rows;
|
|
||||||
} else {
|
|
||||||
pParam->request->body.resInfo.numOfRows += info->affectedRows;
|
|
||||||
}
|
|
||||||
// unlock
|
|
||||||
taosThreadSpinUnlock(&pParam->lock);
|
|
||||||
|
|
||||||
if (pParam->cnt == pParam->total) {
|
|
||||||
tsem_post(&pParam->sem);
|
|
||||||
}
|
|
||||||
uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
|
|
||||||
info->cost.endTime = taosGetTimestampUs();
|
|
||||||
info->cost.code = code;
|
|
||||||
smlPrintStatisticInfo(info);
|
|
||||||
smlDestroyInfo(info);
|
|
||||||
}
|
|
||||||
|
|
||||||
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd,
|
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd,
|
||||||
int numLines, int protocol, int precision, int32_t ttl) {
|
int numLines, int protocol, int precision, int32_t ttl) {
|
||||||
int batchs = 0;
|
|
||||||
STscObj *pTscObj = request->pTscObj;
|
STscObj *pTscObj = request->pTscObj;
|
||||||
|
SSmlHandle *info = NULL;
|
||||||
pTscObj->schemalessType = 1;
|
pTscObj->schemalessType = 1;
|
||||||
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
||||||
|
|
||||||
Params params = {0};
|
|
||||||
params.request = request;
|
|
||||||
tsem_init(¶ms.sem, 0, 0);
|
|
||||||
taosThreadSpinInit(&(params.lock), 0);
|
|
||||||
|
|
||||||
if (request->pDb == NULL) {
|
if (request->pDb == NULL) {
|
||||||
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||||
smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
|
smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
|
||||||
|
@ -2573,65 +2518,24 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
batchs = ceil(((double)numLines) / tsSmlBatchSize);
|
info = smlBuildSmlInfo(pTscObj, request, (SMLProtocolType)protocol, precision);
|
||||||
params.total = batchs;
|
if (!info) {
|
||||||
for (int i = 0; i < batchs; ++i) {
|
request->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0);
|
uError("SML:taos_schemaless_insert error SSmlHandle is null");
|
||||||
if (!req) {
|
goto end;
|
||||||
request->code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
uError("SML:taos_schemaless_insert error request is null");
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
SSmlHandle *info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision);
|
|
||||||
if (!info) {
|
|
||||||
request->code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
uError("SML:taos_schemaless_insert error SSmlHandle is null");
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
info->isRawLine = (rawLine == NULL);
|
|
||||||
info->ttl = ttl;
|
|
||||||
|
|
||||||
int32_t perBatch = tsSmlBatchSize;
|
|
||||||
|
|
||||||
if (numLines > perBatch) {
|
|
||||||
numLines -= perBatch;
|
|
||||||
} else {
|
|
||||||
perBatch = numLines;
|
|
||||||
numLines = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
info->params = ¶ms;
|
|
||||||
info->affectedRows = perBatch;
|
|
||||||
info->pRequest->body.queryFp = smlInsertCallback;
|
|
||||||
info->pRequest->body.param = info;
|
|
||||||
int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch);
|
|
||||||
if (lines) {
|
|
||||||
lines += perBatch;
|
|
||||||
}
|
|
||||||
if (rawLine) {
|
|
||||||
int num = 0;
|
|
||||||
while (rawLine < rawLineEnd) {
|
|
||||||
if (*(rawLine++) == '\n') {
|
|
||||||
num++;
|
|
||||||
}
|
|
||||||
if (num == perBatch) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
info->pRequest->body.queryFp(info, req, code);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
tsem_wait(¶ms.sem);
|
|
||||||
|
info->isRawLine = (rawLine == NULL);
|
||||||
|
info->ttl = ttl;
|
||||||
|
request->code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
|
||||||
|
|
||||||
end:
|
end:
|
||||||
taosThreadSpinDestroy(¶ms.lock);
|
uDebug("SML:0x%" PRIx64 " insert finished, code: %d", info->id, request->code);
|
||||||
tsem_destroy(¶ms.sem);
|
info->cost.endTime = taosGetTimestampUs();
|
||||||
// ((STscObj *)taos)->schemalessType = 0;
|
info->cost.code = request->code;
|
||||||
pTscObj->schemalessType = 1;
|
smlPrintStatisticInfo(info);
|
||||||
uDebug("resultend:%s", request->msgBuf);
|
smlDestroyInfo(info);
|
||||||
|
|
||||||
return (TAOS_RES *)request;
|
return (TAOS_RES *)request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue