diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index c438196e71..984670ec1d 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1213,6 +1213,11 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { static int32_t smlInsertData(SSmlHandle *info) { int32_t code = TSDB_CODE_SUCCESS; + if(info->pRequest->dbList == NULL){ + info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN); + } + taosArrayPush(info->pRequest->dbList, info->pRequest->pDb); + SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL); while (oneTable) { SSmlTableInfo *tableData = *oneTable; @@ -1221,6 +1226,11 @@ static int32_t smlInsertData(SSmlHandle *info) { tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname)); memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName)); + if(info->pRequest->tableList == NULL){ + info->pRequest->tableList = taosArrayInit(1, sizeof(SName)); + } + taosArrayPush(info->pRequest->tableList, &pName); + SRequestConnInfo conn = {0}; conn.pTrans = info->taos->pAppInfo->pTransporter; conn.requestId = info->pRequest->requestId; @@ -1422,6 +1432,7 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL do { code = smlModifyDBSchemas(info); if (code == 0) break; + taosMsleep(200); } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES); if (code != 0) { @@ -1446,62 +1457,75 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } + SRequestObj *request = NULL; + SSmlHandle *info = NULL; + while(1){ + request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); + if (request == NULL) { + uError("SML:taos_schemaless_insert error request is null"); + return NULL; + } - SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); - if (request == NULL) { - uError("SML:taos_schemaless_insert error request is null"); - return NULL; + info = smlBuildSmlInfo(taos); + if (info == NULL) { + request->code = TSDB_CODE_OUT_OF_MEMORY; + uError("SML:taos_schemaless_insert error SSmlHandle is null"); + return (TAOS_RES *)request; + } + info->pRequest = request; + info->isRawLine = rawLine != NULL; + info->ttl = ttl; + info->precision = precision; + info->protocol = (TSDB_SML_PROTOCOL_TYPE)protocol; + info->msgBuf.buf = info->pRequest->msgBuf; + info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; + info->lineNum = numLines; + + SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; + if (request->pDb == NULL) { + request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; + smlBuildInvalidDataMsg(&msg, "Database not specified", NULL); + goto end; + } + + if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) { + request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE; + smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL); + goto end; + } + + if (protocol == TSDB_SML_LINE_PROTOCOL && + (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) { + request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE; + smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL); + goto end; + } + + if (protocol == TSDB_SML_JSON_PROTOCOL) { + numLines = 1; + } else if (numLines <= 0) { + request->code = TSDB_CODE_SML_INVALID_DATA; + smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL); + goto end; + } + + code = smlProcess(info, lines, rawLine, rawLineEnd, numLines); + request->code = code; + info->cost.endTime = taosGetTimestampUs(); + info->cost.code = code; + smlPrintStatisticInfo(info); + if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING){ + refreshMeta(request->pTscObj, request); + uInfo("SML:%"PRIx64" ver is old retry or object is creating:%d", info->id, code); + smlDestroyInfo(info); + info = NULL; + taos_free_result(request); + request = NULL; + continue; + } + break; } - SSmlHandle *info = smlBuildSmlInfo(taos); - if (info == NULL) { - request->code = TSDB_CODE_OUT_OF_MEMORY; - uError("SML:taos_schemaless_insert error SSmlHandle is null"); - return (TAOS_RES *)request; - } - info->pRequest = request; - info->isRawLine = rawLine != NULL; - info->ttl = ttl; - info->precision = precision; - info->protocol = (TSDB_SML_PROTOCOL_TYPE)protocol; - info->msgBuf.buf = info->pRequest->msgBuf; - info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; - info->lineNum = numLines; - - SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; - if (request->pDb == NULL) { - request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; - smlBuildInvalidDataMsg(&msg, "Database not specified", NULL); - goto end; - } - - if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) { - request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE; - smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL); - goto end; - } - - if (protocol == TSDB_SML_LINE_PROTOCOL && - (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) { - request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE; - smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL); - goto end; - } - - if (protocol == TSDB_SML_JSON_PROTOCOL) { - numLines = 1; - } else if (numLines <= 0) { - request->code = TSDB_CODE_SML_INVALID_DATA; - smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL); - goto end; - } - - code = smlProcess(info, lines, rawLine, rawLineEnd, numLines); - request->code = code; - info->cost.endTime = taosGetTimestampUs(); - info->cost.code = code; - smlPrintStatisticInfo(info); - end: smlDestroyInfo(info); return (TAOS_RES *)request; diff --git a/source/client/src/clientSmlLine.c b/source/client/src/clientSmlLine.c index 66f1316cd5..f5ae077b5d 100644 --- a/source/client/src/clientSmlLine.c +++ b/source/client/src/clientSmlLine.c @@ -436,7 +436,7 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin // bind data ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, cnt + 1); if (unlikely(ret != TSDB_CODE_SUCCESS)) { - uError("smlBuildCol error, retry"); + uDebug("smlBuildCol error, retry"); info->dataFormat = false; info->reRun = true; return TSDB_CODE_SUCCESS; @@ -582,8 +582,10 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine .i = ts, .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes}; if (info->dataFormat) { - smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0); - smlBuildRow(info->currTableDataCtx); + ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0); + if(ret != TSDB_CODE_SUCCESS){return ret;} + ret = smlBuildRow(info->currTableDataCtx); + if(ret != TSDB_CODE_SUCCESS){return ret;} clearColValArray(info->currTableDataCtx->pValues); } else { taosArraySet(elements->colArray, 0, &kv); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 04d54a95ae..ac78ddc23c 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1110,11 +1110,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); -#ifndef NDEBUG - qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid, - pTableScanInfo->currentTable, pInfo->pTableScanOp->resultInfo.totalRows); + qDebug("switch to next table %" PRId64 " ts %" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows); pInfo->pTableScanOp->resultInfo.totalRows = 0; -#endif bool found = false; for (int32_t i = 0; i < numOfTables; i++) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fccb53f0cd..40b9597643 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1608,8 +1608,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { - qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64, pResult->info.rows, - pResult->info.window.skey, pResult->info.window.ekey); qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion); pTaskInfo->streamInfo.returned = 1; diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index 4742921d08..d6c1072669 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -201,6 +201,7 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32 SSmlKv* kv = (SSmlKv*)data; if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type){ ret = TSDB_CODE_SML_INVALID_DATA; + uError("SML smlBuildCol error col not same %s", pColSchema->name); goto end; } if (kv->type == TSDB_DATA_TYPE_NCHAR) {