others: merge main.
This commit is contained in:
commit
b22cc82270
|
@ -2,7 +2,7 @@
|
||||||
IF (DEFINED VERNUMBER)
|
IF (DEFINED VERNUMBER)
|
||||||
SET(TD_VER_NUMBER ${VERNUMBER})
|
SET(TD_VER_NUMBER ${VERNUMBER})
|
||||||
ELSE ()
|
ELSE ()
|
||||||
SET(TD_VER_NUMBER "3.0.3.0")
|
SET(TD_VER_NUMBER "3.0.3.1")
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
||||||
IF (DEFINED VERCOMPATIBLE)
|
IF (DEFINED VERCOMPATIBLE)
|
||||||
|
|
|
@ -10,7 +10,7 @@
|
||||||
<description>Demo project for TDengine</description>
|
<description>Demo project for TDengine</description>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<spring.version>5.3.20</spring.version>
|
<spring.version>5.3.26</spring.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -147,9 +147,9 @@ typedef struct {
|
||||||
} SMonStbInfo;
|
} SMonStbInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t expire_time;
|
uint32_t expire_time;
|
||||||
int64_t timeseries_used;
|
int64_t timeseries_used;
|
||||||
int64_t timeseries_total;
|
int64_t timeseries_total;
|
||||||
} SMonGrantInfo;
|
} SMonGrantInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -107,7 +107,7 @@ elif [ ${testFile} = "tools" ];then
|
||||||
originTdpPath="taosTools-${originversion}"
|
originTdpPath="taosTools-${originversion}"
|
||||||
packageName="${tdPath}-Linux-${cpuType}${packageLite}.${packageType}"
|
packageName="${tdPath}-Linux-${cpuType}${packageLite}.${packageType}"
|
||||||
originPackageName="${originTdpPath}-Linux-${cpuType}${packageLite}.${packageType}"
|
originPackageName="${originTdpPath}-Linux-${cpuType}${packageLite}.${packageType}"
|
||||||
installCmd="install-tools.sh"
|
installCmd="install-taostools.sh"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
||||||
|
@ -333,7 +333,7 @@ if [[ ${packageName} =~ "Lite" ]] || ([[ ${packageName} =~ "x64" ]] && [[ ${p
|
||||||
wgetFile taosTools-2.1.3-Linux-x64.tar.gz v2.1.3 web
|
wgetFile taosTools-2.1.3-Linux-x64.tar.gz v2.1.3 web
|
||||||
tar xf taosTools-2.1.3-Linux-x64.tar.gz
|
tar xf taosTools-2.1.3-Linux-x64.tar.gz
|
||||||
fi
|
fi
|
||||||
cd taosTools-2.1.3 && bash install-tools.sh
|
cd taosTools-2.1.3 && bash install-taostools.sh
|
||||||
elif ([[ ${packageName} =~ "arm64" ]] && [[ ${packageName} =~ "client" ]]);then
|
elif ([[ ${packageName} =~ "arm64" ]] && [[ ${packageName} =~ "client" ]]);then
|
||||||
echoColor G "===== install taos-tools arm when package is arm64-client ====="
|
echoColor G "===== install taos-tools arm when package is arm64-client ====="
|
||||||
cd ${installPath}
|
cd ${installPath}
|
||||||
|
@ -342,7 +342,7 @@ elif ([[ ${packageName} =~ "arm64" ]] && [[ ${packageName} =~ "client" ]]);then
|
||||||
tar xf taosTools-2.1.3-Linux-arm64.tar.gz
|
tar xf taosTools-2.1.3-Linux-arm64.tar.gz
|
||||||
fi
|
fi
|
||||||
|
|
||||||
cd taosTools-2.1.3 && bash install-tools.sh
|
cd taosTools-2.1.3 && bash install-taostools.sh
|
||||||
fi
|
fi
|
||||||
|
|
||||||
echoColor G "===== start TDengine ====="
|
echoColor G "===== start TDengine ====="
|
||||||
|
@ -361,18 +361,18 @@ rm -rf ${installPath}/${tdPath}/
|
||||||
# cd ${installPath}
|
# cd ${installPath}
|
||||||
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
|
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
|
||||||
# tar xf taosTools-2.1.2-Linux-x64.tar.gz
|
# tar xf taosTools-2.1.2-Linux-x64.tar.gz
|
||||||
# cd taosTools-2.1.2 && bash install-tools.sh
|
# cd taosTools-2.1.2 && bash install-taostools.sh
|
||||||
# elif [[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "deb" ]] ;then
|
# elif [[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "deb" ]] ;then
|
||||||
# echoColor G "===== install taos-tools when package is lite or client ====="
|
# echoColor G "===== install taos-tools when package is lite or client ====="
|
||||||
# cd ${installPath}
|
# cd ${installPath}
|
||||||
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
|
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
|
||||||
# tar xf taosTools-2.1.2-Linux-x64.tar.gz
|
# tar xf taosTools-2.1.2-Linux-x64.tar.gz
|
||||||
# cd taosTools-2.1.2 && bash install-tools.sh
|
# cd taosTools-2.1.2 && bash install-taostools.sh
|
||||||
# elif [[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "rpm" ]] ;then
|
# elif [[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "rpm" ]] ;then
|
||||||
# echoColor G "===== install taos-tools when package is lite or client ====="
|
# echoColor G "===== install taos-tools when package is lite or client ====="
|
||||||
# cd ${installPath}
|
# cd ${installPath}
|
||||||
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
|
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
|
||||||
# tar xf taosTools-2.1.2-Linux-x64.tar.gz
|
# tar xf taosTools-2.1.2-Linux-x64.tar.gz
|
||||||
# cd taosTools-2.1.2 && bash install-tools.sh
|
# cd taosTools-2.1.2 && bash install-taostools.sh
|
||||||
# fi
|
# fi
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
|
||||||
|
<plist version="1.0">
|
||||||
|
<dict>
|
||||||
|
<key>Label</key>
|
||||||
|
<string>com.tdengine.taoskeeper</string>
|
||||||
|
<key>ProgramArguments</key>
|
||||||
|
<array>
|
||||||
|
<string>/usr/local/bin/taoskeeper</string>
|
||||||
|
</array>
|
||||||
|
<key>ProcessType</key>
|
||||||
|
<string>Interactive</string>
|
||||||
|
<key>Disabled</key>
|
||||||
|
<false/>
|
||||||
|
<key>RunAtLoad</key>
|
||||||
|
<false/>
|
||||||
|
<key>LaunchOnlyOnce</key>
|
||||||
|
<false/>
|
||||||
|
<key>SessionCreate</key>
|
||||||
|
<true/>
|
||||||
|
<key>ExitTimeOut</key>
|
||||||
|
<integer>600</integer>
|
||||||
|
<key>KeepAlive</key>
|
||||||
|
<dict>
|
||||||
|
<key>SuccessfulExit</key>
|
||||||
|
<false/>
|
||||||
|
<key>AfterInitialDemand</key>
|
||||||
|
<true/>
|
||||||
|
</dict>
|
||||||
|
<key>Program</key>
|
||||||
|
<string>/usr/local/bin/taoskeeper</string>
|
||||||
|
</dict>
|
||||||
|
</plist>
|
|
@ -53,7 +53,7 @@ if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then
|
||||||
cd ${top_dir}/tools/taos-tools/packaging/deb
|
cd ${top_dir}/tools/taos-tools/packaging/deb
|
||||||
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
|
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
|
||||||
|
|
||||||
taostools_ver=$(git tag |grep -v taos | sort | tail -1)
|
taostools_ver=$(git for-each-ref --sort=taggerdate --format '%(tag)' refs/tags|grep -v taos | tail -1)
|
||||||
taostools_install_dir="${release_dir}/${clientName2}Tools-${taostools_ver}"
|
taostools_install_dir="${release_dir}/${clientName2}Tools-${taostools_ver}"
|
||||||
|
|
||||||
cd ${curr_dir}
|
cd ${curr_dir}
|
||||||
|
|
|
@ -582,6 +582,11 @@ function install_service_on_launchctl() {
|
||||||
${csudo}cp ${install_main_dir}/service/com.taosdata.taosadapter.plist /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
|
${csudo}cp ${install_main_dir}/service/com.taosdata.taosadapter.plist /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
|
||||||
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
|
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
|
||||||
fi
|
fi
|
||||||
|
if [ -f ${install_main_dir}/service/com.taosdata.taoskeeper.plist ]; then
|
||||||
|
${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taoskeeper.plist > /dev/null 2>&1 || :
|
||||||
|
${csudo}cp ${install_main_dir}/service/com.taosdata.taoskeeper.plist /Library/LaunchDaemons/com.taosdata.taoskeeper.plist || :
|
||||||
|
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taoskeeper.plist || :
|
||||||
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
function install_taosadapter_service() {
|
function install_taosadapter_service() {
|
||||||
|
|
|
@ -1373,6 +1373,8 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
|
||||||
uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname);
|
uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
// uError("td23101 0vgId:%d, vgId:%d, name:%s, uid:%"PRIu64, vgData.vgId, pTableMeta->vgId, tbname, pTableMeta->uid);
|
||||||
|
|
||||||
pQuery = smlInitHandle();
|
pQuery = smlInitHandle();
|
||||||
if (pQuery == NULL) {
|
if (pQuery == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1380,6 +1382,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
|
||||||
}
|
}
|
||||||
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
|
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
|
||||||
|
// uError("td23101 1vgId:%d, numEps:%d, name:%s, uid:%"PRIu64, vgData.vgId, vgData.epSet.numOfEps, tbname, pTableMeta->uid);
|
||||||
|
|
||||||
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false);
|
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1723,10 +1726,15 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
tDecoderClear(&decoderTmp);
|
tDecoderClear(&decoderTmp);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCreateReqDst) {
|
SVgroupInfo vg;
|
||||||
|
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCreateReqDst) { // change stable name to get meta
|
||||||
strcpy(pName.tname, pCreateReqDst->ctb.stbName);
|
strcpy(pName.tname, pCreateReqDst->ctb.stbName);
|
||||||
} else {
|
|
||||||
strcpy(pName.tname, tbName);
|
|
||||||
}
|
}
|
||||||
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
|
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
|
||||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||||
|
@ -1739,13 +1747,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgroupInfo vg;
|
|
||||||
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pCreateReqDst) {
|
if (pCreateReqDst) {
|
||||||
pTableMeta->vgId = vg.vgId;
|
pTableMeta->vgId = vg.vgId;
|
||||||
pTableMeta->uid = pCreateReqDst->uid;
|
pTableMeta->uid = pCreateReqDst->uid;
|
||||||
|
|
|
@ -457,6 +457,7 @@ typedef struct {
|
||||||
void* pIter;
|
void* pIter;
|
||||||
SMnode* pMnode;
|
SMnode* pMnode;
|
||||||
STableMetaRsp* pMeta;
|
STableMetaRsp* pMeta;
|
||||||
|
bool restore;
|
||||||
bool sysDbRsp;
|
bool sysDbRsp;
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
char filterTb[TSDB_TABLE_NAME_LEN];
|
char filterTb[TSDB_TABLE_NAME_LEN];
|
||||||
|
|
|
@ -864,7 +864,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
||||||
}
|
}
|
||||||
|
|
||||||
// grant info
|
// grant info
|
||||||
pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
|
pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 1000;
|
||||||
pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
|
pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
|
||||||
if (pMnode->grant.expireTimeMS == 0) {
|
if (pMnode->grant.expireTimeMS == 0) {
|
||||||
pGrantInfo->expire_time = INT32_MAX;
|
pGrantInfo->expire_time = INT32_MAX;
|
||||||
|
|
|
@ -324,7 +324,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
|
||||||
pReq->info.rsp = pRsp;
|
pReq->info.rsp = pRsp;
|
||||||
pReq->info.rspLen = size;
|
pReq->info.rspLen = size;
|
||||||
|
|
||||||
if (rowsRead == 0 || rowsRead < rowsToRead) {
|
if (rowsRead == 0 || ((rowsRead < rowsToRead) && !pShow->restore)) {
|
||||||
pRsp->completed = 1;
|
pRsp->completed = 1;
|
||||||
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
|
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
|
||||||
mndReleaseShowObj(pShow, true);
|
mndReleaseShowObj(pShow, true);
|
||||||
|
|
|
@ -3113,9 +3113,18 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
|
|
||||||
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_TO_VARSTR(typeName, "SUPER_TABLE");
|
STR_TO_VARSTR(typeName, "SUPER_TABLE");
|
||||||
|
bool fetch = pShow->restore ? false : true;
|
||||||
|
pShow->restore = false;
|
||||||
while (numOfRows < rows) {
|
while (numOfRows < rows) {
|
||||||
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
|
if (fetch) {
|
||||||
if (pShow->pIter == NULL) break;
|
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
|
||||||
|
if (pShow->pIter == NULL) break;
|
||||||
|
} else {
|
||||||
|
fetch = true;
|
||||||
|
void *pKey = taosHashGetKey(pShow->pIter, NULL);
|
||||||
|
pStb = sdbAcquire(pSdb, SDB_STB, pKey);
|
||||||
|
if (!pStb) continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (pDb != NULL && pStb->dbUid != pDb->uid) {
|
if (pDb != NULL && pStb->dbUid != pDb->uid) {
|
||||||
sdbRelease(pSdb, pStb);
|
sdbRelease(pSdb, pStb);
|
||||||
|
@ -3129,6 +3138,17 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
sdbRelease(pSdb, pStb);
|
sdbRelease(pSdb, pStb);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((numOfRows + pStb->numOfColumns) > rows) {
|
||||||
|
pShow->restore = true;
|
||||||
|
if (numOfRows == 0) {
|
||||||
|
mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
|
||||||
|
rows, pStb->numOfColumns, pStb->name, pStb->db);
|
||||||
|
}
|
||||||
|
sdbRelease(pSdb, pStb);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
|
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
|
||||||
|
|
||||||
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
|
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
|
||||||
|
|
|
@ -157,7 +157,7 @@ typedef struct SMTbCursor SMTbCursor;
|
||||||
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
||||||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||||
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
|
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||||
int32_t metaTbCursorPrev(SMTbCursor *pTbCur);
|
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -687,6 +687,8 @@ typedef struct SSttBlockLoadInfo {
|
||||||
STSchema *pSchema;
|
STSchema *pSchema;
|
||||||
int16_t *colIds;
|
int16_t *colIds;
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
|
bool checkRemainingRow;
|
||||||
|
bool isLast;
|
||||||
bool sttBlockLoaded;
|
bool sttBlockLoaded;
|
||||||
int32_t numOfStt;
|
int32_t numOfStt;
|
||||||
|
|
||||||
|
|
|
@ -336,7 +336,7 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaTbCursorPrev(SMTbCursor *pTbCur) {
|
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) {
|
||||||
int ret;
|
int ret;
|
||||||
void *pBuf;
|
void *pBuf;
|
||||||
STbCfg tbCfg;
|
STbCfg tbCfg;
|
||||||
|
@ -350,7 +350,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur) {
|
||||||
tDecoderClear(&pTbCur->mr.coder);
|
tDecoderClear(&pTbCur->mr.coder);
|
||||||
|
|
||||||
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
|
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
|
||||||
if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
|
if (pTbCur->mr.me.type == jumpTableType) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -571,8 +571,9 @@ static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* p
|
||||||
",version:%" PRId64,
|
",version:%" PRId64,
|
||||||
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
|
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
|
||||||
taosxRsp.rspOffset.version);
|
taosxRsp.rspOffset.version);
|
||||||
} else {
|
}
|
||||||
// if (offset.type == TMQ_OFFSET__LOG) {
|
|
||||||
|
if (offset.type == TMQ_OFFSET__LOG) {
|
||||||
int64_t fetchVer = offset.version + 1;
|
int64_t fetchVer = offset.version + 1;
|
||||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||||
if (pCkHead == NULL) {
|
if (pCkHead == NULL) {
|
||||||
|
|
|
@ -590,6 +590,7 @@ typedef struct {
|
||||||
SDataFReader **pDataFReader;
|
SDataFReader **pDataFReader;
|
||||||
TSDBROW row;
|
TSDBROW row;
|
||||||
|
|
||||||
|
bool checkRemainingRow;
|
||||||
SMergeTree mergeTree;
|
SMergeTree mergeTree;
|
||||||
SMergeTree *pMergeTree;
|
SMergeTree *pMergeTree;
|
||||||
SSttBlockLoadInfo *pLoadInfo;
|
SSttBlockLoadInfo *pLoadInfo;
|
||||||
|
@ -600,7 +601,6 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
||||||
int nCols) {
|
int nCols) {
|
||||||
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
|
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
bool checkRemainingRow = true;
|
|
||||||
|
|
||||||
switch (state->state) {
|
switch (state->state) {
|
||||||
case SFSLASTNEXTROW_FS:
|
case SFSLASTNEXTROW_FS:
|
||||||
|
@ -633,12 +633,25 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
state->pLoadInfo->colIds = aCols;
|
for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
|
||||||
state->pLoadInfo->numOfCols = nCols;
|
state->pLoadInfo[i].colIds = aCols;
|
||||||
|
state->pLoadInfo[i].numOfCols = nCols;
|
||||||
|
state->pLoadInfo[i].isLast = isLast;
|
||||||
|
}
|
||||||
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
|
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
|
||||||
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
|
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
|
||||||
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
|
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
|
||||||
state->pMergeTree = &state->mergeTree;
|
state->pMergeTree = &state->mergeTree;
|
||||||
|
state->state = SFSLASTNEXTROW_BLOCKROW;
|
||||||
|
}
|
||||||
|
case SFSLASTNEXTROW_BLOCKROW: {
|
||||||
|
if (nCols != state->pLoadInfo->numOfCols) {
|
||||||
|
for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
|
||||||
|
state->pLoadInfo[i].numOfCols = nCols;
|
||||||
|
|
||||||
|
state->pLoadInfo[i].checkRemainingRow = state->checkRemainingRow;
|
||||||
|
}
|
||||||
|
}
|
||||||
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
||||||
if (!hasVal) {
|
if (!hasVal) {
|
||||||
if (tMergeTreeIgnoreEarlierTs(&state->mergeTree)) {
|
if (tMergeTreeIgnoreEarlierTs(&state->mergeTree)) {
|
||||||
|
@ -649,76 +662,23 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
||||||
state->state = SFSLASTNEXTROW_FILESET;
|
state->state = SFSLASTNEXTROW_FILESET;
|
||||||
goto _next_fileset;
|
goto _next_fileset;
|
||||||
}
|
}
|
||||||
state->state = SFSLASTNEXTROW_BLOCKROW;
|
state->row = tMergeTreeGetRow(&state->mergeTree);
|
||||||
checkRemainingRow = false;
|
*ppRow = &state->row;
|
||||||
}
|
|
||||||
case SFSLASTNEXTROW_BLOCKROW: {
|
|
||||||
bool skipRow = false;
|
|
||||||
do {
|
|
||||||
bool hasVal = false;
|
|
||||||
state->row = tMergeTreeGetRow(&state->mergeTree);
|
|
||||||
*ppRow = &state->row;
|
|
||||||
if (nCols != state->pLoadInfo->numOfCols) {
|
|
||||||
state->pLoadInfo->numOfCols = nCols;
|
|
||||||
}
|
|
||||||
hasVal = tMergeTreeNext(&state->mergeTree);
|
|
||||||
if (TSDBROW_TS(&state->row) <= state->lastTs) {
|
|
||||||
*pIgnoreEarlierTs = true;
|
|
||||||
*ppRow = NULL;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
*pIgnoreEarlierTs = false;
|
if (TSDBROW_TS(&state->row) <= state->lastTs) {
|
||||||
if (!hasVal) {
|
*pIgnoreEarlierTs = true;
|
||||||
state->state = SFSLASTNEXTROW_FILESET;
|
*ppRow = NULL;
|
||||||
break;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (checkRemainingRow) {
|
*pIgnoreEarlierTs = false;
|
||||||
bool skipBlock = true;
|
if (!hasVal) {
|
||||||
|
state->state = SFSLASTNEXTROW_FILESET;
|
||||||
SBlockData *pBlockData = state->row.pBlockData;
|
}
|
||||||
|
|
||||||
for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
|
|
||||||
for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
|
|
||||||
SColData *pColData = &pBlockData->aColData[colIndex];
|
|
||||||
int16_t cid = pColData->cid;
|
|
||||||
|
|
||||||
if (cid == aCols[inputColIndex]) {
|
|
||||||
if (isLast && (pColData->flag & HAS_VALUE)) {
|
|
||||||
skipBlock = false;
|
|
||||||
break;
|
|
||||||
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
|
|
||||||
skipBlock = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
|
|
||||||
SColData *pColData = &pBlockData->aColData[colIndex];
|
|
||||||
int16_t cid = pColData->cid;
|
|
||||||
|
|
||||||
if (inputColIndex < nCols && cid == aCols[inputColIndex]) {
|
|
||||||
if (isLast && (pColData->flag & HAS_VALUE)) {
|
|
||||||
skipBlock = false;
|
|
||||||
break;
|
|
||||||
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
|
|
||||||
skipBlock = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
++inputColIndex;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
if (skipBlock) {
|
|
||||||
skipRow = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} while (skipRow);
|
|
||||||
|
|
||||||
|
if (!state->checkRemainingRow) {
|
||||||
|
state->checkRemainingRow = true;
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -504,9 +504,34 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
|
||||||
pIter->iRow += step;
|
pIter->iRow += step;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
bool skipBlock = false;
|
||||||
|
|
||||||
findNextValidRow(pIter, idStr);
|
findNextValidRow(pIter, idStr);
|
||||||
|
|
||||||
if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
|
if (pIter->pBlockLoadInfo->checkRemainingRow) {
|
||||||
|
skipBlock = true;
|
||||||
|
int16_t *aCols = pIter->pBlockLoadInfo->colIds;
|
||||||
|
int nCols = pIter->pBlockLoadInfo->numOfCols;
|
||||||
|
bool isLast = pIter->pBlockLoadInfo->isLast;
|
||||||
|
for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
|
||||||
|
for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
|
||||||
|
SColData *pColData = &pBlockData->aColData[colIndex];
|
||||||
|
int16_t cid = pColData->cid;
|
||||||
|
|
||||||
|
if (cid == aCols[inputColIndex]) {
|
||||||
|
if (isLast && (pColData->flag & HAS_VALUE)) {
|
||||||
|
skipBlock = false;
|
||||||
|
break;
|
||||||
|
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
|
||||||
|
skipBlock = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
|
||||||
tLDataIterNextBlock(pIter, idStr);
|
tLDataIterNextBlock(pIter, idStr);
|
||||||
if (pIter->pSttBlk == NULL) { // no more data
|
if (pIter->pSttBlk == NULL) { // no more data
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
|
|
@ -335,6 +335,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
||||||
|
|
||||||
// commit json
|
// commit json
|
||||||
if (!rollback) {
|
if (!rollback) {
|
||||||
|
pWriter->info.state.committed = pWriter->ever;
|
||||||
pVnode->config = pWriter->info.config;
|
pVnode->config = pWriter->info.config;
|
||||||
pVnode->state = (SVState){.committed = pWriter->info.state.committed,
|
pVnode->state = (SVState){.committed = pWriter->info.state.committed,
|
||||||
.applied = pWriter->info.state.committed,
|
.applied = pWriter->info.state.committed,
|
||||||
|
|
|
@ -44,7 +44,8 @@ typedef struct SSortSource {
|
||||||
void* param;
|
void* param;
|
||||||
bool onlyRef;
|
bool onlyRef;
|
||||||
};
|
};
|
||||||
|
int64_t fetchUs;
|
||||||
|
int64_t fetchNum;
|
||||||
} SSortSource;
|
} SSortSource;
|
||||||
|
|
||||||
typedef struct SMsortComparParam {
|
typedef struct SMsortComparParam {
|
||||||
|
|
|
@ -161,10 +161,9 @@ static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo
|
||||||
if (pLimitInfo->remainGroupOffset > 0) {
|
if (pLimitInfo->remainGroupOffset > 0) {
|
||||||
return PROJECT_RETRIEVE_CONTINUE;
|
return PROJECT_RETRIEVE_CONTINUE;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// set current group id of the project operator
|
pLimitInfo->currentGroupId = 0;
|
||||||
pLimitInfo->currentGroupId = pBlock->info.id.groupId;
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return PROJECT_RETRIEVE_DONE;
|
return PROJECT_RETRIEVE_DONE;
|
||||||
|
@ -175,19 +174,29 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
|
||||||
// here check for a new group data, we need to handle the data of the previous group.
|
// here check for a new group data, we need to handle the data of the previous group.
|
||||||
ASSERT(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1);
|
ASSERT(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1);
|
||||||
|
|
||||||
if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
|
bool newGroup = false;
|
||||||
|
if (0 == pBlock->info.id.groupId) {
|
||||||
|
pLimitInfo->numOfOutputGroups = 1;
|
||||||
|
} else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
|
||||||
|
pLimitInfo->currentGroupId = pBlock->info.id.groupId;
|
||||||
pLimitInfo->numOfOutputGroups += 1;
|
pLimitInfo->numOfOutputGroups += 1;
|
||||||
if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
|
newGroup = true;
|
||||||
setOperatorCompleted(pOperator);
|
} else {
|
||||||
return PROJECT_RETRIEVE_DONE;
|
return PROJECT_RETRIEVE_CONTINUE;
|
||||||
}
|
|
||||||
|
|
||||||
// reset the value for a new group data
|
|
||||||
// existing rows that belongs to previous group.
|
|
||||||
resetLimitInfoForNextGroup(pLimitInfo);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return PROJECT_RETRIEVE_DONE;
|
if ((pLimitInfo->slimit.limit >= 0) && (pLimitInfo->slimit.limit < pLimitInfo->numOfOutputGroups)) {
|
||||||
|
setOperatorCompleted(pOperator);
|
||||||
|
return PROJECT_RETRIEVE_DONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset the value for a new group data
|
||||||
|
// existing rows that belongs to previous group.
|
||||||
|
if (newGroup) {
|
||||||
|
resetLimitInfoForNextGroup(pLimitInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
return PROJECT_RETRIEVE_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo refactor
|
// todo refactor
|
||||||
|
@ -199,7 +208,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
|
||||||
if (pBlock->info.rows == 0) {
|
if (pBlock->info.rows == 0) {
|
||||||
return PROJECT_RETRIEVE_CONTINUE;
|
return PROJECT_RETRIEVE_CONTINUE;
|
||||||
} else {
|
} else {
|
||||||
if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
|
if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
|
|
||||||
|
#define MULTI_READER_MAX_TABLE_NUM 5000
|
||||||
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
||||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||||
|
|
||||||
|
@ -43,7 +44,9 @@ typedef struct STableMergeScanSortSourceParam {
|
||||||
SOperatorInfo* pOperator;
|
SOperatorInfo* pOperator;
|
||||||
int32_t readerIdx;
|
int32_t readerIdx;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
SSDataBlock* inputBlock;
|
SSDataBlock* inputBlock;
|
||||||
|
bool multiReader;
|
||||||
|
STsdbReader* dataReader;
|
||||||
} STableMergeScanSortSourceParam;
|
} STableMergeScanSortSourceParam;
|
||||||
|
|
||||||
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
||||||
|
@ -2588,6 +2591,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
int32_t readIdx = source->readerIdx;
|
int32_t readIdx = source->readerIdx;
|
||||||
SSDataBlock* pBlock = source->inputBlock;
|
SSDataBlock* pBlock = source->inputBlock;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
|
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
|
||||||
|
|
||||||
|
@ -2595,17 +2599,20 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
|
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
|
||||||
SReadHandle* pHandle = &pInfo->base.readHandle;
|
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||||
|
|
||||||
int32_t code =
|
if (NULL == source->dataReader || !source->multiReader) {
|
||||||
tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
|
code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo));
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->base.dataReader = source->dataReader;
|
||||||
STsdbReader* reader = pInfo->base.dataReader;
|
STsdbReader* reader = pInfo->base.dataReader;
|
||||||
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
|
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
|
||||||
while (tsdbNextDataBlock(reader)) {
|
while (tsdbNextDataBlock(reader)) {
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
tsdbReleaseDataBlock(reader);
|
tsdbReleaseDataBlock(reader);
|
||||||
|
pInfo->base.dataReader = NULL;
|
||||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2639,14 +2646,18 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
qTrace("tsdb/read-table-data: %p, close reader", reader);
|
qTrace("tsdb/read-table-data: %p, close reader", reader);
|
||||||
tsdbReaderClose(pInfo->base.dataReader);
|
if (!source->multiReader) {
|
||||||
|
tsdbReaderClose(pInfo->base.dataReader);
|
||||||
|
source->dataReader = NULL;
|
||||||
|
}
|
||||||
pInfo->base.dataReader = NULL;
|
pInfo->base.dataReader = NULL;
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("8");
|
if (!source->multiReader) {
|
||||||
|
tsdbReaderClose(pInfo->base.dataReader);
|
||||||
tsdbReaderClose(pInfo->base.dataReader);
|
source->dataReader = NULL;
|
||||||
|
}
|
||||||
pInfo->base.dataReader = NULL;
|
pInfo->base.dataReader = NULL;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -2718,6 +2729,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
STableMergeScanSortSourceParam param = {0};
|
STableMergeScanSortSourceParam param = {0};
|
||||||
param.readerIdx = i;
|
param.readerIdx = i;
|
||||||
param.pOperator = pOperator;
|
param.pOperator = pOperator;
|
||||||
|
param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
|
||||||
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
|
@ -2761,6 +2773,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
for (int32_t i = 0; i < numOfTable; ++i) {
|
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||||
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
|
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
|
||||||
blockDataDestroy(param->inputBlock);
|
blockDataDestroy(param->inputBlock);
|
||||||
|
tsdbReaderClose(param->dataReader);
|
||||||
|
param->dataReader = NULL;
|
||||||
}
|
}
|
||||||
taosArrayClear(pInfo->sortSourceParams);
|
taosArrayClear(pInfo->sortSourceParams);
|
||||||
|
|
||||||
|
@ -2803,9 +2817,6 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
|
||||||
qDebug("%s get sorted row block, rows:%d, limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
|
qDebug("%s get sorted row block, rows:%d, limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
|
||||||
pInfo->limitInfo.numOfOutputRows);
|
pInfo->limitInfo.numOfOutputRows);
|
||||||
|
|
||||||
if (limitReached) {
|
|
||||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
|
||||||
}
|
|
||||||
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2858,6 +2869,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
||||||
pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId;
|
pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId;
|
||||||
startGroupTableMergeScan(pOperator);
|
startGroupTableMergeScan(pOperator);
|
||||||
|
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2873,15 +2885,17 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
for (int32_t i = 0; i < numOfTable; i++) {
|
for (int32_t i = 0; i < numOfTable; i++) {
|
||||||
STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
|
STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
|
||||||
blockDataDestroy(p->inputBlock);
|
blockDataDestroy(p->inputBlock);
|
||||||
|
tsdbReaderClose(p->dataReader);
|
||||||
|
p->dataReader = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbReaderClose(pTableScanInfo->base.dataReader);
|
||||||
|
pTableScanInfo->base.dataReader = NULL;
|
||||||
|
|
||||||
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
||||||
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
||||||
pTableScanInfo->pSortHandle = NULL;
|
pTableScanInfo->pSortHandle = NULL;
|
||||||
|
|
||||||
tsdbReaderClose(pTableScanInfo->base.dataReader);
|
|
||||||
pTableScanInfo->base.dataReader = NULL;
|
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
|
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
|
||||||
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
|
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
|
||||||
taosMemoryFree(pCond->colList);
|
taosMemoryFree(pCond->colList);
|
||||||
|
@ -2898,8 +2912,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
taosArrayDestroy(pTableScanInfo->pSortInfo);
|
taosArrayDestroy(pTableScanInfo->pSortInfo);
|
||||||
cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
|
cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
|
||||||
|
|
||||||
tsdbReaderClose(pTableScanInfo->base.dataReader);
|
|
||||||
pTableScanInfo->base.dataReader = NULL;
|
|
||||||
taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
|
taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
|
|
|
@ -57,9 +57,11 @@ typedef struct SSysTableScanInfo {
|
||||||
const char* pUser;
|
const char* pUser;
|
||||||
bool sysInfo;
|
bool sysInfo;
|
||||||
bool showRewrite;
|
bool showRewrite;
|
||||||
|
bool restore;
|
||||||
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
|
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
|
||||||
SMTbCursor* pCur; // cursor for iterate the local table meta store.
|
SMTbCursor* pCur; // cursor for iterate the local table meta store.
|
||||||
SSysTableIndex* pIdx; // idx for local table meta
|
SSysTableIndex* pIdx; // idx for local table meta
|
||||||
|
SHashObj* pSchema;
|
||||||
SColMatchInfo matchInfo;
|
SColMatchInfo matchInfo;
|
||||||
SName name;
|
SName name;
|
||||||
SSDataBlock* pRes;
|
SSDataBlock* pRes;
|
||||||
|
@ -514,9 +516,23 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
||||||
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
|
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
|
||||||
}
|
}
|
||||||
|
|
||||||
SHashObj* stableSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
if (pInfo->pSchema == NULL) {
|
||||||
taosHashSetFreeFp(stableSchema, tDeleteSSchemaWrapperForHash);
|
pInfo->pSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0) {
|
taosHashSetFreeFp(pInfo->pSchema, tDeleteSSchemaWrapperForHash);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pInfo->pCur || !pInfo->pSchema) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
qError("sysTableScanUserCols failed since %s", terrstr(terrno));
|
||||||
|
blockDataDestroy(dataBlock);
|
||||||
|
pInfo->loadInfo.totalRows = 0;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t restore = pInfo->restore;
|
||||||
|
pInfo->restore = false;
|
||||||
|
while (restore || ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) {
|
||||||
|
if (restore) restore = false;
|
||||||
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
|
||||||
|
@ -524,33 +540,36 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) {
|
if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) {
|
||||||
qDebug("sysTableScanUserCols cursor get super table");
|
qDebug("sysTableScanUserCols cursor get super table");
|
||||||
void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
|
void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
|
||||||
if (schema == NULL) {
|
if (schema == NULL) {
|
||||||
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
|
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
|
||||||
taosHashPut(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
|
taosHashPut(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
} else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) {
|
} else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) {
|
||||||
qDebug("sysTableScanUserCols cursor get child table");
|
qDebug("sysTableScanUserCols cursor get child table");
|
||||||
STR_TO_VARSTR(typeName, "CHILD_TABLE");
|
STR_TO_VARSTR(typeName, "CHILD_TABLE");
|
||||||
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
|
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
|
||||||
|
|
||||||
int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
|
int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
|
||||||
void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
|
void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
|
||||||
if (schema != NULL) {
|
if (schema != NULL) {
|
||||||
schemaRow = *(SSchemaWrapper**)schema;
|
schemaRow = *(SSchemaWrapper**)schema;
|
||||||
} else {
|
} else {
|
||||||
tDecoderClear(&pInfo->pCur->mr.coder);
|
SMetaReader smrSuperTable = {0};
|
||||||
int code = metaGetTableEntryByUid(&pInfo->pCur->mr, suid);
|
metaReaderInit(&smrSuperTable, pInfo->readHandle.meta, 0);
|
||||||
|
int code = metaGetTableEntryByUid(&smrSuperTable, suid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// terrno has been set by metaGetTableEntryByName, therefore, return directly
|
// terrno has been set by metaGetTableEntryByName, therefore, return directly
|
||||||
qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code);
|
qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code);
|
||||||
|
metaReaderClear(&smrSuperTable);
|
||||||
blockDataDestroy(dataBlock);
|
blockDataDestroy(dataBlock);
|
||||||
pInfo->loadInfo.totalRows = 0;
|
pInfo->loadInfo.totalRows = 0;
|
||||||
taosHashCleanup(stableSchema);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
schemaRow = &pInfo->pCur->mr.me.stbEntry.schemaRow;
|
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&smrSuperTable.me.stbEntry.schemaRow);
|
||||||
|
taosHashPut(pInfo->pSchema, &suid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
|
||||||
|
schemaRow = schemaWrapper;
|
||||||
|
metaReaderClear(&smrSuperTable);
|
||||||
}
|
}
|
||||||
} else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) {
|
} else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) {
|
||||||
qDebug("sysTableScanUserCols cursor get normal table");
|
qDebug("sysTableScanUserCols cursor get normal table");
|
||||||
|
@ -562,20 +581,19 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
|
if ((numOfRows + schemaRow->nCols) > pOperator->resultInfo.capacity) {
|
||||||
|
|
||||||
if (numOfRows >= pOperator->resultInfo.capacity) {
|
|
||||||
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
|
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
|
||||||
numOfRows = 0;
|
numOfRows = 0;
|
||||||
|
pInfo->restore = true;
|
||||||
|
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashCleanup(stableSchema);
|
|
||||||
|
|
||||||
if (numOfRows > 0) {
|
if (numOfRows > 0) {
|
||||||
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
|
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
|
||||||
numOfRows = 0;
|
numOfRows = 0;
|
||||||
|
@ -695,7 +713,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) {
|
if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) {
|
||||||
metaTbCursorPrev(pInfo->pCur);
|
metaTbCursorPrev(pInfo->pCur, TSDB_TABLE_MAX);
|
||||||
blockFull = true;
|
blockFull = true;
|
||||||
} else {
|
} else {
|
||||||
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
|
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
|
||||||
|
@ -1789,6 +1807,11 @@ void destroySysScanOperator(void* param) {
|
||||||
pInfo->pIdx = NULL;
|
pInfo->pIdx = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(pInfo->pSchema) {
|
||||||
|
taosHashCleanup(pInfo->pSchema);
|
||||||
|
pInfo->pSchema = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->matchInfo.pList);
|
taosArrayDestroy(pInfo->matchInfo.pList);
|
||||||
taosMemoryFreeClear(pInfo->pUser);
|
taosMemoryFreeClear(pInfo->pUser);
|
||||||
|
|
||||||
|
|
|
@ -108,12 +108,18 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsortClearOrderdSource(SArray* pOrderedSource) {
|
void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
|
||||||
for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
|
for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
|
||||||
SSortSource** pSource = taosArrayGet(pOrderedSource, i);
|
SSortSource** pSource = taosArrayGet(pOrderedSource, i);
|
||||||
if (NULL == *pSource) {
|
if (NULL == *pSource) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (fetchUs) {
|
||||||
|
*fetchUs += (*pSource)->fetchUs;
|
||||||
|
*fetchNum += (*pSource)->fetchNum;
|
||||||
|
}
|
||||||
|
|
||||||
// release pageIdList
|
// release pageIdList
|
||||||
if ((*pSource)->pageIdList) {
|
if ((*pSource)->pageIdList) {
|
||||||
taosArrayDestroy((*pSource)->pageIdList);
|
taosArrayDestroy((*pSource)->pageIdList);
|
||||||
|
@ -147,7 +153,10 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
|
||||||
taosMemoryFreeClear(pSortHandle->idStr);
|
taosMemoryFreeClear(pSortHandle->idStr);
|
||||||
blockDataDestroy(pSortHandle->pDataBlock);
|
blockDataDestroy(pSortHandle->pDataBlock);
|
||||||
|
|
||||||
tsortClearOrderdSource(pSortHandle->pOrderedSource);
|
int64_t fetchUs = 0, fetchNum = 0;
|
||||||
|
tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
|
||||||
|
qError("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
|
||||||
|
|
||||||
taosArrayDestroy(pSortHandle->pOrderedSource);
|
taosArrayDestroy(pSortHandle->pOrderedSource);
|
||||||
taosMemoryFreeClear(pSortHandle);
|
taosMemoryFreeClear(pSortHandle);
|
||||||
}
|
}
|
||||||
|
@ -307,7 +316,7 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t et = taosGetTimestampUs();
|
int64_t et = taosGetTimestampUs();
|
||||||
qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
|
qError("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -365,7 +374,10 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
|
||||||
releaseBufPage(pHandle->pBuf, pPage);
|
releaseBufPage(pHandle->pBuf, pPage);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
|
pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
|
||||||
|
pSource->fetchUs += taosGetTimestampUs() - st;
|
||||||
|
pSource->fetchNum++;
|
||||||
if (pSource->src.pBlock == NULL) {
|
if (pSource->src.pBlock == NULL) {
|
||||||
(*numOfCompleted) += 1;
|
(*numOfCompleted) += 1;
|
||||||
pSource->src.rowIndex = -1;
|
pSource->src.rowIndex = -1;
|
||||||
|
@ -602,7 +614,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tsortClearOrderdSource(pHandle->pOrderedSource);
|
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
|
||||||
taosArrayAddAll(pHandle->pOrderedSource, pResList);
|
taosArrayAddAll(pHandle->pOrderedSource, pResList);
|
||||||
taosArrayDestroy(pResList);
|
taosArrayDestroy(pResList);
|
||||||
|
|
||||||
|
@ -644,7 +656,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
SSortSource* source = *pSource;
|
SSortSource* source = *pSource;
|
||||||
*pSource = NULL;
|
*pSource = NULL;
|
||||||
|
|
||||||
tsortClearOrderdSource(pHandle->pOrderedSource);
|
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
|
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
|
||||||
|
|
|
@ -361,11 +361,29 @@ static bool notRefByOrderBy(SColumnNode* pCol, SNodeList* pOrderByList) {
|
||||||
return !cxt.hasThisCol;
|
return !cxt.hasThisCol;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool isSetUselessCol(SSetOperator* pSetOp, int32_t index, SExprNode* pProj) {
|
||||||
|
if (!isUselessCol(pProj)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNodeList* pLeftProjs = getChildProjection(pSetOp->pLeft);
|
||||||
|
if (!isUselessCol((SExprNode*)nodesListGetNode(pLeftProjs, index))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNodeList* pRightProjs = getChildProjection(pSetOp->pRight);
|
||||||
|
if (!isUselessCol((SExprNode*)nodesListGetNode(pRightProjs, index))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator* pSetOp, bool subquery) {
|
static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator* pSetOp, bool subquery) {
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
SNode* pProj = NULL;
|
SNode* pProj = NULL;
|
||||||
WHERE_EACH(pProj, pSetOp->pProjectionList) {
|
WHERE_EACH(pProj, pSetOp->pProjectionList) {
|
||||||
if (subquery && notRefByOrderBy((SColumnNode*)pProj, pSetOp->pOrderByList) && isUselessCol((SExprNode*)pProj)) {
|
if (subquery && notRefByOrderBy((SColumnNode*)pProj, pSetOp->pOrderByList) && isSetUselessCol(pSetOp, index, (SExprNode*)pProj)) {
|
||||||
ERASE_NODE(pSetOp->pProjectionList);
|
ERASE_NODE(pSetOp->pProjectionList);
|
||||||
eraseSetOpChildProjection(pSetOp, index);
|
eraseSetOpChildProjection(pSetOp, index);
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -404,6 +404,7 @@ static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHa
|
||||||
int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
|
int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
taosArrayPush(pVgroupList, &pVgCxt);
|
taosArrayPush(pVgroupList, &pVgCxt);
|
||||||
|
// uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
|
||||||
*pOutput = pVgCxt;
|
*pOutput = pVgCxt;
|
||||||
} else {
|
} else {
|
||||||
insDestroyVgroupDataCxt(pVgCxt);
|
insDestroyVgroupDataCxt(pVgCxt);
|
||||||
|
@ -546,6 +547,7 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
|
dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
|
||||||
code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
|
code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
|
||||||
|
// uError("td23101 3vgId:%d, numEps:%d", src->vgId, dst->vg.epSet.numOfEps);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
|
code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
|
||||||
|
|
|
@ -1422,6 +1422,9 @@ static int32_t translateAggFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
||||||
if (isCountStar(pFunc)) {
|
if (isCountStar(pFunc)) {
|
||||||
return rewriteCountStar(pCxt, pFunc);
|
return rewriteCountStar(pCxt, pFunc);
|
||||||
}
|
}
|
||||||
|
if (isCountNotNullValue(pFunc)) {
|
||||||
|
return rewriteCountNotNullValue(pCxt, pFunc);
|
||||||
|
}
|
||||||
if (isCountTbname(pFunc)) {
|
if (isCountTbname(pFunc)) {
|
||||||
return rewriteCountTbname(pCxt, pFunc);
|
return rewriteCountTbname(pCxt, pFunc);
|
||||||
}
|
}
|
||||||
|
|
|
@ -947,6 +947,12 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t txnIdCompareDesc(const void *pLeft, const void *pRight) {
|
||||||
|
int64_t lhs = *(int64_t *)pLeft;
|
||||||
|
int64_t rhs = *(int64_t *)pRight;
|
||||||
|
return lhs > rhs ? -1 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
int tdbPagerRestoreJournals(SPager *pPager) {
|
int tdbPagerRestoreJournals(SPager *pPager) {
|
||||||
tdbDirEntryPtr pDirEntry;
|
tdbDirEntryPtr pDirEntry;
|
||||||
tdbDirPtr pDir = taosOpenDir(pPager->pEnv->dbName);
|
tdbDirPtr pDir = taosOpenDir(pPager->pEnv->dbName);
|
||||||
|
@ -955,23 +961,33 @@ int tdbPagerRestoreJournals(SPager *pPager) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SArray *pTxnList = taosArrayInit(16, sizeof(int64_t));
|
||||||
|
|
||||||
while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
|
while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
|
||||||
char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry));
|
char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry));
|
||||||
if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) {
|
if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) {
|
||||||
char jname[TD_PATH_MAX] = {0};
|
int64_t txnId = -1;
|
||||||
int dirLen = strlen(pPager->pEnv->dbName);
|
sscanf(name, TDB_MAINDB_NAME "-journal.%" PRId64, &txnId);
|
||||||
memcpy(jname, pPager->pEnv->dbName, dirLen);
|
taosArrayPush(pTxnList, &txnId);
|
||||||
jname[dirLen] = '/';
|
}
|
||||||
memcpy(jname + dirLen + 1, name, strlen(name));
|
}
|
||||||
if (tdbPagerRestore(pPager, jname) < 0) {
|
taosArraySort(pTxnList, txnIdCompareDesc);
|
||||||
tdbCloseDir(&pDir);
|
for (int i = 0; i < TARRAY_SIZE(pTxnList); ++i) {
|
||||||
|
int64_t *pTxnId = taosArrayGet(pTxnList, i);
|
||||||
|
char jname[TD_PATH_MAX] = {0};
|
||||||
|
int dirLen = strlen(pPager->pEnv->dbName);
|
||||||
|
memcpy(jname, pPager->pEnv->dbName, dirLen);
|
||||||
|
jname[dirLen] = '/';
|
||||||
|
sprintf(jname + dirLen + 1, TDB_MAINDB_NAME "-journal.%" PRId64, *pTxnId);
|
||||||
|
if (tdbPagerRestore(pPager, jname) < 0) {
|
||||||
|
tdbCloseDir(&pDir);
|
||||||
|
|
||||||
tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), name);
|
tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), jname);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pTxnList);
|
||||||
tdbCloseDir(&pDir);
|
tdbCloseDir(&pDir);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -853,6 +853,7 @@
|
||||||
,,y,script,./test.sh -f tsim/parser/topbot.sim
|
,,y,script,./test.sh -f tsim/parser/topbot.sim
|
||||||
,,y,script,./test.sh -f tsim/parser/union_sysinfo.sim
|
,,y,script,./test.sh -f tsim/parser/union_sysinfo.sim
|
||||||
,,y,script,./test.sh -f tsim/parser/slimit_limit.sim
|
,,y,script,./test.sh -f tsim/parser/slimit_limit.sim
|
||||||
|
,,y,script,./test.sh -f tsim/parser/table_merge_limit.sim
|
||||||
,,y,script,./test.sh -f tsim/query/tagLikeFilter.sim
|
,,y,script,./test.sh -f tsim/query/tagLikeFilter.sim
|
||||||
,,y,script,./test.sh -f tsim/query/charScalarFunction.sim
|
,,y,script,./test.sh -f tsim/query/charScalarFunction.sim
|
||||||
,,y,script,./test.sh -f tsim/query/explain.sim
|
,,y,script,./test.sh -f tsim/query/explain.sim
|
||||||
|
|
|
@ -68,8 +68,8 @@ docker run \
|
||||||
-v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \
|
-v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \
|
||||||
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
|
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
|
||||||
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
|
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
|
||||||
-v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
|
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1"
|
||||||
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true;make -j || exit 1"
|
# -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
|
||||||
|
|
||||||
if [[ -d ${WORKDIR}/debugNoSan ]] ;then
|
if [[ -d ${WORKDIR}/debugNoSan ]] ;then
|
||||||
echo "delete ${WORKDIR}/debugNoSan"
|
echo "delete ${WORKDIR}/debugNoSan"
|
||||||
|
@ -97,7 +97,7 @@ docker run \
|
||||||
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
|
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
|
||||||
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
|
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
|
||||||
-v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
|
-v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
|
||||||
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true;make -j || exit 1 "
|
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1 "
|
||||||
|
|
||||||
mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan
|
mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan
|
||||||
|
|
||||||
|
|
|
@ -739,6 +739,11 @@ class TDCom:
|
||||||
else:
|
else:
|
||||||
os.system("unset LD_PRELOAD; pkill %s " % processorName)
|
os.system("unset LD_PRELOAD; pkill %s " % processorName)
|
||||||
|
|
||||||
|
def gen_tag_col_str(self, gen_type, data_type, count):
|
||||||
|
"""
|
||||||
|
gen multi tags or cols by gen_type
|
||||||
|
"""
|
||||||
|
return ','.join(map(lambda i: f'{gen_type}{i} {data_type}', range(count)))
|
||||||
|
|
||||||
def is_json(msg):
|
def is_json(msg):
|
||||||
if isinstance(msg, str):
|
if isinstance(msg, str):
|
||||||
|
@ -775,4 +780,5 @@ def dict2toml(in_dict: dict, file:str):
|
||||||
with open(file, 'w') as f:
|
with open(file, 'w') as f:
|
||||||
toml.dump(in_dict, f)
|
toml.dump(in_dict, f)
|
||||||
|
|
||||||
|
|
||||||
tdCom = TDCom()
|
tdCom = TDCom()
|
||||||
|
|
|
@ -12,6 +12,7 @@ fi
|
||||||
today=`date +"%Y%m%d"`
|
today=`date +"%Y%m%d"`
|
||||||
TDENGINE_DIR=/root/TDengine
|
TDENGINE_DIR=/root/TDengine
|
||||||
JDBC_DIR=/root/taos-connector-jdbc
|
JDBC_DIR=/root/taos-connector-jdbc
|
||||||
|
TAOSKEEPER_DIR=/root/taoskeeper
|
||||||
TDENGINE_COVERAGE_REPORT=$TDENGINE_DIR/tests/coverage-report-$today.log
|
TDENGINE_COVERAGE_REPORT=$TDENGINE_DIR/tests/coverage-report-$today.log
|
||||||
|
|
||||||
# Color setting
|
# Color setting
|
||||||
|
@ -171,6 +172,24 @@ function runJDBCCases() {
|
||||||
echo -e "### JDBC test result: $summary ###" | tee -a $TDENGINE_COVERAGE_REPORT
|
echo -e "### JDBC test result: $summary ###" | tee -a $TDENGINE_COVERAGE_REPORT
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function runTaosKeeperCases() {
|
||||||
|
echo "=== Run taoskeeper cases ==="
|
||||||
|
|
||||||
|
cd $TAOSKEEPER_DIR
|
||||||
|
git checkout -- .
|
||||||
|
git reset --hard HEAD
|
||||||
|
git checkout master
|
||||||
|
git pull
|
||||||
|
|
||||||
|
stopTaosd
|
||||||
|
stopTaosadapter
|
||||||
|
|
||||||
|
taosd -c /etc/taos >> /dev/null 2>&1 &
|
||||||
|
taosadapter >> /dev/null 2>&1 &
|
||||||
|
|
||||||
|
go mod tidy && go test -v ./...
|
||||||
|
}
|
||||||
|
|
||||||
function runTest() {
|
function runTest() {
|
||||||
echo "run Test"
|
echo "run Test"
|
||||||
|
|
||||||
|
@ -182,6 +201,7 @@ function runTest() {
|
||||||
runSimCases
|
runSimCases
|
||||||
runPythonCases
|
runPythonCases
|
||||||
runJDBCCases
|
runJDBCCases
|
||||||
|
runTaosKeeperCases
|
||||||
|
|
||||||
stopTaosd
|
stopTaosd
|
||||||
cd $TDENGINE_DIR/tests/script
|
cd $TDENGINE_DIR/tests/script
|
||||||
|
@ -199,7 +219,7 @@ function lcovFunc {
|
||||||
lcov -d . --capture --rc lcov_branch_coverage=1 --rc genhtml_branch_coverage=1 --no-external -b $TDENGINE_DIR -o coverage.info
|
lcov -d . --capture --rc lcov_branch_coverage=1 --rc genhtml_branch_coverage=1 --no-external -b $TDENGINE_DIR -o coverage.info
|
||||||
|
|
||||||
# remove exclude paths
|
# remove exclude paths
|
||||||
if [ "$branch" == "3.0" ]; then
|
if [ "$branch" == "main" ] ; then
|
||||||
lcov --remove coverage.info \
|
lcov --remove coverage.info \
|
||||||
'*/contrib/*' '*/tests/*' '*/test/*' '*/tools/*' '*/libs/sync/*'\
|
'*/contrib/*' '*/tests/*' '*/test/*' '*/tools/*' '*/libs/sync/*'\
|
||||||
'*/AccessBridgeCalls.c' '*/ttszip.c' '*/dataInserter.c' '*/tlinearhash.c' '*/tsimplehash.c' '*/tsdbDiskData.c'\
|
'*/AccessBridgeCalls.c' '*/ttszip.c' '*/dataInserter.c' '*/tlinearhash.c' '*/tsimplehash.c' '*/tsdbDiskData.c'\
|
||||||
|
@ -209,6 +229,8 @@ function lcovFunc {
|
||||||
'*/tthread.c' '*/tversion.c' '*/ctgDbg.c' '*/schDbg.c' '*/qwDbg.c' '*/tencode.h' '*/catalog.c'\
|
'*/tthread.c' '*/tversion.c' '*/ctgDbg.c' '*/schDbg.c' '*/qwDbg.c' '*/tencode.h' '*/catalog.c'\
|
||||||
'*/tqSnapshot.c' '*/tsdbSnapshot.c''*/metaSnapshot.c' '*/smaSnapshot.c' '*/tqOffsetSnapshot.c'\
|
'*/tqSnapshot.c' '*/tsdbSnapshot.c''*/metaSnapshot.c' '*/smaSnapshot.c' '*/tqOffsetSnapshot.c'\
|
||||||
'*/vnodeSnapshot.c' '*/metaSnapshot.c' '*/tsdbSnapshot.c' '*/mndGrant.c' '*/mndSnode.c' '*/streamRecover.c'\
|
'*/vnodeSnapshot.c' '*/metaSnapshot.c' '*/tsdbSnapshot.c' '*/mndGrant.c' '*/mndSnode.c' '*/streamRecover.c'\
|
||||||
|
'*/osAtomic.c' '*/osDir.c' '*/osFile.c' '*/osMath.c' '*/osSignal.c' '*/osSleep.c' '*/osString.c' '*/osSystem.c'\
|
||||||
|
'*/osThread.c' '*/osTime.c' '*/osTimezone.c' \
|
||||||
--rc lcov_branch_coverage=1 -o coverage.info
|
--rc lcov_branch_coverage=1 -o coverage.info
|
||||||
else
|
else
|
||||||
lcov --remove coverage.info \
|
lcov --remove coverage.info \
|
||||||
|
|
|
@ -484,6 +484,7 @@ if $rows != 2 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print === select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) limit 1 offset 0
|
||||||
sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) limit 1 offset 0
|
sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) limit 1 offset 0
|
||||||
if $rows != 3 then
|
if $rows != 3 then
|
||||||
return -1
|
return -1
|
||||||
|
|
|
@ -70,7 +70,7 @@ endi
|
||||||
|
|
||||||
### empty result set
|
### empty result set
|
||||||
sql select count(*) from stb partition by t2,t1 order by t2 asc slimit 0 soffset 0
|
sql select count(*) from stb partition by t2,t1 order by t2 asc slimit 0 soffset 0
|
||||||
if $rows != 9 then
|
if $rows != 0 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
$dbPrefix = m_fl_db
|
||||||
|
$tbPrefix = m_fl_tb
|
||||||
|
$mtPrefix = m_fl_mt
|
||||||
|
$tbNum = 2
|
||||||
|
$rowNum = 513
|
||||||
|
$totalNum = $tbNum * $rowNum
|
||||||
|
$ts0 = 1537146000000
|
||||||
|
$delta = 600000
|
||||||
|
print ========== fill.sim
|
||||||
|
$i = 0
|
||||||
|
$db = $dbPrefix . $i
|
||||||
|
$mt = $mtPrefix . $i
|
||||||
|
|
||||||
|
sql drop database $db -x step1
|
||||||
|
step1:
|
||||||
|
sql create database $db vgroups 1
|
||||||
|
sql use $db
|
||||||
|
sql create table $mt (ts timestamp, c1 int) tags(tgcol int)
|
||||||
|
|
||||||
|
$i = 0
|
||||||
|
$ts = $ts0
|
||||||
|
while $i < $tbNum
|
||||||
|
$tb = $tbPrefix . $i
|
||||||
|
sql create table $tb using $mt tags( $i )
|
||||||
|
|
||||||
|
$x = 0
|
||||||
|
while $x < $rowNum
|
||||||
|
$xs = $x * $delta
|
||||||
|
$ts = $ts0 + $xs
|
||||||
|
sql insert into $tb values ( $ts , $x )
|
||||||
|
$x = $x + 1
|
||||||
|
endw
|
||||||
|
|
||||||
|
$i = $i + 1
|
||||||
|
endw
|
||||||
|
|
||||||
|
sql select * from $mt order by ts limit 10
|
||||||
|
if $rows != 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -28,10 +28,10 @@ import os
|
||||||
NO_FOUND = 0 # not found assert or ASSERT
|
NO_FOUND = 0 # not found assert or ASSERT
|
||||||
FOUND_OK = 1 # found ASSERT and valid usage
|
FOUND_OK = 1 # found ASSERT and valid usage
|
||||||
FOUND_NOIF = 2 # found ASSERT but no if like ASSERT(...)
|
FOUND_NOIF = 2 # found ASSERT but no if like ASSERT(...)
|
||||||
FOUND_LOWER = 3 # found assert write with lower letters
|
FOUND_LOWER = 3 # found assert write with system assert
|
||||||
FOUND_HAVENOT = 4 # found ASSERT have if but have not like if(!ASSERT)
|
FOUND_HAVENOT = 4 # found ASSERT have if but have not like if(!ASSERT)
|
||||||
|
|
||||||
code_strs = ["not found", "valid", "found but no if", "lower assert","found but have not"]
|
code_strs = ["not found", "valid", "found but no if", "system assert","found but have not"]
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
|
|
|
@ -93,7 +93,6 @@ class TDTestCase:
|
||||||
tdSql.checkEqual(i[2],len(self.perf_list))
|
tdSql.checkEqual(i[2],len(self.perf_list))
|
||||||
tdSql.execute('create table db1.ntb (ts timestamp,c0 int)')
|
tdSql.execute('create table db1.ntb (ts timestamp,c0 int)')
|
||||||
tdSql.query(f'select db_name, count(*) from information_schema.ins_tables group by db_name')
|
tdSql.query(f'select db_name, count(*) from information_schema.ins_tables group by db_name')
|
||||||
print(tdSql.queryResult)
|
|
||||||
for i in tdSql.queryResult:
|
for i in tdSql.queryResult:
|
||||||
if i[0].lower() == 'information_schema':
|
if i[0].lower() == 'information_schema':
|
||||||
tdSql.checkEqual(i[1],len(self.ins_list))
|
tdSql.checkEqual(i[1],len(self.ins_list))
|
||||||
|
@ -101,9 +100,77 @@ class TDTestCase:
|
||||||
tdSql.checkEqual(i[1],len(self.perf_list))
|
tdSql.checkEqual(i[1],len(self.perf_list))
|
||||||
elif i[0].lower() == self.dbname:
|
elif i[0].lower() == self.dbname:
|
||||||
tdSql.checkEqual(i[1],self.tbnum+1)
|
tdSql.checkEqual(i[1],self.tbnum+1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def ins_col_check_4096(self):
|
||||||
|
tdSql.execute('create database db3 vgroups 2 replica 1')
|
||||||
|
col_str = tdCom.gen_tag_col_str("col", "int",4094)
|
||||||
|
tdSql.execute(f'create stable if not exists db3.stb (col_ts timestamp, {col_str}) tags (t1 int)')
|
||||||
|
for i in range(100):
|
||||||
|
tdLog.info(f"create table db3.ctb{i} using db3.stb tags({i})")
|
||||||
|
tdSql.execute(f"create table db3.ctb{i} using db3.stb tags({i})")
|
||||||
|
col_value_str = '1, ' * 4093 + '1'
|
||||||
|
tdSql.execute(f"insert into db3.ctb{i} values(now,{col_value_str})(now+1s,{col_value_str})(now+2s,{col_value_str})(now+3s,{col_value_str})")
|
||||||
|
tdSql.query("select * from information_schema.ins_columns")
|
||||||
|
|
||||||
|
tdSql.execute('drop database db3')
|
||||||
|
def ins_stable_check(self):
|
||||||
|
tdSql.execute('create database db3 vgroups 2 replica 1')
|
||||||
|
tbnum = 10
|
||||||
|
ctbnum = 10
|
||||||
|
for i in range(tbnum):
|
||||||
|
tdSql.execute(f'create stable db3.stb_{i} (ts timestamp,c0 int) tags(t0 int)')
|
||||||
|
tdSql.execute(f'create table db3.ntb_{i} (ts timestamp,c0 int)')
|
||||||
|
for j in range(ctbnum):
|
||||||
|
tdSql.execute(f"create table db3.ctb_{i}_{j} using db3.stb_{i} tags({j})")
|
||||||
|
tdSql.query("select stable_name,count(table_name) from information_schema.ins_tables where db_name = 'db3' group by stable_name order by stable_name")
|
||||||
|
result = tdSql.queryResult
|
||||||
|
for i in range(len(result)):
|
||||||
|
if result[i][0] == None:
|
||||||
|
tdSql.checkEqual(result[0][1],tbnum)
|
||||||
|
else:
|
||||||
|
tdSql.checkEqual(result[i][0],f'stb_{i-1}')
|
||||||
|
tdSql.checkEqual(result[i][1],ctbnum)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def ins_columns_check(self):
|
||||||
|
tdSql.execute('drop database if exists db2')
|
||||||
|
tdSql.execute('create database if not exists db2 vgroups 1 replica 1')
|
||||||
|
for i in range (5):
|
||||||
|
self.stb4096 = 'create table db2.stb%d (ts timestamp' % (i)
|
||||||
|
for j in range (4094 - i):
|
||||||
|
self.stb4096 += ', c%d int' % (j)
|
||||||
|
self.stb4096 += ') tags (t1 int)'
|
||||||
|
tdSql.execute(self.stb4096)
|
||||||
|
for k in range(10):
|
||||||
|
tdSql.execute("create table db2.ctb_%d_%dc using db2.stb%d tags(%d)" %(i,k,i,k))
|
||||||
|
for t in range (2):
|
||||||
|
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="SUPER_TABLE"')
|
||||||
|
tdSql.checkEqual(20465,len(tdSql.queryResult))
|
||||||
|
for t in range (2):
|
||||||
|
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="CHILD_TABLE"')
|
||||||
|
tdSql.checkEqual(204650,len(tdSql.queryResult))
|
||||||
|
|
||||||
|
for i in range (5):
|
||||||
|
self.ntb4096 = 'create table db2.ntb%d (ts timestamp' % (i)
|
||||||
|
for j in range (4095 - i):
|
||||||
|
self.ntb4096 += ', c%d binary(10)' % (j)
|
||||||
|
self.ntb4096 += ')'
|
||||||
|
tdSql.execute(self.ntb4096)
|
||||||
|
for t in range (2):
|
||||||
|
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="NORMAL_TABLE"')
|
||||||
|
tdSql.checkEqual(20470,len(tdSql.queryResult))
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.prepare_data()
|
self.prepare_data()
|
||||||
self.count_check()
|
self.count_check()
|
||||||
|
self.ins_columns_check()
|
||||||
|
# self.ins_col_check_4096()
|
||||||
|
self.ins_stable_check()
|
||||||
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
|
@ -214,13 +214,13 @@ class TDTestCase:
|
||||||
sql=f"SELECT model,count(state_changed) FROM (SELECT _rowts,model,diff(broken_down) AS state_changed FROM (SELECT ts,model,tb,cast(cast(floor(2*(nzs)) as bool) as int) AS broken_down FROM (SELECT _wstart as ts,model,tbname as tb, sum(cast(cast(status as bool) as int))/count(cast(cast(status as bool) as int)) AS nzs FROM {dbname}.diagnostics WHERE ts >= 1451606400000 AND ts < 1451952001000 partition BY tbname,model interval(10m))order by ts) partition BY tb,model ) WHERE state_changed = 1 partition BY model;"
|
sql=f"SELECT model,count(state_changed) FROM (SELECT _rowts,model,diff(broken_down) AS state_changed FROM (SELECT ts,model,tb,cast(cast(floor(2*(nzs)) as bool) as int) AS broken_down FROM (SELECT _wstart as ts,model,tbname as tb, sum(cast(cast(status as bool) as int))/count(cast(cast(status as bool) as int)) AS nzs FROM {dbname}.diagnostics WHERE ts >= 1451606400000 AND ts < 1451952001000 partition BY tbname,model interval(10m))order by ts) partition BY tb,model ) WHERE state_changed = 1 partition BY model;"
|
||||||
tdSql.query(f"{sql}")
|
tdSql.query(f"{sql}")
|
||||||
tdSql.checkRows(46)
|
tdSql.checkRows(46)
|
||||||
# for i in range(2):
|
for i in range(2):
|
||||||
# tdSql.query("%s"%sql)
|
tdSql.query("%s"%sql)
|
||||||
# quertR1=tdSql.queryResult
|
quertR1=tdSql.queryResult
|
||||||
# for j in range(50):
|
for j in range(50):
|
||||||
# tdSql.query("%s"%sql)
|
tdSql.query("%s"%sql)
|
||||||
# quertR2=tdSql.queryResult
|
quertR2=tdSql.queryResult
|
||||||
# assert quertR1 == quertR2 , "%s != %s ,The results of multiple queries are different" %(quertR1,quertR2)
|
assert quertR1 == quertR2 , "%s != %s ,The results of multiple queries are different" %(quertR1,quertR2)
|
||||||
|
|
||||||
|
|
||||||
#it's already supported:
|
#it's already supported:
|
||||||
|
|
|
@ -245,8 +245,8 @@ class TDTestCase:
|
||||||
for i in range(expectRows):
|
for i in range(expectRows):
|
||||||
totalConsumeRows += resultList[i]
|
totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
if totalConsumeRows != expectrowcnt/4:
|
tdLog.info("act consume rows: %d, expect consume rows greater than or equal to: %d"%(totalConsumeRows, expectrowcnt/4))
|
||||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
|
if totalConsumeRows < expectrowcnt/4:
|
||||||
tdLog.exit("tmq consume rows error!")
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
self.initConsumerInfoTable()
|
self.initConsumerInfoTable()
|
||||||
|
@ -267,8 +267,8 @@ class TDTestCase:
|
||||||
for i in range(expectRows):
|
for i in range(expectRows):
|
||||||
totalConsumeRows += resultList[i]
|
totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
if totalConsumeRows != expectrowcnt:
|
tdLog.info("act consume rows: %d, expect consume rows greater than or equal to: %d"%(totalConsumeRows, expectrowcnt))
|
||||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
if totalConsumeRows < expectrowcnt:
|
||||||
tdLog.exit("tmq consume rows error!")
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
tdSql.query("drop topic %s"%topicFromStb1)
|
tdSql.query("drop topic %s"%topicFromStb1)
|
||||||
|
|
|
@ -323,7 +323,7 @@ class TDTestCase:
|
||||||
|
|
||||||
if self.snapshot == 0:
|
if self.snapshot == 0:
|
||||||
consumerId = 4
|
consumerId = 4
|
||||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4))
|
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1/4 + 3/4))
|
||||||
elif self.snapshot == 1:
|
elif self.snapshot == 1:
|
||||||
consumerId = 5
|
consumerId = 5
|
||||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4))
|
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4))
|
||||||
|
@ -369,9 +369,14 @@ class TDTestCase:
|
||||||
tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
|
tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
|
||||||
|
|
||||||
if self.snapshot == 0:
|
if self.snapshot == 0:
|
||||||
if totalConsumeRows != expectrowcnt:
|
# If data writing is completed before consumer get snapshot, will consume 7500 from wal;
|
||||||
|
# If data writing has not started before consumer get snapshot, will consume 10000 from wal;
|
||||||
|
minRows = int(expectrowcnt * (1 - 1/4)) # 7500
|
||||||
|
tdLog.info("consume rows should be between %d and %d, "%(minRows, expectrowcnt))
|
||||||
|
if not ((totalConsumeRows >= minRows) and (totalConsumeRows <= expectrowcnt)):
|
||||||
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
||||||
elif self.snapshot == 1:
|
elif self.snapshot == 1:
|
||||||
|
tdLog.info("consume rows should be between %d and %d, "%(totalRowsFromQuery, expectrowcnt))
|
||||||
if not ((totalConsumeRows >= totalRowsFromQuery) and (totalConsumeRows <= expectrowcnt)):
|
if not ((totalConsumeRows >= totalRowsFromQuery) and (totalConsumeRows <= expectrowcnt)):
|
||||||
tdLog.exit("tmq consume rows error with snapshot = 1!")
|
tdLog.exit("tmq consume rows error with snapshot = 1!")
|
||||||
|
|
||||||
|
@ -381,7 +386,113 @@ class TDTestCase:
|
||||||
|
|
||||||
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
||||||
|
|
||||||
|
def tmqCase4(self):
|
||||||
|
tdLog.printNoPrefix("======== test case 4: ")
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 4,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 1,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 5000,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 15,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 0}
|
||||||
|
|
||||||
|
paraDict['snapshot'] = self.snapshot
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
|
tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||||
|
tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||||
|
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
tdLog.info("create topics from stb1")
|
||||||
|
topicFromStb1 = 'topic_stb1'
|
||||||
|
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||||
|
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
|
||||||
|
tdLog.info("create topic sql: %s"%sqlString)
|
||||||
|
tdSql.execute(sqlString)
|
||||||
|
|
||||||
|
# paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
consumerId = 1
|
||||||
|
|
||||||
|
if self.snapshot == 0:
|
||||||
|
consumerId = 4
|
||||||
|
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
|
||||||
|
elif self.snapshot == 1:
|
||||||
|
consumerId = 5
|
||||||
|
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4))
|
||||||
|
|
||||||
|
topicList = topicFromStb1
|
||||||
|
ifcheckdata = 1
|
||||||
|
ifManualCommit = 1
|
||||||
|
keyList = 'group.id:cgrp1,\
|
||||||
|
enable.auto.commit:true,\
|
||||||
|
auto.commit.interval.ms:1000,\
|
||||||
|
auto.offset.reset:earliest'
|
||||||
|
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
# del some data
|
||||||
|
rowsOfDelete = int(self.rowsPerTbl / 4 )
|
||||||
|
paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1
|
||||||
|
# pDeleteThread = self.asyncDeleteData(paraDict)
|
||||||
|
self.threadFunctionForDeletaData(paraDict)
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||||
|
|
||||||
|
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||||
|
|
||||||
|
# update to 1/4 rows and insert 3/4 new rows
|
||||||
|
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 3 / 4)
|
||||||
|
# paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||||
|
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||||
|
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
|
||||||
|
|
||||||
|
pInsertThread.join()
|
||||||
|
|
||||||
|
tdLog.info("start to check consume result")
|
||||||
|
expectRows = 1
|
||||||
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
totalConsumeRows = 0
|
||||||
|
for i in range(expectRows):
|
||||||
|
totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
|
tdSql.query(queryString)
|
||||||
|
totalRowsFromQuery = tdSql.getRows()
|
||||||
|
|
||||||
|
tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
|
||||||
|
|
||||||
|
if self.snapshot == 0:
|
||||||
|
tdLog.info("consume rows should be %d"%(expectrowcnt))
|
||||||
|
if (totalConsumeRows != expectrowcnt):
|
||||||
|
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
||||||
|
elif self.snapshot == 1:
|
||||||
|
# If data writing has not started before consumer get snapshot, will consume 10000 from wal, and consumer 7500 from tsdb;
|
||||||
|
tdLog.info("consume rows should be %d, "%(expectrowcnt))
|
||||||
|
if (totalConsumeRows != expectrowcnt):
|
||||||
|
tdLog.exit("tmq consume rows error with snapshot = 1!")
|
||||||
|
|
||||||
|
# tmqCom.checkFileContent(consumerId, queryString)
|
||||||
|
|
||||||
|
tdSql.query("drop topic %s"%topicFromStb1)
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 4 end ...... ")
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
# tdSql.prepare()
|
# tdSql.prepare()
|
||||||
tdLog.printNoPrefix("=============================================")
|
tdLog.printNoPrefix("=============================================")
|
||||||
|
@ -409,6 +520,17 @@ class TDTestCase:
|
||||||
self.prepareTestEnv()
|
self.prepareTestEnv()
|
||||||
self.tmqCase3()
|
self.tmqCase3()
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("=============================================")
|
||||||
|
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
|
||||||
|
self.snapshot = 0
|
||||||
|
self.prepareTestEnv()
|
||||||
|
self.tmqCase4()
|
||||||
|
tdLog.printNoPrefix("====================================================================")
|
||||||
|
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
|
||||||
|
self.snapshot = 1
|
||||||
|
self.prepareTestEnv()
|
||||||
|
self.tmqCase4()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
tdLog.success(f"{__file__} successfully executed")
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
|
@ -136,7 +136,7 @@ class TDTestCase:
|
||||||
tdLog.info("================= restart dnode ===========================")
|
tdLog.info("================= restart dnode ===========================")
|
||||||
tdDnodes.stoptaosd(1)
|
tdDnodes.stoptaosd(1)
|
||||||
tdDnodes.starttaosd(1)
|
tdDnodes.starttaosd(1)
|
||||||
# time.sleep(3)
|
time.sleep(5)
|
||||||
|
|
||||||
tdLog.info(" restart taosd end and wait to check consume result")
|
tdLog.info(" restart taosd end and wait to check consume result")
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
|
@ -220,7 +220,7 @@ class TDTestCase:
|
||||||
tdLog.info("================= restart dnode ===========================")
|
tdLog.info("================= restart dnode ===========================")
|
||||||
tdDnodes.stoptaosd(1)
|
tdDnodes.stoptaosd(1)
|
||||||
tdDnodes.starttaosd(1)
|
tdDnodes.starttaosd(1)
|
||||||
# time.sleep(3)
|
time.sleep(5)
|
||||||
|
|
||||||
tdLog.info("create some new child table and insert data ")
|
tdLog.info("create some new child table and insert data ")
|
||||||
paraDict["batchNum"] = 100
|
paraDict["batchNum"] = 100
|
||||||
|
|
|
@ -0,0 +1,174 @@
|
||||||
|
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import math
|
||||||
|
# from tests.pytest.util.common import TDCom
|
||||||
|
# from tests.pytest.util.log import TDLog
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.common import *
|
||||||
|
sys.path.append("./7-tmq")
|
||||||
|
from tmqCommon import *
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def __init__(self):
|
||||||
|
self.vgroups = 32
|
||||||
|
self.ctbNum = 100
|
||||||
|
self.rowsPerTbl = 1000
|
||||||
|
self.snapshot = 1
|
||||||
|
self.replicaVar = 3
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor(), False)
|
||||||
|
|
||||||
|
def prepareTestEnv(self):
|
||||||
|
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 4,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 10,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 1000,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 10,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 0}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
tmqCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=self.replicaVar)
|
||||||
|
tdLog.info("create stb")
|
||||||
|
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||||
|
tdLog.info("create ctb")
|
||||||
|
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||||
|
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
tdLog.info("insert data")
|
||||||
|
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||||
|
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||||
|
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
|
||||||
|
# tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||||
|
# tdDnodes.stop(1)
|
||||||
|
# tdDnodes.start(1)
|
||||||
|
# tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||||
|
return
|
||||||
|
|
||||||
|
def tmqCase1(self):
|
||||||
|
tdLog.printNoPrefix("======== test case 1: ")
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 1,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 1,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 1000,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 20,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 1}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
paraDict['snapshot'] = self.snapshot
|
||||||
|
|
||||||
|
topicNameList = ['topic1', 'topic2']
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
|
||||||
|
tdLog.info("create topics from stb with filter")
|
||||||
|
# queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9379)
|
||||||
|
queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s "%(paraDict['dbName'], paraDict['stbName'])
|
||||||
|
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||||
|
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||||
|
tdLog.info("create topic sql: %s"%sqlString)
|
||||||
|
tdSql.execute(sqlString)
|
||||||
|
|
||||||
|
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
|
||||||
|
tdLog.info("create topic sql: %s"%sqlString)
|
||||||
|
tdSql.execute(sqlString)
|
||||||
|
# tdSql.query(queryString)
|
||||||
|
# expectRowsList.append(tdSql.getRows())
|
||||||
|
|
||||||
|
# init consume info, and start tmq_sim, then check consume result
|
||||||
|
tdLog.info("insert consume info to consume processor")
|
||||||
|
consumerId = 0
|
||||||
|
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 4
|
||||||
|
topicList = topicNameList[0] + ',' + topicNameList[0] + ',' + topicNameList[1]
|
||||||
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 1
|
||||||
|
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
|
||||||
|
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
consumerId = 1
|
||||||
|
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||||
|
tdLog.info("wait the consume result")
|
||||||
|
|
||||||
|
# continue to insert new rows
|
||||||
|
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl)
|
||||||
|
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
|
||||||
|
pInsertThread.join()
|
||||||
|
|
||||||
|
expectRows = 2
|
||||||
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
actConsumeTotalRows = resultList[0] + resultList[1]
|
||||||
|
|
||||||
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(actConsumeTotalRows, expectrowcnt))
|
||||||
|
|
||||||
|
if not ((expectrowcnt <= actConsumeTotalRows) or ((resultList[0] == 0) and (resultList[1] >= expectrowcnt)) or ((resultList[1] == 0) and (resultList[0] >= expectrowcnt))):
|
||||||
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt, actConsumeTotalRows))
|
||||||
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
# tmqCom.checkFileContent(consumerId, queryString)
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
|
for i in range(len(topicNameList)):
|
||||||
|
tdSql.query("drop topic %s"%topicNameList[i])
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
tdSql.prepare()
|
||||||
|
self.prepareTestEnv()
|
||||||
|
self.tmqCase1()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
event = threading.Event()
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue