Merge pull request #20554 from taosdata/fix/TS-2828
fix: add retry if insert error with meta change
This commit is contained in:
commit
ea0cc2c584
|
@ -0,0 +1 @@
|
||||||
|
#include<taos.h>
|
|
@ -70,7 +70,7 @@ extern "C" {
|
||||||
#define VALUE_LEN 6
|
#define VALUE_LEN 6
|
||||||
|
|
||||||
#define OTD_JSON_FIELDS_NUM 4
|
#define OTD_JSON_FIELDS_NUM 4
|
||||||
#define MAX_RETRY_TIMES 5
|
#define MAX_RETRY_TIMES 100
|
||||||
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
|
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
|
|
@ -543,7 +543,7 @@ void taos_init_imp(void) {
|
||||||
|
|
||||||
if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
|
if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
|
||||||
// ignore create log failed, only print
|
// ignore create log failed, only print
|
||||||
printf(" WARING: Create taoslog failed. configDir=%s\n", configDir);
|
printf(" WARING: Create taoslog failed:%s. configDir=%s\n", strerror(errno), configDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) {
|
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) {
|
||||||
|
|
|
@ -750,6 +750,7 @@ end:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas start, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema);
|
||||||
if (info->dataFormat && !info->needModifySchema) {
|
if (info->dataFormat && !info->needModifySchema) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -779,6 +780,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||||
|
|
||||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
|
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas create table:%s", info->id, pName.tname);
|
||||||
SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
|
SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
|
||||||
SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
|
SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
|
||||||
code = smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
|
code = smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
|
||||||
|
@ -818,6 +820,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
if (action != SCHEMA_ACTION_NULL) {
|
if (action != SCHEMA_ACTION_NULL) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table tag, table:%s, action:%d", info->id, pName.tname, action);
|
||||||
SArray *pColumns =
|
SArray *pColumns =
|
||||||
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
|
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
|
||||||
SArray *pTags =
|
SArray *pTags =
|
||||||
|
@ -869,6 +872,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
if (action != SCHEMA_ACTION_NULL) {
|
if (action != SCHEMA_ACTION_NULL) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table col, table:%s, action:%d", info->id, pName.tname, action);
|
||||||
SArray *pColumns =
|
SArray *pColumns =
|
||||||
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
|
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
|
||||||
SArray *pTags =
|
SArray *pTags =
|
||||||
|
@ -935,15 +939,19 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||||
}
|
}
|
||||||
|
|
||||||
sTableData->tableMeta = pTableMeta;
|
sTableData->tableMeta = pTableMeta;
|
||||||
|
uDebug("SML:0x%" PRIx64 "modify schema uid:%" PRIu64 ", sversion:%d, tversion:%d", info->id, pTableMeta->uid, pTableMeta->sversion, pTableMeta->tversion)
|
||||||
tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
|
tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
|
||||||
}
|
}
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas end success, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
end:
|
end:
|
||||||
taosHashCleanup(hashTmp);
|
taosHashCleanup(hashTmp);
|
||||||
taosMemoryFreeClear(pTableMeta);
|
taosMemoryFreeClear(pTableMeta);
|
||||||
// catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
|
catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
|
||||||
|
uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code, tstrerror(code), info->dataFormat, info->needModifySchema);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1019,8 +1027,9 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
|
||||||
} else {
|
} else {
|
||||||
size_t tmp = taosArrayGetSize(metaArray);
|
size_t tmp = taosArrayGetSize(metaArray);
|
||||||
if (tmp > INT16_MAX) {
|
if (tmp > INT16_MAX) {
|
||||||
|
smlBuildInvalidDataMsg(msg, "too many cols or tags", kv->key);
|
||||||
uError("too many cols or tags");
|
uError("too many cols or tags");
|
||||||
return -1;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
}
|
}
|
||||||
int16_t size = tmp;
|
int16_t size = tmp;
|
||||||
int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
|
int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
|
||||||
|
@ -1170,6 +1179,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlParseLineBottom(SSmlHandle *info) {
|
static int32_t smlParseLineBottom(SSmlHandle *info) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLineBottom start, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
|
||||||
if (info->dataFormat) return TSDB_CODE_SUCCESS;
|
if (info->dataFormat) return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
for (int32_t i = 0; i < info->lineNum; i++) {
|
for (int32_t i = 0; i < info->lineNum; i++) {
|
||||||
|
@ -1212,6 +1222,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
|
||||||
SSmlSTableMeta **tableMeta =
|
SSmlSTableMeta **tableMeta =
|
||||||
(SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
|
(SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
|
||||||
if (tableMeta) { // update meta
|
if (tableMeta) { // update meta
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLineBottom update meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
|
||||||
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf);
|
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf);
|
||||||
if (ret == TSDB_CODE_SUCCESS) {
|
if (ret == TSDB_CODE_SUCCESS) {
|
||||||
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf);
|
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf);
|
||||||
|
@ -1226,7 +1237,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
|
||||||
// uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
|
// uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
|
||||||
// return ret;
|
// return ret;
|
||||||
// }
|
// }
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
|
||||||
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
|
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
|
||||||
smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
|
smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
|
||||||
if(terrno == TSDB_CODE_DUP_KEY){return terrno;}
|
if(terrno == TSDB_CODE_DUP_KEY){return terrno;}
|
||||||
|
@ -1234,12 +1245,14 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
|
||||||
taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
|
taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlInsertData(SSmlHandle *info) {
|
static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlInsertData start, format:%d", info->id, info->dataFormat);
|
||||||
|
|
||||||
if(info->pRequest->dbList == NULL){
|
if(info->pRequest->dbList == NULL){
|
||||||
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
|
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
|
||||||
|
@ -1284,6 +1297,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
// use tablemeta of stable to save vgid and uid of child table
|
// use tablemeta of stable to save vgid and uid of child table
|
||||||
(*pMeta)->tableMeta->vgId = vg.vgId;
|
(*pMeta)->tableMeta->vgId = vg.vgId;
|
||||||
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
|
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlInsertData table:%s, uid:%" PRIu64 ", format:%d", info->id, pName.tname, tableData->uid, info->dataFormat);
|
||||||
|
|
||||||
code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
|
code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
|
||||||
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
|
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
|
||||||
|
@ -1306,16 +1320,18 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
||||||
|
|
||||||
launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
|
launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlInsertData end, format:%d, code:%d,%s", info->id, info->dataFormat, info->pRequest->code, tstrerror(info->pRequest->code));
|
||||||
|
|
||||||
return info->pRequest->code;
|
return info->pRequest->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void smlPrintStatisticInfo(SSmlHandle *info) {
|
static void smlPrintStatisticInfo(SSmlHandle *info) {
|
||||||
uDebug(
|
uDebug(
|
||||||
"SML:0x%" PRIx64
|
"SML:0x%" PRIx64
|
||||||
" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
|
" smlInsertLines result, code:%d, msg:%s, lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
|
||||||
parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
|
parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
|
||||||
"",
|
"",
|
||||||
info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
|
info->id, info->cost.code, tstrerror(info->cost.code), info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
|
||||||
info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
|
info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
|
||||||
info->cost.schemaTime - info->cost.parseTime, info->cost.insertBindTime - info->cost.schemaTime,
|
info->cost.schemaTime - info->cost.parseTime, info->cost.insertBindTime - info->cost.schemaTime,
|
||||||
info->cost.insertRpcTime - info->cost.insertBindTime, info->cost.endTime - info->cost.insertRpcTime,
|
info->cost.insertRpcTime - info->cost.insertBindTime, info->cost.endTime - info->cost.insertRpcTime,
|
||||||
|
@ -1360,6 +1376,7 @@ int32_t smlClearForRerun(SSmlHandle *info) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
|
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLine start", info->id);
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
||||||
if (lines) {
|
if (lines) {
|
||||||
|
@ -1395,8 +1412,16 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len,
|
char cTmp = 0; // for print tmp if is raw
|
||||||
(info->isRawLine ? "rawdata" : tmp));
|
if(info->isRawLine){
|
||||||
|
cTmp = tmp[len - 1];
|
||||||
|
tmp[len - 1] = '\0';
|
||||||
|
}
|
||||||
|
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, numLines:%d, protocol:%d, len:%d, sql:%s", info->id, info->isRawLine, numLines, info->protocol, len, tmp);
|
||||||
|
if(info->isRawLine){
|
||||||
|
tmp[len - 1] = cTmp;
|
||||||
|
}
|
||||||
|
|
||||||
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
|
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
|
||||||
if (info->dataFormat) {
|
if (info->dataFormat) {
|
||||||
|
@ -1421,6 +1446,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
if (info->reRun) {
|
if (info->reRun) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLine re run", info->id);
|
||||||
i = 0;
|
i = 0;
|
||||||
rawLine = oldRaw;
|
rawLine = oldRaw;
|
||||||
code = smlClearForRerun(info);
|
code = smlClearForRerun(info);
|
||||||
|
@ -1431,6 +1457,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
|
||||||
}
|
}
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLine end", info->id);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1461,7 +1488,8 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
|
||||||
do {
|
do {
|
||||||
code = smlModifyDBSchemas(info);
|
code = smlModifyDBSchemas(info);
|
||||||
if (code == 0) break;
|
if (code == 0) break;
|
||||||
taosMsleep(200);
|
taosMsleep(500);
|
||||||
|
uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
|
||||||
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
|
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -1488,6 +1516,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
|
||||||
}
|
}
|
||||||
SRequestObj *request = NULL;
|
SRequestObj *request = NULL;
|
||||||
SSmlHandle *info = NULL;
|
SSmlHandle *info = NULL;
|
||||||
|
int cnt = 0;
|
||||||
while(1){
|
while(1){
|
||||||
request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
|
request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
|
||||||
if (request == NULL) {
|
if (request == NULL) {
|
||||||
|
@ -1542,16 +1571,22 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
|
||||||
request->code = code;
|
request->code = code;
|
||||||
info->cost.endTime = taosGetTimestampUs();
|
info->cost.endTime = taosGetTimestampUs();
|
||||||
info->cost.code = code;
|
info->cost.code = code;
|
||||||
smlPrintStatisticInfo(info);
|
if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING
|
||||||
if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING){
|
|| code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT){
|
||||||
|
if(cnt++ >= 10){
|
||||||
|
uInfo("SML:%"PRIx64" retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
taosMsleep(100);
|
||||||
refreshMeta(request->pTscObj, request);
|
refreshMeta(request->pTscObj, request);
|
||||||
uInfo("SML:%"PRIx64" ver is old retry or object is creating code:%d", info->id, code);
|
uInfo("SML:%"PRIx64" retry:%d/10,ver is old retry or object is creating code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
|
||||||
smlDestroyInfo(info);
|
smlDestroyInfo(info);
|
||||||
info = NULL;
|
info = NULL;
|
||||||
taos_free_result(request);
|
taos_free_result(request);
|
||||||
request = NULL;
|
request = NULL;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
smlPrintStatisticInfo(info);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -583,12 +583,14 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
|
||||||
.i = ts,
|
.i = ts,
|
||||||
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
|
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
|
||||||
if (info->dataFormat) {
|
if (info->dataFormat) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format true, ts:%" PRId64, info->id, ts);
|
||||||
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
|
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
|
||||||
if(ret != TSDB_CODE_SUCCESS){return ret;}
|
if(ret != TSDB_CODE_SUCCESS){return ret;}
|
||||||
ret = smlBuildRow(info->currTableDataCtx);
|
ret = smlBuildRow(info->currTableDataCtx);
|
||||||
if(ret != TSDB_CODE_SUCCESS){return ret;}
|
if(ret != TSDB_CODE_SUCCESS){return ret;}
|
||||||
clearColValArray(info->currTableDataCtx->pValues);
|
clearColValArray(info->currTableDataCtx->pValues);
|
||||||
} else {
|
} else {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format false, ts:%" PRId64, info->id, ts);
|
||||||
taosArraySet(elements->colArray, 0, &kv);
|
taosArraySet(elements->colArray, 0, &kv);
|
||||||
}
|
}
|
||||||
info->preLine = *elements;
|
info->preLine = *elements;
|
||||||
|
|
|
@ -1238,13 +1238,13 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl) != 0) {
|
if (taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl) != 0) {
|
||||||
uError("failed to load cfg since %s", terrstr());
|
printf("failed to load cfg since %s", terrstr());
|
||||||
cfgCleanup(pCfg);
|
cfgCleanup(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cfgLoadFromArray(pCfg, pArgs) != 0) {
|
if (cfgLoadFromArray(pCfg, pArgs) != 0) {
|
||||||
uError("failed to load cfg from array since %s", terrstr());
|
printf("failed to load cfg from array since %s", terrstr());
|
||||||
cfgCleanup(pCfg);
|
cfgCleanup(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1260,13 +1260,13 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
|
||||||
|
|
||||||
if (taosMulModeMkDir(tsLogDir, 0777) != 0) {
|
if (taosMulModeMkDir(tsLogDir, 0777) != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
uError("failed to create dir:%s since %s", tsLogDir, terrstr());
|
printf("failed to create dir:%s since %s", tsLogDir, terrstr());
|
||||||
cfgCleanup(pCfg);
|
cfgCleanup(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosInitLog(logname, logFileNum) != 0) {
|
if (taosInitLog(logname, logFileNum) != 0) {
|
||||||
uError("failed to init log file since %s", terrstr());
|
printf("failed to init log file since %s", terrstr());
|
||||||
cfgCleanup(pCfg);
|
cfgCleanup(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -637,9 +637,9 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
if (metaRsp.metaRspLen > 0) {
|
if (metaRsp.metaRspLen > 0) {
|
||||||
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
|
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
|
||||||
",version:%" PRId64,
|
",ts:%" PRId64,
|
||||||
consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
||||||
metaRsp.rspOffset.version);
|
metaRsp.rspOffset.ts);
|
||||||
taosMemoryFree(metaRsp.metaRsp);
|
taosMemoryFree(metaRsp.metaRsp);
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -185,13 +185,12 @@ end:
|
||||||
|
|
||||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
||||||
int64_t offset = *fetchOffset;
|
int64_t offset = *fetchOffset;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
|
tqDebug("tmq poll: consumer:%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
|
||||||
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
|
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
|
||||||
*fetchOffset = offset - 1;
|
*fetchOffset = offset - 1;
|
||||||
code = -1;
|
code = -1;
|
||||||
|
|
|
@ -70,7 +70,7 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche
|
||||||
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
|
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
|
||||||
col_id_t t = lastColIdx + 1;
|
col_id_t t = lastColIdx + 1;
|
||||||
col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema));
|
col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema));
|
||||||
uDebug("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols);
|
uTrace("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols);
|
||||||
if (index < 0 && t > 0) {
|
if (index < 0 && t > 0) {
|
||||||
index = insFindCol(&sToken, 0, t, pSchema);
|
index = insFindCol(&sToken, 0, t, pSchema);
|
||||||
}
|
}
|
||||||
|
@ -345,7 +345,7 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
|
||||||
}
|
}
|
||||||
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
|
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
|
||||||
if (errno == E2BIG) {
|
if (errno == E2BIG) {
|
||||||
uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d", (int)kv->length, pColSchema->bytes);
|
uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length, pColSchema->bytes, kv->value);
|
||||||
buildInvalidOperationMsg(&pBuf, "value too long");
|
buildInvalidOperationMsg(&pBuf, "value too long");
|
||||||
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
|
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
|
||||||
goto end;
|
goto end;
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
#include "tjson.h"
|
#include "tjson.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
|
||||||
#define LOG_MAX_LINE_SIZE (1024)
|
#define LOG_MAX_LINE_SIZE (10024)
|
||||||
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
|
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
|
||||||
#define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024)
|
#define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024)
|
||||||
#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3)
|
#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3)
|
||||||
|
|
Loading…
Reference in New Issue