Merge pull request #16999 from taosdata/feature/TD-14761
fix:modify end condition for schemaless
This commit is contained in:
commit
de2b9fdd6e
|
@ -151,13 +151,14 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SRequestObj *request;
|
SRequestObj *request;
|
||||||
tsem_t sem;
|
tsem_t sem;
|
||||||
|
int32_t cnt;
|
||||||
|
int32_t total;
|
||||||
TdThreadSpinlock lock;
|
TdThreadSpinlock lock;
|
||||||
} Params;
|
} Params;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t id;
|
int64_t id;
|
||||||
Params *params;
|
Params *params;
|
||||||
bool isLast;
|
|
||||||
|
|
||||||
SMLProtocolType protocol;
|
SMLProtocolType protocol;
|
||||||
int8_t precision;
|
int8_t precision;
|
||||||
|
@ -2449,28 +2450,26 @@ static void smlInsertCallback(void *param, void *res, int32_t code) {
|
||||||
int32_t rows = taos_affected_rows(pRequest);
|
int32_t rows = taos_affected_rows(pRequest);
|
||||||
|
|
||||||
uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
|
uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
|
||||||
// lock
|
|
||||||
taosThreadSpinLock(&info->params->lock);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
info->params->request->code = code;
|
|
||||||
info->params->request->body.resInfo.numOfRows += rows;
|
|
||||||
}else{
|
|
||||||
info->params->request->body.resInfo.numOfRows += info->affectedRows;
|
|
||||||
}
|
|
||||||
taosThreadSpinUnlock(&info->params->lock);
|
|
||||||
// unlock
|
|
||||||
|
|
||||||
uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
|
|
||||||
Params *pParam = info->params;
|
Params *pParam = info->params;
|
||||||
bool isLast = info->isLast;
|
// lock
|
||||||
|
taosThreadSpinLock(&pParam->lock);
|
||||||
|
pParam->cnt++;
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
pParam->request->code = code;
|
||||||
|
pParam->request->body.resInfo.numOfRows += rows;
|
||||||
|
}else{
|
||||||
|
pParam->request->body.resInfo.numOfRows += info->affectedRows;
|
||||||
|
}
|
||||||
|
if (pParam->cnt == pParam->total) {
|
||||||
|
tsem_post(&pParam->sem);
|
||||||
|
}
|
||||||
|
taosThreadSpinUnlock(&pParam->lock);
|
||||||
|
// unlock
|
||||||
|
uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
|
||||||
info->cost.endTime = taosGetTimestampUs();
|
info->cost.endTime = taosGetTimestampUs();
|
||||||
info->cost.code = code;
|
info->cost.code = code;
|
||||||
smlPrintStatisticInfo(info);
|
smlPrintStatisticInfo(info);
|
||||||
smlDestroyInfo(info);
|
smlDestroyInfo(info);
|
||||||
|
|
||||||
if (isLast) {
|
|
||||||
tsem_post(&pParam->sem);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2512,7 +2511,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
|
||||||
pTscObj->schemalessType = 1;
|
pTscObj->schemalessType = 1;
|
||||||
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
||||||
|
|
||||||
Params params;
|
Params params = {0};
|
||||||
params.request = request;
|
params.request = request;
|
||||||
tsem_init(¶ms.sem, 0, 0);
|
tsem_init(¶ms.sem, 0, 0);
|
||||||
taosThreadSpinInit(&(params.lock), 0);
|
taosThreadSpinInit(&(params.lock), 0);
|
||||||
|
@ -2557,6 +2556,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
|
||||||
}
|
}
|
||||||
|
|
||||||
batchs = ceil(((double)numLines) / LINE_BATCH);
|
batchs = ceil(((double)numLines) / LINE_BATCH);
|
||||||
|
params.total = batchs;
|
||||||
for (int i = 0; i < batchs; ++i) {
|
for (int i = 0; i < batchs; ++i) {
|
||||||
SRequestObj* req = (SRequestObj*)createRequest(pTscObj->id, TSDB_SQL_INSERT);
|
SRequestObj* req = (SRequestObj*)createRequest(pTscObj->id, TSDB_SQL_INSERT);
|
||||||
if(!req){
|
if(!req){
|
||||||
|
@ -2575,11 +2575,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
|
||||||
|
|
||||||
if (numLines > perBatch) {
|
if (numLines > perBatch) {
|
||||||
numLines -= perBatch;
|
numLines -= perBatch;
|
||||||
info->isLast = false;
|
|
||||||
} else {
|
} else {
|
||||||
perBatch = numLines;
|
perBatch = numLines;
|
||||||
numLines = 0;
|
numLines = 0;
|
||||||
info->isLast = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
info->params = ¶ms;
|
info->params = ¶ms;
|
||||||
|
|
Loading…
Reference in New Issue