feat:add async logic for schemaless

This commit is contained in:
wangmm0220 2022-06-10 13:54:44 +08:00
parent 1d14725880
commit a4fba1c70b
2 changed files with 17 additions and 19 deletions

View File

@ -69,6 +69,7 @@ for (int i = 1; i < keyLen; ++i) { \
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" " #define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" "
#define MAX_RETRY_TIMES 5 #define MAX_RETRY_TIMES 5
#define LINE_BATCH 20
//================================================================================================= //=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
@ -164,7 +165,7 @@ typedef struct{
typedef struct { typedef struct {
int64_t id; int64_t id;
Params params; Params *params;
bool isLast; bool isLast;
SMLProtocolType protocol; SMLProtocolType protocol;
@ -2311,29 +2312,24 @@ static int32_t isSchemalessDb(STscObj *taos, SCatalog *catalog){
} }
static void smlInsertCallback(void* param, void* res, int32_t code) { static void smlInsertCallback(void* param, void* res, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
uError("failed to execute, reason:%s\n", taos_errstr(res));
}
SRequestObj *pRequest = (SRequestObj *)res; SRequestObj *pRequest = (SRequestObj *)res;
int32_t rows = taos_affected_rows(pRequest);
SSmlHandle* info = (SSmlHandle *)param; SSmlHandle* info = (SSmlHandle *)param;
// lock // lock
taosThreadSpinLock(&info->params.lock);
info->params.request->body.resInfo.numOfRows += rows;
if(code != TSDB_CODE_SUCCESS){ if(code != TSDB_CODE_SUCCESS){
info->params.request->code = code; taosThreadSpinLock(&info->params->lock);
info->params->request->code = code;
taosThreadSpinUnlock(&info->params->lock);
} }
taosThreadSpinUnlock(&info->params.lock);
// unlock // unlock
printf("SML:0x%"PRIx64" insert finished, code: %d, total: %d, insert: %d\n", info->id, code, info->affectedRows, rows); printf("SML:0x%"PRIx64" insert finished, code: %d, total: %d\n", info->id, code, info->affectedRows);
Params pParam = info->params; Params *pParam = info->params;
bool isLast = info->isLast; bool isLast = info->isLast;
smlDestroyInfo(info); smlDestroyInfo(info);
if(isLast){ if(isLast){
tsem_post(&pParam.sem); tsem_post(&pParam->sem);
} }
} }
@ -2366,8 +2362,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
} }
((STscObj *)taos)->schemalessType = 1; ((STscObj *)taos)->schemalessType = 1;
SSmlMsgBuf msg = {.buf = request->msgBuf, .len = ERROR_MSG_BUF_DEFAULT_SIZE}; SSmlMsgBuf msg = {.len = ERROR_MSG_BUF_DEFAULT_SIZE, .buf = request->msgBuf};
int cnt = ceil(((double)numLines)/LINE_BATCH);
Params params = {.request = request}; Params params = {.request = request};
tsem_init(&params.sem, 0, 0); tsem_init(&params.sem, 0, 0);
taosThreadSpinInit(&(params.lock), 0); taosThreadSpinInit(&(params.lock), 0);
@ -2385,7 +2382,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
goto end; goto end;
} }
if(isSchemalessDb(taos, params.catalog) != TSDB_CODE_SUCCESS){ if(isSchemalessDb(((STscObj *)taos), params.catalog) != TSDB_CODE_SUCCESS){
request->code = TSDB_CODE_SML_INVALID_DB_CONF; request->code = TSDB_CODE_SML_INVALID_DB_CONF;
smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL); smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
goto end; goto end;
@ -2409,8 +2406,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
goto end; goto end;
} }
int32_t perBatch = 20000; for (int i = 0; i < cnt; ++i) {
for (int i = 0; i < ceil(((double)numLines)/perBatch); ++i) {
SRequestObj* req = (SRequestObj*)createRequest((STscObj *)taos, TSDB_SQL_INSERT); SRequestObj* req = (SRequestObj*)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
if(!req){ if(!req){
request->code = TSDB_CODE_OUT_OF_MEMORY; request->code = TSDB_CODE_OUT_OF_MEMORY;
@ -2424,7 +2420,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
goto end; goto end;
} }
if(numLines >= perBatch){ int32_t perBatch = LINE_BATCH;
if(numLines > perBatch){
numLines -= perBatch; numLines -= perBatch;
info->isLast = false; info->isLast = false;
}else{ }else{
@ -2433,7 +2431,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
info->isLast = true; info->isLast = true;
} }
info->params = params; info->params = &params;
info->pCatalog = params.catalog; info->pCatalog = params.catalog;
info->affectedRows = perBatch; info->affectedRows = perBatch;
info->pRequest->body.queryFp = smlInsertCallback; info->pRequest->body.queryFp = smlInsertCallback;

View File

@ -1325,7 +1325,7 @@ TEST(testCase, sml_oom_Test) {
pRes = taos_query(taos, "use oom"); pRes = taos_query(taos, "use oom");
taos_free_result(pRes); taos_free_result(pRes);
TAOS_RES* res = taos_schemaless_insert(taos, (char**)sql, 100, TSDB_SML_LINE_PROTOCOL, 0); TAOS_RES* res = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, 0);
ASSERT_EQ(taos_errno(res), 0); ASSERT_EQ(taos_errno(res), 0);
taos_free_result(pRes); taos_free_result(pRes);
} }