fix:error in optimize consume logic

This commit is contained in:
wangmm0220 2023-03-22 17:04:02 +08:00
commit dd4cf0597b
42 changed files with 446 additions and 165 deletions

View File

@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER) IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER}) SET(TD_VER_NUMBER ${VERNUMBER})
ELSE () ELSE ()
SET(TD_VER_NUMBER "3.0.3.0") SET(TD_VER_NUMBER "3.0.3.1")
ENDIF () ENDIF ()
IF (DEFINED VERCOMPATIBLE) IF (DEFINED VERCOMPATIBLE)

1
examples/rust/wrapper.h Normal file
View File

@ -0,0 +1 @@
#include<taos.h>

View File

@ -214,7 +214,6 @@ enum {
typedef struct { typedef struct {
int8_t fetchType; int8_t fetchType;
STqOffsetVal offset;
union { union {
SSDataBlock data; SSDataBlock data;
void* meta; void* meta;

View File

@ -147,7 +147,7 @@ typedef struct {
} SMonStbInfo; } SMonStbInfo;
typedef struct { typedef struct {
int32_t expire_time; uint32_t expire_time;
int64_t timeseries_used; int64_t timeseries_used;
int64_t timeseries_total; int64_t timeseries_total;
} SMonGrantInfo; } SMonGrantInfo;

View File

@ -107,7 +107,7 @@ elif [ ${testFile} = "tools" ];then
originTdpPath="taosTools-${originversion}" originTdpPath="taosTools-${originversion}"
packageName="${tdPath}-Linux-${cpuType}${packageLite}.${packageType}" packageName="${tdPath}-Linux-${cpuType}${packageLite}.${packageType}"
originPackageName="${originTdpPath}-Linux-${cpuType}${packageLite}.${packageType}" originPackageName="${originTdpPath}-Linux-${cpuType}${packageLite}.${packageType}"
installCmd="install-tools.sh" installCmd="install-taostools.sh"
fi fi
@ -333,7 +333,7 @@ if [[ ${packageName} =~ "Lite" ]] || ([[ ${packageName} =~ "x64" ]] && [[ ${p
wgetFile taosTools-2.1.3-Linux-x64.tar.gz v2.1.3 web wgetFile taosTools-2.1.3-Linux-x64.tar.gz v2.1.3 web
tar xf taosTools-2.1.3-Linux-x64.tar.gz tar xf taosTools-2.1.3-Linux-x64.tar.gz
fi fi
cd taosTools-2.1.3 && bash install-tools.sh cd taosTools-2.1.3 && bash install-taostools.sh
elif ([[ ${packageName} =~ "arm64" ]] && [[ ${packageName} =~ "client" ]]);then elif ([[ ${packageName} =~ "arm64" ]] && [[ ${packageName} =~ "client" ]]);then
echoColor G "===== install taos-tools arm when package is arm64-client =====" echoColor G "===== install taos-tools arm when package is arm64-client ====="
cd ${installPath} cd ${installPath}
@ -342,7 +342,7 @@ elif ([[ ${packageName} =~ "arm64" ]] && [[ ${packageName} =~ "client" ]]);then
tar xf taosTools-2.1.3-Linux-arm64.tar.gz tar xf taosTools-2.1.3-Linux-arm64.tar.gz
fi fi
cd taosTools-2.1.3 && bash install-tools.sh cd taosTools-2.1.3 && bash install-taostools.sh
fi fi
echoColor G "===== start TDengine =====" echoColor G "===== start TDengine ====="
@ -361,18 +361,18 @@ rm -rf ${installPath}/${tdPath}/
# cd ${installPath} # cd ${installPath}
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz . # wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
# tar xf taosTools-2.1.2-Linux-x64.tar.gz # tar xf taosTools-2.1.2-Linux-x64.tar.gz
# cd taosTools-2.1.2 && bash install-tools.sh # cd taosTools-2.1.2 && bash install-taostools.sh
# elif [[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "deb" ]] ;then # elif [[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "deb" ]] ;then
# echoColor G "===== install taos-tools when package is lite or client =====" # echoColor G "===== install taos-tools when package is lite or client ====="
# cd ${installPath} # cd ${installPath}
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz . # wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
# tar xf taosTools-2.1.2-Linux-x64.tar.gz # tar xf taosTools-2.1.2-Linux-x64.tar.gz
# cd taosTools-2.1.2 && bash install-tools.sh # cd taosTools-2.1.2 && bash install-taostools.sh
# elif [[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "rpm" ]] ;then # elif [[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "rpm" ]] ;then
# echoColor G "===== install taos-tools when package is lite or client =====" # echoColor G "===== install taos-tools when package is lite or client ====="
# cd ${installPath} # cd ${installPath}
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz . # wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
# tar xf taosTools-2.1.2-Linux-x64.tar.gz # tar xf taosTools-2.1.2-Linux-x64.tar.gz
# cd taosTools-2.1.2 && bash install-tools.sh # cd taosTools-2.1.2 && bash install-taostools.sh
# fi # fi

View File

@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.tdengine.taoskeeper</string>
<key>ProgramArguments</key>
<array>
<string>/usr/local/bin/taoskeeper</string>
</array>
<key>ProcessType</key>
<string>Interactive</string>
<key>Disabled</key>
<false/>
<key>RunAtLoad</key>
<false/>
<key>LaunchOnlyOnce</key>
<false/>
<key>SessionCreate</key>
<true/>
<key>ExitTimeOut</key>
<integer>600</integer>
<key>KeepAlive</key>
<dict>
<key>SuccessfulExit</key>
<false/>
<key>AfterInitialDemand</key>
<true/>
</dict>
<key>Program</key>
<string>/usr/local/bin/taoskeeper</string>
</dict>
</plist>

View File

@ -53,7 +53,7 @@ if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then
cd ${top_dir}/tools/taos-tools/packaging/deb cd ${top_dir}/tools/taos-tools/packaging/deb
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0" [ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taostools_ver=$(git tag |grep -v taos | sort | tail -1) taostools_ver=$(git for-each-ref --sort=taggerdate --format '%(tag)' refs/tags|grep -v taos | tail -1)
taostools_install_dir="${release_dir}/${clientName2}Tools-${taostools_ver}" taostools_install_dir="${release_dir}/${clientName2}Tools-${taostools_ver}"
cd ${curr_dir} cd ${curr_dir}

View File

@ -582,6 +582,11 @@ function install_service_on_launchctl() {
${csudo}cp ${install_main_dir}/service/com.taosdata.taosadapter.plist /Library/LaunchDaemons/com.taosdata.taosadapter.plist || : ${csudo}cp ${install_main_dir}/service/com.taosdata.taosadapter.plist /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist || : ${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
fi fi
if [ -f ${install_main_dir}/service/com.taosdata.taoskeeper.plist ]; then
${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taoskeeper.plist > /dev/null 2>&1 || :
${csudo}cp ${install_main_dir}/service/com.taosdata.taoskeeper.plist /Library/LaunchDaemons/com.taosdata.taoskeeper.plist || :
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taoskeeper.plist || :
fi
} }
function install_taosadapter_service() { function install_taosadapter_service() {

View File

@ -70,7 +70,7 @@ extern "C" {
#define VALUE_LEN 6 #define VALUE_LEN 6
#define OTD_JSON_FIELDS_NUM 4 #define OTD_JSON_FIELDS_NUM 4
#define MAX_RETRY_TIMES 5 #define MAX_RETRY_TIMES 100
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
typedef enum { typedef enum {

View File

@ -543,7 +543,7 @@ void taos_init_imp(void) {
if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) { if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
// ignore create log failed, only print // ignore create log failed, only print
printf(" WARING: Create taoslog failed. configDir=%s\n", configDir); printf(" WARING: Create taoslog failed:%s. configDir=%s\n", strerror(errno), configDir);
} }
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) { if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) {

View File

@ -1373,6 +1373,8 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname); uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname);
goto end; goto end;
} }
// uError("td23101 0vgId:%d, vgId:%d, name:%s, uid:%"PRIu64, vgData.vgId, pTableMeta->vgId, tbname, pTableMeta->uid);
pQuery = smlInitHandle(); pQuery = smlInitHandle();
if (pQuery == NULL) { if (pQuery == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
@ -1380,6 +1382,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
} }
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)); taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
// uError("td23101 1vgId:%d, numEps:%d, name:%s, uid:%"PRIu64, vgData.vgId, vgData.epSet.numOfEps, tbname, pTableMeta->uid);
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false); code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -1723,10 +1726,15 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tDecoderClear(&decoderTmp); tDecoderClear(&decoderTmp);
} }
if (pCreateReqDst) { SVgroupInfo vg;
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
goto end;
}
if (pCreateReqDst) { // change stable name to get meta
strcpy(pName.tname, pCreateReqDst->ctb.stbName); strcpy(pName.tname, pCreateReqDst->ctb.stbName);
} else {
strcpy(pName.tname, tbName);
} }
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
@ -1739,13 +1747,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end; goto end;
} }
SVgroupInfo vg;
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
goto end;
}
if (pCreateReqDst) { if (pCreateReqDst) {
pTableMeta->vgId = vg.vgId; pTableMeta->vgId = vg.vgId;
pTableMeta->uid = pCreateReqDst->uid; pTableMeta->uid = pCreateReqDst->uid;

View File

@ -750,6 +750,7 @@ end:
} }
static int32_t smlModifyDBSchemas(SSmlHandle *info) { static int32_t smlModifyDBSchemas(SSmlHandle *info) {
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas start, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema);
if (info->dataFormat && !info->needModifySchema) { if (info->dataFormat && !info->needModifySchema) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -779,6 +780,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) { if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas create table:%s", info->id, pName.tname);
SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField)); SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField)); SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
code = smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true); code = smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
@ -818,6 +820,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto end; goto end;
} }
if (action != SCHEMA_ACTION_NULL) { if (action != SCHEMA_ACTION_NULL) {
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table tag, table:%s, action:%d", info->id, pName.tname, action);
SArray *pColumns = SArray *pColumns =
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
SArray *pTags = SArray *pTags =
@ -869,6 +872,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto end; goto end;
} }
if (action != SCHEMA_ACTION_NULL) { if (action != SCHEMA_ACTION_NULL) {
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table col, table:%s, action:%d", info->id, pName.tname, action);
SArray *pColumns = SArray *pColumns =
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
SArray *pTags = SArray *pTags =
@ -935,15 +939,19 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
} }
sTableData->tableMeta = pTableMeta; sTableData->tableMeta = pTableMeta;
uDebug("SML:0x%" PRIx64 "modify schema uid:%" PRIu64 ", sversion:%d, tversion:%d", info->id, pTableMeta->uid, pTableMeta->sversion, pTableMeta->tversion)
tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp); tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
} }
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas end success, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema);
return 0; return 0;
end: end:
taosHashCleanup(hashTmp); taosHashCleanup(hashTmp);
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
// catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code, tstrerror(code), info->dataFormat, info->needModifySchema);
return code; return code;
} }
@ -1019,8 +1027,9 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
} else { } else {
size_t tmp = taosArrayGetSize(metaArray); size_t tmp = taosArrayGetSize(metaArray);
if (tmp > INT16_MAX) { if (tmp > INT16_MAX) {
smlBuildInvalidDataMsg(msg, "too many cols or tags", kv->key);
uError("too many cols or tags"); uError("too many cols or tags");
return -1; return TSDB_CODE_SML_INVALID_DATA;
} }
int16_t size = tmp; int16_t size = tmp;
int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES); int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
@ -1170,6 +1179,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
} }
static int32_t smlParseLineBottom(SSmlHandle *info) { static int32_t smlParseLineBottom(SSmlHandle *info) {
uDebug("SML:0x%" PRIx64 " smlParseLineBottom start, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
if (info->dataFormat) return TSDB_CODE_SUCCESS; if (info->dataFormat) return TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < info->lineNum; i++) { for (int32_t i = 0; i < info->lineNum; i++) {
@ -1212,6 +1222,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
SSmlSTableMeta **tableMeta = SSmlSTableMeta **tableMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
if (tableMeta) { // update meta if (tableMeta) { // update meta
uDebug("SML:0x%" PRIx64 " smlParseLineBottom update meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf); ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf);
if (ret == TSDB_CODE_SUCCESS) { if (ret == TSDB_CODE_SUCCESS) {
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf); ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf);
@ -1226,7 +1237,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
// uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id); // uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
// return ret; // return ret;
// } // }
uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags); smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
if(terrno == TSDB_CODE_DUP_KEY){return terrno;} if(terrno == TSDB_CODE_DUP_KEY){return terrno;}
@ -1234,12 +1245,14 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES); taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
} }
} }
uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlInsertData(SSmlHandle *info) { static int32_t smlInsertData(SSmlHandle *info) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
uDebug("SML:0x%" PRIx64 " smlInsertData start, format:%d", info->id, info->dataFormat);
if(info->pRequest->dbList == NULL){ if(info->pRequest->dbList == NULL){
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN); info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
@ -1284,6 +1297,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
// use tablemeta of stable to save vgid and uid of child table // use tablemeta of stable to save vgid and uid of child table
(*pMeta)->tableMeta->vgId = vg.vgId; (*pMeta)->tableMeta->vgId = vg.vgId;
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
uDebug("SML:0x%" PRIx64 " smlInsertData table:%s, uid:%" PRIu64 ", format:%d", info->id, pName.tname, tableData->uid, info->dataFormat);
code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols, code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
@ -1306,16 +1320,18 @@ static int32_t smlInsertData(SSmlHandle *info) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
launchQueryImpl(info->pRequest, info->pQuery, true, NULL); launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
uDebug("SML:0x%" PRIx64 " smlInsertData end, format:%d, code:%d,%s", info->id, info->dataFormat, info->pRequest->code, tstrerror(info->pRequest->code));
return info->pRequest->code; return info->pRequest->code;
} }
static void smlPrintStatisticInfo(SSmlHandle *info) { static void smlPrintStatisticInfo(SSmlHandle *info) {
uDebug( uDebug(
"SML:0x%" PRIx64 "SML:0x%" PRIx64
" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \ " smlInsertLines result, code:%d, msg:%s, lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64 parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
"", "",
info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, info->id, info->cost.code, tstrerror(info->cost.code), info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables, info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
info->cost.schemaTime - info->cost.parseTime, info->cost.insertBindTime - info->cost.schemaTime, info->cost.schemaTime - info->cost.parseTime, info->cost.insertBindTime - info->cost.schemaTime,
info->cost.insertRpcTime - info->cost.insertBindTime, info->cost.endTime - info->cost.insertRpcTime, info->cost.insertRpcTime - info->cost.insertBindTime, info->cost.endTime - info->cost.insertRpcTime,
@ -1360,6 +1376,7 @@ int32_t smlClearForRerun(SSmlHandle *info) {
} }
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) { static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
uDebug("SML:0x%" PRIx64 " smlParseLine start", info->id);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (info->protocol == TSDB_SML_JSON_PROTOCOL) { if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
if (lines) { if (lines) {
@ -1395,8 +1412,16 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
} }
} }
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, char cTmp = 0; // for print tmp if is raw
(info->isRawLine ? "rawdata" : tmp)); if(info->isRawLine){
cTmp = tmp[len - 1];
tmp[len - 1] = '\0';
}
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, numLines:%d, protocol:%d, len:%d, sql:%s", info->id, info->isRawLine, numLines, info->protocol, len, tmp);
if(info->isRawLine){
tmp[len - 1] = cTmp;
}
if (info->protocol == TSDB_SML_LINE_PROTOCOL) { if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
if (info->dataFormat) { if (info->dataFormat) {
@ -1421,6 +1446,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
return code; return code;
} }
if (info->reRun) { if (info->reRun) {
uDebug("SML:0x%" PRIx64 " smlParseLine re run", info->id);
i = 0; i = 0;
rawLine = oldRaw; rawLine = oldRaw;
code = smlClearForRerun(info); code = smlClearForRerun(info);
@ -1431,6 +1457,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
} }
i++; i++;
} }
uDebug("SML:0x%" PRIx64 " smlParseLine end", info->id);
return code; return code;
} }
@ -1461,7 +1488,8 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
do { do {
code = smlModifyDBSchemas(info); code = smlModifyDBSchemas(info);
if (code == 0) break; if (code == 0) break;
taosMsleep(200); taosMsleep(500);
uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES); } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
if (code != 0) { if (code != 0) {
@ -1488,6 +1516,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
} }
SRequestObj *request = NULL; SRequestObj *request = NULL;
SSmlHandle *info = NULL; SSmlHandle *info = NULL;
int cnt = 0;
while(1){ while(1){
request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
if (request == NULL) { if (request == NULL) {
@ -1542,16 +1571,22 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
request->code = code; request->code = code;
info->cost.endTime = taosGetTimestampUs(); info->cost.endTime = taosGetTimestampUs();
info->cost.code = code; info->cost.code = code;
smlPrintStatisticInfo(info); if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING
if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING){ || code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT){
if(cnt++ >= 10){
uInfo("SML:%"PRIx64" retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
break;
}
taosMsleep(100);
refreshMeta(request->pTscObj, request); refreshMeta(request->pTscObj, request);
uInfo("SML:%"PRIx64" ver is old retry or object is creating code:%d", info->id, code); uInfo("SML:%"PRIx64" retry:%d/10,ver is old retry or object is creating code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
smlDestroyInfo(info); smlDestroyInfo(info);
info = NULL; info = NULL;
taos_free_result(request); taos_free_result(request);
request = NULL; request = NULL;
continue; continue;
} }
smlPrintStatisticInfo(info);
break; break;
} }

View File

@ -583,12 +583,14 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
.i = ts, .i = ts,
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes}; .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if (info->dataFormat) { if (info->dataFormat) {
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format true, ts:%" PRId64, info->id, ts);
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0); ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
if(ret != TSDB_CODE_SUCCESS){return ret;} if(ret != TSDB_CODE_SUCCESS){return ret;}
ret = smlBuildRow(info->currTableDataCtx); ret = smlBuildRow(info->currTableDataCtx);
if(ret != TSDB_CODE_SUCCESS){return ret;} if(ret != TSDB_CODE_SUCCESS){return ret;}
clearColValArray(info->currTableDataCtx->pValues); clearColValArray(info->currTableDataCtx->pValues);
} else { } else {
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format false, ts:%" PRId64, info->id, ts);
taosArraySet(elements->colArray, 0, &kv); taosArraySet(elements->colArray, 0, &kv);
} }
info->preLine = *elements; info->preLine = *elements;

View File

@ -24,7 +24,7 @@
#include "tref.h" #include "tref.h"
#include "ttimer.h" #include "ttimer.h"
#define EMPTY_BLOCK_POLL_IDLE_DURATION 100 #define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000
struct SMqMgmt { struct SMqMgmt {
@ -148,7 +148,8 @@ typedef struct {
typedef struct { typedef struct {
int8_t tmqRspType; int8_t tmqRspType;
int32_t epoch; int32_t epoch; // epoch can be used to guard the vgHandle
int32_t vgId;
SMqClientVg* vgHandle; SMqClientVg* vgHandle;
SMqClientTopic* topicHandle; SMqClientTopic* topicHandle;
uint64_t reqId; uint64_t reqId;
@ -1329,6 +1330,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
pRspWrapper->topicHandle = pTopic; pRspWrapper->topicHandle = pTopic;
pRspWrapper->reqId = requestId; pRspWrapper->reqId = requestId;
pRspWrapper->pEpset = pMsg->pEpSet; pRspWrapper->pEpset = pMsg->pEpSet;
pRspWrapper->vgId = pVg->vgId;
pMsg->pEpSet = NULL; pMsg->pEpSet = NULL;
if (rspType == TMQ_MSG_TYPE__POLL_RSP) { if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
@ -1747,7 +1749,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int j = 0; j < numOfVg; j++) { for (int j = 0; j < numOfVg; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch, tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch,
pVg->vgId); pVg->vgId);
continue; continue;
} }
@ -1864,9 +1866,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return pRsp; return pRsp;
} }
} else { } else {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pVg->vgId, pDataRsp->head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
@ -1886,7 +1887,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return pRsp; return pRsp;
} else { } else {
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
@ -1933,7 +1934,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else { } else {
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
@ -1955,7 +1956,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
void* rspObj; void* rspObj;
int64_t startTime = taosGetTimestampMs(); int64_t startTime = taosGetTimestampMs();
tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime); tscDebug("consumer:0x%" PRIx64 " start to poll at %"PRId64", timeout:%" PRId64, tmq->consumerId, startTime, timeout);
#if 0 #if 0
tmqHandleAllDelayedTask(tmq); tmqHandleAllDelayedTask(tmq);
@ -2017,24 +2018,16 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
} }
} }
int32_t tmq_consumer_close(tmq_t* tmq) { static void displayConsumeStatistics(const tmq_t* pTmq) {
tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status); int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != 0) {
return rsp;
}
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
tmq->consumerId, tmq->pollCnt, tmq->totalRows, numOfTopics, tmq->epoch); pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
for (int32_t i = 0; i < numOfTopics; ++i) { for (int32_t i = 0; i < numOfTopics; ++i) {
SMqClientTopic* pTopics = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
tscDebug("consumer:0x%" PRIx64 " topic:%d", tmq->consumerId, i); tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
int32_t numOfVgs = taosArrayGetSize(pTopics->vgs); int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
for (int32_t j = 0; j < numOfVgs; ++j) { for (int32_t j = 0; j < numOfVgs; ++j) {
SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
@ -2042,12 +2035,26 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
} }
} }
tscDebug("consumer:0x%" PRIx64 " rows dist end", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
}
int32_t tmq_consumer_close(tmq_t* tmq) {
tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status);
displayConsumeStatistics(tmq);
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
// if auto commit is set, commit before close consumer. Otherwise, do nothing.
if (tmq->autoCommit) {
int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != 0) {
return rsp;
}
}
int32_t retryCnt = 0; int32_t retryCnt = 0;
tmq_list_t* lst = tmq_list_new(); tmq_list_t* lst = tmq_list_new();
while (1) { while (1) {
rsp = tmq_subscribe(tmq, lst); int32_t rsp = tmq_subscribe(tmq, lst);
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) { if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
break; break;
} else { } else {
@ -2057,6 +2064,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
} }
tmq_list_destroy(lst); tmq_list_destroy(lst);
} else {
tscWarn("consumer:0x%" PRIx64" not in ready state, close it directly", tmq->consumerId);
} }
taosRemoveRef(tmqMgmt.rsetId, tmq->refId); taosRemoveRef(tmqMgmt.rsetId, tmq->refId);

View File

@ -1238,13 +1238,13 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
} }
if (taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl) != 0) { if (taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl) != 0) {
uError("failed to load cfg since %s", terrstr()); printf("failed to load cfg since %s", terrstr());
cfgCleanup(pCfg); cfgCleanup(pCfg);
return -1; return -1;
} }
if (cfgLoadFromArray(pCfg, pArgs) != 0) { if (cfgLoadFromArray(pCfg, pArgs) != 0) {
uError("failed to load cfg from array since %s", terrstr()); printf("failed to load cfg from array since %s", terrstr());
cfgCleanup(pCfg); cfgCleanup(pCfg);
return -1; return -1;
} }
@ -1260,13 +1260,13 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
if (taosMulModeMkDir(tsLogDir, 0777) != 0) { if (taosMulModeMkDir(tsLogDir, 0777) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create dir:%s since %s", tsLogDir, terrstr()); printf("failed to create dir:%s since %s", tsLogDir, terrstr());
cfgCleanup(pCfg); cfgCleanup(pCfg);
return -1; return -1;
} }
if (taosInitLog(logname, logFileNum) != 0) { if (taosInitLog(logname, logFileNum) != 0) {
uError("failed to init log file since %s", terrstr()); printf("failed to init log file since %s", terrstr());
cfgCleanup(pCfg); cfgCleanup(pCfg);
return -1; return -1;
} }

View File

@ -457,6 +457,7 @@ typedef struct {
void* pIter; void* pIter;
SMnode* pMnode; SMnode* pMnode;
STableMetaRsp* pMeta; STableMetaRsp* pMeta;
bool restore;
bool sysDbRsp; bool sysDbRsp;
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
char filterTb[TSDB_TABLE_NAME_LEN]; char filterTb[TSDB_TABLE_NAME_LEN];

View File

@ -864,7 +864,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
} }
// grant info // grant info
pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f; pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 1000;
pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed; pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
if (pMnode->grant.expireTimeMS == 0) { if (pMnode->grant.expireTimeMS == 0) {
pGrantInfo->expire_time = INT32_MAX; pGrantInfo->expire_time = INT32_MAX;

View File

@ -324,7 +324,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
pReq->info.rsp = pRsp; pReq->info.rsp = pRsp;
pReq->info.rspLen = size; pReq->info.rspLen = size;
if (rowsRead == 0 || rowsRead < rowsToRead) { if (rowsRead == 0 || ((rowsRead < rowsToRead) && !pShow->restore)) {
pRsp->completed = 1; pRsp->completed = 1;
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id); mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
mndReleaseShowObj(pShow, true); mndReleaseShowObj(pShow, true);

View File

@ -3113,9 +3113,18 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(typeName, "SUPER_TABLE"); STR_TO_VARSTR(typeName, "SUPER_TABLE");
bool fetch = pShow->restore ? false : true;
pShow->restore = false;
while (numOfRows < rows) { while (numOfRows < rows) {
if (fetch) {
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb); pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
} else {
fetch = true;
void *pKey = taosHashGetKey(pShow->pIter, NULL);
pStb = sdbAcquire(pSdb, SDB_STB, pKey);
if (!pStb) continue;
}
if (pDb != NULL && pStb->dbUid != pDb->uid) { if (pDb != NULL && pStb->dbUid != pDb->uid) {
sdbRelease(pSdb, pStb); sdbRelease(pSdb, pStb);
@ -3129,6 +3138,17 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
sdbRelease(pSdb, pStb); sdbRelease(pSdb, pStb);
continue; continue;
} }
if ((numOfRows + pStb->numOfColumns) > rows) {
pShow->restore = true;
if (numOfRows == 0) {
mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
rows, pStb->numOfColumns, pStb->name, pStb->db);
}
sdbRelease(pSdb, pStb);
break;
}
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE])); varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db); mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);

View File

@ -157,7 +157,7 @@ typedef struct SMTbCursor SMTbCursor;
SMTbCursor *metaOpenTbCursor(SMeta *pMeta); SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur); void metaCloseTbCursor(SMTbCursor *pTbCur);
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType); int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
int32_t metaTbCursorPrev(SMTbCursor *pTbCur); int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
#endif #endif

View File

@ -141,7 +141,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
// tqRead // tqRead
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset); int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId);
// tqExec // tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp); int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);

View File

@ -336,7 +336,7 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) {
return 0; return 0;
} }
int32_t metaTbCursorPrev(SMTbCursor *pTbCur) { int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) {
int ret; int ret;
void *pBuf; void *pBuf;
STbCfg tbCfg; STbCfg tbCfg;
@ -350,7 +350,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur) {
tDecoderClear(&pTbCur->mr.coder); tDecoderClear(&pTbCur->mr.coder);
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey); metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) { if (pTbCur->mr.me.type == jumpTableType) {
continue; continue;
} }

View File

@ -453,6 +453,15 @@ static int32_t processSubColumn(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->pushLock);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// NOTE: this pHandle->consumerId may have been changed already.
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
", ts:%" PRId64", reqId:0x%"PRIx64,
pRequest->consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
dataRsp.rspOffset.ts, pRequest->reqId);
tDeleteSMqDataRsp(&dataRsp);
return code; return code;
} }
@ -484,8 +493,7 @@ static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollRe
if (metaRsp.metaRspLen > 0) { if (metaRsp.metaRspLen > 0) {
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
metaRsp.rspOffset.ts); metaRsp.rspOffset.ts);
taosMemoryFree(metaRsp.metaRsp); taosMemoryFree(metaRsp.metaRsp);
@ -522,7 +530,7 @@ static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollRe
break; break;
} }
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) { if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);

View File

@ -183,23 +183,24 @@ end:
return tbSuid == realTbSuid; return tbSuid == realTbSuid;
} }
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) { int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) {
int32_t code = 0; int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
taosThreadMutexLock(&pHandle->pWalReader->mutex); taosThreadMutexLock(&pHandle->pWalReader->mutex);
int64_t offset = *fetchOffset; int64_t offset = *fetchOffset;
while (1) { while (1) {
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) { if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return, reqId:0x%"PRIx64,
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset); pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
*fetchOffset = offset - 1; *fetchOffset = offset - 1;
code = -1; code = -1;
goto END; goto END;
} }
tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset, tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
TMSG_INFO((*ppCkHead)->head.msgType)); pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId);
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader, ppCkHead); code = walFetchBody(pHandle->pWalReader, ppCkHead);
@ -305,14 +306,9 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) {
while (1) { while (1) {
if (pReader->msg2.msgStr == NULL) { if (pReader->msg2.msgStr == NULL) {
if (walNextValidMsg(pReader->pWalReader) < 0) { if (walNextValidMsg(pReader->pWalReader) < 0) {
// pReader->ver = pReader->pWalReader->curVersion;
ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->pWalReader->curVersion;
ret->fetchType = FETCH_TYPE__NONE; ret->fetchType = FETCH_TYPE__NONE;
tqInfo("wal return none, offset %" PRId64 ", no more valid msg in wal", ret->offset.version);
return; return;
} }
void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pReader->pWalReader->pHead->head.version; int64_t ver = pReader->pWalReader->pHead->head.version;
@ -327,7 +323,6 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) {
continue; continue;
} }
ret->fetchType = FETCH_TYPE__DATA; ret->fetchType = FETCH_TYPE__DATA;
tqDebug("wal return data rows %d", ret->data.info.rows);
return; return;
} }
} }

View File

@ -134,7 +134,6 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id
// all queried tables have been dropped already, return immediately. // all queried tables have been dropped already, return immediately.
if (p->pSchema == NULL) { if (p->pSchema == NULL) {
taosMemoryFree(p);
tsdbWarn("all queried tables has been dropped, try next group, %s", idstr); tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
return TSDB_CODE_PAR_TABLE_NOT_EXIST; return TSDB_CODE_PAR_TABLE_NOT_EXIST;
} }

View File

@ -922,7 +922,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
pBlockNum->numOfBlocks += 1; pBlockNum->numOfBlocks += 1;
} }
if ((pScanInfo->pBlockList != NULL) && (taosArrayGetSize(pScanInfo->pBlockList) > 0)) { if (taosArrayGetSize(pScanInfo->pBlockList) > 0) {
numOfQTable += 1; numOfQTable += 1;
} }
} }
@ -4220,12 +4220,8 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
if (pStatus->loadFromFile) { if (pStatus->loadFromFile) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
if (pBlockInfo != NULL) { if (pBlockInfo != NULL) {
pBlockScanInfo = pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
*(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pBlockScanInfo == NULL) { if (pBlockScanInfo == NULL) {
code = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
goto _err; goto _err;
} }
} else { } else {

View File

@ -44,7 +44,8 @@ typedef struct SSortSource {
void* param; void* param;
bool onlyRef; bool onlyRef;
}; };
int64_t fetchUs;
int64_t fetchNum;
} SSortSource; } SSortSource;
typedef struct SMsortComparParam { typedef struct SMsortComparParam {

View File

@ -1070,7 +1070,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) { if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
return 0; return 0;
} }
pTaskInfo->streamInfo.currentOffset = *pOffset;
if (subType == TOPIC_SUB_TYPE__COLUMN) { if (subType == TOPIC_SUB_TYPE__COLUMN) {
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
@ -1213,6 +1212,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pInfo->dataReader = NULL; pInfo->dataReader = NULL;
qDebug("tmqsnap qStreamPrepareScan snapshot log"); qDebug("tmqsnap qStreamPrepareScan snapshot log");
} }
pTaskInfo->streamInfo.currentOffset = *pOffset;
return 0; return 0;
} }

View File

@ -31,6 +31,7 @@
#include "thash.h" #include "thash.h"
#include "ttypes.h" #include "ttypes.h"
#define MULTI_READER_MAX_TABLE_NUM 5000
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
@ -44,6 +45,8 @@ typedef struct STableMergeScanSortSourceParam {
int32_t readerIdx; int32_t readerIdx;
uint64_t uid; uint64_t uid;
SSDataBlock* inputBlock; SSDataBlock* inputBlock;
bool multiReader;
STsdbReader* dataReader;
} STableMergeScanSortSourceParam; } STableMergeScanSortSourceParam;
static bool processBlockWithProbability(const SSampleExecInfo* pInfo); static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
@ -1195,6 +1198,8 @@ static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SET_SESSION_WIN_KEY_INVALID(pKey); SET_SESSION_WIN_KEY_INVALID(pKey);
} }
taosMemoryFree(pCur);
return code; return code;
} }
@ -1623,16 +1628,18 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
while (1) { while (1) {
SFetchRet ret = {0}; SFetchRet ret = {0};
tqNextBlock(pInfo->tqReader, &ret); tqNextBlock(pInfo->tqReader, &ret);
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion);
if (ret.fetchType == FETCH_TYPE__DATA) { if (ret.fetchType == FETCH_TYPE__DATA) {
qDebug("doQueueScan get data from log %d rows, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version);
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
setBlockIntoRes(pInfo, &ret.data, true); setBlockIntoRes(pInfo, &ret.data, true);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
qDebug("queue scan log return %d rows", pInfo->pRes->info.rows); qDebug("doQueueScan get data from log %d rows, return, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version);
return pInfo->pRes; return pInfo->pRes;
} }
}else if(ret.fetchType == FETCH_TYPE__NONE){ }else if(ret.fetchType == FETCH_TYPE__NONE){
pTaskInfo->streamInfo.currentOffset = ret.offset; qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
return NULL; return NULL;
} }
} }
@ -2546,6 +2553,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t readIdx = source->readerIdx; int32_t readIdx = source->readerIdx;
SSDataBlock* pBlock = source->inputBlock; SSDataBlock* pBlock = source->inputBlock;
int32_t code = 0;
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx); SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
@ -2553,17 +2561,20 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle; SReadHandle* pHandle = &pInfo->base.readHandle;
int32_t code = if (NULL == source->dataReader || !source->multiReader) {
tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo));
if (code != 0) { if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
}
pInfo->base.dataReader = source->dataReader;
STsdbReader* reader = pInfo->base.dataReader; STsdbReader* reader = pInfo->base.dataReader;
qTrace("tsdb/read-table-data: %p, enter next reader", reader); qTrace("tsdb/read-table-data: %p, enter next reader", reader);
while (tsdbNextDataBlock(reader)) { while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
tsdbReleaseDataBlock(reader); tsdbReleaseDataBlock(reader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
} }
@ -2597,14 +2608,18 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
qTrace("tsdb/read-table-data: %p, close reader", reader); qTrace("tsdb/read-table-data: %p, close reader", reader);
if (!source->multiReader) {
tsdbReaderClose(pInfo->base.dataReader); tsdbReaderClose(pInfo->base.dataReader);
source->dataReader = NULL;
}
pInfo->base.dataReader = NULL; pInfo->base.dataReader = NULL;
return pBlock; return pBlock;
} }
qDebug("8"); if (!source->multiReader) {
tsdbReaderClose(pInfo->base.dataReader); tsdbReaderClose(pInfo->base.dataReader);
source->dataReader = NULL;
}
pInfo->base.dataReader = NULL; pInfo->base.dataReader = NULL;
return NULL; return NULL;
} }
@ -2676,6 +2691,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanSortSourceParam param = {0}; STableMergeScanSortSourceParam param = {0};
param.readerIdx = i; param.readerIdx = i;
param.pOperator = pOperator; param.pOperator = pOperator;
param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false); param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);
@ -2719,6 +2735,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
for (int32_t i = 0; i < numOfTable; ++i) { for (int32_t i = 0; i < numOfTable; ++i) {
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
blockDataDestroy(param->inputBlock); blockDataDestroy(param->inputBlock);
tsdbReaderClose(param->dataReader);
param->dataReader = NULL;
} }
taosArrayClear(pInfo->sortSourceParams); taosArrayClear(pInfo->sortSourceParams);
@ -2761,9 +2779,6 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
qDebug("%s get sorted row block, rows:%d, limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, qDebug("%s get sorted row block, rows:%d, limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
pInfo->limitInfo.numOfOutputRows); pInfo->limitInfo.numOfOutputRows);
if (limitReached) {
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
return (pResBlock->info.rows > 0) ? pResBlock : NULL; return (pResBlock->info.rows > 0) ? pResBlock : NULL;
} }
@ -2816,6 +2831,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pInfo->tableStartIndex = pInfo->tableEndIndex + 1; pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId; pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId;
startGroupTableMergeScan(pOperator); startGroupTableMergeScan(pOperator);
resetLimitInfoForNextGroup(&pInfo->limitInfo);
} }
} }
@ -2831,15 +2847,17 @@ void destroyTableMergeScanOperatorInfo(void* param) {
for (int32_t i = 0; i < numOfTable; i++) { for (int32_t i = 0; i < numOfTable; i++) {
STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i); STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
blockDataDestroy(p->inputBlock); blockDataDestroy(p->inputBlock);
tsdbReaderClose(p->dataReader);
p->dataReader = NULL;
} }
tsdbReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
taosArrayDestroy(pTableScanInfo->sortSourceParams); taosArrayDestroy(pTableScanInfo->sortSourceParams);
tsortDestroySortHandle(pTableScanInfo->pSortHandle); tsortDestroySortHandle(pTableScanInfo->pSortHandle);
pTableScanInfo->pSortHandle = NULL; pTableScanInfo->pSortHandle = NULL;
tsdbReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) { for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i); SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
taosMemoryFree(pCond->colList); taosMemoryFree(pCond->colList);
@ -2856,8 +2874,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
taosArrayDestroy(pTableScanInfo->pSortInfo); taosArrayDestroy(pTableScanInfo->pSortInfo);
cleanupExprSupp(&pTableScanInfo->base.pseudoSup); cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
tsdbReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache); taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);

View File

@ -57,9 +57,11 @@ typedef struct SSysTableScanInfo {
const char* pUser; const char* pUser;
bool sysInfo; bool sysInfo;
bool showRewrite; bool showRewrite;
bool restore;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database SNode* pCondition; // db_name filter condition, to discard data that are not in current database
SMTbCursor* pCur; // cursor for iterate the local table meta store. SMTbCursor* pCur; // cursor for iterate the local table meta store.
SSysTableIndex* pIdx; // idx for local table meta SSysTableIndex* pIdx; // idx for local table meta
SHashObj* pSchema;
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
SName name; SName name;
SSDataBlock* pRes; SSDataBlock* pRes;
@ -514,9 +516,23 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta); pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
} }
SHashObj* stableSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); if (pInfo->pSchema == NULL) {
taosHashSetFreeFp(stableSchema, tDeleteSSchemaWrapperForHash); pInfo->pSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0) { taosHashSetFreeFp(pInfo->pSchema, tDeleteSSchemaWrapperForHash);
}
if (!pInfo->pCur || !pInfo->pSchema) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qError("sysTableScanUserCols failed since %s", terrstr(terrno));
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
int32_t restore = pInfo->restore;
pInfo->restore = false;
while (restore || ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) {
if (restore) restore = false;
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
@ -524,33 +540,36 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) { if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) {
qDebug("sysTableScanUserCols cursor get super table"); qDebug("sysTableScanUserCols cursor get super table");
void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t)); void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
if (schema == NULL) { if (schema == NULL) {
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow); SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
taosHashPut(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES); taosHashPut(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
} }
continue; continue;
} else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) { } else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) {
qDebug("sysTableScanUserCols cursor get child table"); qDebug("sysTableScanUserCols cursor get child table");
STR_TO_VARSTR(typeName, "CHILD_TABLE"); STR_TO_VARSTR(typeName, "CHILD_TABLE");
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name); STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t)); void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
if (schema != NULL) { if (schema != NULL) {
schemaRow = *(SSchemaWrapper**)schema; schemaRow = *(SSchemaWrapper**)schema;
} else { } else {
tDecoderClear(&pInfo->pCur->mr.coder); SMetaReader smrSuperTable = {0};
int code = metaGetTableEntryByUid(&pInfo->pCur->mr, suid); metaReaderInit(&smrSuperTable, pInfo->readHandle.meta, 0);
int code = metaGetTableEntryByUid(&smrSuperTable, suid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by metaGetTableEntryByName, therefore, return directly // terrno has been set by metaGetTableEntryByName, therefore, return directly
qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code); qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code);
metaReaderClear(&smrSuperTable);
blockDataDestroy(dataBlock); blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0; pInfo->loadInfo.totalRows = 0;
taosHashCleanup(stableSchema);
return NULL; return NULL;
} }
schemaRow = &pInfo->pCur->mr.me.stbEntry.schemaRow; SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&smrSuperTable.me.stbEntry.schemaRow);
taosHashPut(pInfo->pSchema, &suid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
schemaRow = schemaWrapper;
metaReaderClear(&smrSuperTable);
} }
} else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) { } else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) {
qDebug("sysTableScanUserCols cursor get normal table"); qDebug("sysTableScanUserCols cursor get normal table");
@ -562,20 +581,19 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
continue; continue;
} }
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName); if ((numOfRows + schemaRow->nCols) > pOperator->resultInfo.capacity) {
if (numOfRows >= pOperator->resultInfo.capacity) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0; numOfRows = 0;
pInfo->restore = true;
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
break; break;
} }
} else {
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
} }
} }
taosHashCleanup(stableSchema);
if (numOfRows > 0) { if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0; numOfRows = 0;
@ -695,7 +713,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
} }
if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) { if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) {
metaTbCursorPrev(pInfo->pCur); metaTbCursorPrev(pInfo->pCur, TSDB_TABLE_MAX);
blockFull = true; blockFull = true;
} else { } else {
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows, sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
@ -1789,6 +1807,11 @@ void destroySysScanOperator(void* param) {
pInfo->pIdx = NULL; pInfo->pIdx = NULL;
} }
if(pInfo->pSchema) {
taosHashCleanup(pInfo->pSchema);
pInfo->pSchema = NULL;
}
taosArrayDestroy(pInfo->matchInfo.pList); taosArrayDestroy(pInfo->matchInfo.pList);
taosMemoryFreeClear(pInfo->pUser); taosMemoryFreeClear(pInfo->pUser);

View File

@ -108,12 +108,18 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tsortClearOrderdSource(SArray* pOrderedSource) { void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) { for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
SSortSource** pSource = taosArrayGet(pOrderedSource, i); SSortSource** pSource = taosArrayGet(pOrderedSource, i);
if (NULL == *pSource) { if (NULL == *pSource) {
continue; continue;
} }
if (fetchUs) {
*fetchUs += (*pSource)->fetchUs;
*fetchNum += (*pSource)->fetchNum;
}
// release pageIdList // release pageIdList
if ((*pSource)->pageIdList) { if ((*pSource)->pageIdList) {
taosArrayDestroy((*pSource)->pageIdList); taosArrayDestroy((*pSource)->pageIdList);
@ -147,7 +153,10 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
taosMemoryFreeClear(pSortHandle->idStr); taosMemoryFreeClear(pSortHandle->idStr);
blockDataDestroy(pSortHandle->pDataBlock); blockDataDestroy(pSortHandle->pDataBlock);
tsortClearOrderdSource(pSortHandle->pOrderedSource); int64_t fetchUs = 0, fetchNum = 0;
tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
qError("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
taosArrayDestroy(pSortHandle->pOrderedSource); taosArrayDestroy(pSortHandle->pOrderedSource);
taosMemoryFreeClear(pSortHandle); taosMemoryFreeClear(pSortHandle);
} }
@ -307,7 +316,7 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32
} }
int64_t et = taosGetTimestampUs(); int64_t et = taosGetTimestampUs();
qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr); qError("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
} }
return code; return code;
@ -365,7 +374,10 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
} }
} else { } else {
int64_t st = taosGetTimestampUs();
pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param); pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
pSource->fetchUs += taosGetTimestampUs() - st;
pSource->fetchNum++;
if (pSource->src.pBlock == NULL) { if (pSource->src.pBlock == NULL) {
(*numOfCompleted) += 1; (*numOfCompleted) += 1;
pSource->src.rowIndex = -1; pSource->src.rowIndex = -1;
@ -602,7 +614,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
} }
} }
tsortClearOrderdSource(pHandle->pOrderedSource); tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
taosArrayAddAll(pHandle->pOrderedSource, pResList); taosArrayAddAll(pHandle->pOrderedSource, pResList);
taosArrayDestroy(pResList); taosArrayDestroy(pResList);
@ -644,7 +656,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
SSortSource* source = *pSource; SSortSource* source = *pSource;
*pSource = NULL; *pSource = NULL;
tsortClearOrderdSource(pHandle->pOrderedSource); tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
while (1) { while (1) {
SSDataBlock* pBlock = pHandle->fetchfp(source->param); SSDataBlock* pBlock = pHandle->fetchfp(source->param);

View File

@ -70,7 +70,7 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key}; SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
col_id_t t = lastColIdx + 1; col_id_t t = lastColIdx + 1;
col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema)); col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema));
uDebug("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols); uTrace("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols);
if (index < 0 && t > 0) { if (index < 0 && t > 0) {
index = insFindCol(&sToken, 0, t, pSchema); index = insFindCol(&sToken, 0, t, pSchema);
} }
@ -345,7 +345,7 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
} }
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) { if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
if (errno == E2BIG) { if (errno == E2BIG) {
uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d", (int)kv->length, pColSchema->bytes); uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length, pColSchema->bytes, kv->value);
buildInvalidOperationMsg(&pBuf, "value too long"); buildInvalidOperationMsg(&pBuf, "value too long");
ret = TSDB_CODE_PAR_VALUE_TOO_LONG; ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
goto end; goto end;

View File

@ -404,6 +404,7 @@ static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHa
int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES); int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
taosArrayPush(pVgroupList, &pVgCxt); taosArrayPush(pVgroupList, &pVgCxt);
// uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
*pOutput = pVgCxt; *pOutput = pVgCxt;
} else { } else {
insDestroyVgroupDataCxt(pVgCxt); insDestroyVgroupDataCxt(pVgCxt);
@ -546,6 +547,7 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData); dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
// uError("td23101 3vgId:%d, numEps:%d", src->vgId, dst->vg.epSet.numOfEps);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size); code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);

View File

@ -21,7 +21,7 @@
#include "tjson.h" #include "tjson.h"
#include "tglobal.h" #include "tglobal.h"
#define LOG_MAX_LINE_SIZE (1024) #define LOG_MAX_LINE_SIZE (10024)
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3) #define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
#define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024) #define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024)
#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3) #define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3)

View File

@ -853,6 +853,7 @@
,,y,script,./test.sh -f tsim/parser/topbot.sim ,,y,script,./test.sh -f tsim/parser/topbot.sim
,,y,script,./test.sh -f tsim/parser/union_sysinfo.sim ,,y,script,./test.sh -f tsim/parser/union_sysinfo.sim
,,y,script,./test.sh -f tsim/parser/slimit_limit.sim ,,y,script,./test.sh -f tsim/parser/slimit_limit.sim
,,y,script,./test.sh -f tsim/parser/table_merge_limit.sim
,,y,script,./test.sh -f tsim/query/tagLikeFilter.sim ,,y,script,./test.sh -f tsim/query/tagLikeFilter.sim
,,y,script,./test.sh -f tsim/query/charScalarFunction.sim ,,y,script,./test.sh -f tsim/query/charScalarFunction.sim
,,y,script,./test.sh -f tsim/query/explain.sim ,,y,script,./test.sh -f tsim/query/explain.sim

View File

@ -68,8 +68,8 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \ -v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \ -v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \ -v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
-v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1"
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true;make -j || exit 1" # -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
if [[ -d ${WORKDIR}/debugNoSan ]] ;then if [[ -d ${WORKDIR}/debugNoSan ]] ;then
echo "delete ${WORKDIR}/debugNoSan" echo "delete ${WORKDIR}/debugNoSan"
@ -97,7 +97,7 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \ -v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \ -v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
-v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true;make -j || exit 1 " --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1 "
mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan

View File

@ -739,6 +739,11 @@ class TDCom:
else: else:
os.system("unset LD_PRELOAD; pkill %s " % processorName) os.system("unset LD_PRELOAD; pkill %s " % processorName)
def gen_tag_col_str(self, gen_type, data_type, count):
"""
gen multi tags or cols by gen_type
"""
return ','.join(map(lambda i: f'{gen_type}{i} {data_type}', range(count)))
def is_json(msg): def is_json(msg):
if isinstance(msg, str): if isinstance(msg, str):
@ -775,4 +780,5 @@ def dict2toml(in_dict: dict, file:str):
with open(file, 'w') as f: with open(file, 'w') as f:
toml.dump(in_dict, f) toml.dump(in_dict, f)
tdCom = TDCom() tdCom = TDCom()

View File

@ -0,0 +1,47 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
$dbPrefix = m_fl_db
$tbPrefix = m_fl_tb
$mtPrefix = m_fl_mt
$tbNum = 2
$rowNum = 513
$totalNum = $tbNum * $rowNum
$ts0 = 1537146000000
$delta = 600000
print ========== fill.sim
$i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db vgroups 1
sql use $db
sql create table $mt (ts timestamp, c1 int) tags(tgcol int)
$i = 0
$ts = $ts0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$xs = $x * $delta
$ts = $ts0 + $xs
sql insert into $tb values ( $ts , $x )
$x = $x + 1
endw
$i = $i + 1
endw
sql select * from $mt order by ts limit 10
if $rows != 10 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -93,7 +93,6 @@ class TDTestCase:
tdSql.checkEqual(i[2],len(self.perf_list)) tdSql.checkEqual(i[2],len(self.perf_list))
tdSql.execute('create table db1.ntb (ts timestamp,c0 int)') tdSql.execute('create table db1.ntb (ts timestamp,c0 int)')
tdSql.query(f'select db_name, count(*) from information_schema.ins_tables group by db_name') tdSql.query(f'select db_name, count(*) from information_schema.ins_tables group by db_name')
print(tdSql.queryResult)
for i in tdSql.queryResult: for i in tdSql.queryResult:
if i[0].lower() == 'information_schema': if i[0].lower() == 'information_schema':
tdSql.checkEqual(i[1],len(self.ins_list)) tdSql.checkEqual(i[1],len(self.ins_list))
@ -101,9 +100,77 @@ class TDTestCase:
tdSql.checkEqual(i[1],len(self.perf_list)) tdSql.checkEqual(i[1],len(self.perf_list))
elif i[0].lower() == self.dbname: elif i[0].lower() == self.dbname:
tdSql.checkEqual(i[1],self.tbnum+1) tdSql.checkEqual(i[1],self.tbnum+1)
def ins_col_check_4096(self):
tdSql.execute('create database db3 vgroups 2 replica 1')
col_str = tdCom.gen_tag_col_str("col", "int",4094)
tdSql.execute(f'create stable if not exists db3.stb (col_ts timestamp, {col_str}) tags (t1 int)')
for i in range(100):
tdLog.info(f"create table db3.ctb{i} using db3.stb tags({i})")
tdSql.execute(f"create table db3.ctb{i} using db3.stb tags({i})")
col_value_str = '1, ' * 4093 + '1'
tdSql.execute(f"insert into db3.ctb{i} values(now,{col_value_str})(now+1s,{col_value_str})(now+2s,{col_value_str})(now+3s,{col_value_str})")
tdSql.query("select * from information_schema.ins_columns")
tdSql.execute('drop database db3')
def ins_stable_check(self):
tdSql.execute('create database db3 vgroups 2 replica 1')
tbnum = 10
ctbnum = 10
for i in range(tbnum):
tdSql.execute(f'create stable db3.stb_{i} (ts timestamp,c0 int) tags(t0 int)')
tdSql.execute(f'create table db3.ntb_{i} (ts timestamp,c0 int)')
for j in range(ctbnum):
tdSql.execute(f"create table db3.ctb_{i}_{j} using db3.stb_{i} tags({j})")
tdSql.query("select stable_name,count(table_name) from information_schema.ins_tables where db_name = 'db3' group by stable_name order by stable_name")
result = tdSql.queryResult
for i in range(len(result)):
if result[i][0] == None:
tdSql.checkEqual(result[0][1],tbnum)
else:
tdSql.checkEqual(result[i][0],f'stb_{i-1}')
tdSql.checkEqual(result[i][1],ctbnum)
def ins_columns_check(self):
tdSql.execute('drop database if exists db2')
tdSql.execute('create database if not exists db2 vgroups 1 replica 1')
for i in range (5):
self.stb4096 = 'create table db2.stb%d (ts timestamp' % (i)
for j in range (4094 - i):
self.stb4096 += ', c%d int' % (j)
self.stb4096 += ') tags (t1 int)'
tdSql.execute(self.stb4096)
for k in range(10):
tdSql.execute("create table db2.ctb_%d_%dc using db2.stb%d tags(%d)" %(i,k,i,k))
for t in range (2):
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="SUPER_TABLE"')
tdSql.checkEqual(20465,len(tdSql.queryResult))
for t in range (2):
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="CHILD_TABLE"')
tdSql.checkEqual(204650,len(tdSql.queryResult))
for i in range (5):
self.ntb4096 = 'create table db2.ntb%d (ts timestamp' % (i)
for j in range (4095 - i):
self.ntb4096 += ', c%d binary(10)' % (j)
self.ntb4096 += ')'
tdSql.execute(self.ntb4096)
for t in range (2):
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="NORMAL_TABLE"')
tdSql.checkEqual(20470,len(tdSql.queryResult))
def run(self): def run(self):
self.prepare_data() self.prepare_data()
self.count_check() self.count_check()
self.ins_columns_check()
# self.ins_col_check_4096()
self.ins_stable_check()
def stop(self): def stop(self):
tdSql.close() tdSql.close()

View File

@ -214,13 +214,13 @@ class TDTestCase:
sql=f"SELECT model,count(state_changed) FROM (SELECT _rowts,model,diff(broken_down) AS state_changed FROM (SELECT ts,model,tb,cast(cast(floor(2*(nzs)) as bool) as int) AS broken_down FROM (SELECT _wstart as ts,model,tbname as tb, sum(cast(cast(status as bool) as int))/count(cast(cast(status as bool) as int)) AS nzs FROM {dbname}.diagnostics WHERE ts >= 1451606400000 AND ts < 1451952001000 partition BY tbname,model interval(10m))order by ts) partition BY tb,model ) WHERE state_changed = 1 partition BY model;" sql=f"SELECT model,count(state_changed) FROM (SELECT _rowts,model,diff(broken_down) AS state_changed FROM (SELECT ts,model,tb,cast(cast(floor(2*(nzs)) as bool) as int) AS broken_down FROM (SELECT _wstart as ts,model,tbname as tb, sum(cast(cast(status as bool) as int))/count(cast(cast(status as bool) as int)) AS nzs FROM {dbname}.diagnostics WHERE ts >= 1451606400000 AND ts < 1451952001000 partition BY tbname,model interval(10m))order by ts) partition BY tb,model ) WHERE state_changed = 1 partition BY model;"
tdSql.query(f"{sql}") tdSql.query(f"{sql}")
tdSql.checkRows(46) tdSql.checkRows(46)
# for i in range(2): for i in range(2):
# tdSql.query("%s"%sql) tdSql.query("%s"%sql)
# quertR1=tdSql.queryResult quertR1=tdSql.queryResult
# for j in range(50): for j in range(50):
# tdSql.query("%s"%sql) tdSql.query("%s"%sql)
# quertR2=tdSql.queryResult quertR2=tdSql.queryResult
# assert quertR1 == quertR2 , "%s != %s ,The results of multiple queries are different" %(quertR1,quertR2) assert quertR1 == quertR2 , "%s != %s ,The results of multiple queries are different" %(quertR1,quertR2)
#it's already supported: #it's already supported:

View File

@ -245,7 +245,7 @@ class TDTestCase:
for i in range(expectRows): for i in range(expectRows):
totalConsumeRows += resultList[i] totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt/4: if totalConsumeRows < expectrowcnt/4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4)) tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
@ -267,7 +267,7 @@ class TDTestCase:
for i in range(expectRows): for i in range(expectRows):
totalConsumeRows += resultList[i] totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt: if totalConsumeRows < expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")

View File

@ -688,6 +688,7 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
} }
static int32_t g_once_commit_flag = 0; static int32_t g_once_commit_flag = 0;
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);