feat:add async logic for schemaless
This commit is contained in:
parent
3451ba0c82
commit
1d14725880
|
@ -155,8 +155,17 @@ typedef struct {
|
||||||
int64_t endTime;
|
int64_t endTime;
|
||||||
} SSmlCostInfo;
|
} SSmlCostInfo;
|
||||||
|
|
||||||
|
typedef struct{
|
||||||
|
SRequestObj* request;
|
||||||
|
SCatalog* catalog;
|
||||||
|
tsem_t sem;
|
||||||
|
TdThreadSpinlock lock;
|
||||||
|
} Params;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t id;
|
int64_t id;
|
||||||
|
Params params;
|
||||||
|
bool isLast;
|
||||||
|
|
||||||
SMLProtocolType protocol;
|
SMLProtocolType protocol;
|
||||||
int8_t precision;
|
int8_t precision;
|
||||||
|
@ -1378,6 +1387,7 @@ static void smlDestroyInfo(SSmlHandle* info){
|
||||||
if(!info->dataFormat){
|
if(!info->dataFormat){
|
||||||
taosArrayDestroy(info->colsContainer);
|
taosArrayDestroy(info->colsContainer);
|
||||||
}
|
}
|
||||||
|
destroyRequest(info->pRequest);
|
||||||
taosMemoryFreeClear(info);
|
taosMemoryFreeClear(info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1405,11 +1415,6 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol
|
||||||
((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
|
((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
|
||||||
|
|
||||||
info->taos = (STscObj *)taos;
|
info->taos = (STscObj *)taos;
|
||||||
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
|
|
||||||
if(code != TSDB_CODE_SUCCESS){
|
|
||||||
uError("SML:0x%"PRIx64" get catalog error %d", info->id, code);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
info->precision = precision;
|
info->precision = precision;
|
||||||
info->protocol = protocol;
|
info->protocol = protocol;
|
||||||
|
@ -2158,7 +2163,6 @@ end:
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t smlInsertData(SSmlHandle* info) {
|
static int32_t smlInsertData(SSmlHandle* info) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -2200,10 +2204,12 @@ static int32_t smlInsertData(SSmlHandle* info) {
|
||||||
}
|
}
|
||||||
info->cost.insertRpcTime = taosGetTimestampUs();
|
info->cost.insertRpcTime = taosGetTimestampUs();
|
||||||
|
|
||||||
launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
|
//launchQueryImpl(info->pRequest, info->pQuery, false, NULL);
|
||||||
|
// info->affectedRows = taos_affected_rows(info->pRequest);
|
||||||
|
// return info->pRequest->code;
|
||||||
|
|
||||||
info->affectedRows = taos_affected_rows(info->pRequest);
|
launchAsyncQuery(info->pRequest, info->pQuery);
|
||||||
return info->pRequest->code;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void smlPrintStatisticInfo(SSmlHandle *info){
|
static void smlPrintStatisticInfo(SSmlHandle *info){
|
||||||
|
@ -2284,30 +2290,53 @@ cleanup:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t isSchemalessDb(SSmlHandle* info){
|
static int32_t isSchemalessDb(STscObj *taos, SCatalog *catalog){
|
||||||
SName name;
|
SName name;
|
||||||
tNameSetDbName(&name, info->taos->acctId, info->taos->db, strlen(info->taos->db));
|
tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db));
|
||||||
char dbFname[TSDB_DB_FNAME_LEN] = {0};
|
char dbFname[TSDB_DB_FNAME_LEN] = {0};
|
||||||
tNameGetFullDbName(&name, dbFname);
|
tNameGetFullDbName(&name, dbFname);
|
||||||
SDbCfgInfo pInfo = {0};
|
SDbCfgInfo pInfo = {0};
|
||||||
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
|
SEpSet ep = getEpSet_s(&taos->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
int32_t code = catalogGetDBCfg(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, dbFname, &pInfo);
|
int32_t code = catalogGetDBCfg(catalog, taos->pAppInfo->pTransporter, &ep, dbFname, &pInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
info->pRequest->code = code;
|
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "catalogGetDBCfg error, code:", tstrerror(code));
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pInfo.pRetensions);
|
taosArrayDestroy(pInfo.pRetensions);
|
||||||
|
|
||||||
if (!pInfo.schemaless){
|
if (!pInfo.schemaless){
|
||||||
info->pRequest->code = TSDB_CODE_SML_INVALID_DB_CONF;
|
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "can not insert into schemaless db:", dbFname);
|
|
||||||
return TSDB_CODE_SML_INVALID_DB_CONF;
|
return TSDB_CODE_SML_INVALID_DB_CONF;
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void smlInsertCallback(void* param, void* res, int32_t code) {
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("failed to execute, reason:%s\n", taos_errstr(res));
|
||||||
|
}
|
||||||
|
SRequestObj *pRequest = (SRequestObj *)res;
|
||||||
|
int32_t rows = taos_affected_rows(pRequest);
|
||||||
|
SSmlHandle* info = (SSmlHandle *)param;
|
||||||
|
|
||||||
|
// lock
|
||||||
|
taosThreadSpinLock(&info->params.lock);
|
||||||
|
info->params.request->body.resInfo.numOfRows += rows;
|
||||||
|
if(code != TSDB_CODE_SUCCESS){
|
||||||
|
info->params.request->code = code;
|
||||||
|
}
|
||||||
|
taosThreadSpinUnlock(&info->params.lock);
|
||||||
|
// unlock
|
||||||
|
|
||||||
|
printf("SML:0x%"PRIx64" insert finished, code: %d, total: %d, insert: %d\n", info->id, code, info->affectedRows, rows);
|
||||||
|
Params pParam = info->params;
|
||||||
|
bool isLast = info->isLast;
|
||||||
|
smlDestroyInfo(info);
|
||||||
|
|
||||||
|
if(isLast){
|
||||||
|
tsem_post(&pParam.sem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* taos_schemaless_insert() parse and insert data points into database according to
|
* taos_schemaless_insert() parse and insert data points into database according to
|
||||||
* different protocol.
|
* different protocol.
|
||||||
|
@ -2336,48 +2365,92 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSmlHandle* info = smlBuildSmlInfo(taos, request, (SMLProtocolType)protocol, precision);
|
((STscObj *)taos)->schemalessType = 1;
|
||||||
if(!info){
|
SSmlMsgBuf msg = {.buf = request->msgBuf, .len = ERROR_MSG_BUF_DEFAULT_SIZE};
|
||||||
return (TAOS_RES*)request;
|
|
||||||
}
|
|
||||||
|
|
||||||
info->taos->schemalessType = 1;
|
Params params = {.request = request};
|
||||||
if(request->pDb == NULL){
|
tsem_init(¶ms.sem, 0, 0);
|
||||||
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
taosThreadSpinInit(&(params.lock), 0);
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "Database not specified", NULL);
|
|
||||||
|
int32_t code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, ¶ms.catalog);
|
||||||
|
if(code != TSDB_CODE_SUCCESS){
|
||||||
|
uError("SML get catalog error %d", code);
|
||||||
|
request->code = code;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(isSchemalessDb(info) != TSDB_CODE_SUCCESS){
|
if(request->pDb == NULL){
|
||||||
|
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||||
|
smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(isSchemalessDb(taos, params.catalog) != TSDB_CODE_SUCCESS){
|
||||||
request->code = TSDB_CODE_SML_INVALID_DB_CONF;
|
request->code = TSDB_CODE_SML_INVALID_DB_CONF;
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "Cannot write data to a non schemaless database", NULL);
|
smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!lines) {
|
if (!lines) {
|
||||||
request->code = TSDB_CODE_SML_INVALID_DATA;
|
request->code = TSDB_CODE_SML_INVALID_DATA;
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "lines is null", NULL);
|
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL){
|
if(protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL){
|
||||||
request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
|
request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "protocol invalidate", NULL);
|
smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(protocol == TSDB_SML_LINE_PROTOCOL && (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)){
|
if(protocol == TSDB_SML_LINE_PROTOCOL && (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)){
|
||||||
request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
|
request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "precision invalidate for line protocol", NULL);
|
smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
info->pRequest->code = smlProcess(info, lines, numLines);
|
int32_t perBatch = 20000;
|
||||||
|
for (int i = 0; i < ceil(((double)numLines)/perBatch); ++i) {
|
||||||
|
SRequestObj* req = (SRequestObj*)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
|
||||||
|
if(!req){
|
||||||
|
request->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
uError("SML:taos_schemaless_insert error request is null");
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
SSmlHandle* info = smlBuildSmlInfo(taos, req, (SMLProtocolType)protocol, precision);
|
||||||
|
if(!info){
|
||||||
|
request->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
uError("SML:taos_schemaless_insert error SSmlHandle is null");
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(numLines >= perBatch){
|
||||||
|
numLines -= perBatch;
|
||||||
|
info->isLast = false;
|
||||||
|
}else{
|
||||||
|
perBatch = numLines;
|
||||||
|
numLines = 0;
|
||||||
|
info->isLast = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
info->params = params;
|
||||||
|
info->pCatalog = params.catalog;
|
||||||
|
info->affectedRows = perBatch;
|
||||||
|
info->pRequest->body.queryFp = smlInsertCallback;
|
||||||
|
info->pRequest->body.param = info;
|
||||||
|
code = smlProcess(info, lines, perBatch);
|
||||||
|
lines += perBatch;
|
||||||
|
if (code != TSDB_CODE_SUCCESS){
|
||||||
|
info->pRequest->body.queryFp(info, req, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tsem_wait(¶ms.sem);
|
||||||
|
|
||||||
end:
|
end:
|
||||||
info->taos->schemalessType = 0;
|
taosThreadSpinDestroy(¶ms.lock);
|
||||||
uDebug("result:%s", info->msgBuf.buf);
|
tsem_destroy(¶ms.sem);
|
||||||
smlDestroyInfo(info);
|
((STscObj *)taos)->schemalessType = 0;
|
||||||
|
uDebug("result:%s", request->msgBuf);
|
||||||
return (TAOS_RES*)request;
|
return (TAOS_RES*)request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue