diff --git a/examples/rust/wrapper.h b/examples/rust/wrapper.h new file mode 100644 index 0000000000..78857597a9 --- /dev/null +++ b/examples/rust/wrapper.h @@ -0,0 +1 @@ +#include diff --git a/source/client/inc/clientSml.h b/source/client/inc/clientSml.h index d8d41d8be6..92896e6f23 100644 --- a/source/client/inc/clientSml.h +++ b/source/client/inc/clientSml.h @@ -70,7 +70,7 @@ extern "C" { #define VALUE_LEN 6 #define OTD_JSON_FIELDS_NUM 4 -#define MAX_RETRY_TIMES 5 +#define MAX_RETRY_TIMES 100 typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; typedef enum { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index e3e20ee85d..de08ba66cc 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -543,7 +543,7 @@ void taos_init_imp(void) { if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) { // ignore create log failed, only print - printf(" WARING: Create taoslog failed. configDir=%s\n", configDir); + printf(" WARING: Create taoslog failed:%s. configDir=%s\n", strerror(errno), configDir); } if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) { diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 7f58b579fa..1719759822 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -750,6 +750,7 @@ end: } static int32_t smlModifyDBSchemas(SSmlHandle *info) { + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas start, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema); if (info->dataFormat && !info->needModifySchema) { return TSDB_CODE_SUCCESS; } @@ -779,6 +780,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) { + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas create table:%s", info->id, pName.tname); SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField)); SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField)); code = smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true); @@ -818,6 +820,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { goto end; } if (action != SCHEMA_ACTION_NULL) { + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table tag, table:%s, action:%d", info->id, pName.tname, action); SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); SArray *pTags = @@ -869,6 +872,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { goto end; } if (action != SCHEMA_ACTION_NULL) { + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table col, table:%s, action:%d", info->id, pName.tname, action); SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); SArray *pTags = @@ -935,15 +939,19 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { } sTableData->tableMeta = pTableMeta; - + uDebug("SML:0x%" PRIx64 "modify schema uid:%" PRIu64 ", sversion:%d, tversion:%d", info->id, pTableMeta->uid, pTableMeta->sversion, pTableMeta->tversion) tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp); } + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas end success, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema); + return 0; end: taosHashCleanup(hashTmp); taosMemoryFreeClear(pTableMeta); - // catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); + catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); + uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code, tstrerror(code), info->dataFormat, info->needModifySchema); + return code; } @@ -1019,8 +1027,9 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols } else { size_t tmp = taosArrayGetSize(metaArray); if (tmp > INT16_MAX) { + smlBuildInvalidDataMsg(msg, "too many cols or tags", kv->key); uError("too many cols or tags"); - return -1; + return TSDB_CODE_SML_INVALID_DATA; } int16_t size = tmp; int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES); @@ -1170,6 +1179,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) { } static int32_t smlParseLineBottom(SSmlHandle *info) { + uDebug("SML:0x%" PRIx64 " smlParseLineBottom start, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); if (info->dataFormat) return TSDB_CODE_SUCCESS; for (int32_t i = 0; i < info->lineNum; i++) { @@ -1212,6 +1222,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); if (tableMeta) { // update meta + uDebug("SML:0x%" PRIx64 " smlParseLineBottom update meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf); if (ret == TSDB_CODE_SUCCESS) { ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf); @@ -1226,7 +1237,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { // uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id); // return ret; // } - + uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags); if(terrno == TSDB_CODE_DUP_KEY){return terrno;} @@ -1234,12 +1245,14 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES); } } + uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); return TSDB_CODE_SUCCESS; } static int32_t smlInsertData(SSmlHandle *info) { int32_t code = TSDB_CODE_SUCCESS; + uDebug("SML:0x%" PRIx64 " smlInsertData start, format:%d", info->id, info->dataFormat); if(info->pRequest->dbList == NULL){ info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN); @@ -1284,6 +1297,7 @@ static int32_t smlInsertData(SSmlHandle *info) { // use tablemeta of stable to save vgid and uid of child table (*pMeta)->tableMeta->vgId = vg.vgId; (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid + uDebug("SML:0x%" PRIx64 " smlInsertData table:%s, uid:%" PRIu64 ", format:%d", info->id, pName.tname, tableData->uid, info->dataFormat); code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols, (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, @@ -1306,16 +1320,18 @@ static int32_t smlInsertData(SSmlHandle *info) { atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); launchQueryImpl(info->pRequest, info->pQuery, true, NULL); + uDebug("SML:0x%" PRIx64 " smlInsertData end, format:%d, code:%d,%s", info->id, info->dataFormat, info->pRequest->code, tstrerror(info->pRequest->code)); + return info->pRequest->code; } static void smlPrintStatisticInfo(SSmlHandle *info) { uDebug( "SML:0x%" PRIx64 - " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \ + " smlInsertLines result, code:%d, msg:%s, lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \ parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64 "", - info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, + info->id, info->cost.code, tstrerror(info->cost.code), info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables, info->cost.schemaTime - info->cost.parseTime, info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime, info->cost.endTime - info->cost.insertRpcTime, @@ -1360,6 +1376,7 @@ int32_t smlClearForRerun(SSmlHandle *info) { } static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) { + uDebug("SML:0x%" PRIx64 " smlParseLine start", info->id); int32_t code = TSDB_CODE_SUCCESS; if (info->protocol == TSDB_SML_JSON_PROTOCOL) { if (lines) { @@ -1395,8 +1412,16 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char } } - uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, - (info->isRawLine ? "rawdata" : tmp)); + char cTmp = 0; // for print tmp if is raw + if(info->isRawLine){ + cTmp = tmp[len - 1]; + tmp[len - 1] = '\0'; + } + + uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, numLines:%d, protocol:%d, len:%d, sql:%s", info->id, info->isRawLine, numLines, info->protocol, len, tmp); + if(info->isRawLine){ + tmp[len - 1] = cTmp; + } if (info->protocol == TSDB_SML_LINE_PROTOCOL) { if (info->dataFormat) { @@ -1421,6 +1446,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char return code; } if (info->reRun) { + uDebug("SML:0x%" PRIx64 " smlParseLine re run", info->id); i = 0; rawLine = oldRaw; code = smlClearForRerun(info); @@ -1431,6 +1457,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char } i++; } + uDebug("SML:0x%" PRIx64 " smlParseLine end", info->id); return code; } @@ -1461,7 +1488,8 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL do { code = smlModifyDBSchemas(info); if (code == 0) break; - taosMsleep(200); + taosMsleep(500); + uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum); } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES); if (code != 0) { @@ -1488,6 +1516,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, } SRequestObj *request = NULL; SSmlHandle *info = NULL; + int cnt = 0; while(1){ request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); if (request == NULL) { @@ -1542,16 +1571,22 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, request->code = code; info->cost.endTime = taosGetTimestampUs(); info->cost.code = code; - smlPrintStatisticInfo(info); - if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING){ + if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING + || code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT){ + if(cnt++ >= 10){ + uInfo("SML:%"PRIx64" retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code)); + break; + } + taosMsleep(100); refreshMeta(request->pTscObj, request); - uInfo("SML:%"PRIx64" ver is old retry or object is creating code:%d", info->id, code); + uInfo("SML:%"PRIx64" retry:%d/10,ver is old retry or object is creating code:%d, msg:%s", info->id, cnt, code, tstrerror(code)); smlDestroyInfo(info); info = NULL; taos_free_result(request); request = NULL; continue; } + smlPrintStatisticInfo(info); break; } diff --git a/source/client/src/clientSmlLine.c b/source/client/src/clientSmlLine.c index b1aea1bfaa..335e3a1dc7 100644 --- a/source/client/src/clientSmlLine.c +++ b/source/client/src/clientSmlLine.c @@ -583,12 +583,14 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine .i = ts, .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes}; if (info->dataFormat) { + uDebug("SML:0x%" PRIx64 " smlParseInfluxString format true, ts:%" PRId64, info->id, ts); ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0); if(ret != TSDB_CODE_SUCCESS){return ret;} ret = smlBuildRow(info->currTableDataCtx); if(ret != TSDB_CODE_SUCCESS){return ret;} clearColValArray(info->currTableDataCtx->pValues); } else { + uDebug("SML:0x%" PRIx64 " smlParseInfluxString format false, ts:%" PRId64, info->id, ts); taosArraySet(elements->colArray, 0, &kv); } info->preLine = *elements; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 0b4c7b88d7..aeeec1d61c 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1238,13 +1238,13 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi } if (taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl) != 0) { - uError("failed to load cfg since %s", terrstr()); + printf("failed to load cfg since %s", terrstr()); cfgCleanup(pCfg); return -1; } if (cfgLoadFromArray(pCfg, pArgs) != 0) { - uError("failed to load cfg from array since %s", terrstr()); + printf("failed to load cfg from array since %s", terrstr()); cfgCleanup(pCfg); return -1; } @@ -1260,13 +1260,13 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi if (taosMulModeMkDir(tsLogDir, 0777) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to create dir:%s since %s", tsLogDir, terrstr()); + printf("failed to create dir:%s since %s", tsLogDir, terrstr()); cfgCleanup(pCfg); return -1; } if (taosInitLog(logname, logFileNum) != 0) { - uError("failed to init log file since %s", terrstr()); + printf("failed to init log file since %s", terrstr()); cfgCleanup(pCfg); return -1; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1ee799a8cf..327d233d30 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -637,9 +637,9 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* if (metaRsp.metaRspLen > 0) { code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 - ",version:%" PRId64, + ",ts:%" PRId64, consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, - metaRsp.rspOffset.version); + metaRsp.rspOffset.ts); taosMemoryFree(metaRsp.metaRsp); tDeleteSTaosxRsp(&taosxRsp); return code; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 1c8088be7e..69fb9e4c3d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -185,13 +185,12 @@ end: int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) { int32_t code = 0; - taosThreadMutexLock(&pHandle->pWalReader->mutex); int64_t offset = *fetchOffset; while (1) { if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) { - tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", + tqDebug("tmq poll: consumer:%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset); *fetchOffset = offset - 1; code = -1; diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index 3c2b6499a4..106ee641af 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -70,7 +70,7 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key}; col_id_t t = lastColIdx + 1; col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema)); - uDebug("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols); + uTrace("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols); if (index < 0 && t > 0) { index = insFindCol(&sToken, 0, t, pSchema); } @@ -345,7 +345,7 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc } if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) { if (errno == E2BIG) { - uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d", (int)kv->length, pColSchema->bytes); + uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length, pColSchema->bytes, kv->value); buildInvalidOperationMsg(&pBuf, "value too long"); ret = TSDB_CODE_PAR_VALUE_TOO_LONG; goto end; diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 89dd51a892..bd9ea058b4 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -21,7 +21,7 @@ #include "tjson.h" #include "tglobal.h" -#define LOG_MAX_LINE_SIZE (1024) +#define LOG_MAX_LINE_SIZE (10024) #define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3) #define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024) #define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3)