fix:modify end condition for schemaless

This commit is contained in:
wangmm0220 2022-09-22 11:46:14 +08:00
parent 048d0440b3
commit 09dbaffa9d
1 changed files with 6 additions and 6 deletions

View File

@ -151,13 +151,14 @@ typedef struct {
typedef struct { typedef struct {
SRequestObj *request; SRequestObj *request;
tsem_t sem; tsem_t sem;
int32_t cnt;
int32_t total;
TdThreadSpinlock lock; TdThreadSpinlock lock;
} Params; } Params;
typedef struct { typedef struct {
int64_t id; int64_t id;
Params *params; Params *params;
bool isLast;
SMLProtocolType protocol; SMLProtocolType protocol;
int8_t precision; int8_t precision;
@ -2451,6 +2452,7 @@ static void smlInsertCallback(void *param, void *res, int32_t code) {
uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf); uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
// lock // lock
taosThreadSpinLock(&info->params->lock); taosThreadSpinLock(&info->params->lock);
info->params->cnt++;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
info->params->request->code = code; info->params->request->code = code;
info->params->request->body.resInfo.numOfRows += rows; info->params->request->body.resInfo.numOfRows += rows;
@ -2462,13 +2464,12 @@ static void smlInsertCallback(void *param, void *res, int32_t code) {
uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows); uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
Params *pParam = info->params; Params *pParam = info->params;
bool isLast = info->isLast;
info->cost.endTime = taosGetTimestampUs(); info->cost.endTime = taosGetTimestampUs();
info->cost.code = code; info->cost.code = code;
smlPrintStatisticInfo(info); smlPrintStatisticInfo(info);
smlDestroyInfo(info); smlDestroyInfo(info);
if (isLast) { if (info->params->cnt == info->params->total) {
tsem_post(&pParam->sem); tsem_post(&pParam->sem);
} }
} }
@ -2512,7 +2513,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
pTscObj->schemalessType = 1; pTscObj->schemalessType = 1;
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
Params params; Params params = {0};
params.request = request; params.request = request;
tsem_init(&params.sem, 0, 0); tsem_init(&params.sem, 0, 0);
taosThreadSpinInit(&(params.lock), 0); taosThreadSpinInit(&(params.lock), 0);
@ -2557,6 +2558,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
} }
batchs = ceil(((double)numLines) / LINE_BATCH); batchs = ceil(((double)numLines) / LINE_BATCH);
params.total = batchs;
for (int i = 0; i < batchs; ++i) { for (int i = 0; i < batchs; ++i) {
SRequestObj* req = (SRequestObj*)createRequest(pTscObj->id, TSDB_SQL_INSERT); SRequestObj* req = (SRequestObj*)createRequest(pTscObj->id, TSDB_SQL_INSERT);
if(!req){ if(!req){
@ -2575,11 +2577,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
if (numLines > perBatch) { if (numLines > perBatch) {
numLines -= perBatch; numLines -= perBatch;
info->isLast = false;
} else { } else {
perBatch = numLines; perBatch = numLines;
numLines = 0; numLines = 0;
info->isLast = true;
} }
info->params = &params; info->params = &params;