fix:add log for schmaless

This commit is contained in:
wangmm0220 2023-03-17 19:11:40 +08:00
parent 4fdef7421c
commit d5328b7a2b
4 changed files with 43 additions and 13 deletions

View File

@ -743,6 +743,7 @@ end:
} }
static int32_t smlModifyDBSchemas(SSmlHandle *info) { static int32_t smlModifyDBSchemas(SSmlHandle *info) {
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas start, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema);
if (info->dataFormat && !info->needModifySchema) { if (info->dataFormat && !info->needModifySchema) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -772,6 +773,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) { if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas create table:%s", info->id, pName.tname);
SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField)); SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField)); SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true); smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
@ -804,6 +806,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto end; goto end;
} }
if (action != SCHEMA_ACTION_NULL) { if (action != SCHEMA_ACTION_NULL) {
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table tag, table:%s, action:%d", info->id, pName.tname, action);
SArray *pColumns = SArray *pColumns =
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
SArray *pTags = SArray *pTags =
@ -851,6 +854,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto end; goto end;
} }
if (action != SCHEMA_ACTION_NULL) { if (action != SCHEMA_ACTION_NULL) {
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table col, table:%s, action:%d", info->id, pName.tname, action);
SArray *pColumns = SArray *pColumns =
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
SArray *pTags = SArray *pTags =
@ -913,15 +917,19 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
} }
sTableData->tableMeta = pTableMeta; sTableData->tableMeta = pTableMeta;
uDebug("SML:0x%" PRIx64 "modify schema uid:%" PRIu64 ", sversion:%d, tversion:%d", info->id, pTableMeta->uid, pTableMeta->sversion, pTableMeta->tversion)
tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp); tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
} }
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas end success, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema);
return 0; return 0;
end: end:
taosHashCleanup(hashTmp); taosHashCleanup(hashTmp);
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code, tstrerror(code), info->dataFormat, info->needModifySchema);
return code; return code;
} }
@ -997,8 +1005,9 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
} else { } else {
size_t tmp = taosArrayGetSize(metaArray); size_t tmp = taosArrayGetSize(metaArray);
if (tmp > INT16_MAX) { if (tmp > INT16_MAX) {
smlBuildInvalidDataMsg(msg, "too many cols or tags", kv->key);
uError("too many cols or tags"); uError("too many cols or tags");
return -1; return TSDB_CODE_SML_INVALID_DATA;
} }
int16_t size = tmp; int16_t size = tmp;
int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES); int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
@ -1145,6 +1154,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
} }
static int32_t smlParseLineBottom(SSmlHandle *info) { static int32_t smlParseLineBottom(SSmlHandle *info) {
uDebug("SML:0x%" PRIx64 " smlParseLineBottom start, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
if (info->dataFormat) return TSDB_CODE_SUCCESS; if (info->dataFormat) return TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < info->lineNum; i++) { for (int32_t i = 0; i < info->lineNum; i++) {
@ -1184,6 +1194,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
SSmlSTableMeta **tableMeta = SSmlSTableMeta **tableMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
if (tableMeta) { // update meta if (tableMeta) { // update meta
uDebug("SML:0x%" PRIx64 " smlParseLineBottom update meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf); ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf);
if (ret == TSDB_CODE_SUCCESS) { if (ret == TSDB_CODE_SUCCESS) {
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf); ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf);
@ -1198,7 +1209,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
// uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id); // uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
// return ret; // return ret;
// } // }
uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags); smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
if(terrno == TSDB_CODE_DUP_KEY){return terrno;} if(terrno == TSDB_CODE_DUP_KEY){return terrno;}
@ -1206,12 +1217,14 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES); taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
} }
} }
uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlInsertData(SSmlHandle *info) { static int32_t smlInsertData(SSmlHandle *info) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
uDebug("SML:0x%" PRIx64 " smlInsertData start, format:%d", info->id, info->dataFormat);
if(info->pRequest->dbList == NULL){ if(info->pRequest->dbList == NULL){
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN); info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
@ -1256,6 +1269,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
// use tablemeta of stable to save vgid and uid of child table // use tablemeta of stable to save vgid and uid of child table
(*pMeta)->tableMeta->vgId = vg.vgId; (*pMeta)->tableMeta->vgId = vg.vgId;
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
uDebug("SML:0x%" PRIx64 " smlInsertData table:%s, uid:%" PRIu64 ", format:%d, code:%d,%s", info->id, pName.tname, tableData->uid, info->dataFormat);
code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols, code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
@ -1278,16 +1292,18 @@ static int32_t smlInsertData(SSmlHandle *info) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
launchQueryImpl(info->pRequest, info->pQuery, true, NULL); launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
uDebug("SML:0x%" PRIx64 " smlInsertData end, format:%d, code:%d,%s", info->id, info->dataFormat, info->pRequest->code, tstrerror(info->pRequest->code));
return info->pRequest->code; return info->pRequest->code;
} }
static void smlPrintStatisticInfo(SSmlHandle *info) { static void smlPrintStatisticInfo(SSmlHandle *info) {
uDebug( uDebug(
"SML:0x%" PRIx64 "SML:0x%" PRIx64
" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \ " smlInsertLines result, code:%d, msg:%s, lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64 parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
"", "",
info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, info->id, info->cost.code, tstrerror(info->cost.code), info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables, info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
info->cost.schemaTime - info->cost.parseTime, info->cost.insertBindTime - info->cost.schemaTime, info->cost.schemaTime - info->cost.parseTime, info->cost.insertBindTime - info->cost.schemaTime,
info->cost.insertRpcTime - info->cost.insertBindTime, info->cost.endTime - info->cost.insertRpcTime, info->cost.insertRpcTime - info->cost.insertBindTime, info->cost.endTime - info->cost.insertRpcTime,
@ -1332,6 +1348,7 @@ int32_t smlClearForRerun(SSmlHandle *info) {
} }
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) { static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
uDebug("SML:0x%" PRIx64 " smlParseLine start", info->id);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (info->protocol == TSDB_SML_JSON_PROTOCOL) { if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
if (lines) { if (lines) {
@ -1367,8 +1384,16 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
} }
} }
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, char cTmp = 0; // for print tmp if is raw
(info->isRawLine ? "rawdata" : tmp)); if(info->isRawLine){
cTmp = tmp[len - 1];
tmp[len - 1] = '\0';
}
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, numLines:%d, protocol:%d, len:%d, sql:%s", info->id, info->isRawLine, numLines, info->protocol, len, tmp);
if(info->isRawLine){
tmp[len - 1] = cTmp;
}
if (info->protocol == TSDB_SML_LINE_PROTOCOL) { if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
if (info->dataFormat) { if (info->dataFormat) {
@ -1393,6 +1418,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
return code; return code;
} }
if (info->reRun) { if (info->reRun) {
uDebug("SML:0x%" PRIx64 " smlParseLine re run", info->id);
i = 0; i = 0;
rawLine = oldRaw; rawLine = oldRaw;
code = smlClearForRerun(info); code = smlClearForRerun(info);
@ -1403,6 +1429,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
} }
i++; i++;
} }
uDebug("SML:0x%" PRIx64 " smlParseLine end", info->id);
return code; return code;
} }
@ -1515,16 +1542,17 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
request->code = code; request->code = code;
info->cost.endTime = taosGetTimestampUs(); info->cost.endTime = taosGetTimestampUs();
info->cost.code = code; info->cost.code = code;
smlPrintStatisticInfo(info); if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING
if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING){ || code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT){
refreshMeta(request->pTscObj, request); refreshMeta(request->pTscObj, request);
uInfo("SML:%"PRIx64" ver is old retry or object is creating code:%d", info->id, code); uInfo("SML:%"PRIx64" ver is old retry or object is creating code:%d, msg:%s", info->id, code, tstrerror(code));
smlDestroyInfo(info); smlDestroyInfo(info);
info = NULL; info = NULL;
taos_free_result(request); taos_free_result(request);
request = NULL; request = NULL;
continue; continue;
} }
smlPrintStatisticInfo(info);
break; break;
} }

View File

@ -582,12 +582,14 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
.i = ts, .i = ts,
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes}; .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if (info->dataFormat) { if (info->dataFormat) {
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format true, ts:%" PRId64, info->id, ts);
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0); ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
if(ret != TSDB_CODE_SUCCESS){return ret;} if(ret != TSDB_CODE_SUCCESS){return ret;}
ret = smlBuildRow(info->currTableDataCtx); ret = smlBuildRow(info->currTableDataCtx);
if(ret != TSDB_CODE_SUCCESS){return ret;} if(ret != TSDB_CODE_SUCCESS){return ret;}
clearColValArray(info->currTableDataCtx->pValues); clearColValArray(info->currTableDataCtx->pValues);
} else { } else {
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format false, ts:%" PRId64, info->id, ts);
taosArraySet(elements->colArray, 0, &kv); taosArraySet(elements->colArray, 0, &kv);
} }
info->preLine = *elements; info->preLine = *elements;

View File

@ -70,7 +70,7 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key}; SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
col_id_t t = lastColIdx + 1; col_id_t t = lastColIdx + 1;
col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema)); col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema));
uDebug("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols); uTrace("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols);
if (index < 0 && t > 0) { if (index < 0 && t > 0) {
index = insFindCol(&sToken, 0, t, pSchema); index = insFindCol(&sToken, 0, t, pSchema);
} }
@ -345,7 +345,7 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
} }
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) { if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
if (errno == E2BIG) { if (errno == E2BIG) {
uError("sml bind taosMbsToUcs4 error, kv->value:%s, kv length:%d, bytes:%d", kv->value, (int)kv->length, pColSchema->bytes); uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length, pColSchema->bytes, kv->value);
buildInvalidOperationMsg(&pBuf, "value too long"); buildInvalidOperationMsg(&pBuf, "value too long");
ret = TSDB_CODE_PAR_VALUE_TOO_LONG; ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
goto end; goto end;

View File

@ -21,7 +21,7 @@
#include "tjson.h" #include "tjson.h"
#include "tglobal.h" #include "tglobal.h"
#define LOG_MAX_LINE_SIZE (1024) #define LOG_MAX_LINE_SIZE (10024)
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3) #define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
#define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024) #define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024)
#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3) #define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3)