Merge branch 'main' into test/TS-2908
This commit is contained in:
commit
c35fc05d66
|
@ -0,0 +1 @@
|
||||||
|
#include<taos.h>
|
|
@ -147,9 +147,9 @@ typedef struct {
|
||||||
} SMonStbInfo;
|
} SMonStbInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t expire_time;
|
uint32_t expire_time;
|
||||||
int64_t timeseries_used;
|
int64_t timeseries_used;
|
||||||
int64_t timeseries_total;
|
int64_t timeseries_total;
|
||||||
} SMonGrantInfo;
|
} SMonGrantInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
|
||||||
|
<plist version="1.0">
|
||||||
|
<dict>
|
||||||
|
<key>Label</key>
|
||||||
|
<string>com.tdengine.taoskeeper</string>
|
||||||
|
<key>ProgramArguments</key>
|
||||||
|
<array>
|
||||||
|
<string>/usr/local/bin/taoskeeper</string>
|
||||||
|
</array>
|
||||||
|
<key>ProcessType</key>
|
||||||
|
<string>Interactive</string>
|
||||||
|
<key>Disabled</key>
|
||||||
|
<false/>
|
||||||
|
<key>RunAtLoad</key>
|
||||||
|
<false/>
|
||||||
|
<key>LaunchOnlyOnce</key>
|
||||||
|
<false/>
|
||||||
|
<key>SessionCreate</key>
|
||||||
|
<true/>
|
||||||
|
<key>ExitTimeOut</key>
|
||||||
|
<integer>600</integer>
|
||||||
|
<key>KeepAlive</key>
|
||||||
|
<dict>
|
||||||
|
<key>SuccessfulExit</key>
|
||||||
|
<false/>
|
||||||
|
<key>AfterInitialDemand</key>
|
||||||
|
<true/>
|
||||||
|
</dict>
|
||||||
|
<key>Program</key>
|
||||||
|
<string>/usr/local/bin/taoskeeper</string>
|
||||||
|
</dict>
|
||||||
|
</plist>
|
|
@ -53,7 +53,7 @@ if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then
|
||||||
cd ${top_dir}/tools/taos-tools/packaging/deb
|
cd ${top_dir}/tools/taos-tools/packaging/deb
|
||||||
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
|
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
|
||||||
|
|
||||||
taostools_ver=$(git tag |grep -v taos | sort | tail -1)
|
taostools_ver=$(git for-each-ref --sort=taggerdate --format '%(tag)' refs/tags|grep -v taos | tail -1)
|
||||||
taostools_install_dir="${release_dir}/${clientName2}Tools-${taostools_ver}"
|
taostools_install_dir="${release_dir}/${clientName2}Tools-${taostools_ver}"
|
||||||
|
|
||||||
cd ${curr_dir}
|
cd ${curr_dir}
|
||||||
|
|
|
@ -582,6 +582,11 @@ function install_service_on_launchctl() {
|
||||||
${csudo}cp ${install_main_dir}/service/com.taosdata.taosadapter.plist /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
|
${csudo}cp ${install_main_dir}/service/com.taosdata.taosadapter.plist /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
|
||||||
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
|
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
|
||||||
fi
|
fi
|
||||||
|
if [ -f ${install_main_dir}/service/com.taosdata.taoskeeper.plist ]; then
|
||||||
|
${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taoskeeper.plist > /dev/null 2>&1 || :
|
||||||
|
${csudo}cp ${install_main_dir}/service/com.taosdata.taoskeeper.plist /Library/LaunchDaemons/com.taosdata.taoskeeper.plist || :
|
||||||
|
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taoskeeper.plist || :
|
||||||
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
function install_taosadapter_service() {
|
function install_taosadapter_service() {
|
||||||
|
|
|
@ -70,7 +70,7 @@ extern "C" {
|
||||||
#define VALUE_LEN 6
|
#define VALUE_LEN 6
|
||||||
|
|
||||||
#define OTD_JSON_FIELDS_NUM 4
|
#define OTD_JSON_FIELDS_NUM 4
|
||||||
#define MAX_RETRY_TIMES 5
|
#define MAX_RETRY_TIMES 100
|
||||||
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
|
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
|
|
@ -543,7 +543,7 @@ void taos_init_imp(void) {
|
||||||
|
|
||||||
if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
|
if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
|
||||||
// ignore create log failed, only print
|
// ignore create log failed, only print
|
||||||
printf(" WARING: Create taoslog failed. configDir=%s\n", configDir);
|
printf(" WARING: Create taoslog failed:%s. configDir=%s\n", strerror(errno), configDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) {
|
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) {
|
||||||
|
|
|
@ -750,6 +750,7 @@ end:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas start, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema);
|
||||||
if (info->dataFormat && !info->needModifySchema) {
|
if (info->dataFormat && !info->needModifySchema) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -779,6 +780,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||||
|
|
||||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
|
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas create table:%s", info->id, pName.tname);
|
||||||
SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
|
SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
|
||||||
SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
|
SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
|
||||||
code = smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
|
code = smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
|
||||||
|
@ -818,6 +820,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
if (action != SCHEMA_ACTION_NULL) {
|
if (action != SCHEMA_ACTION_NULL) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table tag, table:%s, action:%d", info->id, pName.tname, action);
|
||||||
SArray *pColumns =
|
SArray *pColumns =
|
||||||
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
|
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
|
||||||
SArray *pTags =
|
SArray *pTags =
|
||||||
|
@ -869,6 +872,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
if (action != SCHEMA_ACTION_NULL) {
|
if (action != SCHEMA_ACTION_NULL) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table col, table:%s, action:%d", info->id, pName.tname, action);
|
||||||
SArray *pColumns =
|
SArray *pColumns =
|
||||||
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
|
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
|
||||||
SArray *pTags =
|
SArray *pTags =
|
||||||
|
@ -935,15 +939,19 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||||
}
|
}
|
||||||
|
|
||||||
sTableData->tableMeta = pTableMeta;
|
sTableData->tableMeta = pTableMeta;
|
||||||
|
uDebug("SML:0x%" PRIx64 "modify schema uid:%" PRIu64 ", sversion:%d, tversion:%d", info->id, pTableMeta->uid, pTableMeta->sversion, pTableMeta->tversion)
|
||||||
tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
|
tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
|
||||||
}
|
}
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas end success, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
end:
|
end:
|
||||||
taosHashCleanup(hashTmp);
|
taosHashCleanup(hashTmp);
|
||||||
taosMemoryFreeClear(pTableMeta);
|
taosMemoryFreeClear(pTableMeta);
|
||||||
// catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
|
catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
|
||||||
|
uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code, tstrerror(code), info->dataFormat, info->needModifySchema);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1019,8 +1027,9 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
|
||||||
} else {
|
} else {
|
||||||
size_t tmp = taosArrayGetSize(metaArray);
|
size_t tmp = taosArrayGetSize(metaArray);
|
||||||
if (tmp > INT16_MAX) {
|
if (tmp > INT16_MAX) {
|
||||||
|
smlBuildInvalidDataMsg(msg, "too many cols or tags", kv->key);
|
||||||
uError("too many cols or tags");
|
uError("too many cols or tags");
|
||||||
return -1;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
}
|
}
|
||||||
int16_t size = tmp;
|
int16_t size = tmp;
|
||||||
int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
|
int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
|
||||||
|
@ -1170,6 +1179,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlParseLineBottom(SSmlHandle *info) {
|
static int32_t smlParseLineBottom(SSmlHandle *info) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLineBottom start, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
|
||||||
if (info->dataFormat) return TSDB_CODE_SUCCESS;
|
if (info->dataFormat) return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
for (int32_t i = 0; i < info->lineNum; i++) {
|
for (int32_t i = 0; i < info->lineNum; i++) {
|
||||||
|
@ -1212,6 +1222,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
|
||||||
SSmlSTableMeta **tableMeta =
|
SSmlSTableMeta **tableMeta =
|
||||||
(SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
|
(SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
|
||||||
if (tableMeta) { // update meta
|
if (tableMeta) { // update meta
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLineBottom update meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
|
||||||
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf);
|
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf);
|
||||||
if (ret == TSDB_CODE_SUCCESS) {
|
if (ret == TSDB_CODE_SUCCESS) {
|
||||||
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf);
|
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf);
|
||||||
|
@ -1226,7 +1237,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
|
||||||
// uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
|
// uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
|
||||||
// return ret;
|
// return ret;
|
||||||
// }
|
// }
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
|
||||||
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
|
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
|
||||||
smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
|
smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
|
||||||
if(terrno == TSDB_CODE_DUP_KEY){return terrno;}
|
if(terrno == TSDB_CODE_DUP_KEY){return terrno;}
|
||||||
|
@ -1234,12 +1245,14 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
|
||||||
taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
|
taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlInsertData(SSmlHandle *info) {
|
static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlInsertData start, format:%d", info->id, info->dataFormat);
|
||||||
|
|
||||||
if(info->pRequest->dbList == NULL){
|
if(info->pRequest->dbList == NULL){
|
||||||
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
|
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
|
||||||
|
@ -1284,6 +1297,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
// use tablemeta of stable to save vgid and uid of child table
|
// use tablemeta of stable to save vgid and uid of child table
|
||||||
(*pMeta)->tableMeta->vgId = vg.vgId;
|
(*pMeta)->tableMeta->vgId = vg.vgId;
|
||||||
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
|
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlInsertData table:%s, uid:%" PRIu64 ", format:%d", info->id, pName.tname, tableData->uid, info->dataFormat);
|
||||||
|
|
||||||
code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
|
code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
|
||||||
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
|
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
|
||||||
|
@ -1306,16 +1320,18 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
||||||
|
|
||||||
launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
|
launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlInsertData end, format:%d, code:%d,%s", info->id, info->dataFormat, info->pRequest->code, tstrerror(info->pRequest->code));
|
||||||
|
|
||||||
return info->pRequest->code;
|
return info->pRequest->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void smlPrintStatisticInfo(SSmlHandle *info) {
|
static void smlPrintStatisticInfo(SSmlHandle *info) {
|
||||||
uDebug(
|
uDebug(
|
||||||
"SML:0x%" PRIx64
|
"SML:0x%" PRIx64
|
||||||
" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
|
" smlInsertLines result, code:%d, msg:%s, lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
|
||||||
parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
|
parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
|
||||||
"",
|
"",
|
||||||
info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
|
info->id, info->cost.code, tstrerror(info->cost.code), info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
|
||||||
info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
|
info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
|
||||||
info->cost.schemaTime - info->cost.parseTime, info->cost.insertBindTime - info->cost.schemaTime,
|
info->cost.schemaTime - info->cost.parseTime, info->cost.insertBindTime - info->cost.schemaTime,
|
||||||
info->cost.insertRpcTime - info->cost.insertBindTime, info->cost.endTime - info->cost.insertRpcTime,
|
info->cost.insertRpcTime - info->cost.insertBindTime, info->cost.endTime - info->cost.insertRpcTime,
|
||||||
|
@ -1360,6 +1376,7 @@ int32_t smlClearForRerun(SSmlHandle *info) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
|
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLine start", info->id);
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
||||||
if (lines) {
|
if (lines) {
|
||||||
|
@ -1395,8 +1412,16 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len,
|
char cTmp = 0; // for print tmp if is raw
|
||||||
(info->isRawLine ? "rawdata" : tmp));
|
if(info->isRawLine){
|
||||||
|
cTmp = tmp[len - 1];
|
||||||
|
tmp[len - 1] = '\0';
|
||||||
|
}
|
||||||
|
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, numLines:%d, protocol:%d, len:%d, sql:%s", info->id, info->isRawLine, numLines, info->protocol, len, tmp);
|
||||||
|
if(info->isRawLine){
|
||||||
|
tmp[len - 1] = cTmp;
|
||||||
|
}
|
||||||
|
|
||||||
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
|
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
|
||||||
if (info->dataFormat) {
|
if (info->dataFormat) {
|
||||||
|
@ -1421,6 +1446,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
if (info->reRun) {
|
if (info->reRun) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLine re run", info->id);
|
||||||
i = 0;
|
i = 0;
|
||||||
rawLine = oldRaw;
|
rawLine = oldRaw;
|
||||||
code = smlClearForRerun(info);
|
code = smlClearForRerun(info);
|
||||||
|
@ -1431,6 +1457,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
|
||||||
}
|
}
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseLine end", info->id);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1461,7 +1488,8 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
|
||||||
do {
|
do {
|
||||||
code = smlModifyDBSchemas(info);
|
code = smlModifyDBSchemas(info);
|
||||||
if (code == 0) break;
|
if (code == 0) break;
|
||||||
taosMsleep(200);
|
taosMsleep(500);
|
||||||
|
uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
|
||||||
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
|
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -1488,6 +1516,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
|
||||||
}
|
}
|
||||||
SRequestObj *request = NULL;
|
SRequestObj *request = NULL;
|
||||||
SSmlHandle *info = NULL;
|
SSmlHandle *info = NULL;
|
||||||
|
int cnt = 0;
|
||||||
while(1){
|
while(1){
|
||||||
request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
|
request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
|
||||||
if (request == NULL) {
|
if (request == NULL) {
|
||||||
|
@ -1542,16 +1571,22 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
|
||||||
request->code = code;
|
request->code = code;
|
||||||
info->cost.endTime = taosGetTimestampUs();
|
info->cost.endTime = taosGetTimestampUs();
|
||||||
info->cost.code = code;
|
info->cost.code = code;
|
||||||
smlPrintStatisticInfo(info);
|
if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING
|
||||||
if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING){
|
|| code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT){
|
||||||
|
if(cnt++ >= 10){
|
||||||
|
uInfo("SML:%"PRIx64" retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
taosMsleep(100);
|
||||||
refreshMeta(request->pTscObj, request);
|
refreshMeta(request->pTscObj, request);
|
||||||
uInfo("SML:%"PRIx64" ver is old retry or object is creating code:%d", info->id, code);
|
uInfo("SML:%"PRIx64" retry:%d/10,ver is old retry or object is creating code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
|
||||||
smlDestroyInfo(info);
|
smlDestroyInfo(info);
|
||||||
info = NULL;
|
info = NULL;
|
||||||
taos_free_result(request);
|
taos_free_result(request);
|
||||||
request = NULL;
|
request = NULL;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
smlPrintStatisticInfo(info);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -583,12 +583,14 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
|
||||||
.i = ts,
|
.i = ts,
|
||||||
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
|
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
|
||||||
if (info->dataFormat) {
|
if (info->dataFormat) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format true, ts:%" PRId64, info->id, ts);
|
||||||
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
|
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
|
||||||
if(ret != TSDB_CODE_SUCCESS){return ret;}
|
if(ret != TSDB_CODE_SUCCESS){return ret;}
|
||||||
ret = smlBuildRow(info->currTableDataCtx);
|
ret = smlBuildRow(info->currTableDataCtx);
|
||||||
if(ret != TSDB_CODE_SUCCESS){return ret;}
|
if(ret != TSDB_CODE_SUCCESS){return ret;}
|
||||||
clearColValArray(info->currTableDataCtx->pValues);
|
clearColValArray(info->currTableDataCtx->pValues);
|
||||||
} else {
|
} else {
|
||||||
|
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format false, ts:%" PRId64, info->id, ts);
|
||||||
taosArraySet(elements->colArray, 0, &kv);
|
taosArraySet(elements->colArray, 0, &kv);
|
||||||
}
|
}
|
||||||
info->preLine = *elements;
|
info->preLine = *elements;
|
||||||
|
|
|
@ -24,7 +24,7 @@
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
|
||||||
#define EMPTY_BLOCK_POLL_IDLE_DURATION 100
|
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
|
||||||
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
|
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
|
||||||
|
|
||||||
struct SMqMgmt {
|
struct SMqMgmt {
|
||||||
|
@ -148,7 +148,8 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t tmqRspType;
|
int8_t tmqRspType;
|
||||||
int32_t epoch;
|
int32_t epoch; // epoch can be used to guard the vgHandle
|
||||||
|
int32_t vgId;
|
||||||
SMqClientVg* vgHandle;
|
SMqClientVg* vgHandle;
|
||||||
SMqClientTopic* topicHandle;
|
SMqClientTopic* topicHandle;
|
||||||
uint64_t reqId;
|
uint64_t reqId;
|
||||||
|
@ -1329,6 +1330,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
pRspWrapper->topicHandle = pTopic;
|
pRspWrapper->topicHandle = pTopic;
|
||||||
pRspWrapper->reqId = requestId;
|
pRspWrapper->reqId = requestId;
|
||||||
pRspWrapper->pEpset = pMsg->pEpSet;
|
pRspWrapper->pEpset = pMsg->pEpSet;
|
||||||
|
pRspWrapper->vgId = pVg->vgId;
|
||||||
|
|
||||||
pMsg->pEpSet = NULL;
|
pMsg->pEpSet = NULL;
|
||||||
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
|
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
|
@ -1747,7 +1749,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
for (int j = 0; j < numOfVg; j++) {
|
for (int j = 0; j < numOfVg; j++) {
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms
|
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms
|
||||||
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch,
|
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch,
|
||||||
pVg->vgId);
|
pVg->vgId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1864,9 +1866,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
return pRsp;
|
return pRsp;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
tmq->consumerId, pVg->vgId, pDataRsp->head.epoch, consumerEpoch);
|
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
|
||||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
|
@ -1886,7 +1887,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
return pRsp;
|
return pRsp;
|
||||||
} else {
|
} else {
|
||||||
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
||||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
|
@ -1933,7 +1934,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
||||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
|
@ -1955,7 +1956,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
void* rspObj;
|
void* rspObj;
|
||||||
int64_t startTime = taosGetTimestampMs();
|
int64_t startTime = taosGetTimestampMs();
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime);
|
tscDebug("consumer:0x%" PRIx64 " start to poll at %"PRId64", timeout:%" PRId64, tmq->consumerId, startTime, timeout);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
tmqHandleAllDelayedTask(tmq);
|
tmqHandleAllDelayedTask(tmq);
|
||||||
|
@ -2017,37 +2018,43 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void displayConsumeStatistics(const tmq_t* pTmq) {
|
||||||
|
int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
|
||||||
|
tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
|
||||||
|
pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
|
||||||
|
|
||||||
|
tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
|
||||||
|
for (int32_t i = 0; i < numOfTopics; ++i) {
|
||||||
|
SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
|
||||||
|
|
||||||
|
tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
|
||||||
|
int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
|
||||||
|
for (int32_t j = 0; j < numOfVgs; ++j) {
|
||||||
|
SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
|
||||||
|
tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tmq_consumer_close(tmq_t* tmq) {
|
int32_t tmq_consumer_close(tmq_t* tmq) {
|
||||||
tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status);
|
tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status);
|
||||||
|
displayConsumeStatistics(tmq);
|
||||||
|
|
||||||
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
|
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
|
||||||
int32_t rsp = tmq_commit_sync(tmq, NULL);
|
// if auto commit is set, commit before close consumer. Otherwise, do nothing.
|
||||||
if (rsp != 0) {
|
if (tmq->autoCommit) {
|
||||||
return rsp;
|
int32_t rsp = tmq_commit_sync(tmq, NULL);
|
||||||
}
|
if (rsp != 0) {
|
||||||
|
return rsp;
|
||||||
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
|
|
||||||
tmq->consumerId, tmq->pollCnt, tmq->totalRows, numOfTopics, tmq->epoch);
|
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", tmq->consumerId);
|
|
||||||
for (int32_t i = 0; i < numOfTopics; ++i) {
|
|
||||||
SMqClientTopic* pTopics = taosArrayGet(tmq->clientTopics, i);
|
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " topic:%d", tmq->consumerId, i);
|
|
||||||
int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
|
|
||||||
for (int32_t j = 0; j < numOfVgs; ++j) {
|
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
|
|
||||||
tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " rows dist end", tmq->consumerId);
|
|
||||||
|
|
||||||
int32_t retryCnt = 0;
|
int32_t retryCnt = 0;
|
||||||
tmq_list_t* lst = tmq_list_new();
|
tmq_list_t* lst = tmq_list_new();
|
||||||
while (1) {
|
while (1) {
|
||||||
rsp = tmq_subscribe(tmq, lst);
|
int32_t rsp = tmq_subscribe(tmq, lst);
|
||||||
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
|
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -2057,6 +2064,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_list_destroy(lst);
|
tmq_list_destroy(lst);
|
||||||
|
} else {
|
||||||
|
tscWarn("consumer:0x%" PRIx64" not in ready state, close it directly", tmq->consumerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
|
taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
|
||||||
|
|
|
@ -1238,13 +1238,13 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl) != 0) {
|
if (taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl) != 0) {
|
||||||
uError("failed to load cfg since %s", terrstr());
|
printf("failed to load cfg since %s", terrstr());
|
||||||
cfgCleanup(pCfg);
|
cfgCleanup(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cfgLoadFromArray(pCfg, pArgs) != 0) {
|
if (cfgLoadFromArray(pCfg, pArgs) != 0) {
|
||||||
uError("failed to load cfg from array since %s", terrstr());
|
printf("failed to load cfg from array since %s", terrstr());
|
||||||
cfgCleanup(pCfg);
|
cfgCleanup(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1260,13 +1260,13 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
|
||||||
|
|
||||||
if (taosMulModeMkDir(tsLogDir, 0777) != 0) {
|
if (taosMulModeMkDir(tsLogDir, 0777) != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
uError("failed to create dir:%s since %s", tsLogDir, terrstr());
|
printf("failed to create dir:%s since %s", tsLogDir, terrstr());
|
||||||
cfgCleanup(pCfg);
|
cfgCleanup(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosInitLog(logname, logFileNum) != 0) {
|
if (taosInitLog(logname, logFileNum) != 0) {
|
||||||
uError("failed to init log file since %s", terrstr());
|
printf("failed to init log file since %s", terrstr());
|
||||||
cfgCleanup(pCfg);
|
cfgCleanup(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -457,6 +457,7 @@ typedef struct {
|
||||||
void* pIter;
|
void* pIter;
|
||||||
SMnode* pMnode;
|
SMnode* pMnode;
|
||||||
STableMetaRsp* pMeta;
|
STableMetaRsp* pMeta;
|
||||||
|
bool restore;
|
||||||
bool sysDbRsp;
|
bool sysDbRsp;
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
char filterTb[TSDB_TABLE_NAME_LEN];
|
char filterTb[TSDB_TABLE_NAME_LEN];
|
||||||
|
|
|
@ -864,7 +864,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
||||||
}
|
}
|
||||||
|
|
||||||
// grant info
|
// grant info
|
||||||
pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
|
pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 1000;
|
||||||
pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
|
pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
|
||||||
if (pMnode->grant.expireTimeMS == 0) {
|
if (pMnode->grant.expireTimeMS == 0) {
|
||||||
pGrantInfo->expire_time = INT32_MAX;
|
pGrantInfo->expire_time = INT32_MAX;
|
||||||
|
|
|
@ -324,7 +324,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
|
||||||
pReq->info.rsp = pRsp;
|
pReq->info.rsp = pRsp;
|
||||||
pReq->info.rspLen = size;
|
pReq->info.rspLen = size;
|
||||||
|
|
||||||
if (rowsRead == 0 || rowsRead < rowsToRead) {
|
if (rowsRead == 0 || ((rowsRead < rowsToRead) && !pShow->restore)) {
|
||||||
pRsp->completed = 1;
|
pRsp->completed = 1;
|
||||||
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
|
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
|
||||||
mndReleaseShowObj(pShow, true);
|
mndReleaseShowObj(pShow, true);
|
||||||
|
|
|
@ -3113,9 +3113,18 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
|
|
||||||
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_TO_VARSTR(typeName, "SUPER_TABLE");
|
STR_TO_VARSTR(typeName, "SUPER_TABLE");
|
||||||
|
bool fetch = pShow->restore ? false : true;
|
||||||
|
pShow->restore = false;
|
||||||
while (numOfRows < rows) {
|
while (numOfRows < rows) {
|
||||||
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
|
if (fetch) {
|
||||||
if (pShow->pIter == NULL) break;
|
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
|
||||||
|
if (pShow->pIter == NULL) break;
|
||||||
|
} else {
|
||||||
|
fetch = true;
|
||||||
|
void *pKey = taosHashGetKey(pShow->pIter, NULL);
|
||||||
|
pStb = sdbAcquire(pSdb, SDB_STB, pKey);
|
||||||
|
if (!pStb) continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (pDb != NULL && pStb->dbUid != pDb->uid) {
|
if (pDb != NULL && pStb->dbUid != pDb->uid) {
|
||||||
sdbRelease(pSdb, pStb);
|
sdbRelease(pSdb, pStb);
|
||||||
|
@ -3129,6 +3138,17 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
sdbRelease(pSdb, pStb);
|
sdbRelease(pSdb, pStb);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((numOfRows + pStb->numOfColumns) > rows) {
|
||||||
|
pShow->restore = true;
|
||||||
|
if (numOfRows == 0) {
|
||||||
|
mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
|
||||||
|
rows, pStb->numOfColumns, pStb->name, pStb->db);
|
||||||
|
}
|
||||||
|
sdbRelease(pSdb, pStb);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
|
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
|
||||||
|
|
||||||
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
|
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
|
||||||
|
|
|
@ -157,7 +157,7 @@ typedef struct SMTbCursor SMTbCursor;
|
||||||
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
||||||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||||
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
|
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||||
int32_t metaTbCursorPrev(SMTbCursor *pTbCur);
|
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -141,7 +141,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
||||||
// tqRead
|
// tqRead
|
||||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||||
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
|
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
|
||||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
|
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId);
|
||||||
|
|
||||||
// tqExec
|
// tqExec
|
||||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);
|
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);
|
||||||
|
|
|
@ -336,7 +336,7 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaTbCursorPrev(SMTbCursor *pTbCur) {
|
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) {
|
||||||
int ret;
|
int ret;
|
||||||
void *pBuf;
|
void *pBuf;
|
||||||
STbCfg tbCfg;
|
STbCfg tbCfg;
|
||||||
|
@ -350,7 +350,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur) {
|
||||||
tDecoderClear(&pTbCur->mr.coder);
|
tDecoderClear(&pTbCur->mr.coder);
|
||||||
|
|
||||||
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
|
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
|
||||||
if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
|
if (pTbCur->mr.me.type == jumpTableType) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -309,70 +309,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
//int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
|
|
||||||
//#if 0
|
|
||||||
// A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
|
||||||
// A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
|
||||||
//
|
|
||||||
// if (pRsp->withSchema) {
|
|
||||||
// A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
|
|
||||||
// } else {
|
|
||||||
// A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
|
||||||
// if (pRsp->blockNum > 0) {
|
|
||||||
// A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
|
||||||
// } else {
|
|
||||||
// A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//#endif
|
|
||||||
//
|
|
||||||
// int32_t len = 0;
|
|
||||||
// int32_t code = 0;
|
|
||||||
// tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
|
|
||||||
// if (code < 0) {
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// int32_t tlen = sizeof(SMqRspHead) + len;
|
|
||||||
// void* buf = rpcMallocCont(tlen);
|
|
||||||
// if (buf == NULL) {
|
|
||||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
|
|
||||||
// ((SMqRspHead*)buf)->epoch = pReq->epoch;
|
|
||||||
// ((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
|
||||||
//
|
|
||||||
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
|
||||||
//
|
|
||||||
// SEncoder encoder = {0};
|
|
||||||
// tEncoderInit(&encoder, abuf, len);
|
|
||||||
// tEncodeSTaosxRsp(&encoder, pRsp);
|
|
||||||
// tEncoderClear(&encoder);
|
|
||||||
//
|
|
||||||
// SRpcMsg rsp = {
|
|
||||||
// .info = pMsg->info,
|
|
||||||
// .pCont = buf,
|
|
||||||
// .contLen = tlen,
|
|
||||||
// .code = 0,
|
|
||||||
// };
|
|
||||||
//
|
|
||||||
// tmsgSendRsp(&rsp);
|
|
||||||
//
|
|
||||||
// char buf1[80] = {0};
|
|
||||||
// char buf2[80] = {0};
|
|
||||||
// tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
|
||||||
// tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
|
||||||
//
|
|
||||||
// tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
|
|
||||||
// TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
|
||||||
// return 0;
|
|
||||||
//}
|
|
||||||
|
|
||||||
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
||||||
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
|
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
|
||||||
pLeft->val.version <= pRight->val.version;
|
pLeft->val.version <= pRight->val.version;
|
||||||
|
@ -615,9 +551,9 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
|
|
||||||
// NOTE: this pHandle->consumerId may have been changed already.
|
// NOTE: this pHandle->consumerId may have been changed already.
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
|
||||||
", ts:%" PRId64,
|
", ts:%" PRId64", reqId:0x%"PRIx64,
|
||||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
|
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
|
||||||
dataRsp.rspOffset.ts);
|
dataRsp.rspOffset.ts, pRequest->reqId);
|
||||||
|
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
return code;
|
return code;
|
||||||
|
@ -637,9 +573,9 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
if (metaRsp.metaRspLen > 0) {
|
if (metaRsp.metaRspLen > 0) {
|
||||||
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
|
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
|
||||||
",version:%" PRId64,
|
",ts:%" PRId64,
|
||||||
consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
||||||
metaRsp.rspOffset.version);
|
metaRsp.rspOffset.ts);
|
||||||
taosMemoryFree(metaRsp.metaRsp);
|
taosMemoryFree(metaRsp.metaRsp);
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
return code;
|
return code;
|
||||||
|
@ -680,17 +616,12 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
|
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
// if (terrno == 0) { // failed to seek to given ver, but no errors happen.
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, (SMqDataRsp*) &taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
// return code;
|
taosMemoryFreeClear(pCkHead);
|
||||||
// } else { // error happens, return to consumers
|
return code;
|
||||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
|
||||||
taosMemoryFreeClear(pCkHead);
|
|
||||||
return code;
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SWalCont* pHead = &pCkHead->head;
|
SWalCont* pHead = &pCkHead->head;
|
||||||
|
|
|
@ -183,23 +183,24 @@ end:
|
||||||
return tbSuid == realTbSuid;
|
return tbSuid == realTbSuid;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
||||||
int64_t offset = *fetchOffset;
|
int64_t offset = *fetchOffset;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return, reqId:0x%"PRIx64,
|
||||||
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
|
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
|
||||||
*fetchOffset = offset - 1;
|
*fetchOffset = offset - 1;
|
||||||
code = -1;
|
code = -1;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset,
|
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
|
||||||
TMSG_INFO((*ppCkHead)->head.msgType));
|
pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId);
|
||||||
|
|
||||||
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
|
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
|
||||||
code = walFetchBody(pHandle->pWalReader, ppCkHead);
|
code = walFetchBody(pHandle->pWalReader, ppCkHead);
|
||||||
|
|
|
@ -134,7 +134,6 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id
|
||||||
|
|
||||||
// all queried tables have been dropped already, return immediately.
|
// all queried tables have been dropped already, return immediately.
|
||||||
if (p->pSchema == NULL) {
|
if (p->pSchema == NULL) {
|
||||||
taosMemoryFree(p);
|
|
||||||
tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
|
tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
|
||||||
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||||
}
|
}
|
||||||
|
|
|
@ -922,7 +922,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
||||||
pBlockNum->numOfBlocks += 1;
|
pBlockNum->numOfBlocks += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pScanInfo->pBlockList != NULL) && (taosArrayGetSize(pScanInfo->pBlockList) > 0)) {
|
if (taosArrayGetSize(pScanInfo->pBlockList) > 0) {
|
||||||
numOfQTable += 1;
|
numOfQTable += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4220,12 +4220,8 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
|
||||||
if (pStatus->loadFromFile) {
|
if (pStatus->loadFromFile) {
|
||||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||||
if (pBlockInfo != NULL) {
|
if (pBlockInfo != NULL) {
|
||||||
pBlockScanInfo =
|
pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
|
||||||
*(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
|
||||||
if (pBlockScanInfo == NULL) {
|
if (pBlockScanInfo == NULL) {
|
||||||
code = TSDB_CODE_INVALID_PARA;
|
|
||||||
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
|
|
||||||
taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1194,6 +1194,8 @@ static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SET_SESSION_WIN_KEY_INVALID(pKey);
|
SET_SESSION_WIN_KEY_INVALID(pKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pCur);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,9 +57,11 @@ typedef struct SSysTableScanInfo {
|
||||||
const char* pUser;
|
const char* pUser;
|
||||||
bool sysInfo;
|
bool sysInfo;
|
||||||
bool showRewrite;
|
bool showRewrite;
|
||||||
|
bool restore;
|
||||||
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
|
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
|
||||||
SMTbCursor* pCur; // cursor for iterate the local table meta store.
|
SMTbCursor* pCur; // cursor for iterate the local table meta store.
|
||||||
SSysTableIndex* pIdx; // idx for local table meta
|
SSysTableIndex* pIdx; // idx for local table meta
|
||||||
|
SHashObj* pSchema;
|
||||||
SColMatchInfo matchInfo;
|
SColMatchInfo matchInfo;
|
||||||
SName name;
|
SName name;
|
||||||
SSDataBlock* pRes;
|
SSDataBlock* pRes;
|
||||||
|
@ -514,9 +516,23 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
||||||
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
|
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
|
||||||
}
|
}
|
||||||
|
|
||||||
SHashObj* stableSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
if (pInfo->pSchema == NULL) {
|
||||||
taosHashSetFreeFp(stableSchema, tDeleteSSchemaWrapperForHash);
|
pInfo->pSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0) {
|
taosHashSetFreeFp(pInfo->pSchema, tDeleteSSchemaWrapperForHash);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pInfo->pCur || !pInfo->pSchema) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
qError("sysTableScanUserCols failed since %s", terrstr(terrno));
|
||||||
|
blockDataDestroy(dataBlock);
|
||||||
|
pInfo->loadInfo.totalRows = 0;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t restore = pInfo->restore;
|
||||||
|
pInfo->restore = false;
|
||||||
|
while (restore || ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) {
|
||||||
|
if (restore) restore = false;
|
||||||
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
|
||||||
|
@ -524,33 +540,36 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) {
|
if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) {
|
||||||
qDebug("sysTableScanUserCols cursor get super table");
|
qDebug("sysTableScanUserCols cursor get super table");
|
||||||
void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
|
void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
|
||||||
if (schema == NULL) {
|
if (schema == NULL) {
|
||||||
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
|
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
|
||||||
taosHashPut(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
|
taosHashPut(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
} else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) {
|
} else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) {
|
||||||
qDebug("sysTableScanUserCols cursor get child table");
|
qDebug("sysTableScanUserCols cursor get child table");
|
||||||
STR_TO_VARSTR(typeName, "CHILD_TABLE");
|
STR_TO_VARSTR(typeName, "CHILD_TABLE");
|
||||||
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
|
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
|
||||||
|
|
||||||
int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
|
int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
|
||||||
void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
|
void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
|
||||||
if (schema != NULL) {
|
if (schema != NULL) {
|
||||||
schemaRow = *(SSchemaWrapper**)schema;
|
schemaRow = *(SSchemaWrapper**)schema;
|
||||||
} else {
|
} else {
|
||||||
tDecoderClear(&pInfo->pCur->mr.coder);
|
SMetaReader smrSuperTable = {0};
|
||||||
int code = metaGetTableEntryByUid(&pInfo->pCur->mr, suid);
|
metaReaderInit(&smrSuperTable, pInfo->readHandle.meta, 0);
|
||||||
|
int code = metaGetTableEntryByUid(&smrSuperTable, suid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// terrno has been set by metaGetTableEntryByName, therefore, return directly
|
// terrno has been set by metaGetTableEntryByName, therefore, return directly
|
||||||
qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code);
|
qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code);
|
||||||
|
metaReaderClear(&smrSuperTable);
|
||||||
blockDataDestroy(dataBlock);
|
blockDataDestroy(dataBlock);
|
||||||
pInfo->loadInfo.totalRows = 0;
|
pInfo->loadInfo.totalRows = 0;
|
||||||
taosHashCleanup(stableSchema);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
schemaRow = &pInfo->pCur->mr.me.stbEntry.schemaRow;
|
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&smrSuperTable.me.stbEntry.schemaRow);
|
||||||
|
taosHashPut(pInfo->pSchema, &suid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
|
||||||
|
schemaRow = schemaWrapper;
|
||||||
|
metaReaderClear(&smrSuperTable);
|
||||||
}
|
}
|
||||||
} else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) {
|
} else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) {
|
||||||
qDebug("sysTableScanUserCols cursor get normal table");
|
qDebug("sysTableScanUserCols cursor get normal table");
|
||||||
|
@ -562,20 +581,19 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
|
if ((numOfRows + schemaRow->nCols) > pOperator->resultInfo.capacity) {
|
||||||
|
|
||||||
if (numOfRows >= pOperator->resultInfo.capacity) {
|
|
||||||
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
|
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
|
||||||
numOfRows = 0;
|
numOfRows = 0;
|
||||||
|
pInfo->restore = true;
|
||||||
|
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashCleanup(stableSchema);
|
|
||||||
|
|
||||||
if (numOfRows > 0) {
|
if (numOfRows > 0) {
|
||||||
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
|
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
|
||||||
numOfRows = 0;
|
numOfRows = 0;
|
||||||
|
@ -695,7 +713,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) {
|
if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) {
|
||||||
metaTbCursorPrev(pInfo->pCur);
|
metaTbCursorPrev(pInfo->pCur, TSDB_TABLE_MAX);
|
||||||
blockFull = true;
|
blockFull = true;
|
||||||
} else {
|
} else {
|
||||||
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
|
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
|
||||||
|
@ -1789,6 +1807,11 @@ void destroySysScanOperator(void* param) {
|
||||||
pInfo->pIdx = NULL;
|
pInfo->pIdx = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(pInfo->pSchema) {
|
||||||
|
taosHashCleanup(pInfo->pSchema);
|
||||||
|
pInfo->pSchema = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->matchInfo.pList);
|
taosArrayDestroy(pInfo->matchInfo.pList);
|
||||||
taosMemoryFreeClear(pInfo->pUser);
|
taosMemoryFreeClear(pInfo->pUser);
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,7 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche
|
||||||
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
|
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
|
||||||
col_id_t t = lastColIdx + 1;
|
col_id_t t = lastColIdx + 1;
|
||||||
col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema));
|
col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema));
|
||||||
uDebug("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols);
|
uTrace("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols);
|
||||||
if (index < 0 && t > 0) {
|
if (index < 0 && t > 0) {
|
||||||
index = insFindCol(&sToken, 0, t, pSchema);
|
index = insFindCol(&sToken, 0, t, pSchema);
|
||||||
}
|
}
|
||||||
|
@ -345,7 +345,7 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
|
||||||
}
|
}
|
||||||
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
|
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
|
||||||
if (errno == E2BIG) {
|
if (errno == E2BIG) {
|
||||||
uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d", (int)kv->length, pColSchema->bytes);
|
uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length, pColSchema->bytes, kv->value);
|
||||||
buildInvalidOperationMsg(&pBuf, "value too long");
|
buildInvalidOperationMsg(&pBuf, "value too long");
|
||||||
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
|
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
|
||||||
goto end;
|
goto end;
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
#include "tjson.h"
|
#include "tjson.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
|
||||||
#define LOG_MAX_LINE_SIZE (1024)
|
#define LOG_MAX_LINE_SIZE (10024)
|
||||||
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
|
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
|
||||||
#define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024)
|
#define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024)
|
||||||
#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3)
|
#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3)
|
||||||
|
|
|
@ -100,15 +100,9 @@ class TDTestCase:
|
||||||
tdSql.checkEqual(i[1],len(self.perf_list))
|
tdSql.checkEqual(i[1],len(self.perf_list))
|
||||||
elif i[0].lower() == self.dbname:
|
elif i[0].lower() == self.dbname:
|
||||||
tdSql.checkEqual(i[1],self.tbnum+1)
|
tdSql.checkEqual(i[1],self.tbnum+1)
|
||||||
def ins_columns_check(self):
|
|
||||||
tdSql.execute('create database db2 vgroups 2 replica 1')
|
|
||||||
tdSql.execute('create table db2.stb2 (ts timestamp,c0 int,c1 int, c2 double, c3 float, c4 binary(1000), c5 nchar(100),c7 bigint, c8 bool, c9 smallint) tags(t0 int)')
|
|
||||||
for i in range(2000):
|
|
||||||
tdSql.execute("create table db2.ctb%d using db2.stb2 tags(%d)" %(i,i))
|
|
||||||
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type="CHILD_TABLE"')
|
|
||||||
tdSql.checkEqual(20000,len(tdSql.queryResult))
|
|
||||||
print("number of ins_columns of child table in db2 is %s" % len(tdSql.queryResult))
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def ins_col_check_4096(self):
|
def ins_col_check_4096(self):
|
||||||
tdSql.execute('create database db3 vgroups 2 replica 1')
|
tdSql.execute('create database db3 vgroups 2 replica 1')
|
||||||
col_str = tdCom.gen_tag_col_str("col", "int",4094)
|
col_str = tdCom.gen_tag_col_str("col", "int",4094)
|
||||||
|
@ -141,6 +135,35 @@ class TDTestCase:
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def ins_columns_check(self):
|
||||||
|
tdSql.execute('drop database if exists db2')
|
||||||
|
tdSql.execute('create database if not exists db2 vgroups 1 replica 1')
|
||||||
|
for i in range (5):
|
||||||
|
self.stb4096 = 'create table db2.stb%d (ts timestamp' % (i)
|
||||||
|
for j in range (4094 - i):
|
||||||
|
self.stb4096 += ', c%d int' % (j)
|
||||||
|
self.stb4096 += ') tags (t1 int)'
|
||||||
|
tdSql.execute(self.stb4096)
|
||||||
|
for k in range(10):
|
||||||
|
tdSql.execute("create table db2.ctb_%d_%dc using db2.stb%d tags(%d)" %(i,k,i,k))
|
||||||
|
for t in range (2):
|
||||||
|
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="SUPER_TABLE"')
|
||||||
|
tdSql.checkEqual(20465,len(tdSql.queryResult))
|
||||||
|
for t in range (2):
|
||||||
|
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="CHILD_TABLE"')
|
||||||
|
tdSql.checkEqual(204650,len(tdSql.queryResult))
|
||||||
|
|
||||||
|
for i in range (5):
|
||||||
|
self.ntb4096 = 'create table db2.ntb%d (ts timestamp' % (i)
|
||||||
|
for j in range (4095 - i):
|
||||||
|
self.ntb4096 += ', c%d binary(10)' % (j)
|
||||||
|
self.ntb4096 += ')'
|
||||||
|
tdSql.execute(self.ntb4096)
|
||||||
|
for t in range (2):
|
||||||
|
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="NORMAL_TABLE"')
|
||||||
|
tdSql.checkEqual(20470,len(tdSql.queryResult))
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.prepare_data()
|
self.prepare_data()
|
||||||
self.count_check()
|
self.count_check()
|
||||||
|
@ -148,6 +171,7 @@ class TDTestCase:
|
||||||
# self.ins_col_check_4096()
|
# self.ins_col_check_4096()
|
||||||
self.ins_stable_check()
|
self.ins_stable_check()
|
||||||
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
tdLog.success("%s successfully executed" % __file__)
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
|
@ -688,16 +688,17 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t g_once_commit_flag = 0;
|
static int32_t g_once_commit_flag = 0;
|
||||||
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
|
||||||
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
|
|
||||||
|
|
||||||
if (0 == g_once_commit_flag) {
|
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||||
g_once_commit_flag = 1;
|
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
|
||||||
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
|
|
||||||
|
if (0 == g_once_commit_flag) {
|
||||||
|
g_once_commit_flag = 1;
|
||||||
|
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
|
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
|
||||||
}
|
}
|
||||||
|
|
||||||
void build_consumer(SThreadInfo* pInfo) {
|
void build_consumer(SThreadInfo* pInfo) {
|
||||||
|
|
Loading…
Reference in New Issue