Merge branch '3.0' of https://github.com/taosdata/TDengine into test/3.0/TD-24125

This commit is contained in:
chenhaoran 2023-10-17 20:35:31 +08:00
commit aba7143994
29 changed files with 438 additions and 164 deletions

View File

@ -93,6 +93,8 @@ ELSE()
set(VAR_TSZ "TSZ" CACHE INTERNAL "global variant tsz" )
ENDIF()
# force set all platform to JEMALLOC_ENABLED = false
SET(JEMALLOC_ENABLED OFF)
IF (TD_WINDOWS)
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
SET(COMMON_FLAGS "/w /D_WIN32 /DWIN32 /Zi /MTd")
@ -116,8 +118,6 @@ IF (TD_WINDOWS)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${COMMON_FLAGS}")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMMON_FLAGS}")
SET(JEMALLOC_ENABLED OFF)
ELSE ()
IF (${TD_DARWIN})
set(CMAKE_MACOSX_RPATH 0)

View File

@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "3.2.0.0.alpha")
SET(TD_VER_NUMBER "3.2.1.0.alpha")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)

Binary file not shown.

Binary file not shown.

View File

@ -12,7 +12,7 @@ The FQDN of all hosts must be setup properly. For e.g. FQDNs may have to be conf
### Step 1
If any previous version of TDengine has been installed and configured on any host, the installation needs to be removed and the data needs to be cleaned up. For details about uninstalling please refer to [Install and Uninstall](../../operation/pkg-install). To clean up the data, please use `rm -rf /var/lib/taos/\*` assuming the `dataDir` is configured as `/var/lib/taos`.
If any previous version of TDengine has been installed and configured on any host, the installation needs to be removed and the data needs to be cleaned up. To clean up the data, please use `rm -rf /var/lib/taos/\*` assuming the `dataDir` is configured as `/var/lib/taos`.
:::note
FQDN information is written to file. If you have started TDengine without configuring or changing the FQDN, ensure that data is backed up or no longer needed before running the `rm -rf /var/lib\taos/\*` command.

View File

@ -41,8 +41,6 @@ An existing Grafana Notification Channel can be specified with parameter `-E`, t
Launch `TDinsight.sh` with the command above and restart Grafana, then open Dashboard `http://localhost:3000/d/tdinsight`.
For more use cases and restrictions please refer to [TDinsight](/reference/tdinsight/).
## log database
The data of tdinsight dashboard is stored in `log` database (default. You can change it in taoskeeper's config file. For more infrmation, please reference to [taoskeeper document](/reference/taosKeeper)). The taoskeeper will create log database on taoskeeper startup.

View File

@ -13,7 +13,7 @@ taosBenchmark (formerly taosdemo ) is a tool for testing the performance of TDen
There are two ways to install taosBenchmark:
- Installing the official TDengine installer will automatically install taosBenchmark. Please refer to [TDengine installation](../../operation/pkg-install) for details.
- Installing the official TDengine installer will automatically install taosBenchmark.
- Compile taos-tools separately and install them. Please refer to the [taos-tools](https://github.com/taosdata/taos-tools) repository for details.

View File

@ -16,7 +16,7 @@ taosKeeper is a tool for TDengine that exports monitoring metrics. With taosKeep
There are two ways to install taosKeeper:
Methods of installing taosKeeper:
- Installing the official TDengine installer will automatically install taosKeeper. Please refer to [TDengine installation](../../operation/pkg-install) for details.
- Installing the official TDengine installer will automatically install taosKeeper.
- You can compile taosKeeper separately and install it. Please refer to the [taosKeeper](https://github.com/taosdata/taoskeeper) repository for details.
## Configuration and Launch

View File

@ -21,7 +21,7 @@ TDengine Source Connector is used to read data from TDengine in real-time and se
1. Linux operating system
2. Java 8 and Maven installed
3. Git/curl/vi is installed
4. TDengine is installed and started. If not, please refer to [Installation and Uninstallation](../../operation/pkg-install)
4. TDengine is installed and started.
## Install Kafka

View File

@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t
import Release from "/components/ReleaseV3";
## 3.2.0.0
<Release type="tdengine" version="3.2.0.0" />
## 3.1.1.0
<Release type="tdengine" version="3.1.1.0" />

View File

@ -9,8 +9,6 @@ TDengine 通过 [taosKeeper](/reference/taosKeeper/) 将服务器的 CPU、内
## TDinsight - 使用监控数据库 + Grafana 对 TDengine 进行监控的解决方案
监控数据库将提供更多的监控项,您可以从 [TDinsight Grafana Dashboard](/reference/tdinsight/) 了解如何使用 TDinsight 方案对 TDengine 进行监控。
我们提供了一个自动化脚本 `TDinsight.sh` 对 TDinsight 进行部署。
下载 `TDinsight.sh`
@ -37,8 +35,6 @@ chmod +x TDinsight.sh
运行程序并重启 Grafana 服务,打开面板:`http://localhost:3000/d/tdinsight`。
更多使用场景和限制请参考[TDinsight](/reference/tdinsight/) 文档。
## log 库
TDinsight dashboard 数据来源于 log 库存放监控数据的默认db可以在 taoskeeper 配置文件中修改,具体参考 [taoskeeper 文档](/reference/taosKeeper)。taoskeeper 启动后会自动创建 log 库,并将监控数据写入到该数据库中。

View File

@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3";
## 3.2.0.0
<Release type="tdengine" version="3.2.0.0" />
## 3.1.1.0
<Release type="tdengine" version="3.1.1.0" />

View File

@ -225,7 +225,10 @@ void syslog(int unused, const char *format, ...);
#endif
#else
// Windows
#define setThreadName(name)
#define setThreadName(name) \
do { \
pthread_setname_np(taosThreadSelf(), (name)); \
} while (0)
#endif
#if defined(_WIN32)

View File

@ -719,6 +719,21 @@ int taos_init() {
int taos_options_imp(TSDB_OPTION option, const char *str) {
if (option == TSDB_OPTION_CONFIGDIR) {
#ifndef WINDOWS
char newstr[PATH_MAX];
int len = strlen(str);
if (len > 1 && str[0] != '"' && str[0] != '\'') {
if (len + 2 >= PATH_MAX) {
tscError("Too long path %s", str);
return -1;
}
newstr[0] = '"';
strncpy(newstr+1, str, len);
newstr[len + 1] = '"';
newstr[len + 2] = '\0';
str = newstr;
}
#endif
tstrncpy(configDir, str, PATH_MAX);
tscInfo("set cfg:%s to %s", configDir, str);
return 0;

View File

@ -1366,7 +1366,7 @@ END:
taosReleaseRef(tmqMgmt.rsetId, refId);
FAIL:
tsem_post(&tmq->rspSem);
if(tmq) tsem_post(&tmq->rspSem);
taosMemoryFree(pParam);
if(pMsg) taosMemoryFreeClear(pMsg->pData);
if(pMsg) taosMemoryFreeClear(pMsg->pEpSet);

View File

@ -722,13 +722,18 @@ typedef struct SSttBlockLoadCostInfo {
double statisElapsedTime;
} SSttBlockLoadCostInfo;
typedef struct SBlockDataInfo {
SBlockData data;
bool pin;
int32_t sttBlockIndex;
} SBlockDataInfo;
typedef struct SSttBlockLoadInfo {
SBlockData blockData[2]; // buffered block data
SBlockDataInfo blockData[2]; // buffered block data
int32_t statisBlockIndex; // buffered statistics block index
void *statisBlock; // buffered statistics block data
void *pSttStatisBlkArray;
SArray *aSttBlk;
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
int32_t currentLoadBlockIndex;
STSchema *pSchema;
int16_t *colIds;
@ -744,8 +749,7 @@ typedef struct SMergeTree {
int8_t backward;
SRBTree rbt;
SLDataIter *pIter;
bool destroyLoadInfo;
SSttBlockLoadInfo *pLoadInfo;
SLDataIter *pPinnedBlockIter;
const char *idStr;
bool ignoreEarlierTs;
} SMergeTree;
@ -791,7 +795,7 @@ struct SDiskDataBuilder {
typedef struct SLDataIter {
SRBTreeNode node;
SSttBlk *pSttBlk;
int32_t iStt; // for debug purpose
int64_t cid; // for debug purpose
int8_t backward;
int32_t iSttBlk;
int32_t iRow;
@ -805,9 +809,6 @@ typedef struct SLDataIter {
} SLDataIter;
#define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row))
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter);
struct SSttFileReader;
typedef int32_t (*_load_tomb_fn)(STsdbReader *pReader, struct SSttFileReader *pSttFileReader,
@ -830,10 +831,13 @@ typedef struct {
void *pReader;
void *idstr;
} SMergeTreeConf;
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree);
void tMergeTreePinSttBlock(SMergeTree* pMTree);
void tMergeTreeUnpinSttBlock(SMergeTree* pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree);

View File

@ -198,8 +198,6 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
taosWLockLatch(&pTq->pStreamMeta->lock);
tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode));
taosWLockLatch(&pTq->pStreamMeta->lock);
if (rollback) {
tdbAbort(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
} else {
@ -208,12 +206,6 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
code = tdbPostCommit(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
if (code) goto _err;
}
if (tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
code = -1;
goto _err;
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);
if (tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
code = -1;

View File

@ -29,16 +29,17 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi
return NULL;
}
pLoadInfo->blockIndex[0] = -1;
pLoadInfo->blockIndex[1] = -1;
pLoadInfo->blockData[0].sttBlockIndex = -1;
pLoadInfo->blockData[1].sttBlockIndex = -1;
pLoadInfo->currentLoadBlockIndex = 1;
int32_t code = tBlockDataCreate(&pLoadInfo->blockData[0]);
int32_t code = tBlockDataCreate(&pLoadInfo->blockData[0].data);
if (code) {
terrno = code;
}
code = tBlockDataCreate(&pLoadInfo->blockData[1]);
code = tBlockDataCreate(&pLoadInfo->blockData[1].data);
if (code) {
terrno = code;
}
@ -66,11 +67,16 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
}
pLoadInfo->currentLoadBlockIndex = 1;
pLoadInfo->blockIndex[0] = -1;
pLoadInfo->blockIndex[1] = -1;
tBlockDataDestroy(&pLoadInfo->blockData[0]);
tBlockDataDestroy(&pLoadInfo->blockData[1]);
SBlockDataInfo* pInfo = &pLoadInfo->blockData[0];
tBlockDataDestroy(&pInfo->data);
pInfo->sttBlockIndex = -1;
pInfo->pin = false;
pInfo = &pLoadInfo->blockData[1];
tBlockDataDestroy(&pInfo->data);
pInfo->sttBlockIndex = -1;
pInfo->pin = false;
taosArrayDestroy(pLoadInfo->aSttBlk);
taosMemoryFree(pLoadInfo);
@ -109,37 +115,48 @@ void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo* pLoa
return NULL;
}
// choose the unpinned slot to load next data block
static void updateBlockLoadSlot(SSttBlockLoadInfo* pLoadInfo) {
int32_t nextSlotIndex = pLoadInfo->currentLoadBlockIndex ^ 1;
if (pLoadInfo->blockData[nextSlotIndex].pin) {
nextSlotIndex = nextSlotIndex ^ 1;
}
pLoadInfo->currentLoadBlockIndex = nextSlotIndex;
}
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
int32_t code = 0;
SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
if (pInfo->blockIndex[0] == pIter->iSttBlk) {
if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) {
if (pInfo->currentLoadBlockIndex != 0) {
tsdbDebug("current load index is set to 0, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
tsdbDebug("current load index is set to 0, block index:%d, fileVer:%" PRId64 ", due to uid:%" PRIu64
", load data, %s",
pIter->iSttBlk, pIter->cid, pIter->uid, idStr);
pInfo->currentLoadBlockIndex = 0;
}
return &pInfo->blockData[0];
return &pInfo->blockData[0].data;
}
if (pInfo->blockIndex[1] == pIter->iSttBlk) {
if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) {
if (pInfo->currentLoadBlockIndex != 1) {
tsdbDebug("current load index is set to 1, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
tsdbDebug("current load index is set to 1, block index:%d, fileVer:%" PRId64 ", due to uid:%" PRIu64
", load data, %s",
pIter->iSttBlk, pIter->cid, pIter->uid, idStr);
pInfo->currentLoadBlockIndex = 1;
}
return &pInfo->blockData[1];
return &pInfo->blockData[1].data;
}
if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
return NULL;
}
// current block not loaded yet
pInfo->currentLoadBlockIndex ^= 1;
updateBlockLoadSlot(pInfo);
int64_t st = taosGetTimestampUs();
SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex].data;
code = tsdbSttFileReadBlockDataByColumn(pIter->pReader, pIter->pSttBlk, pBlock, pInfo->pSchema, &pInfo->colIds[1],
pInfo->numOfCols - 1);
if (code != TSDB_CODE_SUCCESS) {
@ -150,17 +167,19 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
pInfo->cost.blockElapsedTime += el;
pInfo->cost.loadBlocks += 1;
tsdbDebug("read last block, total load:%"PRId64", trigger by uid:%" PRIu64
", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
pInfo->cost.loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
pBlock, el, idStr);
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow,
tsdbDebug("read last block, total load:%" PRId64 ", trigger by uid:%" PRIu64 ", stt-fileVer:%" PRId64
", last block index:%d, entry:%d, rows:%d, uidRange:%" PRId64 "-%" PRId64 " tsRange:%" PRId64 "-%" PRId64
" %p, elapsed time:%.2f ms, %s",
pInfo->cost.loadBlocks, pIter->uid, pIter->cid, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
pIter->pSttBlk->minUid, pIter->pSttBlk->maxUid, pIter->pSttBlk->minKey, pIter->pSttBlk->maxKey, pBlock, el,
idStr);
return &pInfo->blockData[pInfo->currentLoadBlockIndex];
pInfo->blockData[pInfo->currentLoadBlockIndex].sttBlockIndex = pIter->iSttBlk;
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].data.nRow : -1;
tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockData[0].sttBlockIndex,
pInfo->blockData[1].sttBlockIndex, pIter->iRow, idStr);
return &pInfo->blockData[pInfo->currentLoadBlockIndex].data;
_exit:
if (code != TSDB_CODE_SUCCESS) {
@ -424,14 +443,14 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *
return code;
}
int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t cid, int8_t backward,
uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange,
SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr, bool strictTimeRange,
_load_tomb_fn loadTombFn, void *pReader1) {
int32_t code = TSDB_CODE_SUCCESS;
pIter->uid = uid;
pIter->iStt = iStt;
pIter->cid = cid;
pIter->backward = backward;
pIter->verRange.minVer = pRange->minVer;
pIter->verRange.maxVer = pRange->maxVer;
@ -538,12 +557,15 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
pIter->pSttBlk = NULL;
if (index != -1) {
SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, index);
pIter->iSttBlk = index;
pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
tsdbDebug("try next last file block:%d from stt fileIdx:%d, trigger by uid:%" PRIu64 ", file index:%d, %s",
pIter->iSttBlk, oldIndex, pIter->uid, pIter->iStt, idStr);
tsdbDebug("try next stt-file block:%d from %d, trigger by uid:%" PRIu64 ", stt-fileVer:%" PRId64
", uidRange:%" PRId64 "-%" PRId64 " %s",
pIter->iSttBlk, oldIndex, pIter->uid, pIter->cid, p->minUid, p->maxUid, idStr);
} else {
tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr);
tsdbDebug("no more last block qualified, uid:%" PRIu64 ", stt-file block:%d, %s", pIter->uid, oldIndex, idStr);
}
}
@ -787,7 +809,9 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
}
memset(pIter, 0, sizeof(SLDataIter));
code = tLDataIterOpen2(pIter, pSttFileReader, i, pMTree->backward, pConf->suid, pConf->uid, &pConf->timewindow,
int64_t cid = pSttLevel->fobjArr->data[i]->f->cid;
code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf->suid, pConf->uid, &pConf->timewindow,
&pConf->verRange, pLoadInfo, pMTree->idStr, pConf->strictTimeRange, pConf->loadTombFn,
pConf->pReader);
if (code != TSDB_CODE_SUCCESS) {
@ -816,8 +840,66 @@ void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTr
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }
static void tLDataIterPinSttBlock(SLDataIter* pIter, const char* id) {
SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo;
if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) {
pInfo->blockData[0].pin = true;
ASSERT(!pInfo->blockData[1].pin);
tsdbDebug("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
return;
}
if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) {
pInfo->blockData[1].pin = true;
ASSERT(!pInfo->blockData[0].pin);
tsdbDebug("pin stt-block, blockIndex:%d, stt-fileVer:%"PRId64" %s", pIter->iSttBlk, pIter->cid, id);
return;
}
tsdbError("failed to pin any stt block, sttBlock:%d stt-fileVer:%"PRId64" %s", pIter->iSttBlk, pIter->cid, id);
}
static void tLDataIterUnpinSttBlock(SLDataIter* pIter, const char* id) {
SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo;
if (pInfo->blockData[0].pin) {
ASSERT(!pInfo->blockData[1].pin);
pInfo->blockData[0].pin = false;
tsdbTrace("unpin stt-block:%d, stt-fileVer:%" PRId64 " %s", pInfo->blockData[0].sttBlockIndex, pIter->cid, id);
return;
}
if (pInfo->blockData[1].pin) {
ASSERT(!pInfo->blockData[0].pin);
pInfo->blockData[1].pin = false;
tsdbTrace("unpin stt-block:%d, stt-fileVer:%" PRId64 " %s", pInfo->blockData[1].sttBlockIndex, pIter->cid, id);
return;
}
tsdbError("failed to unpin any stt block, sttBlock:%d stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
}
void tMergeTreePinSttBlock(SMergeTree *pMTree) {
if (pMTree->pIter == NULL) {
return;
}
SLDataIter *pIter = pMTree->pIter;
pMTree->pPinnedBlockIter = pIter;
tLDataIterPinSttBlock(pIter, pMTree->idStr);
}
void tMergeTreeUnpinSttBlock(SMergeTree *pMTree) {
if (pMTree->pPinnedBlockIter == NULL) {
return;
}
SLDataIter* pIter = pMTree->pPinnedBlockIter;
pMTree->pPinnedBlockIter = NULL;
tLDataIterUnpinSttBlock(pIter, pMTree->idStr);
}
bool tMergeTreeNext(SMergeTree *pMTree) {
int32_t code = TSDB_CODE_SUCCESS;
if (pMTree->pIter) {
SLDataIter *pIter = pMTree->pIter;
@ -851,8 +933,5 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
void tMergeTreeClose(SMergeTree *pMTree) {
pMTree->pIter = NULL;
if (pMTree->destroyLoadInfo) {
pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
pMTree->destroyLoadInfo = false;
}
pMTree->pPinnedBlockIter = NULL;
}

View File

@ -1420,6 +1420,14 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
}
}
static void doPinSttBlock(SLastBlockReader* pLastBlockReader) {
tMergeTreePinSttBlock(&pLastBlockReader->mergeTree);
}
static void doUnpinSttBlock(SLastBlockReader* pLastBlockReader) {
tMergeTreeUnpinSttBlock(&pLastBlockReader->mergeTree);
}
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader,
bool* copied) {
@ -1427,7 +1435,10 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
*copied = false;
// avoid the fetch next row replace the referenced stt block in buffer
doPinSttBlock(pLastBlockReader);
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange);
doUnpinSttBlock(pLastBlockReader);
if (hasVal) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 != ts) {
@ -4071,13 +4082,6 @@ void tsdbReaderClose2(STsdbReader* pReader) {
tsdbDataFileReaderClose(&pReader->pFileReader);
}
qTrace("tsdb/reader-close: %p, untake snapshot", pReader);
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, true);
pReader->pReadSnap = NULL;
tsdbReleaseReader(pReader);
tsdbUninitReaderLock(pReader);
SCostSummary* pCost = &pReader->cost;
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
if (pFilesetIter->pLastBlockReader != NULL) {
@ -4089,6 +4093,13 @@ void tsdbReaderClose2(STsdbReader* pReader) {
destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
taosMemoryFreeClear(pReader->status.uidList.tableUidList);
qTrace("tsdb/reader-close: %p, untake snapshot", pReader);
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, true);
pReader->pReadSnap = NULL;
tsdbReleaseReader(pReader);
tsdbUninitReaderLock(pReader);
tsdbDebug(
"%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
" SMA-time:%.2f ms, fileBlocks:%" PRId64

View File

@ -146,7 +146,6 @@ typedef struct SLastBlockReader {
int32_t order;
uint64_t uid;
SMergeTree mergeTree;
SSttBlockLoadInfo* pInfo;
int64_t currentKey;
} SLastBlockReader;

View File

@ -307,7 +307,6 @@ typedef struct STagScanInfo {
SSDataBlock* pRes;
SColMatchInfo matchInfo;
int32_t curPos;
SLimitNode* pSlimit;
SReadHandle readHandle;
STableListInfo* pTableListInfo;
uint64_t suid;
@ -318,6 +317,7 @@ typedef struct STagScanInfo {
SArray* aUidTags; // SArray<STUidTagInfo>
SArray* aFilterIdxs; // SArray<int32_t>
SStorageAPI* pStorageAPI;
SLimitInfo limitInfo;
} STagScanInfo;
typedef enum EStreamScanMode {

View File

@ -3060,7 +3060,12 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
}
pRes->info.rows = count;
pOperator->resultInfo.totalRows += count;
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
if (bLimitReached) {
setOperatorCompleted(pOperator);
}
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
@ -3094,28 +3099,20 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
if (++pInfo->curPos >= size) {
setOperatorCompleted(pOperator);
}
// each table with tbname is a group, hence its own block, but only group when slimit exists for performance reason.
if (pInfo->pSlimit != NULL) {
if (pInfo->curPos < pInfo->pSlimit->offset) {
continue;
}
pInfo->pRes->info.id.groupId = calcGroupId(mr.me.name, strlen(mr.me.name));
if (pInfo->curPos >= (pInfo->pSlimit->offset + pInfo->pSlimit->limit) - 1) {
setOperatorCompleted(pOperator);
}
break;
}
}
pRes->info.rows = count;
pAPI->metaReaderFn.clearReader(&mr);
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
if (bLimitReached) {
setOperatorCompleted(pOperator);
}
// qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
if (pOperator->status == OP_EXEC_DONE) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
}
pRes->info.rows = count;
pOperator->resultInfo.totalRows += count;
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
@ -3169,8 +3166,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
pInfo->pSlimit = (SLimitNode*)pPhyNode->node.pSlimit; //TODO: slimit now only indicate group
initLimitInfo(pPhyNode->node.pLimit, pPhyNode->node.pSlimit, &pInfo->limitInfo);
setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
initResultSizeInfo(&pOperator->resultInfo, 4096);

View File

@ -2730,36 +2730,6 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) {
return true;
}
static SLogicNode* tagScanOptFindAncestorWithSlimit(SLogicNode* pTableScanNode) {
SLogicNode* pNode = pTableScanNode->pParent;
while (NULL != pNode) {
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) ||
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pNode)) {
return NULL;
}
if (NULL != pNode->pSlimit) {
return pNode;
}
pNode = pNode->pParent;
}
return NULL;
}
static void tagScanOptCloneAncestorSlimit(SLogicNode* pTableScanNode) {
if (NULL != pTableScanNode->pSlimit) {
return;
}
SLogicNode* pNode = tagScanOptFindAncestorWithSlimit(pTableScanNode);
if (NULL != pNode) {
// TODO: only set the slimit now. push down slimit later
pTableScanNode->pSlimit = nodesCloneNode(pNode->pSlimit);
((SLimitNode*)pTableScanNode->pSlimit)->limit += ((SLimitNode*)pTableScanNode->pSlimit)->offset;
((SLimitNode*)pTableScanNode->pSlimit)->offset = 0;
}
return;
}
static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SScanLogicNode* pScanNode = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tagScanOptShouldBeOptimized);
if (NULL == pScanNode) {
@ -2795,13 +2765,6 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
pScanNode->node.pTargets = pScanTargets;
}
int32_t code = replaceLogicNode(pLogicSubplan, pAgg, (SLogicNode*)pScanNode);
if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pAgg->pChildren);
}
nodesDestroyNode((SNode*)pAgg);
tagScanOptCloneAncestorSlimit((SLogicNode*)pScanNode);
pScanNode->onlyMetaCtbIdx = false;
pCxt->optimized = true;

View File

@ -691,9 +691,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
tFreeStreamTask(pTask);
stError(
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
"stream "
"manually",
vgId, tsDataDir);
"stream manually", vgId, tsDataDir);
return -1;
}
tDecoderClear(&decoder);

View File

@ -19,6 +19,7 @@
#include "streamBackendRocksdb.h"
#include "streamInt.h"
#include "tcommon.h"
#include "streamInt.h"
enum SBackendFileType {
ROCKSDB_OPTIONS_TYPE = 1,

View File

@ -2860,11 +2860,12 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
}
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
int32_t code = -1;
if (pEntry->dataLen < sizeof(SMsgHead)) {
sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
syncEntryDestroy(pEntry);
return -1;
goto _out;
}
// append to log buffer
@ -2873,9 +2874,11 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
ASSERT(terrno != 0);
(void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false);
syncEntryDestroy(pEntry);
return -1;
goto _out;
}
code = 0;
_out:;
// proceed match index, with replicating on needed
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append");
@ -2886,7 +2889,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// multi replica
if (ths->replicaNum > 1) {
return 0;
return code;
}
// single replica
@ -2894,10 +2897,10 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
return -1;
code = -1;
}
return 0;
return code;
}
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {

View File

@ -0,0 +1,206 @@
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import tdDnodes
from math import inf
class TDTestCase:
def caseDescription(self):
'''
case1<shenglian zhou>: [TD-11204]Difference improvement that can ignore negative
'''
return
def init(self, conn, logSql, replicaVer=1):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), True)
self._conn = conn
def restartTaosd(self, index=1, dbname="db"):
tdDnodes.stop(index)
tdDnodes.startWithoutSleep(index)
tdSql.execute(f"use tagscan")
def runSingleVgroup(self):
print("running {}".format(__file__))
tdSql.execute("drop database if exists tagscan2")
tdSql.execute("create database if not exists tagscan2 vgroups 1")
tdSql.execute('use tagscan2')
tdSql.execute('create table stb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);')
tdSql.execute("create table tb1 using stb1 tags(1,'1',1.0);")
tdSql.execute("create table tb2 using stb1 tags(2,'2',2.0);")
tdSql.execute("create table tb3 using stb1 tags(3,'3',3.0);")
tdSql.execute("create table tb4 using stb1 tags(4,'4',4.0);")
tdSql.execute("create table tb5 using stb1 tags(5,'5',5.0);")
tdSql.execute("create table tb6 using stb1 tags(5,'5',5.0);")
tdSql.execute('insert into tb1 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
tdSql.execute('insert into tb2 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
tdSql.execute('insert into tb3 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
tdSql.execute('insert into tb4 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
tdSql.execute('insert into tb5 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
tdSql.execute('insert into tb6 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
tdSql.query('select tags t1,t2 from stb1 order by t1,t2;')
tdSql.checkRows(6)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, '1')
tdSql.checkData(1, 0, 2)
tdSql.checkData(1, 1, '2')
tdSql.checkData(2, 0, 3)
tdSql.checkData(2, 1, '3')
tdSql.checkData(3, 0, 4)
tdSql.checkData(3, 1, '4')
tdSql.checkData(4, 0, 5)
tdSql.checkData(4, 1, '5')
tdSql.checkData(5, 0, 5)
tdSql.checkData(5, 1, '5')
tdSql.query('select * from (select tags t1,t2 from stb1 group by t1,t2 slimit 2,3) order by t1,t2;')
tdSql.checkRows(3)
tdSql.query('select * from (select tags tbname tn from stb1 group by tbname slimit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.query('select * from (select tbname tn from stb1 group by tbname slimit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.query('select * from (select tbname tn from stb1 group by tbname order by tbname limit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.checkData(0, 0, 'tb3')
tdSql.query('select * from (select distinct tbname tn from stb1 limit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.query('select * from (select distinct tbname tn, t1,t2 from stb1 limit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.query('select * from (select tags t1,t2 from stb1 order by t1, t2 limit 2,3) order by t1, t2;')
tdSql.checkRows(3)
tdSql.checkData(0, 0, 3)
tdSql.checkData(0, 1, '3')
tdSql.checkData(1, 0, 4)
tdSql.checkData(1, 1, '4')
tdSql.checkData(2, 0, 5)
tdSql.checkData(2, 1, '5')
tdSql.query('select * from (select tbname tn, t1,t2 from stb1 partition by tbname slimit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.query('select * from (select tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.query('select * from (select tags tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.execute('drop database tagscan2')
def runMultiVgroups(self):
print("running {}".format(__file__))
tdSql.execute("drop database if exists tagscan")
tdSql.execute("create database if not exists tagscan")
tdSql.execute('use tagscan')
tdSql.execute('create table stb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);')
tdSql.execute("create table tb1 using stb1 tags(1,'1',1.0);")
tdSql.execute("create table tb2 using stb1 tags(2,'2',2.0);")
tdSql.execute("create table tb3 using stb1 tags(3,'3',3.0);")
tdSql.execute("create table tb4 using stb1 tags(4,'4',4.0);")
tdSql.execute("create table tb5 using stb1 tags(5,'5',5.0);")
tdSql.execute("create table tb6 using stb1 tags(5,'5',5.0);")
tdSql.execute('insert into tb1 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
tdSql.execute('insert into tb2 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
tdSql.execute('insert into tb3 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
tdSql.execute('insert into tb4 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
tdSql.execute('insert into tb5 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
tdSql.execute('insert into tb6 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
tdSql.query('select tags t1,t2 from stb1 order by t1,t2;')
tdSql.checkRows(6)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, '1')
tdSql.checkData(1, 0, 2)
tdSql.checkData(1, 1, '2')
tdSql.checkData(2, 0, 3)
tdSql.checkData(2, 1, '3')
tdSql.checkData(3, 0, 4)
tdSql.checkData(3, 1, '4')
tdSql.checkData(4, 0, 5)
tdSql.checkData(4, 1, '5')
tdSql.checkData(5, 0, 5)
tdSql.checkData(5, 1, '5')
tdSql.query('select * from (select tags t1,t2 from stb1 group by t1,t2 slimit 2,3) order by t1,t2;')
tdSql.checkRows(3)
tdSql.query('select * from (select tags tbname tn from stb1 group by tbname slimit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.query('select * from (select tbname tn from stb1 group by tbname slimit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.query('select * from (select tbname tn from stb1 group by tbname order by tbname limit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.checkData(0, 0, 'tb3')
tdSql.query('select * from (select distinct tbname tn from stb1 limit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.query('select * from (select distinct tbname tn, t1,t2 from stb1 limit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.query('select * from (select tags t1,t2 from stb1 order by t1, t2 limit 2,3) order by t1, t2;')
tdSql.checkRows(3)
tdSql.checkData(0, 0, 3)
tdSql.checkData(0, 1, '3')
tdSql.checkData(1, 0, 4)
tdSql.checkData(1, 1, '4')
tdSql.checkData(2, 0, 5)
tdSql.checkData(2, 1, '5')
tdSql.query('select * from (select tbname tn, t1,t2 from stb1 partition by tbname slimit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.query('select * from (select tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.query('select * from (select tags tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;')
tdSql.checkRows(3)
tdSql.execute('drop database tagscan')
def run(self):
self.runMultiVgroups()
self.runSingleVgroup()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -1278,6 +1278,7 @@ e
#develop test
,,n,develop-test,python3 ./test.py -f 2-query/table_count_scan.py
,,n,develop-test,python3 ./test.py -f 2-query/ts-range.py
,,n,develop-test,python3 ./test.py -f 2-query/tag_scan.py
,,n,develop-test,python3 ./test.py -f 2-query/show_create_db.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/custom_col_tag.py