diff --git a/cmake/cmake.version b/cmake/cmake.version
index d38ac40b90..de85025a8c 100644
--- a/cmake/cmake.version
+++ b/cmake/cmake.version
@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
- SET(TD_VER_NUMBER "3.0.3.0")
+ SET(TD_VER_NUMBER "3.0.3.1")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
diff --git a/examples/JDBC/taosdemo/pom.xml b/examples/JDBC/taosdemo/pom.xml
index 68224bbad5..4731d8e237 100644
--- a/examples/JDBC/taosdemo/pom.xml
+++ b/examples/JDBC/taosdemo/pom.xml
@@ -10,7 +10,7 @@
Demo project for TDengine
- 5.3.20
+ 5.3.26
diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h
index 3dee59ab14..1da801be0d 100644
--- a/include/libs/monitor/monitor.h
+++ b/include/libs/monitor/monitor.h
@@ -147,9 +147,9 @@ typedef struct {
} SMonStbInfo;
typedef struct {
- int32_t expire_time;
- int64_t timeseries_used;
- int64_t timeseries_total;
+ uint32_t expire_time;
+ int64_t timeseries_used;
+ int64_t timeseries_total;
} SMonGrantInfo;
typedef struct {
diff --git a/packaging/testpackage.sh b/packaging/testpackage.sh
index 9959d290e7..97226a86b5 100755
--- a/packaging/testpackage.sh
+++ b/packaging/testpackage.sh
@@ -107,7 +107,7 @@ elif [ ${testFile} = "tools" ];then
originTdpPath="taosTools-${originversion}"
packageName="${tdPath}-Linux-${cpuType}${packageLite}.${packageType}"
originPackageName="${originTdpPath}-Linux-${cpuType}${packageLite}.${packageType}"
- installCmd="install-tools.sh"
+ installCmd="install-taostools.sh"
fi
@@ -333,7 +333,7 @@ if [[ ${packageName} =~ "Lite" ]] || ([[ ${packageName} =~ "x64" ]] && [[ ${p
wgetFile taosTools-2.1.3-Linux-x64.tar.gz v2.1.3 web
tar xf taosTools-2.1.3-Linux-x64.tar.gz
fi
- cd taosTools-2.1.3 && bash install-tools.sh
+ cd taosTools-2.1.3 && bash install-taostools.sh
elif ([[ ${packageName} =~ "arm64" ]] && [[ ${packageName} =~ "client" ]]);then
echoColor G "===== install taos-tools arm when package is arm64-client ====="
cd ${installPath}
@@ -342,7 +342,7 @@ elif ([[ ${packageName} =~ "arm64" ]] && [[ ${packageName} =~ "client" ]]);then
tar xf taosTools-2.1.3-Linux-arm64.tar.gz
fi
- cd taosTools-2.1.3 && bash install-tools.sh
+ cd taosTools-2.1.3 && bash install-taostools.sh
fi
echoColor G "===== start TDengine ====="
@@ -361,18 +361,18 @@ rm -rf ${installPath}/${tdPath}/
# cd ${installPath}
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
# tar xf taosTools-2.1.2-Linux-x64.tar.gz
-# cd taosTools-2.1.2 && bash install-tools.sh
+# cd taosTools-2.1.2 && bash install-taostools.sh
# elif [[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "deb" ]] ;then
# echoColor G "===== install taos-tools when package is lite or client ====="
# cd ${installPath}
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
# tar xf taosTools-2.1.2-Linux-x64.tar.gz
-# cd taosTools-2.1.2 && bash install-tools.sh
+# cd taosTools-2.1.2 && bash install-taostools.sh
# elif [[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "rpm" ]] ;then
# echoColor G "===== install taos-tools when package is lite or client ====="
# cd ${installPath}
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
# tar xf taosTools-2.1.2-Linux-x64.tar.gz
-# cd taosTools-2.1.2 && bash install-tools.sh
+# cd taosTools-2.1.2 && bash install-taostools.sh
# fi
diff --git a/packaging/tools/com.taosdata.taoskeeper.plist b/packaging/tools/com.taosdata.taoskeeper.plist
new file mode 100644
index 0000000000..338b5d8e79
--- /dev/null
+++ b/packaging/tools/com.taosdata.taoskeeper.plist
@@ -0,0 +1,33 @@
+
+
+
+
+ Label
+ com.tdengine.taoskeeper
+ ProgramArguments
+
+ /usr/local/bin/taoskeeper
+
+ ProcessType
+ Interactive
+ Disabled
+
+ RunAtLoad
+
+ LaunchOnlyOnce
+
+ SessionCreate
+
+ ExitTimeOut
+ 600
+ KeepAlive
+
+ SuccessfulExit
+
+ AfterInitialDemand
+
+
+ Program
+ /usr/local/bin/taoskeeper
+
+
\ No newline at end of file
diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh
index d71d5df47c..29160238ce 100755
--- a/packaging/tools/makepkg.sh
+++ b/packaging/tools/makepkg.sh
@@ -53,7 +53,7 @@ if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then
cd ${top_dir}/tools/taos-tools/packaging/deb
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
- taostools_ver=$(git tag |grep -v taos | sort | tail -1)
+ taostools_ver=$(git for-each-ref --sort=taggerdate --format '%(tag)' refs/tags|grep -v taos | tail -1)
taostools_install_dir="${release_dir}/${clientName2}Tools-${taostools_ver}"
cd ${curr_dir}
diff --git a/packaging/tools/post.sh b/packaging/tools/post.sh
index 9861806677..c9fab51b28 100755
--- a/packaging/tools/post.sh
+++ b/packaging/tools/post.sh
@@ -582,6 +582,11 @@ function install_service_on_launchctl() {
${csudo}cp ${install_main_dir}/service/com.taosdata.taosadapter.plist /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
fi
+ if [ -f ${install_main_dir}/service/com.taosdata.taoskeeper.plist ]; then
+ ${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taoskeeper.plist > /dev/null 2>&1 || :
+ ${csudo}cp ${install_main_dir}/service/com.taosdata.taoskeeper.plist /Library/LaunchDaemons/com.taosdata.taoskeeper.plist || :
+ ${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taoskeeper.plist || :
+ fi
}
function install_taosadapter_service() {
diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c
index cfd1005181..6bd8b01842 100644
--- a/source/client/src/clientRawBlockWrite.c
+++ b/source/client/src/clientRawBlockWrite.c
@@ -1373,6 +1373,8 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname);
goto end;
}
+// uError("td23101 0vgId:%d, vgId:%d, name:%s, uid:%"PRIu64, vgData.vgId, pTableMeta->vgId, tbname, pTableMeta->uid);
+
pQuery = smlInitHandle();
if (pQuery == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
@@ -1380,6 +1382,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
}
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
+// uError("td23101 1vgId:%d, numEps:%d, name:%s, uid:%"PRIu64, vgData.vgId, vgData.epSet.numOfEps, tbname, pTableMeta->uid);
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false);
if (code != TSDB_CODE_SUCCESS) {
@@ -1723,10 +1726,15 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tDecoderClear(&decoderTmp);
}
- if (pCreateReqDst) {
+ SVgroupInfo vg;
+ code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
+ if (code != TSDB_CODE_SUCCESS) {
+ uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
+ goto end;
+ }
+
+ if (pCreateReqDst) { // change stable name to get meta
strcpy(pName.tname, pCreateReqDst->ctb.stbName);
- } else {
- strcpy(pName.tname, tbName);
}
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
@@ -1739,13 +1747,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end;
}
- SVgroupInfo vg;
- code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
- if (code != TSDB_CODE_SUCCESS) {
- uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
- goto end;
- }
-
if (pCreateReqDst) {
pTableMeta->vgId = vg.vgId;
pTableMeta->uid = pCreateReqDst->uid;
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index dfc3b3fde8..e891eef1d8 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -457,6 +457,7 @@ typedef struct {
void* pIter;
SMnode* pMnode;
STableMetaRsp* pMeta;
+ bool restore;
bool sysDbRsp;
char db[TSDB_DB_FNAME_LEN];
char filterTb[TSDB_TABLE_NAME_LEN];
diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c
index c32212dfc1..a096e0341e 100644
--- a/source/dnode/mnode/impl/src/mndMain.c
+++ b/source/dnode/mnode/impl/src/mndMain.c
@@ -864,7 +864,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
}
// grant info
- pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
+ pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 1000;
pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
if (pMnode->grant.expireTimeMS == 0) {
pGrantInfo->expire_time = INT32_MAX;
diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c
index 7fe08514f6..c50b205f37 100644
--- a/source/dnode/mnode/impl/src/mndShow.c
+++ b/source/dnode/mnode/impl/src/mndShow.c
@@ -324,7 +324,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
pReq->info.rsp = pRsp;
pReq->info.rspLen = size;
- if (rowsRead == 0 || rowsRead < rowsToRead) {
+ if (rowsRead == 0 || ((rowsRead < rowsToRead) && !pShow->restore)) {
pRsp->completed = 1;
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
mndReleaseShowObj(pShow, true);
diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c
index 2a369a863a..c577097644 100644
--- a/source/dnode/mnode/impl/src/mndStb.c
+++ b/source/dnode/mnode/impl/src/mndStb.c
@@ -3113,9 +3113,18 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(typeName, "SUPER_TABLE");
+ bool fetch = pShow->restore ? false : true;
+ pShow->restore = false;
while (numOfRows < rows) {
- pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
- if (pShow->pIter == NULL) break;
+ if (fetch) {
+ pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
+ if (pShow->pIter == NULL) break;
+ } else {
+ fetch = true;
+ void *pKey = taosHashGetKey(pShow->pIter, NULL);
+ pStb = sdbAcquire(pSdb, SDB_STB, pKey);
+ if (!pStb) continue;
+ }
if (pDb != NULL && pStb->dbUid != pDb->uid) {
sdbRelease(pSdb, pStb);
@@ -3129,6 +3138,17 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
sdbRelease(pSdb, pStb);
continue;
}
+
+ if ((numOfRows + pStb->numOfColumns) > rows) {
+ pShow->restore = true;
+ if (numOfRows == 0) {
+ mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
+ rows, pStb->numOfColumns, pStb->name, pStb->db);
+ }
+ sdbRelease(pSdb, pStb);
+ break;
+ }
+
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h
index 29d1d0aafa..2d053d04ae 100644
--- a/source/dnode/vnode/inc/vnode.h
+++ b/source/dnode/vnode/inc/vnode.h
@@ -157,7 +157,7 @@ typedef struct SMTbCursor SMTbCursor;
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
-int32_t metaTbCursorPrev(SMTbCursor *pTbCur);
+int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
#endif
diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h
index 0b38ce6d24..452b1f6c0b 100644
--- a/source/dnode/vnode/src/inc/tsdb.h
+++ b/source/dnode/vnode/src/inc/tsdb.h
@@ -687,6 +687,8 @@ typedef struct SSttBlockLoadInfo {
STSchema *pSchema;
int16_t *colIds;
int32_t numOfCols;
+ bool checkRemainingRow;
+ bool isLast;
bool sttBlockLoaded;
int32_t numOfStt;
diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c
index 6ab322d26a..3376150d26 100644
--- a/source/dnode/vnode/src/meta/metaQuery.c
+++ b/source/dnode/vnode/src/meta/metaQuery.c
@@ -336,7 +336,7 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) {
return 0;
}
-int32_t metaTbCursorPrev(SMTbCursor *pTbCur) {
+int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) {
int ret;
void *pBuf;
STbCfg tbCfg;
@@ -350,7 +350,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur) {
tDecoderClear(&pTbCur->mr.coder);
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
- if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
+ if (pTbCur->mr.me.type == jumpTableType) {
continue;
}
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index 8279ee7aeb..de2732fcb5 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -571,8 +571,9 @@ static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* p
",version:%" PRId64,
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
taosxRsp.rspOffset.version);
- } else {
- // if (offset.type == TMQ_OFFSET__LOG) {
+ }
+
+ if (offset.type == TMQ_OFFSET__LOG) {
int64_t fetchVer = offset.version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c
index 3d01184e78..6fc8ad8be6 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCache.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCache.c
@@ -590,6 +590,7 @@ typedef struct {
SDataFReader **pDataFReader;
TSDBROW row;
+ bool checkRemainingRow;
SMergeTree mergeTree;
SMergeTree *pMergeTree;
SSttBlockLoadInfo *pLoadInfo;
@@ -600,7 +601,6 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
int nCols) {
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
int32_t code = 0;
- bool checkRemainingRow = true;
switch (state->state) {
case SFSLASTNEXTROW_FS:
@@ -633,12 +633,25 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
if (code) goto _err;
}
- state->pLoadInfo->colIds = aCols;
- state->pLoadInfo->numOfCols = nCols;
+ for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
+ state->pLoadInfo[i].colIds = aCols;
+ state->pLoadInfo[i].numOfCols = nCols;
+ state->pLoadInfo[i].isLast = isLast;
+ }
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
state->pMergeTree = &state->mergeTree;
+ state->state = SFSLASTNEXTROW_BLOCKROW;
+ }
+ case SFSLASTNEXTROW_BLOCKROW: {
+ if (nCols != state->pLoadInfo->numOfCols) {
+ for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
+ state->pLoadInfo[i].numOfCols = nCols;
+
+ state->pLoadInfo[i].checkRemainingRow = state->checkRemainingRow;
+ }
+ }
bool hasVal = tMergeTreeNext(&state->mergeTree);
if (!hasVal) {
if (tMergeTreeIgnoreEarlierTs(&state->mergeTree)) {
@@ -649,76 +662,23 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
state->state = SFSLASTNEXTROW_FILESET;
goto _next_fileset;
}
- state->state = SFSLASTNEXTROW_BLOCKROW;
- checkRemainingRow = false;
- }
- case SFSLASTNEXTROW_BLOCKROW: {
- bool skipRow = false;
- do {
- bool hasVal = false;
- state->row = tMergeTreeGetRow(&state->mergeTree);
- *ppRow = &state->row;
- if (nCols != state->pLoadInfo->numOfCols) {
- state->pLoadInfo->numOfCols = nCols;
- }
- hasVal = tMergeTreeNext(&state->mergeTree);
- if (TSDBROW_TS(&state->row) <= state->lastTs) {
- *pIgnoreEarlierTs = true;
- *ppRow = NULL;
- return code;
- }
+ state->row = tMergeTreeGetRow(&state->mergeTree);
+ *ppRow = &state->row;
- *pIgnoreEarlierTs = false;
- if (!hasVal) {
- state->state = SFSLASTNEXTROW_FILESET;
- break;
- }
+ if (TSDBROW_TS(&state->row) <= state->lastTs) {
+ *pIgnoreEarlierTs = true;
+ *ppRow = NULL;
+ return code;
+ }
- if (checkRemainingRow) {
- bool skipBlock = true;
-
- SBlockData *pBlockData = state->row.pBlockData;
-
- for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
- for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
- SColData *pColData = &pBlockData->aColData[colIndex];
- int16_t cid = pColData->cid;
-
- if (cid == aCols[inputColIndex]) {
- if (isLast && (pColData->flag & HAS_VALUE)) {
- skipBlock = false;
- break;
- } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
- skipBlock = false;
- break;
- }
- }
- }
- }
- /*
- for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
- SColData *pColData = &pBlockData->aColData[colIndex];
- int16_t cid = pColData->cid;
-
- if (inputColIndex < nCols && cid == aCols[inputColIndex]) {
- if (isLast && (pColData->flag & HAS_VALUE)) {
- skipBlock = false;
- break;
- } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
- skipBlock = false;
- break;
- }
-
- ++inputColIndex;
- }
- }
- */
- if (skipBlock) {
- skipRow = true;
- }
- }
- } while (skipRow);
+ *pIgnoreEarlierTs = false;
+ if (!hasVal) {
+ state->state = SFSLASTNEXTROW_FILESET;
+ }
+ if (!state->checkRemainingRow) {
+ state->checkRemainingRow = true;
+ }
return code;
}
default:
diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
index 943b16116c..fa8870835c 100644
--- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
+++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
@@ -504,9 +504,34 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
pIter->iRow += step;
while (1) {
+ bool skipBlock = false;
+
findNextValidRow(pIter, idStr);
- if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
+ if (pIter->pBlockLoadInfo->checkRemainingRow) {
+ skipBlock = true;
+ int16_t *aCols = pIter->pBlockLoadInfo->colIds;
+ int nCols = pIter->pBlockLoadInfo->numOfCols;
+ bool isLast = pIter->pBlockLoadInfo->isLast;
+ for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
+ for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
+ SColData *pColData = &pBlockData->aColData[colIndex];
+ int16_t cid = pColData->cid;
+
+ if (cid == aCols[inputColIndex]) {
+ if (isLast && (pColData->flag & HAS_VALUE)) {
+ skipBlock = false;
+ break;
+ } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
+ skipBlock = false;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
tLDataIterNextBlock(pIter, idStr);
if (pIter->pSttBlk == NULL) { // no more data
goto _exit;
diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c
index 71458acce2..052e4ab2c1 100644
--- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c
+++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c
@@ -335,6 +335,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
// commit json
if (!rollback) {
+ pWriter->info.state.committed = pWriter->ever;
pVnode->config = pWriter->info.config;
pVnode->state = (SVState){.committed = pWriter->info.state.committed,
.applied = pWriter->info.state.committed,
diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h
index cff568aebc..d51a24bb43 100644
--- a/source/libs/executor/inc/tsort.h
+++ b/source/libs/executor/inc/tsort.h
@@ -44,7 +44,8 @@ typedef struct SSortSource {
void* param;
bool onlyRef;
};
-
+ int64_t fetchUs;
+ int64_t fetchNum;
} SSortSource;
typedef struct SMsortComparParam {
diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c
index 9fff7a4943..4e2e105d14 100644
--- a/source/libs/executor/src/projectoperator.c
+++ b/source/libs/executor/src/projectoperator.c
@@ -161,10 +161,9 @@ static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo
if (pLimitInfo->remainGroupOffset > 0) {
return PROJECT_RETRIEVE_CONTINUE;
}
- }
- // set current group id of the project operator
- pLimitInfo->currentGroupId = pBlock->info.id.groupId;
+ pLimitInfo->currentGroupId = 0;
+ }
}
return PROJECT_RETRIEVE_DONE;
@@ -175,19 +174,29 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
// here check for a new group data, we need to handle the data of the previous group.
ASSERT(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1);
- if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
+ bool newGroup = false;
+ if (0 == pBlock->info.id.groupId) {
+ pLimitInfo->numOfOutputGroups = 1;
+ } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
+ pLimitInfo->currentGroupId = pBlock->info.id.groupId;
pLimitInfo->numOfOutputGroups += 1;
- if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
- setOperatorCompleted(pOperator);
- return PROJECT_RETRIEVE_DONE;
- }
-
- // reset the value for a new group data
- // existing rows that belongs to previous group.
- resetLimitInfoForNextGroup(pLimitInfo);
+ newGroup = true;
+ } else {
+ return PROJECT_RETRIEVE_CONTINUE;
}
- return PROJECT_RETRIEVE_DONE;
+ if ((pLimitInfo->slimit.limit >= 0) && (pLimitInfo->slimit.limit < pLimitInfo->numOfOutputGroups)) {
+ setOperatorCompleted(pOperator);
+ return PROJECT_RETRIEVE_DONE;
+ }
+
+ // reset the value for a new group data
+ // existing rows that belongs to previous group.
+ if (newGroup) {
+ resetLimitInfoForNextGroup(pLimitInfo);
+ }
+
+ return PROJECT_RETRIEVE_CONTINUE;
}
// todo refactor
@@ -199,7 +208,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
if (pBlock->info.rows == 0) {
return PROJECT_RETRIEVE_CONTINUE;
} else {
- if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
+ if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
setOperatorCompleted(pOperator);
}
}
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index ee92805d27..ce647014ae 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -31,6 +31,7 @@
#include "thash.h"
#include "ttypes.h"
+#define MULTI_READER_MAX_TABLE_NUM 5000
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
@@ -43,7 +44,9 @@ typedef struct STableMergeScanSortSourceParam {
SOperatorInfo* pOperator;
int32_t readerIdx;
uint64_t uid;
- SSDataBlock* inputBlock;
+ SSDataBlock* inputBlock;
+ bool multiReader;
+ STsdbReader* dataReader;
} STableMergeScanSortSourceParam;
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
@@ -2588,6 +2591,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t readIdx = source->readerIdx;
SSDataBlock* pBlock = source->inputBlock;
+ int32_t code = 0;
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
@@ -2595,17 +2599,20 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle;
- int32_t code =
- tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
- if (code != 0) {
- T_LONG_JMP(pTaskInfo->env, code);
+ if (NULL == source->dataReader || !source->multiReader) {
+ code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo));
+ if (code != 0) {
+ T_LONG_JMP(pTaskInfo->env, code);
+ }
}
-
+
+ pInfo->base.dataReader = source->dataReader;
STsdbReader* reader = pInfo->base.dataReader;
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pTaskInfo)) {
tsdbReleaseDataBlock(reader);
+ pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
@@ -2639,14 +2646,18 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
qTrace("tsdb/read-table-data: %p, close reader", reader);
- tsdbReaderClose(pInfo->base.dataReader);
+ if (!source->multiReader) {
+ tsdbReaderClose(pInfo->base.dataReader);
+ source->dataReader = NULL;
+ }
pInfo->base.dataReader = NULL;
return pBlock;
}
- qDebug("8");
-
- tsdbReaderClose(pInfo->base.dataReader);
+ if (!source->multiReader) {
+ tsdbReaderClose(pInfo->base.dataReader);
+ source->dataReader = NULL;
+ }
pInfo->base.dataReader = NULL;
return NULL;
}
@@ -2718,6 +2729,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanSortSourceParam param = {0};
param.readerIdx = i;
param.pOperator = pOperator;
+ param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);
@@ -2761,6 +2773,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
for (int32_t i = 0; i < numOfTable; ++i) {
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
blockDataDestroy(param->inputBlock);
+ tsdbReaderClose(param->dataReader);
+ param->dataReader = NULL;
}
taosArrayClear(pInfo->sortSourceParams);
@@ -2803,9 +2817,6 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
qDebug("%s get sorted row block, rows:%d, limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
pInfo->limitInfo.numOfOutputRows);
- if (limitReached) {
- resetLimitInfoForNextGroup(&pInfo->limitInfo);
- }
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
}
@@ -2858,6 +2869,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId;
startGroupTableMergeScan(pOperator);
+ resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
}
@@ -2873,15 +2885,17 @@ void destroyTableMergeScanOperatorInfo(void* param) {
for (int32_t i = 0; i < numOfTable; i++) {
STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
blockDataDestroy(p->inputBlock);
+ tsdbReaderClose(p->dataReader);
+ p->dataReader = NULL;
}
+ tsdbReaderClose(pTableScanInfo->base.dataReader);
+ pTableScanInfo->base.dataReader = NULL;
+
taosArrayDestroy(pTableScanInfo->sortSourceParams);
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
pTableScanInfo->pSortHandle = NULL;
- tsdbReaderClose(pTableScanInfo->base.dataReader);
- pTableScanInfo->base.dataReader = NULL;
-
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
taosMemoryFree(pCond->colList);
@@ -2898,8 +2912,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
taosArrayDestroy(pTableScanInfo->pSortInfo);
cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
- tsdbReaderClose(pTableScanInfo->base.dataReader);
- pTableScanInfo->base.dataReader = NULL;
taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
taosMemoryFreeClear(param);
diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c
index 24f42ff178..f24d3523c8 100644
--- a/source/libs/executor/src/sysscanoperator.c
+++ b/source/libs/executor/src/sysscanoperator.c
@@ -57,9 +57,11 @@ typedef struct SSysTableScanInfo {
const char* pUser;
bool sysInfo;
bool showRewrite;
+ bool restore;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
SMTbCursor* pCur; // cursor for iterate the local table meta store.
SSysTableIndex* pIdx; // idx for local table meta
+ SHashObj* pSchema;
SColMatchInfo matchInfo;
SName name;
SSDataBlock* pRes;
@@ -514,9 +516,23 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
}
- SHashObj* stableSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
- taosHashSetFreeFp(stableSchema, tDeleteSSchemaWrapperForHash);
- while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0) {
+ if (pInfo->pSchema == NULL) {
+ pInfo->pSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
+ taosHashSetFreeFp(pInfo->pSchema, tDeleteSSchemaWrapperForHash);
+ }
+
+ if (!pInfo->pCur || !pInfo->pSchema) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ qError("sysTableScanUserCols failed since %s", terrstr(terrno));
+ blockDataDestroy(dataBlock);
+ pInfo->loadInfo.totalRows = 0;
+ return NULL;
+ }
+
+ int32_t restore = pInfo->restore;
+ pInfo->restore = false;
+ while (restore || ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) {
+ if (restore) restore = false;
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
@@ -524,33 +540,36 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) {
qDebug("sysTableScanUserCols cursor get super table");
- void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
+ void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
if (schema == NULL) {
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
- taosHashPut(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
+ taosHashPut(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
}
continue;
} else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) {
qDebug("sysTableScanUserCols cursor get child table");
STR_TO_VARSTR(typeName, "CHILD_TABLE");
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
-
int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
- void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
+ void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
if (schema != NULL) {
schemaRow = *(SSchemaWrapper**)schema;
} else {
- tDecoderClear(&pInfo->pCur->mr.coder);
- int code = metaGetTableEntryByUid(&pInfo->pCur->mr, suid);
+ SMetaReader smrSuperTable = {0};
+ metaReaderInit(&smrSuperTable, pInfo->readHandle.meta, 0);
+ int code = metaGetTableEntryByUid(&smrSuperTable, suid);
if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by metaGetTableEntryByName, therefore, return directly
qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code);
+ metaReaderClear(&smrSuperTable);
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
- taosHashCleanup(stableSchema);
return NULL;
}
- schemaRow = &pInfo->pCur->mr.me.stbEntry.schemaRow;
+ SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&smrSuperTable.me.stbEntry.schemaRow);
+ taosHashPut(pInfo->pSchema, &suid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
+ schemaRow = schemaWrapper;
+ metaReaderClear(&smrSuperTable);
}
} else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) {
qDebug("sysTableScanUserCols cursor get normal table");
@@ -562,20 +581,19 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
continue;
}
- sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
-
- if (numOfRows >= pOperator->resultInfo.capacity) {
+ if ((numOfRows + schemaRow->nCols) > pOperator->resultInfo.capacity) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
+ pInfo->restore = true;
if (pInfo->pRes->info.rows > 0) {
break;
}
+ } else {
+ sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
}
}
- taosHashCleanup(stableSchema);
-
if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
@@ -695,7 +713,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
}
if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) {
- metaTbCursorPrev(pInfo->pCur);
+ metaTbCursorPrev(pInfo->pCur, TSDB_TABLE_MAX);
blockFull = true;
} else {
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
@@ -1789,6 +1807,11 @@ void destroySysScanOperator(void* param) {
pInfo->pIdx = NULL;
}
+ if(pInfo->pSchema) {
+ taosHashCleanup(pInfo->pSchema);
+ pInfo->pSchema = NULL;
+ }
+
taosArrayDestroy(pInfo->matchInfo.pList);
taosMemoryFreeClear(pInfo->pUser);
diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c
index 291b0cf515..6c8e581b3f 100644
--- a/source/libs/executor/src/tsort.c
+++ b/source/libs/executor/src/tsort.c
@@ -108,12 +108,18 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
return TSDB_CODE_SUCCESS;
}
-void tsortClearOrderdSource(SArray* pOrderedSource) {
+void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
SSortSource** pSource = taosArrayGet(pOrderedSource, i);
if (NULL == *pSource) {
continue;
}
+
+ if (fetchUs) {
+ *fetchUs += (*pSource)->fetchUs;
+ *fetchNum += (*pSource)->fetchNum;
+ }
+
// release pageIdList
if ((*pSource)->pageIdList) {
taosArrayDestroy((*pSource)->pageIdList);
@@ -147,7 +153,10 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
taosMemoryFreeClear(pSortHandle->idStr);
blockDataDestroy(pSortHandle->pDataBlock);
- tsortClearOrderdSource(pSortHandle->pOrderedSource);
+ int64_t fetchUs = 0, fetchNum = 0;
+ tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
+ qError("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
+
taosArrayDestroy(pSortHandle->pOrderedSource);
taosMemoryFreeClear(pSortHandle);
}
@@ -307,7 +316,7 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32
}
int64_t et = taosGetTimestampUs();
- qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
+ qError("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
}
return code;
@@ -365,7 +374,10 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
releaseBufPage(pHandle->pBuf, pPage);
}
} else {
+ int64_t st = taosGetTimestampUs();
pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
+ pSource->fetchUs += taosGetTimestampUs() - st;
+ pSource->fetchNum++;
if (pSource->src.pBlock == NULL) {
(*numOfCompleted) += 1;
pSource->src.rowIndex = -1;
@@ -602,7 +614,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
}
}
- tsortClearOrderdSource(pHandle->pOrderedSource);
+ tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
taosArrayAddAll(pHandle->pOrderedSource, pResList);
taosArrayDestroy(pResList);
@@ -644,7 +656,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
SSortSource* source = *pSource;
*pSource = NULL;
- tsortClearOrderdSource(pHandle->pOrderedSource);
+ tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
while (1) {
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c
index b2fc88add1..33b8a686ae 100644
--- a/source/libs/parser/src/parCalcConst.c
+++ b/source/libs/parser/src/parCalcConst.c
@@ -361,11 +361,29 @@ static bool notRefByOrderBy(SColumnNode* pCol, SNodeList* pOrderByList) {
return !cxt.hasThisCol;
}
+static bool isSetUselessCol(SSetOperator* pSetOp, int32_t index, SExprNode* pProj) {
+ if (!isUselessCol(pProj)) {
+ return false;
+ }
+
+ SNodeList* pLeftProjs = getChildProjection(pSetOp->pLeft);
+ if (!isUselessCol((SExprNode*)nodesListGetNode(pLeftProjs, index))) {
+ return false;
+ }
+
+ SNodeList* pRightProjs = getChildProjection(pSetOp->pRight);
+ if (!isUselessCol((SExprNode*)nodesListGetNode(pRightProjs, index))) {
+ return false;
+ }
+
+ return true;
+}
+
static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator* pSetOp, bool subquery) {
int32_t index = 0;
SNode* pProj = NULL;
WHERE_EACH(pProj, pSetOp->pProjectionList) {
- if (subquery && notRefByOrderBy((SColumnNode*)pProj, pSetOp->pOrderByList) && isUselessCol((SExprNode*)pProj)) {
+ if (subquery && notRefByOrderBy((SColumnNode*)pProj, pSetOp->pOrderByList) && isSetUselessCol(pSetOp, index, (SExprNode*)pProj)) {
ERASE_NODE(pSetOp->pProjectionList);
eraseSetOpChildProjection(pSetOp, index);
continue;
diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c
index 52d3569dcd..704f381afa 100644
--- a/source/libs/parser/src/parInsertUtil.c
+++ b/source/libs/parser/src/parInsertUtil.c
@@ -404,6 +404,7 @@ static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHa
int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
if (TSDB_CODE_SUCCESS == code) {
taosArrayPush(pVgroupList, &pVgCxt);
+// uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
*pOutput = pVgCxt;
} else {
insDestroyVgroupDataCxt(pVgCxt);
@@ -546,6 +547,7 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
if (TSDB_CODE_SUCCESS == code) {
dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
+// uError("td23101 3vgId:%d, numEps:%d", src->vgId, dst->vg.epSet.numOfEps);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index a3c79b5757..68c93f8ca9 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -1422,6 +1422,9 @@ static int32_t translateAggFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (isCountStar(pFunc)) {
return rewriteCountStar(pCxt, pFunc);
}
+ if (isCountNotNullValue(pFunc)) {
+ return rewriteCountNotNullValue(pCxt, pFunc);
+ }
if (isCountTbname(pFunc)) {
return rewriteCountTbname(pCxt, pFunc);
}
diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c
index 29b7fa740c..5ea9be63db 100644
--- a/source/libs/tdb/src/db/tdbPager.c
+++ b/source/libs/tdb/src/db/tdbPager.c
@@ -947,6 +947,12 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
return 0;
}
+static int32_t txnIdCompareDesc(const void *pLeft, const void *pRight) {
+ int64_t lhs = *(int64_t *)pLeft;
+ int64_t rhs = *(int64_t *)pRight;
+ return lhs > rhs ? -1 : 1;
+}
+
int tdbPagerRestoreJournals(SPager *pPager) {
tdbDirEntryPtr pDirEntry;
tdbDirPtr pDir = taosOpenDir(pPager->pEnv->dbName);
@@ -955,23 +961,33 @@ int tdbPagerRestoreJournals(SPager *pPager) {
return -1;
}
+ SArray *pTxnList = taosArrayInit(16, sizeof(int64_t));
+
while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry));
if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) {
- char jname[TD_PATH_MAX] = {0};
- int dirLen = strlen(pPager->pEnv->dbName);
- memcpy(jname, pPager->pEnv->dbName, dirLen);
- jname[dirLen] = '/';
- memcpy(jname + dirLen + 1, name, strlen(name));
- if (tdbPagerRestore(pPager, jname) < 0) {
- tdbCloseDir(&pDir);
+ int64_t txnId = -1;
+ sscanf(name, TDB_MAINDB_NAME "-journal.%" PRId64, &txnId);
+ taosArrayPush(pTxnList, &txnId);
+ }
+ }
+ taosArraySort(pTxnList, txnIdCompareDesc);
+ for (int i = 0; i < TARRAY_SIZE(pTxnList); ++i) {
+ int64_t *pTxnId = taosArrayGet(pTxnList, i);
+ char jname[TD_PATH_MAX] = {0};
+ int dirLen = strlen(pPager->pEnv->dbName);
+ memcpy(jname, pPager->pEnv->dbName, dirLen);
+ jname[dirLen] = '/';
+ sprintf(jname + dirLen + 1, TDB_MAINDB_NAME "-journal.%" PRId64, *pTxnId);
+ if (tdbPagerRestore(pPager, jname) < 0) {
+ tdbCloseDir(&pDir);
- tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), name);
- return -1;
- }
+ tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), jname);
+ return -1;
}
}
+ taosArrayDestroy(pTxnList);
tdbCloseDir(&pDir);
return 0;
diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task
index cf967c9553..8c8d41d5b6 100644
--- a/tests/parallel_test/cases.task
+++ b/tests/parallel_test/cases.task
@@ -853,6 +853,7 @@
,,y,script,./test.sh -f tsim/parser/topbot.sim
,,y,script,./test.sh -f tsim/parser/union_sysinfo.sim
,,y,script,./test.sh -f tsim/parser/slimit_limit.sim
+,,y,script,./test.sh -f tsim/parser/table_merge_limit.sim
,,y,script,./test.sh -f tsim/query/tagLikeFilter.sim
,,y,script,./test.sh -f tsim/query/charScalarFunction.sim
,,y,script,./test.sh -f tsim/query/explain.sim
diff --git a/tests/parallel_test/container_build.sh b/tests/parallel_test/container_build.sh
index edb9b4ab3c..80236cf604 100755
--- a/tests/parallel_test/container_build.sh
+++ b/tests/parallel_test/container_build.sh
@@ -68,8 +68,8 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
- -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
- --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true;make -j || exit 1"
+ --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1"
+ # -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
if [[ -d ${WORKDIR}/debugNoSan ]] ;then
echo "delete ${WORKDIR}/debugNoSan"
@@ -97,7 +97,7 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
-v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
- --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true;make -j || exit 1 "
+ --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1 "
mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan
diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py
index 5b73989d6f..6d813a4166 100644
--- a/tests/pytest/util/common.py
+++ b/tests/pytest/util/common.py
@@ -739,6 +739,11 @@ class TDCom:
else:
os.system("unset LD_PRELOAD; pkill %s " % processorName)
+ def gen_tag_col_str(self, gen_type, data_type, count):
+ """
+ gen multi tags or cols by gen_type
+ """
+ return ','.join(map(lambda i: f'{gen_type}{i} {data_type}', range(count)))
def is_json(msg):
if isinstance(msg, str):
@@ -775,4 +780,5 @@ def dict2toml(in_dict: dict, file:str):
with open(file, 'w') as f:
toml.dump(in_dict, f)
+
tdCom = TDCom()
diff --git a/tests/script/coverage_test.sh b/tests/script/coverage_test.sh
index 3f99dde544..ad87528f9c 100755
--- a/tests/script/coverage_test.sh
+++ b/tests/script/coverage_test.sh
@@ -12,6 +12,7 @@ fi
today=`date +"%Y%m%d"`
TDENGINE_DIR=/root/TDengine
JDBC_DIR=/root/taos-connector-jdbc
+TAOSKEEPER_DIR=/root/taoskeeper
TDENGINE_COVERAGE_REPORT=$TDENGINE_DIR/tests/coverage-report-$today.log
# Color setting
@@ -171,6 +172,24 @@ function runJDBCCases() {
echo -e "### JDBC test result: $summary ###" | tee -a $TDENGINE_COVERAGE_REPORT
}
+function runTaosKeeperCases() {
+ echo "=== Run taoskeeper cases ==="
+
+ cd $TAOSKEEPER_DIR
+ git checkout -- .
+ git reset --hard HEAD
+ git checkout master
+ git pull
+
+ stopTaosd
+ stopTaosadapter
+
+ taosd -c /etc/taos >> /dev/null 2>&1 &
+ taosadapter >> /dev/null 2>&1 &
+
+ go mod tidy && go test -v ./...
+}
+
function runTest() {
echo "run Test"
@@ -182,6 +201,7 @@ function runTest() {
runSimCases
runPythonCases
runJDBCCases
+ runTaosKeeperCases
stopTaosd
cd $TDENGINE_DIR/tests/script
@@ -199,7 +219,7 @@ function lcovFunc {
lcov -d . --capture --rc lcov_branch_coverage=1 --rc genhtml_branch_coverage=1 --no-external -b $TDENGINE_DIR -o coverage.info
# remove exclude paths
- if [ "$branch" == "3.0" ]; then
+ if [ "$branch" == "main" ] ; then
lcov --remove coverage.info \
'*/contrib/*' '*/tests/*' '*/test/*' '*/tools/*' '*/libs/sync/*'\
'*/AccessBridgeCalls.c' '*/ttszip.c' '*/dataInserter.c' '*/tlinearhash.c' '*/tsimplehash.c' '*/tsdbDiskData.c'\
@@ -209,6 +229,8 @@ function lcovFunc {
'*/tthread.c' '*/tversion.c' '*/ctgDbg.c' '*/schDbg.c' '*/qwDbg.c' '*/tencode.h' '*/catalog.c'\
'*/tqSnapshot.c' '*/tsdbSnapshot.c''*/metaSnapshot.c' '*/smaSnapshot.c' '*/tqOffsetSnapshot.c'\
'*/vnodeSnapshot.c' '*/metaSnapshot.c' '*/tsdbSnapshot.c' '*/mndGrant.c' '*/mndSnode.c' '*/streamRecover.c'\
+ '*/osAtomic.c' '*/osDir.c' '*/osFile.c' '*/osMath.c' '*/osSignal.c' '*/osSleep.c' '*/osString.c' '*/osSystem.c'\
+ '*/osThread.c' '*/osTime.c' '*/osTimezone.c' \
--rc lcov_branch_coverage=1 -o coverage.info
else
lcov --remove coverage.info \
diff --git a/tests/script/tsim/parser/limit1_stb.sim b/tests/script/tsim/parser/limit1_stb.sim
index 1a5d57efbc..731a218de5 100644
--- a/tests/script/tsim/parser/limit1_stb.sim
+++ b/tests/script/tsim/parser/limit1_stb.sim
@@ -484,6 +484,7 @@ if $rows != 2 then
return -1
endi
+print === select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) limit 1 offset 0
sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) limit 1 offset 0
if $rows != 3 then
return -1
diff --git a/tests/script/tsim/parser/slimit1_query.sim b/tests/script/tsim/parser/slimit1_query.sim
index 9a27d5523b..1167fe0b3d 100644
--- a/tests/script/tsim/parser/slimit1_query.sim
+++ b/tests/script/tsim/parser/slimit1_query.sim
@@ -70,7 +70,7 @@ endi
### empty result set
sql select count(*) from stb partition by t2,t1 order by t2 asc slimit 0 soffset 0
-if $rows != 9 then
+if $rows != 0 then
return -1
endi
diff --git a/tests/script/tsim/parser/table_merge_limit.sim b/tests/script/tsim/parser/table_merge_limit.sim
new file mode 100644
index 0000000000..7d67b6d3a2
--- /dev/null
+++ b/tests/script/tsim/parser/table_merge_limit.sim
@@ -0,0 +1,47 @@
+system sh/stop_dnodes.sh
+system sh/deploy.sh -n dnode1 -i 1
+system sh/exec.sh -n dnode1 -s start
+sql connect
+
+$dbPrefix = m_fl_db
+$tbPrefix = m_fl_tb
+$mtPrefix = m_fl_mt
+$tbNum = 2
+$rowNum = 513
+$totalNum = $tbNum * $rowNum
+$ts0 = 1537146000000
+$delta = 600000
+print ========== fill.sim
+$i = 0
+$db = $dbPrefix . $i
+$mt = $mtPrefix . $i
+
+sql drop database $db -x step1
+step1:
+sql create database $db vgroups 1
+sql use $db
+sql create table $mt (ts timestamp, c1 int) tags(tgcol int)
+
+$i = 0
+$ts = $ts0
+while $i < $tbNum
+ $tb = $tbPrefix . $i
+ sql create table $tb using $mt tags( $i )
+
+ $x = 0
+ while $x < $rowNum
+ $xs = $x * $delta
+ $ts = $ts0 + $xs
+ sql insert into $tb values ( $ts , $x )
+ $x = $x + 1
+ endw
+
+ $i = $i + 1
+endw
+
+sql select * from $mt order by ts limit 10
+if $rows != 10 then
+ return -1
+endi
+
+system sh/exec.sh -n dnode1 -s stop -x SIGINT
diff --git a/tests/system-test/0-others/check_assert.py b/tests/system-test/0-others/check_assert.py
index 59fb223528..ff69b9eeec 100644
--- a/tests/system-test/0-others/check_assert.py
+++ b/tests/system-test/0-others/check_assert.py
@@ -28,10 +28,10 @@ import os
NO_FOUND = 0 # not found assert or ASSERT
FOUND_OK = 1 # found ASSERT and valid usage
FOUND_NOIF = 2 # found ASSERT but no if like ASSERT(...)
-FOUND_LOWER = 3 # found assert write with lower letters
+FOUND_LOWER = 3 # found assert write with system assert
FOUND_HAVENOT = 4 # found ASSERT have if but have not like if(!ASSERT)
-code_strs = ["not found", "valid", "found but no if", "lower assert","found but have not"]
+code_strs = ["not found", "valid", "found but no if", "system assert","found but have not"]
#
diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py
index 720eab74c4..3c4a71c3e4 100644
--- a/tests/system-test/0-others/information_schema.py
+++ b/tests/system-test/0-others/information_schema.py
@@ -93,7 +93,6 @@ class TDTestCase:
tdSql.checkEqual(i[2],len(self.perf_list))
tdSql.execute('create table db1.ntb (ts timestamp,c0 int)')
tdSql.query(f'select db_name, count(*) from information_schema.ins_tables group by db_name')
- print(tdSql.queryResult)
for i in tdSql.queryResult:
if i[0].lower() == 'information_schema':
tdSql.checkEqual(i[1],len(self.ins_list))
@@ -101,9 +100,77 @@ class TDTestCase:
tdSql.checkEqual(i[1],len(self.perf_list))
elif i[0].lower() == self.dbname:
tdSql.checkEqual(i[1],self.tbnum+1)
+
+
+
+ def ins_col_check_4096(self):
+ tdSql.execute('create database db3 vgroups 2 replica 1')
+ col_str = tdCom.gen_tag_col_str("col", "int",4094)
+ tdSql.execute(f'create stable if not exists db3.stb (col_ts timestamp, {col_str}) tags (t1 int)')
+ for i in range(100):
+ tdLog.info(f"create table db3.ctb{i} using db3.stb tags({i})")
+ tdSql.execute(f"create table db3.ctb{i} using db3.stb tags({i})")
+ col_value_str = '1, ' * 4093 + '1'
+ tdSql.execute(f"insert into db3.ctb{i} values(now,{col_value_str})(now+1s,{col_value_str})(now+2s,{col_value_str})(now+3s,{col_value_str})")
+ tdSql.query("select * from information_schema.ins_columns")
+
+ tdSql.execute('drop database db3')
+ def ins_stable_check(self):
+ tdSql.execute('create database db3 vgroups 2 replica 1')
+ tbnum = 10
+ ctbnum = 10
+ for i in range(tbnum):
+ tdSql.execute(f'create stable db3.stb_{i} (ts timestamp,c0 int) tags(t0 int)')
+ tdSql.execute(f'create table db3.ntb_{i} (ts timestamp,c0 int)')
+ for j in range(ctbnum):
+ tdSql.execute(f"create table db3.ctb_{i}_{j} using db3.stb_{i} tags({j})")
+ tdSql.query("select stable_name,count(table_name) from information_schema.ins_tables where db_name = 'db3' group by stable_name order by stable_name")
+ result = tdSql.queryResult
+ for i in range(len(result)):
+ if result[i][0] == None:
+ tdSql.checkEqual(result[0][1],tbnum)
+ else:
+ tdSql.checkEqual(result[i][0],f'stb_{i-1}')
+ tdSql.checkEqual(result[i][1],ctbnum)
+
+
+
+ def ins_columns_check(self):
+ tdSql.execute('drop database if exists db2')
+ tdSql.execute('create database if not exists db2 vgroups 1 replica 1')
+ for i in range (5):
+ self.stb4096 = 'create table db2.stb%d (ts timestamp' % (i)
+ for j in range (4094 - i):
+ self.stb4096 += ', c%d int' % (j)
+ self.stb4096 += ') tags (t1 int)'
+ tdSql.execute(self.stb4096)
+ for k in range(10):
+ tdSql.execute("create table db2.ctb_%d_%dc using db2.stb%d tags(%d)" %(i,k,i,k))
+ for t in range (2):
+ tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="SUPER_TABLE"')
+ tdSql.checkEqual(20465,len(tdSql.queryResult))
+ for t in range (2):
+ tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="CHILD_TABLE"')
+ tdSql.checkEqual(204650,len(tdSql.queryResult))
+
+ for i in range (5):
+ self.ntb4096 = 'create table db2.ntb%d (ts timestamp' % (i)
+ for j in range (4095 - i):
+ self.ntb4096 += ', c%d binary(10)' % (j)
+ self.ntb4096 += ')'
+ tdSql.execute(self.ntb4096)
+ for t in range (2):
+ tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="NORMAL_TABLE"')
+ tdSql.checkEqual(20470,len(tdSql.queryResult))
+
+
def run(self):
self.prepare_data()
self.count_check()
+ self.ins_columns_check()
+ # self.ins_col_check_4096()
+ self.ins_stable_check()
+
def stop(self):
tdSql.close()
diff --git a/tests/system-test/2-query/tsbsQuery.py b/tests/system-test/2-query/tsbsQuery.py
index e64b3ed0c7..0e2975cdba 100644
--- a/tests/system-test/2-query/tsbsQuery.py
+++ b/tests/system-test/2-query/tsbsQuery.py
@@ -214,13 +214,13 @@ class TDTestCase:
sql=f"SELECT model,count(state_changed) FROM (SELECT _rowts,model,diff(broken_down) AS state_changed FROM (SELECT ts,model,tb,cast(cast(floor(2*(nzs)) as bool) as int) AS broken_down FROM (SELECT _wstart as ts,model,tbname as tb, sum(cast(cast(status as bool) as int))/count(cast(cast(status as bool) as int)) AS nzs FROM {dbname}.diagnostics WHERE ts >= 1451606400000 AND ts < 1451952001000 partition BY tbname,model interval(10m))order by ts) partition BY tb,model ) WHERE state_changed = 1 partition BY model;"
tdSql.query(f"{sql}")
tdSql.checkRows(46)
- # for i in range(2):
- # tdSql.query("%s"%sql)
- # quertR1=tdSql.queryResult
- # for j in range(50):
- # tdSql.query("%s"%sql)
- # quertR2=tdSql.queryResult
- # assert quertR1 == quertR2 , "%s != %s ,The results of multiple queries are different" %(quertR1,quertR2)
+ for i in range(2):
+ tdSql.query("%s"%sql)
+ quertR1=tdSql.queryResult
+ for j in range(50):
+ tdSql.query("%s"%sql)
+ quertR2=tdSql.queryResult
+ assert quertR1 == quertR2 , "%s != %s ,The results of multiple queries are different" %(quertR1,quertR2)
#it's already supported:
diff --git a/tests/system-test/7-tmq/subscribeStb1.py b/tests/system-test/7-tmq/subscribeStb1.py
index edbe1bc3c6..3dc3528d04 100644
--- a/tests/system-test/7-tmq/subscribeStb1.py
+++ b/tests/system-test/7-tmq/subscribeStb1.py
@@ -245,8 +245,8 @@ class TDTestCase:
for i in range(expectRows):
totalConsumeRows += resultList[i]
- if totalConsumeRows != expectrowcnt/4:
- tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
+ tdLog.info("act consume rows: %d, expect consume rows greater than or equal to: %d"%(totalConsumeRows, expectrowcnt/4))
+ if totalConsumeRows < expectrowcnt/4:
tdLog.exit("tmq consume rows error!")
self.initConsumerInfoTable()
@@ -267,8 +267,8 @@ class TDTestCase:
for i in range(expectRows):
totalConsumeRows += resultList[i]
- if totalConsumeRows != expectrowcnt:
- tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
+ tdLog.info("act consume rows: %d, expect consume rows greater than or equal to: %d"%(totalConsumeRows, expectrowcnt))
+ if totalConsumeRows < expectrowcnt:
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
diff --git a/tests/system-test/7-tmq/tmqDelete-1ctb.py b/tests/system-test/7-tmq/tmqDelete-1ctb.py
index b09efdd1e6..69d2f5e347 100644
--- a/tests/system-test/7-tmq/tmqDelete-1ctb.py
+++ b/tests/system-test/7-tmq/tmqDelete-1ctb.py
@@ -323,7 +323,7 @@ class TDTestCase:
if self.snapshot == 0:
consumerId = 4
- expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4))
+ expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1/4 + 3/4))
elif self.snapshot == 1:
consumerId = 5
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4))
@@ -369,9 +369,14 @@ class TDTestCase:
tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
if self.snapshot == 0:
- if totalConsumeRows != expectrowcnt:
+ # If data writing is completed before consumer get snapshot, will consume 7500 from wal;
+ # If data writing has not started before consumer get snapshot, will consume 10000 from wal;
+ minRows = int(expectrowcnt * (1 - 1/4)) # 7500
+ tdLog.info("consume rows should be between %d and %d, "%(minRows, expectrowcnt))
+ if not ((totalConsumeRows >= minRows) and (totalConsumeRows <= expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!")
elif self.snapshot == 1:
+ tdLog.info("consume rows should be between %d and %d, "%(totalRowsFromQuery, expectrowcnt))
if not ((totalConsumeRows >= totalRowsFromQuery) and (totalConsumeRows <= expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 1!")
@@ -381,7 +386,113 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 3 end ...... ")
+ def tmqCase4(self):
+ tdLog.printNoPrefix("======== test case 4: ")
+ paraDict = {'dbName': 'dbt',
+ 'dropFlag': 1,
+ 'event': '',
+ 'vgroups': 4,
+ 'stbName': 'stb',
+ 'colPrefix': 'c',
+ 'tagPrefix': 't',
+ 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
+ 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
+ 'ctbPrefix': 'ctb',
+ 'ctbStartIdx': 0,
+ 'ctbNum': 1,
+ 'rowsPerTbl': 10000,
+ 'batchNum': 5000,
+ 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
+ 'pollDelay': 15,
+ 'showMsg': 1,
+ 'showRow': 1,
+ 'snapshot': 0}
+ paraDict['snapshot'] = self.snapshot
+ paraDict['vgroups'] = self.vgroups
+ paraDict['ctbNum'] = self.ctbNum
+ paraDict['rowsPerTbl'] = self.rowsPerTbl
+
+ tdLog.info("restart taosd to ensure that the data falls into the disk")
+ tdSql.query("flush database %s"%(paraDict['dbName']))
+
+ tmqCom.initConsumerTable()
+ tdLog.info("create topics from stb1")
+ topicFromStb1 = 'topic_stb1'
+ queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
+ sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
+ tdLog.info("create topic sql: %s"%sqlString)
+ tdSql.execute(sqlString)
+
+ # paraDict['ctbNum'] = self.ctbNum
+ paraDict['rowsPerTbl'] = self.rowsPerTbl
+ consumerId = 1
+
+ if self.snapshot == 0:
+ consumerId = 4
+ expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
+ elif self.snapshot == 1:
+ consumerId = 5
+ expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4))
+
+ topicList = topicFromStb1
+ ifcheckdata = 1
+ ifManualCommit = 1
+ keyList = 'group.id:cgrp1,\
+ enable.auto.commit:true,\
+ auto.commit.interval.ms:1000,\
+ auto.offset.reset:earliest'
+ tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
+
+ # del some data
+ rowsOfDelete = int(self.rowsPerTbl / 4 )
+ paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1
+ # pDeleteThread = self.asyncDeleteData(paraDict)
+ self.threadFunctionForDeletaData(paraDict)
+
+ tdLog.info("start consume processor")
+ tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
+
+ tmqCom.getStartConsumeNotifyFromTmqsim()
+
+ # update to 1/4 rows and insert 3/4 new rows
+ paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 3 / 4)
+ # paraDict['rowsPerTbl'] = self.rowsPerTbl
+ # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
+ # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
+ # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
+ pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
+
+ pInsertThread.join()
+
+ tdLog.info("start to check consume result")
+ expectRows = 1
+ resultList = tmqCom.selectConsumeResult(expectRows)
+ totalConsumeRows = 0
+ for i in range(expectRows):
+ totalConsumeRows += resultList[i]
+
+ tdSql.query(queryString)
+ totalRowsFromQuery = tdSql.getRows()
+
+ tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
+
+ if self.snapshot == 0:
+ tdLog.info("consume rows should be %d"%(expectrowcnt))
+ if (totalConsumeRows != expectrowcnt):
+ tdLog.exit("tmq consume rows error with snapshot = 0!")
+ elif self.snapshot == 1:
+ # If data writing has not started before consumer get snapshot, will consume 10000 from wal, and consumer 7500 from tsdb;
+ tdLog.info("consume rows should be %d, "%(expectrowcnt))
+ if (totalConsumeRows != expectrowcnt):
+ tdLog.exit("tmq consume rows error with snapshot = 1!")
+
+ # tmqCom.checkFileContent(consumerId, queryString)
+
+ tdSql.query("drop topic %s"%topicFromStb1)
+
+ tdLog.printNoPrefix("======== test case 4 end ...... ")
+
def run(self):
# tdSql.prepare()
tdLog.printNoPrefix("=============================================")
@@ -409,6 +520,17 @@ class TDTestCase:
self.prepareTestEnv()
self.tmqCase3()
+ tdLog.printNoPrefix("=============================================")
+ tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
+ self.snapshot = 0
+ self.prepareTestEnv()
+ self.tmqCase4()
+ tdLog.printNoPrefix("====================================================================")
+ tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
+ self.snapshot = 1
+ self.prepareTestEnv()
+ self.tmqCase4()
+
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
diff --git a/tests/system-test/7-tmq/tmqDnodeRestart.py b/tests/system-test/7-tmq/tmqDnodeRestart.py
index a44ff916e5..8c19377c0b 100644
--- a/tests/system-test/7-tmq/tmqDnodeRestart.py
+++ b/tests/system-test/7-tmq/tmqDnodeRestart.py
@@ -136,7 +136,7 @@ class TDTestCase:
tdLog.info("================= restart dnode ===========================")
tdDnodes.stoptaosd(1)
tdDnodes.starttaosd(1)
- # time.sleep(3)
+ time.sleep(5)
tdLog.info(" restart taosd end and wait to check consume result")
expectRows = 1
@@ -220,7 +220,7 @@ class TDTestCase:
tdLog.info("================= restart dnode ===========================")
tdDnodes.stoptaosd(1)
tdDnodes.starttaosd(1)
- # time.sleep(3)
+ time.sleep(5)
tdLog.info("create some new child table and insert data ")
paraDict["batchNum"] = 100
diff --git a/tests/system-test/7-tmq/tmqMultiConsumer.py b/tests/system-test/7-tmq/tmqMultiConsumer.py
new file mode 100644
index 0000000000..cc217e0c4c
--- /dev/null
+++ b/tests/system-test/7-tmq/tmqMultiConsumer.py
@@ -0,0 +1,174 @@
+
+import taos
+import sys
+import time
+import socket
+import os
+import threading
+import math
+# from tests.pytest.util.common import TDCom
+# from tests.pytest.util.log import TDLog
+
+from util.log import *
+from util.sql import *
+from util.cases import *
+from util.dnodes import *
+from util.common import *
+sys.path.append("./7-tmq")
+from tmqCommon import *
+
+class TDTestCase:
+ def __init__(self):
+ self.vgroups = 32
+ self.ctbNum = 100
+ self.rowsPerTbl = 1000
+ self.snapshot = 1
+ self.replicaVar = 3
+
+ def init(self, conn, logSql, replicaVar=1):
+ self.replicaVar = int(replicaVar)
+ tdLog.debug(f"start to excute {__file__}")
+ tdSql.init(conn.cursor(), False)
+
+ def prepareTestEnv(self):
+ tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
+ paraDict = {'dbName': 'dbt',
+ 'dropFlag': 1,
+ 'event': '',
+ 'vgroups': 4,
+ 'stbName': 'stb',
+ 'colPrefix': 'c',
+ 'tagPrefix': 't',
+ 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
+ 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
+ 'ctbPrefix': 'ctb',
+ 'ctbStartIdx': 0,
+ 'ctbNum': 10,
+ 'rowsPerTbl': 10000,
+ 'batchNum': 1000,
+ 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
+ 'pollDelay': 10,
+ 'showMsg': 1,
+ 'showRow': 1,
+ 'snapshot': 0}
+
+ paraDict['vgroups'] = self.vgroups
+ paraDict['ctbNum'] = self.ctbNum
+ paraDict['rowsPerTbl'] = self.rowsPerTbl
+
+ tmqCom.initConsumerTable()
+ tmqCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=self.replicaVar)
+ tdLog.info("create stb")
+ tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
+ tdLog.info("create ctb")
+ tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
+ ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
+ tdLog.info("insert data")
+ tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
+ ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
+ startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
+
+ # tdLog.info("restart taosd to ensure that the data falls into the disk")
+ # tdDnodes.stop(1)
+ # tdDnodes.start(1)
+ # tdSql.query("flush database %s"%(paraDict['dbName']))
+ return
+
+ def tmqCase1(self):
+ tdLog.printNoPrefix("======== test case 1: ")
+ paraDict = {'dbName': 'dbt',
+ 'dropFlag': 1,
+ 'event': '',
+ 'vgroups': 1,
+ 'stbName': 'stb',
+ 'colPrefix': 'c',
+ 'tagPrefix': 't',
+ 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
+ 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
+ 'ctbPrefix': 'ctb',
+ 'ctbStartIdx': 0,
+ 'ctbNum': 1,
+ 'rowsPerTbl': 10000,
+ 'batchNum': 1000,
+ 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
+ 'pollDelay': 20,
+ 'showMsg': 1,
+ 'showRow': 1,
+ 'snapshot': 1}
+
+ paraDict['vgroups'] = self.vgroups
+ paraDict['ctbNum'] = self.ctbNum
+ paraDict['rowsPerTbl'] = self.rowsPerTbl
+ paraDict['snapshot'] = self.snapshot
+
+ topicNameList = ['topic1', 'topic2']
+ tmqCom.initConsumerTable()
+
+ tdLog.info("create topics from stb with filter")
+ # queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9379)
+ queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s "%(paraDict['dbName'], paraDict['stbName'])
+ # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
+ sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
+ tdLog.info("create topic sql: %s"%sqlString)
+ tdSql.execute(sqlString)
+
+ sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
+ tdLog.info("create topic sql: %s"%sqlString)
+ tdSql.execute(sqlString)
+ # tdSql.query(queryString)
+ # expectRowsList.append(tdSql.getRows())
+
+ # init consume info, and start tmq_sim, then check consume result
+ tdLog.info("insert consume info to consume processor")
+ consumerId = 0
+ expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 4
+ topicList = topicNameList[0] + ',' + topicNameList[0] + ',' + topicNameList[1]
+ ifcheckdata = 0
+ ifManualCommit = 1
+ keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
+ tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
+
+ consumerId = 1
+ tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
+
+ tdLog.info("start consume processor")
+ tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
+ tdLog.info("wait the consume result")
+
+ # continue to insert new rows
+ paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl)
+ pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
+ pInsertThread.join()
+
+ expectRows = 2
+ resultList = tmqCom.selectConsumeResult(expectRows)
+ actConsumeTotalRows = resultList[0] + resultList[1]
+
+ tdLog.info("act consume rows: %d, expect consume rows: %d"%(actConsumeTotalRows, expectrowcnt))
+
+ if not ((expectrowcnt <= actConsumeTotalRows) or ((resultList[0] == 0) and (resultList[1] >= expectrowcnt)) or ((resultList[1] == 0) and (resultList[0] >= expectrowcnt))):
+ tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt, actConsumeTotalRows))
+ tdLog.exit("%d tmq consume rows error!"%consumerId)
+
+ # tmqCom.checkFileContent(consumerId, queryString)
+
+ time.sleep(10)
+ for i in range(len(topicNameList)):
+ tdSql.query("drop topic %s"%topicNameList[i])
+
+ tdLog.printNoPrefix("======== test case 1 end ...... ")
+
+
+ def run(self):
+ tdSql.prepare()
+ self.prepareTestEnv()
+ self.tmqCase1()
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success(f"{__file__} successfully executed")
+
+event = threading.Event()
+
+tdCases.addLinux(__file__, TDTestCase())
+tdCases.addWindows(__file__, TDTestCase())