diff --git a/cmake/cmake.version b/cmake/cmake.version index d38ac40b90..de85025a8c 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.0.3.0") + SET(TD_VER_NUMBER "3.0.3.1") ENDIF () IF (DEFINED VERCOMPATIBLE) diff --git a/examples/rust/wrapper.h b/examples/rust/wrapper.h new file mode 100644 index 0000000000..78857597a9 --- /dev/null +++ b/examples/rust/wrapper.h @@ -0,0 +1 @@ +#include diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 691aae5593..d721ddd07e 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -214,7 +214,6 @@ enum { typedef struct { int8_t fetchType; - STqOffsetVal offset; union { SSDataBlock data; void* meta; diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 3dee59ab14..1da801be0d 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -147,9 +147,9 @@ typedef struct { } SMonStbInfo; typedef struct { - int32_t expire_time; - int64_t timeseries_used; - int64_t timeseries_total; + uint32_t expire_time; + int64_t timeseries_used; + int64_t timeseries_total; } SMonGrantInfo; typedef struct { diff --git a/packaging/testpackage.sh b/packaging/testpackage.sh index 9959d290e7..97226a86b5 100755 --- a/packaging/testpackage.sh +++ b/packaging/testpackage.sh @@ -107,7 +107,7 @@ elif [ ${testFile} = "tools" ];then originTdpPath="taosTools-${originversion}" packageName="${tdPath}-Linux-${cpuType}${packageLite}.${packageType}" originPackageName="${originTdpPath}-Linux-${cpuType}${packageLite}.${packageType}" - installCmd="install-tools.sh" + installCmd="install-taostools.sh" fi @@ -333,7 +333,7 @@ if [[ ${packageName} =~ "Lite" ]] || ([[ ${packageName} =~ "x64" ]] && [[ ${p wgetFile taosTools-2.1.3-Linux-x64.tar.gz v2.1.3 web tar xf taosTools-2.1.3-Linux-x64.tar.gz fi - cd taosTools-2.1.3 && bash install-tools.sh + cd taosTools-2.1.3 && bash install-taostools.sh elif ([[ ${packageName} =~ "arm64" ]] && [[ ${packageName} =~ "client" ]]);then echoColor G "===== install taos-tools arm when package is arm64-client =====" cd ${installPath} @@ -342,7 +342,7 @@ elif ([[ ${packageName} =~ "arm64" ]] && [[ ${packageName} =~ "client" ]]);then tar xf taosTools-2.1.3-Linux-arm64.tar.gz fi - cd taosTools-2.1.3 && bash install-tools.sh + cd taosTools-2.1.3 && bash install-taostools.sh fi echoColor G "===== start TDengine =====" @@ -361,18 +361,18 @@ rm -rf ${installPath}/${tdPath}/ # cd ${installPath} # wgetFile taosTools-2.1.2-Linux-x64.tar.gz . # tar xf taosTools-2.1.2-Linux-x64.tar.gz -# cd taosTools-2.1.2 && bash install-tools.sh +# cd taosTools-2.1.2 && bash install-taostools.sh # elif [[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "deb" ]] ;then # echoColor G "===== install taos-tools when package is lite or client =====" # cd ${installPath} # wgetFile taosTools-2.1.2-Linux-x64.tar.gz . # tar xf taosTools-2.1.2-Linux-x64.tar.gz -# cd taosTools-2.1.2 && bash install-tools.sh +# cd taosTools-2.1.2 && bash install-taostools.sh # elif [[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "rpm" ]] ;then # echoColor G "===== install taos-tools when package is lite or client =====" # cd ${installPath} # wgetFile taosTools-2.1.2-Linux-x64.tar.gz . # tar xf taosTools-2.1.2-Linux-x64.tar.gz -# cd taosTools-2.1.2 && bash install-tools.sh +# cd taosTools-2.1.2 && bash install-taostools.sh # fi diff --git a/packaging/tools/com.taosdata.taoskeeper.plist b/packaging/tools/com.taosdata.taoskeeper.plist new file mode 100644 index 0000000000..338b5d8e79 --- /dev/null +++ b/packaging/tools/com.taosdata.taoskeeper.plist @@ -0,0 +1,33 @@ + + + + + Label + com.tdengine.taoskeeper + ProgramArguments + + /usr/local/bin/taoskeeper + + ProcessType + Interactive + Disabled + + RunAtLoad + + LaunchOnlyOnce + + SessionCreate + + ExitTimeOut + 600 + KeepAlive + + SuccessfulExit + + AfterInitialDemand + + + Program + /usr/local/bin/taoskeeper + + \ No newline at end of file diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index d71d5df47c..29160238ce 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -53,7 +53,7 @@ if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then cd ${top_dir}/tools/taos-tools/packaging/deb [ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0" - taostools_ver=$(git tag |grep -v taos | sort | tail -1) + taostools_ver=$(git for-each-ref --sort=taggerdate --format '%(tag)' refs/tags|grep -v taos | tail -1) taostools_install_dir="${release_dir}/${clientName2}Tools-${taostools_ver}" cd ${curr_dir} diff --git a/packaging/tools/post.sh b/packaging/tools/post.sh index 9861806677..c9fab51b28 100755 --- a/packaging/tools/post.sh +++ b/packaging/tools/post.sh @@ -582,6 +582,11 @@ function install_service_on_launchctl() { ${csudo}cp ${install_main_dir}/service/com.taosdata.taosadapter.plist /Library/LaunchDaemons/com.taosdata.taosadapter.plist || : ${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist || : fi + if [ -f ${install_main_dir}/service/com.taosdata.taoskeeper.plist ]; then + ${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taoskeeper.plist > /dev/null 2>&1 || : + ${csudo}cp ${install_main_dir}/service/com.taosdata.taoskeeper.plist /Library/LaunchDaemons/com.taosdata.taoskeeper.plist || : + ${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taoskeeper.plist || : + fi } function install_taosadapter_service() { diff --git a/source/client/inc/clientSml.h b/source/client/inc/clientSml.h index d8d41d8be6..92896e6f23 100644 --- a/source/client/inc/clientSml.h +++ b/source/client/inc/clientSml.h @@ -70,7 +70,7 @@ extern "C" { #define VALUE_LEN 6 #define OTD_JSON_FIELDS_NUM 4 -#define MAX_RETRY_TIMES 5 +#define MAX_RETRY_TIMES 100 typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; typedef enum { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index e3e20ee85d..de08ba66cc 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -543,7 +543,7 @@ void taos_init_imp(void) { if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) { // ignore create log failed, only print - printf(" WARING: Create taoslog failed. configDir=%s\n", configDir); + printf(" WARING: Create taoslog failed:%s. configDir=%s\n", strerror(errno), configDir); } if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) { diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index cfd1005181..6bd8b01842 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1373,6 +1373,8 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname); goto end; } +// uError("td23101 0vgId:%d, vgId:%d, name:%s, uid:%"PRIu64, vgData.vgId, pTableMeta->vgId, tbname, pTableMeta->uid); + pQuery = smlInitHandle(); if (pQuery == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1380,6 +1382,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch } pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)); +// uError("td23101 1vgId:%d, numEps:%d, name:%s, uid:%"PRIu64, vgData.vgId, vgData.epSet.numOfEps, tbname, pTableMeta->uid); code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false); if (code != TSDB_CODE_SUCCESS) { @@ -1723,10 +1726,15 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) tDecoderClear(&decoderTmp); } - if (pCreateReqDst) { + SVgroupInfo vg; + code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName); + goto end; + } + + if (pCreateReqDst) { // change stable name to get meta strcpy(pName.tname, pCreateReqDst->ctb.stbName); - } else { - strcpy(pName.tname, tbName); } code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { @@ -1739,13 +1747,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } - SVgroupInfo vg; - code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg); - if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName); - goto end; - } - if (pCreateReqDst) { pTableMeta->vgId = vg.vgId; pTableMeta->uid = pCreateReqDst->uid; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 7f58b579fa..1719759822 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -750,6 +750,7 @@ end: } static int32_t smlModifyDBSchemas(SSmlHandle *info) { + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas start, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema); if (info->dataFormat && !info->needModifySchema) { return TSDB_CODE_SUCCESS; } @@ -779,6 +780,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) { + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas create table:%s", info->id, pName.tname); SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField)); SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField)); code = smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true); @@ -818,6 +820,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { goto end; } if (action != SCHEMA_ACTION_NULL) { + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table tag, table:%s, action:%d", info->id, pName.tname, action); SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); SArray *pTags = @@ -869,6 +872,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { goto end; } if (action != SCHEMA_ACTION_NULL) { + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table col, table:%s, action:%d", info->id, pName.tname, action); SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); SArray *pTags = @@ -935,15 +939,19 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { } sTableData->tableMeta = pTableMeta; - + uDebug("SML:0x%" PRIx64 "modify schema uid:%" PRIu64 ", sversion:%d, tversion:%d", info->id, pTableMeta->uid, pTableMeta->sversion, pTableMeta->tversion) tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp); } + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas end success, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema); + return 0; end: taosHashCleanup(hashTmp); taosMemoryFreeClear(pTableMeta); - // catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); + catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); + uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code, tstrerror(code), info->dataFormat, info->needModifySchema); + return code; } @@ -1019,8 +1027,9 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols } else { size_t tmp = taosArrayGetSize(metaArray); if (tmp > INT16_MAX) { + smlBuildInvalidDataMsg(msg, "too many cols or tags", kv->key); uError("too many cols or tags"); - return -1; + return TSDB_CODE_SML_INVALID_DATA; } int16_t size = tmp; int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES); @@ -1170,6 +1179,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) { } static int32_t smlParseLineBottom(SSmlHandle *info) { + uDebug("SML:0x%" PRIx64 " smlParseLineBottom start, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); if (info->dataFormat) return TSDB_CODE_SUCCESS; for (int32_t i = 0; i < info->lineNum; i++) { @@ -1212,6 +1222,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); if (tableMeta) { // update meta + uDebug("SML:0x%" PRIx64 " smlParseLineBottom update meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf); if (ret == TSDB_CODE_SUCCESS) { ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf); @@ -1226,7 +1237,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { // uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id); // return ret; // } - + uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags); if(terrno == TSDB_CODE_DUP_KEY){return terrno;} @@ -1234,12 +1245,14 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES); } } + uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); return TSDB_CODE_SUCCESS; } static int32_t smlInsertData(SSmlHandle *info) { int32_t code = TSDB_CODE_SUCCESS; + uDebug("SML:0x%" PRIx64 " smlInsertData start, format:%d", info->id, info->dataFormat); if(info->pRequest->dbList == NULL){ info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN); @@ -1284,6 +1297,7 @@ static int32_t smlInsertData(SSmlHandle *info) { // use tablemeta of stable to save vgid and uid of child table (*pMeta)->tableMeta->vgId = vg.vgId; (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid + uDebug("SML:0x%" PRIx64 " smlInsertData table:%s, uid:%" PRIu64 ", format:%d", info->id, pName.tname, tableData->uid, info->dataFormat); code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols, (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, @@ -1306,16 +1320,18 @@ static int32_t smlInsertData(SSmlHandle *info) { atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); launchQueryImpl(info->pRequest, info->pQuery, true, NULL); + uDebug("SML:0x%" PRIx64 " smlInsertData end, format:%d, code:%d,%s", info->id, info->dataFormat, info->pRequest->code, tstrerror(info->pRequest->code)); + return info->pRequest->code; } static void smlPrintStatisticInfo(SSmlHandle *info) { uDebug( "SML:0x%" PRIx64 - " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \ + " smlInsertLines result, code:%d, msg:%s, lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \ parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64 "", - info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, + info->id, info->cost.code, tstrerror(info->cost.code), info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables, info->cost.schemaTime - info->cost.parseTime, info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime, info->cost.endTime - info->cost.insertRpcTime, @@ -1360,6 +1376,7 @@ int32_t smlClearForRerun(SSmlHandle *info) { } static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) { + uDebug("SML:0x%" PRIx64 " smlParseLine start", info->id); int32_t code = TSDB_CODE_SUCCESS; if (info->protocol == TSDB_SML_JSON_PROTOCOL) { if (lines) { @@ -1395,8 +1412,16 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char } } - uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, - (info->isRawLine ? "rawdata" : tmp)); + char cTmp = 0; // for print tmp if is raw + if(info->isRawLine){ + cTmp = tmp[len - 1]; + tmp[len - 1] = '\0'; + } + + uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, numLines:%d, protocol:%d, len:%d, sql:%s", info->id, info->isRawLine, numLines, info->protocol, len, tmp); + if(info->isRawLine){ + tmp[len - 1] = cTmp; + } if (info->protocol == TSDB_SML_LINE_PROTOCOL) { if (info->dataFormat) { @@ -1421,6 +1446,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char return code; } if (info->reRun) { + uDebug("SML:0x%" PRIx64 " smlParseLine re run", info->id); i = 0; rawLine = oldRaw; code = smlClearForRerun(info); @@ -1431,6 +1457,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char } i++; } + uDebug("SML:0x%" PRIx64 " smlParseLine end", info->id); return code; } @@ -1461,7 +1488,8 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL do { code = smlModifyDBSchemas(info); if (code == 0) break; - taosMsleep(200); + taosMsleep(500); + uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum); } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES); if (code != 0) { @@ -1488,6 +1516,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, } SRequestObj *request = NULL; SSmlHandle *info = NULL; + int cnt = 0; while(1){ request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); if (request == NULL) { @@ -1542,16 +1571,22 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, request->code = code; info->cost.endTime = taosGetTimestampUs(); info->cost.code = code; - smlPrintStatisticInfo(info); - if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING){ + if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING + || code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT){ + if(cnt++ >= 10){ + uInfo("SML:%"PRIx64" retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code)); + break; + } + taosMsleep(100); refreshMeta(request->pTscObj, request); - uInfo("SML:%"PRIx64" ver is old retry or object is creating code:%d", info->id, code); + uInfo("SML:%"PRIx64" retry:%d/10,ver is old retry or object is creating code:%d, msg:%s", info->id, cnt, code, tstrerror(code)); smlDestroyInfo(info); info = NULL; taos_free_result(request); request = NULL; continue; } + smlPrintStatisticInfo(info); break; } diff --git a/source/client/src/clientSmlLine.c b/source/client/src/clientSmlLine.c index b1aea1bfaa..335e3a1dc7 100644 --- a/source/client/src/clientSmlLine.c +++ b/source/client/src/clientSmlLine.c @@ -583,12 +583,14 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine .i = ts, .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes}; if (info->dataFormat) { + uDebug("SML:0x%" PRIx64 " smlParseInfluxString format true, ts:%" PRId64, info->id, ts); ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0); if(ret != TSDB_CODE_SUCCESS){return ret;} ret = smlBuildRow(info->currTableDataCtx); if(ret != TSDB_CODE_SUCCESS){return ret;} clearColValArray(info->currTableDataCtx->pValues); } else { + uDebug("SML:0x%" PRIx64 " smlParseInfluxString format false, ts:%" PRId64, info->id, ts); taosArraySet(elements->colArray, 0, &kv); } info->preLine = *elements; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 79139a5fe5..77bbd0be1a 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -24,7 +24,7 @@ #include "tref.h" #include "ttimer.h" -#define EMPTY_BLOCK_POLL_IDLE_DURATION 100 +#define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 struct SMqMgmt { @@ -148,7 +148,8 @@ typedef struct { typedef struct { int8_t tmqRspType; - int32_t epoch; + int32_t epoch; // epoch can be used to guard the vgHandle + int32_t vgId; SMqClientVg* vgHandle; SMqClientTopic* topicHandle; uint64_t reqId; @@ -1329,6 +1330,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { pRspWrapper->topicHandle = pTopic; pRspWrapper->reqId = requestId; pRspWrapper->pEpset = pMsg->pEpSet; + pRspWrapper->vgId = pVg->vgId; pMsg->pEpSet = NULL; if (rspType == TMQ_MSG_TYPE__POLL_RSP) { @@ -1747,7 +1749,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms - tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch, + tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; } @@ -1864,9 +1866,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return pRsp; } } else { - SMqClientVg* pVg = pollRspWrapper->vgHandle; tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pVg->vgId, pDataRsp->head.epoch, consumerEpoch); + tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1886,7 +1887,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return pRsp; } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); + tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1933,7 +1934,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); + tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1955,7 +1956,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { void* rspObj; int64_t startTime = taosGetTimestampMs(); - tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime); + tscDebug("consumer:0x%" PRIx64 " start to poll at %"PRId64", timeout:%" PRId64, tmq->consumerId, startTime, timeout); #if 0 tmqHandleAllDelayedTask(tmq); @@ -2017,37 +2018,43 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { } } +static void displayConsumeStatistics(const tmq_t* pTmq) { + int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics); + tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", + pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch); + + tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId); + for (int32_t i = 0; i < numOfTopics; ++i) { + SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i); + + tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i); + int32_t numOfVgs = taosArrayGetSize(pTopics->vgs); + for (int32_t j = 0; j < numOfVgs; ++j) { + SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j); + tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows); + } + } + + tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId); +} + int32_t tmq_consumer_close(tmq_t* tmq) { tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status); + displayConsumeStatistics(tmq); if (tmq->status == TMQ_CONSUMER_STATUS__READY) { - int32_t rsp = tmq_commit_sync(tmq, NULL); - if (rsp != 0) { - return rsp; - } - - int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", - tmq->consumerId, tmq->pollCnt, tmq->totalRows, numOfTopics, tmq->epoch); - - tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", tmq->consumerId); - for (int32_t i = 0; i < numOfTopics; ++i) { - SMqClientTopic* pTopics = taosArrayGet(tmq->clientTopics, i); - - tscDebug("consumer:0x%" PRIx64 " topic:%d", tmq->consumerId, i); - int32_t numOfVgs = taosArrayGetSize(pTopics->vgs); - for (int32_t j = 0; j < numOfVgs; ++j) { - SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j); - tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows); + // if auto commit is set, commit before close consumer. Otherwise, do nothing. + if (tmq->autoCommit) { + int32_t rsp = tmq_commit_sync(tmq, NULL); + if (rsp != 0) { + return rsp; } } - tscDebug("consumer:0x%" PRIx64 " rows dist end", tmq->consumerId); - int32_t retryCnt = 0; tmq_list_t* lst = tmq_list_new(); while (1) { - rsp = tmq_subscribe(tmq, lst); + int32_t rsp = tmq_subscribe(tmq, lst); if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) { break; } else { @@ -2057,6 +2064,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) { } tmq_list_destroy(lst); + } else { + tscWarn("consumer:0x%" PRIx64" not in ready state, close it directly", tmq->consumerId); } taosRemoveRef(tmqMgmt.rsetId, tmq->refId); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 0b4c7b88d7..aeeec1d61c 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1238,13 +1238,13 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi } if (taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl) != 0) { - uError("failed to load cfg since %s", terrstr()); + printf("failed to load cfg since %s", terrstr()); cfgCleanup(pCfg); return -1; } if (cfgLoadFromArray(pCfg, pArgs) != 0) { - uError("failed to load cfg from array since %s", terrstr()); + printf("failed to load cfg from array since %s", terrstr()); cfgCleanup(pCfg); return -1; } @@ -1260,13 +1260,13 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi if (taosMulModeMkDir(tsLogDir, 0777) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to create dir:%s since %s", tsLogDir, terrstr()); + printf("failed to create dir:%s since %s", tsLogDir, terrstr()); cfgCleanup(pCfg); return -1; } if (taosInitLog(logname, logFileNum) != 0) { - uError("failed to init log file since %s", terrstr()); + printf("failed to init log file since %s", terrstr()); cfgCleanup(pCfg); return -1; } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index dfc3b3fde8..e891eef1d8 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -457,6 +457,7 @@ typedef struct { void* pIter; SMnode* pMnode; STableMetaRsp* pMeta; + bool restore; bool sysDbRsp; char db[TSDB_DB_FNAME_LEN]; char filterTb[TSDB_TABLE_NAME_LEN]; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index c32212dfc1..a096e0341e 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -864,7 +864,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr } // grant info - pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f; + pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 1000; pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed; if (pMnode->grant.expireTimeMS == 0) { pGrantInfo->expire_time = INT32_MAX; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 7fe08514f6..c50b205f37 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -324,7 +324,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { pReq->info.rsp = pRsp; pReq->info.rspLen = size; - if (rowsRead == 0 || rowsRead < rowsToRead) { + if (rowsRead == 0 || ((rowsRead < rowsToRead) && !pShow->restore)) { pRsp->completed = 1; mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id); mndReleaseShowObj(pShow, true); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 2a369a863a..c577097644 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -3113,9 +3113,18 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(typeName, "SUPER_TABLE"); + bool fetch = pShow->restore ? false : true; + pShow->restore = false; while (numOfRows < rows) { - pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb); - if (pShow->pIter == NULL) break; + if (fetch) { + pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb); + if (pShow->pIter == NULL) break; + } else { + fetch = true; + void *pKey = taosHashGetKey(pShow->pIter, NULL); + pStb = sdbAcquire(pSdb, SDB_STB, pKey); + if (!pStb) continue; + } if (pDb != NULL && pStb->dbUid != pDb->uid) { sdbRelease(pSdb, pStb); @@ -3129,6 +3138,17 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB sdbRelease(pSdb, pStb); continue; } + + if ((numOfRows + pStb->numOfColumns) > rows) { + pShow->restore = true; + if (numOfRows == 0) { + mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s", + rows, pStb->numOfColumns, pStb->name, pStb->db); + } + sdbRelease(pSdb, pStb); + break; + } + varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE])); mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index ed4facb061..b0feb974c1 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -157,7 +157,7 @@ typedef struct SMTbCursor SMTbCursor; SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType); -int32_t metaTbCursorPrev(SMTbCursor *pTbCur); +int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType); #endif diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index ee2bd007ce..c2b38f5cd1 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -141,7 +141,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); // tqRead int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset); -int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); +int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId); // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 6ab322d26a..3376150d26 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -336,7 +336,7 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) { return 0; } -int32_t metaTbCursorPrev(SMTbCursor *pTbCur) { +int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) { int ret; void *pBuf; STbCfg tbCfg; @@ -350,7 +350,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur) { tDecoderClear(&pTbCur->mr.coder); metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey); - if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) { + if (pTbCur->mr.me.type == jumpTableType) { continue; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 646abc5d6a..8e3f1aa82a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -453,6 +453,15 @@ static int32_t processSubColumn(STQ* pTq, STqHandle* pHandle, const SMqPollReq* dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); taosWUnLockLatch(&pTq->pushLock); + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); + + // NOTE: this pHandle->consumerId may have been changed already. + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 + ", ts:%" PRId64", reqId:0x%"PRIx64, + pRequest->consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, + dataRsp.rspOffset.ts, pRequest->reqId); + + tDeleteSMqDataRsp(&dataRsp); return code; } @@ -484,8 +493,7 @@ static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollRe if (metaRsp.metaRspLen > 0) { code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); - tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 - ",ts:%" PRId64, + tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64, pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts); taosMemoryFree(metaRsp.metaRsp); @@ -522,7 +530,7 @@ static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollRe break; } - if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) { + if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); tDeleteSTaosxRsp(&taosxRsp); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 9579eb3407..b002140a6c 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -183,23 +183,24 @@ end: return tbSuid == realTbSuid; } -int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) { +int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) { int32_t code = 0; + int32_t vgId = TD_VID(pTq->pVnode); taosThreadMutexLock(&pHandle->pWalReader->mutex); int64_t offset = *fetchOffset; while (1) { if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) { - tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", - pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return, reqId:0x%"PRIx64, + pHandle->consumerId, pHandle->epoch, vgId, offset, reqId); *fetchOffset = offset - 1; code = -1; goto END; } - tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset, - TMSG_INFO((*ppCkHead)->head.msgType)); + tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId, + pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId); if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { code = walFetchBody(pHandle->pWalReader, ppCkHead); @@ -305,14 +306,9 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) { while (1) { if (pReader->msg2.msgStr == NULL) { if (walNextValidMsg(pReader->pWalReader) < 0) { -// pReader->ver = pReader->pWalReader->curVersion; - ret->offset.type = TMQ_OFFSET__LOG; - ret->offset.version = pReader->pWalReader->curVersion; ret->fetchType = FETCH_TYPE__NONE; - tqInfo("wal return none, offset %" PRId64 ", no more valid msg in wal", ret->offset.version); return; } - void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); int64_t ver = pReader->pWalReader->pHead->head.version; @@ -327,7 +323,6 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) { continue; } ret->fetchType = FETCH_TYPE__DATA; - tqDebug("wal return data rows %d", ret->data.info.rows); return; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index ad93cc567c..95981c2f08 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -134,7 +134,6 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id // all queried tables have been dropped already, return immediately. if (p->pSchema == NULL) { - taosMemoryFree(p); tsdbWarn("all queried tables has been dropped, try next group, %s", idstr); return TSDB_CODE_PAR_TABLE_NOT_EXIST; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e5f3c7406d..5ad9276c6c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -922,7 +922,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN pBlockNum->numOfBlocks += 1; } - if ((pScanInfo->pBlockList != NULL) && (taosArrayGetSize(pScanInfo->pBlockList) > 0)) { + if (taosArrayGetSize(pScanInfo->pBlockList) > 0) { numOfQTable += 1; } } @@ -4220,12 +4220,8 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) { if (pStatus->loadFromFile) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); if (pBlockInfo != NULL) { - pBlockScanInfo = - *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr); if (pBlockScanInfo == NULL) { - code = TSDB_CODE_INVALID_PARA; - tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, - taosHashGetSize(pReader->status.pTableMap), pReader->idStr); goto _err; } } else { diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index cff568aebc..d51a24bb43 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -44,7 +44,8 @@ typedef struct SSortSource { void* param; bool onlyRef; }; - + int64_t fetchUs; + int64_t fetchNum; } SSortSource; typedef struct SMsortComparParam { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 61ad44f1e6..75d05feb67 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1070,7 +1070,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) { return 0; } - pTaskInfo->streamInfo.currentOffset = *pOffset; if (subType == TOPIC_SUB_TYPE__COLUMN) { pOperator->status = OP_OPENED; @@ -1213,6 +1212,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pInfo->dataReader = NULL; qDebug("tmqsnap qStreamPrepareScan snapshot log"); } + pTaskInfo->streamInfo.currentOffset = *pOffset; + return 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4e6cd148d1..ef0f391709 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -31,6 +31,7 @@ #include "thash.h" #include "ttypes.h" +#define MULTI_READER_MAX_TABLE_NUM 5000 #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) @@ -43,7 +44,9 @@ typedef struct STableMergeScanSortSourceParam { SOperatorInfo* pOperator; int32_t readerIdx; uint64_t uid; - SSDataBlock* inputBlock; + SSDataBlock* inputBlock; + bool multiReader; + STsdbReader* dataReader; } STableMergeScanSortSourceParam; static bool processBlockWithProbability(const SSampleExecInfo* pInfo); @@ -1195,6 +1198,8 @@ static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, if (code != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_KEY_INVALID(pKey); } + + taosMemoryFree(pCur); return code; } @@ -1623,16 +1628,18 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { while (1) { SFetchRet ret = {0}; tqNextBlock(pInfo->tqReader, &ret); + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion); if (ret.fetchType == FETCH_TYPE__DATA) { + qDebug("doQueueScan get data from log %d rows, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version); blockDataCleanup(pInfo->pRes); setBlockIntoRes(pInfo, &ret.data, true); if (pInfo->pRes->info.rows > 0) { - qDebug("queue scan log return %d rows", pInfo->pRes->info.rows); + qDebug("doQueueScan get data from log %d rows, return, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version); return pInfo->pRes; } }else if(ret.fetchType == FETCH_TYPE__NONE){ - pTaskInfo->streamInfo.currentOffset = ret.offset; + qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version); return NULL; } } @@ -2546,6 +2553,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t readIdx = source->readerIdx; SSDataBlock* pBlock = source->inputBlock; + int32_t code = 0; SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx); @@ -2553,17 +2561,20 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->base.readHandle; - int32_t code = - tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); - if (code != 0) { - T_LONG_JMP(pTaskInfo->env, code); + if (NULL == source->dataReader || !source->multiReader) { + code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo)); + if (code != 0) { + T_LONG_JMP(pTaskInfo->env, code); + } } - + + pInfo->base.dataReader = source->dataReader; STsdbReader* reader = pInfo->base.dataReader; qTrace("tsdb/read-table-data: %p, enter next reader", reader); while (tsdbNextDataBlock(reader)) { if (isTaskKilled(pTaskInfo)) { tsdbReleaseDataBlock(reader); + pInfo->base.dataReader = NULL; T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } @@ -2597,14 +2608,18 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; qTrace("tsdb/read-table-data: %p, close reader", reader); - tsdbReaderClose(pInfo->base.dataReader); + if (!source->multiReader) { + tsdbReaderClose(pInfo->base.dataReader); + source->dataReader = NULL; + } pInfo->base.dataReader = NULL; return pBlock; } - qDebug("8"); - - tsdbReaderClose(pInfo->base.dataReader); + if (!source->multiReader) { + tsdbReaderClose(pInfo->base.dataReader); + source->dataReader = NULL; + } pInfo->base.dataReader = NULL; return NULL; } @@ -2676,6 +2691,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanSortSourceParam param = {0}; param.readerIdx = i; param.pOperator = pOperator; + param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false; param.inputBlock = createOneDataBlock(pInfo->pResBlock, false); blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity); @@ -2719,6 +2735,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { for (int32_t i = 0; i < numOfTable; ++i) { STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); blockDataDestroy(param->inputBlock); + tsdbReaderClose(param->dataReader); + param->dataReader = NULL; } taosArrayClear(pInfo->sortSourceParams); @@ -2761,9 +2779,6 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* qDebug("%s get sorted row block, rows:%d, limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, pInfo->limitInfo.numOfOutputRows); - if (limitReached) { - resetLimitInfoForNextGroup(&pInfo->limitInfo); - } return (pResBlock->info.rows > 0) ? pResBlock : NULL; } @@ -2816,6 +2831,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { pInfo->tableStartIndex = pInfo->tableEndIndex + 1; pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId; startGroupTableMergeScan(pOperator); + resetLimitInfoForNextGroup(&pInfo->limitInfo); } } @@ -2831,15 +2847,17 @@ void destroyTableMergeScanOperatorInfo(void* param) { for (int32_t i = 0; i < numOfTable; i++) { STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i); blockDataDestroy(p->inputBlock); + tsdbReaderClose(p->dataReader); + p->dataReader = NULL; } + tsdbReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.dataReader = NULL; + taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL; - tsdbReaderClose(pTableScanInfo->base.dataReader); - pTableScanInfo->base.dataReader = NULL; - for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) { SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i); taosMemoryFree(pCond->colList); @@ -2856,8 +2874,6 @@ void destroyTableMergeScanOperatorInfo(void* param) { taosArrayDestroy(pTableScanInfo->pSortInfo); cleanupExprSupp(&pTableScanInfo->base.pseudoSup); - tsdbReaderClose(pTableScanInfo->base.dataReader); - pTableScanInfo->base.dataReader = NULL; taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache); taosMemoryFreeClear(param); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 24f42ff178..f24d3523c8 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -57,9 +57,11 @@ typedef struct SSysTableScanInfo { const char* pUser; bool sysInfo; bool showRewrite; + bool restore; SNode* pCondition; // db_name filter condition, to discard data that are not in current database SMTbCursor* pCur; // cursor for iterate the local table meta store. SSysTableIndex* pIdx; // idx for local table meta + SHashObj* pSchema; SColMatchInfo matchInfo; SName name; SSDataBlock* pRes; @@ -514,9 +516,23 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta); } - SHashObj* stableSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - taosHashSetFreeFp(stableSchema, tDeleteSSchemaWrapperForHash); - while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0) { + if (pInfo->pSchema == NULL) { + pInfo->pSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + taosHashSetFreeFp(pInfo->pSchema, tDeleteSSchemaWrapperForHash); + } + + if (!pInfo->pCur || !pInfo->pSchema) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + qError("sysTableScanUserCols failed since %s", terrstr(terrno)); + blockDataDestroy(dataBlock); + pInfo->loadInfo.totalRows = 0; + return NULL; + } + + int32_t restore = pInfo->restore; + pInfo->restore = false; + while (restore || ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) { + if (restore) restore = false; char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; @@ -524,33 +540,36 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) { qDebug("sysTableScanUserCols cursor get super table"); - void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t)); + void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t)); if (schema == NULL) { SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow); - taosHashPut(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES); + taosHashPut(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES); } continue; } else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) { qDebug("sysTableScanUserCols cursor get child table"); STR_TO_VARSTR(typeName, "CHILD_TABLE"); STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name); - int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; - void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t)); + void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t)); if (schema != NULL) { schemaRow = *(SSchemaWrapper**)schema; } else { - tDecoderClear(&pInfo->pCur->mr.coder); - int code = metaGetTableEntryByUid(&pInfo->pCur->mr, suid); + SMetaReader smrSuperTable = {0}; + metaReaderInit(&smrSuperTable, pInfo->readHandle.meta, 0); + int code = metaGetTableEntryByUid(&smrSuperTable, suid); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by metaGetTableEntryByName, therefore, return directly qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code); + metaReaderClear(&smrSuperTable); blockDataDestroy(dataBlock); pInfo->loadInfo.totalRows = 0; - taosHashCleanup(stableSchema); return NULL; } - schemaRow = &pInfo->pCur->mr.me.stbEntry.schemaRow; + SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&smrSuperTable.me.stbEntry.schemaRow); + taosHashPut(pInfo->pSchema, &suid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES); + schemaRow = schemaWrapper; + metaReaderClear(&smrSuperTable); } } else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) { qDebug("sysTableScanUserCols cursor get normal table"); @@ -562,20 +581,19 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { continue; } - sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName); - - if (numOfRows >= pOperator->resultInfo.capacity) { + if ((numOfRows + schemaRow->nCols) > pOperator->resultInfo.capacity) { relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); numOfRows = 0; + pInfo->restore = true; if (pInfo->pRes->info.rows > 0) { break; } + } else { + sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName); } } - taosHashCleanup(stableSchema); - if (numOfRows > 0) { relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); numOfRows = 0; @@ -695,7 +713,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { } if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) { - metaTbCursorPrev(pInfo->pCur); + metaTbCursorPrev(pInfo->pCur, TSDB_TABLE_MAX); blockFull = true; } else { sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows, @@ -1789,6 +1807,11 @@ void destroySysScanOperator(void* param) { pInfo->pIdx = NULL; } + if(pInfo->pSchema) { + taosHashCleanup(pInfo->pSchema); + pInfo->pSchema = NULL; + } + taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFreeClear(pInfo->pUser); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 291b0cf515..6c8e581b3f 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -108,12 +108,18 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) { return TSDB_CODE_SUCCESS; } -void tsortClearOrderdSource(SArray* pOrderedSource) { +void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) { for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) { SSortSource** pSource = taosArrayGet(pOrderedSource, i); if (NULL == *pSource) { continue; } + + if (fetchUs) { + *fetchUs += (*pSource)->fetchUs; + *fetchNum += (*pSource)->fetchNum; + } + // release pageIdList if ((*pSource)->pageIdList) { taosArrayDestroy((*pSource)->pageIdList); @@ -147,7 +153,10 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { taosMemoryFreeClear(pSortHandle->idStr); blockDataDestroy(pSortHandle->pDataBlock); - tsortClearOrderdSource(pSortHandle->pOrderedSource); + int64_t fetchUs = 0, fetchNum = 0; + tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum); + qError("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr); + taosArrayDestroy(pSortHandle->pOrderedSource); taosMemoryFreeClear(pSortHandle); } @@ -307,7 +316,7 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32 } int64_t et = taosGetTimestampUs(); - qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr); + qError("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr); } return code; @@ -365,7 +374,10 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT releaseBufPage(pHandle->pBuf, pPage); } } else { + int64_t st = taosGetTimestampUs(); pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param); + pSource->fetchUs += taosGetTimestampUs() - st; + pSource->fetchNum++; if (pSource->src.pBlock == NULL) { (*numOfCompleted) += 1; pSource->src.rowIndex = -1; @@ -602,7 +614,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { } } - tsortClearOrderdSource(pHandle->pOrderedSource); + tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); taosArrayAddAll(pHandle->pOrderedSource, pResList); taosArrayDestroy(pResList); @@ -644,7 +656,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { SSortSource* source = *pSource; *pSource = NULL; - tsortClearOrderdSource(pHandle->pOrderedSource); + tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); while (1) { SSDataBlock* pBlock = pHandle->fetchfp(source->param); diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index 3c2b6499a4..106ee641af 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -70,7 +70,7 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key}; col_id_t t = lastColIdx + 1; col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema)); - uDebug("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols); + uTrace("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols); if (index < 0 && t > 0) { index = insFindCol(&sToken, 0, t, pSchema); } @@ -345,7 +345,7 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc } if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) { if (errno == E2BIG) { - uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d", (int)kv->length, pColSchema->bytes); + uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length, pColSchema->bytes, kv->value); buildInvalidOperationMsg(&pBuf, "value too long"); ret = TSDB_CODE_PAR_VALUE_TOO_LONG; goto end; diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 66977f206a..b0ab9ddedf 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -404,6 +404,7 @@ static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHa int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES); if (TSDB_CODE_SUCCESS == code) { taosArrayPush(pVgroupList, &pVgCxt); +// uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid); *pOutput = pVgCxt; } else { insDestroyVgroupDataCxt(pVgCxt); @@ -546,6 +547,7 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, if (TSDB_CODE_SUCCESS == code) { dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData); code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); +// uError("td23101 3vgId:%d, numEps:%d", src->vgId, dst->vg.epSet.numOfEps); } if (TSDB_CODE_SUCCESS == code) { code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size); diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 89dd51a892..bd9ea058b4 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -21,7 +21,7 @@ #include "tjson.h" #include "tglobal.h" -#define LOG_MAX_LINE_SIZE (1024) +#define LOG_MAX_LINE_SIZE (10024) #define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3) #define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024) #define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index cf967c9553..8c8d41d5b6 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -853,6 +853,7 @@ ,,y,script,./test.sh -f tsim/parser/topbot.sim ,,y,script,./test.sh -f tsim/parser/union_sysinfo.sim ,,y,script,./test.sh -f tsim/parser/slimit_limit.sim +,,y,script,./test.sh -f tsim/parser/table_merge_limit.sim ,,y,script,./test.sh -f tsim/query/tagLikeFilter.sim ,,y,script,./test.sh -f tsim/query/charScalarFunction.sim ,,y,script,./test.sh -f tsim/query/explain.sim diff --git a/tests/parallel_test/container_build.sh b/tests/parallel_test/container_build.sh index edb9b4ab3c..80236cf604 100755 --- a/tests/parallel_test/container_build.sh +++ b/tests/parallel_test/container_build.sh @@ -68,8 +68,8 @@ docker run \ -v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \ -v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \ -v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \ - -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ - --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true;make -j || exit 1" + --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1" + # -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ if [[ -d ${WORKDIR}/debugNoSan ]] ;then echo "delete ${WORKDIR}/debugNoSan" @@ -97,7 +97,7 @@ docker run \ -v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \ -v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \ -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ - --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true;make -j || exit 1 " + --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1 " mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index 5b73989d6f..6d813a4166 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -739,6 +739,11 @@ class TDCom: else: os.system("unset LD_PRELOAD; pkill %s " % processorName) + def gen_tag_col_str(self, gen_type, data_type, count): + """ + gen multi tags or cols by gen_type + """ + return ','.join(map(lambda i: f'{gen_type}{i} {data_type}', range(count))) def is_json(msg): if isinstance(msg, str): @@ -775,4 +780,5 @@ def dict2toml(in_dict: dict, file:str): with open(file, 'w') as f: toml.dump(in_dict, f) + tdCom = TDCom() diff --git a/tests/script/tsim/parser/table_merge_limit.sim b/tests/script/tsim/parser/table_merge_limit.sim new file mode 100644 index 0000000000..7d67b6d3a2 --- /dev/null +++ b/tests/script/tsim/parser/table_merge_limit.sim @@ -0,0 +1,47 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +$dbPrefix = m_fl_db +$tbPrefix = m_fl_tb +$mtPrefix = m_fl_mt +$tbNum = 2 +$rowNum = 513 +$totalNum = $tbNum * $rowNum +$ts0 = 1537146000000 +$delta = 600000 +print ========== fill.sim +$i = 0 +$db = $dbPrefix . $i +$mt = $mtPrefix . $i + +sql drop database $db -x step1 +step1: +sql create database $db vgroups 1 +sql use $db +sql create table $mt (ts timestamp, c1 int) tags(tgcol int) + +$i = 0 +$ts = $ts0 +while $i < $tbNum + $tb = $tbPrefix . $i + sql create table $tb using $mt tags( $i ) + + $x = 0 + while $x < $rowNum + $xs = $x * $delta + $ts = $ts0 + $xs + sql insert into $tb values ( $ts , $x ) + $x = $x + 1 + endw + + $i = $i + 1 +endw + +sql select * from $mt order by ts limit 10 +if $rows != 10 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 720eab74c4..3c4a71c3e4 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -93,7 +93,6 @@ class TDTestCase: tdSql.checkEqual(i[2],len(self.perf_list)) tdSql.execute('create table db1.ntb (ts timestamp,c0 int)') tdSql.query(f'select db_name, count(*) from information_schema.ins_tables group by db_name') - print(tdSql.queryResult) for i in tdSql.queryResult: if i[0].lower() == 'information_schema': tdSql.checkEqual(i[1],len(self.ins_list)) @@ -101,9 +100,77 @@ class TDTestCase: tdSql.checkEqual(i[1],len(self.perf_list)) elif i[0].lower() == self.dbname: tdSql.checkEqual(i[1],self.tbnum+1) + + + + def ins_col_check_4096(self): + tdSql.execute('create database db3 vgroups 2 replica 1') + col_str = tdCom.gen_tag_col_str("col", "int",4094) + tdSql.execute(f'create stable if not exists db3.stb (col_ts timestamp, {col_str}) tags (t1 int)') + for i in range(100): + tdLog.info(f"create table db3.ctb{i} using db3.stb tags({i})") + tdSql.execute(f"create table db3.ctb{i} using db3.stb tags({i})") + col_value_str = '1, ' * 4093 + '1' + tdSql.execute(f"insert into db3.ctb{i} values(now,{col_value_str})(now+1s,{col_value_str})(now+2s,{col_value_str})(now+3s,{col_value_str})") + tdSql.query("select * from information_schema.ins_columns") + + tdSql.execute('drop database db3') + def ins_stable_check(self): + tdSql.execute('create database db3 vgroups 2 replica 1') + tbnum = 10 + ctbnum = 10 + for i in range(tbnum): + tdSql.execute(f'create stable db3.stb_{i} (ts timestamp,c0 int) tags(t0 int)') + tdSql.execute(f'create table db3.ntb_{i} (ts timestamp,c0 int)') + for j in range(ctbnum): + tdSql.execute(f"create table db3.ctb_{i}_{j} using db3.stb_{i} tags({j})") + tdSql.query("select stable_name,count(table_name) from information_schema.ins_tables where db_name = 'db3' group by stable_name order by stable_name") + result = tdSql.queryResult + for i in range(len(result)): + if result[i][0] == None: + tdSql.checkEqual(result[0][1],tbnum) + else: + tdSql.checkEqual(result[i][0],f'stb_{i-1}') + tdSql.checkEqual(result[i][1],ctbnum) + + + + def ins_columns_check(self): + tdSql.execute('drop database if exists db2') + tdSql.execute('create database if not exists db2 vgroups 1 replica 1') + for i in range (5): + self.stb4096 = 'create table db2.stb%d (ts timestamp' % (i) + for j in range (4094 - i): + self.stb4096 += ', c%d int' % (j) + self.stb4096 += ') tags (t1 int)' + tdSql.execute(self.stb4096) + for k in range(10): + tdSql.execute("create table db2.ctb_%d_%dc using db2.stb%d tags(%d)" %(i,k,i,k)) + for t in range (2): + tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="SUPER_TABLE"') + tdSql.checkEqual(20465,len(tdSql.queryResult)) + for t in range (2): + tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="CHILD_TABLE"') + tdSql.checkEqual(204650,len(tdSql.queryResult)) + + for i in range (5): + self.ntb4096 = 'create table db2.ntb%d (ts timestamp' % (i) + for j in range (4095 - i): + self.ntb4096 += ', c%d binary(10)' % (j) + self.ntb4096 += ')' + tdSql.execute(self.ntb4096) + for t in range (2): + tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="NORMAL_TABLE"') + tdSql.checkEqual(20470,len(tdSql.queryResult)) + + def run(self): self.prepare_data() self.count_check() + self.ins_columns_check() + # self.ins_col_check_4096() + self.ins_stable_check() + def stop(self): tdSql.close() diff --git a/tests/system-test/2-query/tsbsQuery.py b/tests/system-test/2-query/tsbsQuery.py index e64b3ed0c7..0e2975cdba 100644 --- a/tests/system-test/2-query/tsbsQuery.py +++ b/tests/system-test/2-query/tsbsQuery.py @@ -214,13 +214,13 @@ class TDTestCase: sql=f"SELECT model,count(state_changed) FROM (SELECT _rowts,model,diff(broken_down) AS state_changed FROM (SELECT ts,model,tb,cast(cast(floor(2*(nzs)) as bool) as int) AS broken_down FROM (SELECT _wstart as ts,model,tbname as tb, sum(cast(cast(status as bool) as int))/count(cast(cast(status as bool) as int)) AS nzs FROM {dbname}.diagnostics WHERE ts >= 1451606400000 AND ts < 1451952001000 partition BY tbname,model interval(10m))order by ts) partition BY tb,model ) WHERE state_changed = 1 partition BY model;" tdSql.query(f"{sql}") tdSql.checkRows(46) - # for i in range(2): - # tdSql.query("%s"%sql) - # quertR1=tdSql.queryResult - # for j in range(50): - # tdSql.query("%s"%sql) - # quertR2=tdSql.queryResult - # assert quertR1 == quertR2 , "%s != %s ,The results of multiple queries are different" %(quertR1,quertR2) + for i in range(2): + tdSql.query("%s"%sql) + quertR1=tdSql.queryResult + for j in range(50): + tdSql.query("%s"%sql) + quertR2=tdSql.queryResult + assert quertR1 == quertR2 , "%s != %s ,The results of multiple queries are different" %(quertR1,quertR2) #it's already supported: diff --git a/tests/system-test/7-tmq/subscribeStb1.py b/tests/system-test/7-tmq/subscribeStb1.py index edbe1bc3c6..3941e3a9a8 100644 --- a/tests/system-test/7-tmq/subscribeStb1.py +++ b/tests/system-test/7-tmq/subscribeStb1.py @@ -245,7 +245,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows != expectrowcnt/4: + if totalConsumeRows < expectrowcnt/4: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4)) tdLog.exit("tmq consume rows error!") @@ -267,7 +267,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows != expectrowcnt: + if totalConsumeRows < expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index cb7b501298..69debe7ab5 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -688,16 +688,17 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { } static int32_t g_once_commit_flag = 0; -static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { - taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); - if (0 == g_once_commit_flag) { - g_once_commit_flag = 1; - notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); +static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); + + if (0 == g_once_commit_flag) { + g_once_commit_flag = 1; + notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); } - char tmpString[128]; - taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString)); + char tmpString[128]; + taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString)); } void build_consumer(SThreadInfo* pInfo) {