Merge branch 'main' into test/TD-23715

This commit is contained in:
Ping Xiao 2023-09-06 16:19:45 +08:00
commit 5c74d5f103
37 changed files with 255 additions and 111 deletions

View File

@ -1,7 +1,7 @@
# cos
ExternalProject_Add(mxml
GIT_REPOSITORY https://github.com/michaelrsweet/mxml.git
GIT_TAG release-2.12
GIT_TAG v2.12
SOURCE_DIR "${TD_CONTRIB_DIR}/mxml"
#BINARY_DIR ""
BUILD_IN_SOURCE TRUE

View File

@ -292,11 +292,11 @@ CONCAT_WS(separator_expr, expr1, expr2 [, expr] ...)
LENGTH(expr)
```
**Description**: The length in bytes of a string
**Description**: The length in bytes
**Return value type**: Bigint
**Applicable data types**: VARCHAR and NCHAR fields or columns
**Applicable data types**: VARCHAR and NCHAR and VARBINARY
**Nested query**: It can be used in both the outer query and inner query in a nested query.

View File

@ -10,7 +10,7 @@ TDengine 是一款开源、高性能、云原生的[时序数据库](https://tde
## 主要产品
TDengine 有三个主要产品TDengine Pro (即 TDengine 企业版TDengine Cloud和 TDengine OSS关于它们的具体定义请参考
TDengine 有三个主要产品TDengine Enterprise (即 TDengine 企业版TDengine Cloud和 TDengine OSS关于它们的具体定义请参考
- [TDengine 企业版](https://www.taosdata.com/tdengine-pro)
- [TDengine 云服务](https://cloud.taosdata.com/?utm_source=menu&utm_medium=webcn)
- [TDengine 开源版](https://www.taosdata.com/tdengine-oss)

View File

@ -1004,7 +1004,7 @@ TaosConsumer consumer = new TaosConsumer<>(config);
- httpConnectTimeout: 创建连接超时参数,单位 ms默认为 5000 ms。仅在 WebSocket 连接下有效。
- messageWaitTimeout: 数据传输超时参数,单位 ms默认为 10000 ms。仅在 WebSocket 连接下有效。
- httpPoolSize: 同一个连接下最大并行请求数。仅在 WebSocket 连接下有效。
其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group)
其他参数请参考:[Consumer 参数列表](../../develop/tmq#创建-consumer-以及consumer-group)
#### 订阅消费数据
@ -1082,7 +1082,7 @@ consumer.unsubscribe();
consumer.close()
```
详情请参考:[数据订阅](../../../develop/tmq)
详情请参考:[数据订阅](../../develop/tmq)
#### 完整示例
@ -1373,7 +1373,7 @@ public static void main(String[] args) throws Exception {
**解决方法** 更换 taos-jdbcdriver 3.0.2+ 版本。
其它问题请参考 [FAQ](../../../train-faq/faq)
其它问题请参考 [FAQ](../../train-faq/faq)
## API 参考

View File

@ -352,7 +352,7 @@ client.put(&sml_data)?
### 数据订阅
TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。
TDengine 通过消息队列 [TMQ](../../taos-sql/tmq/) 启动一个订阅。
#### 创建 Topic
@ -491,7 +491,7 @@ let taos = pool.get()?;
## 常见问题
请参考 [FAQ](../../../train-faq/faq)
请参考 [FAQ](../../train-faq/faq)
## API 参考

View File

@ -292,11 +292,11 @@ CONCAT_WS(separator_expr, expr1, expr2 [, expr] ...)
LENGTH(expr)
```
**功能说明**:以字节计数的字符串长度。
**功能说明**:以字节计数的长度。
**返回结果类型**BIGINT。
**适用数据类型**输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列
**适用数据类型**VARCHAR, NCHAR, VARBINARY
**嵌套子查询支持**:适用于内层查询和外层查询。

View File

@ -13,7 +13,7 @@ taosBenchmark (曾用名 taosdemo ) 是一个用于测试 TDengine 产品性能
taosBenchmark 有两种安装方式:
- 安装 TDengine 官方安装包的同时会自动安装 taosBenchmark, 详情请参考[ TDengine 安装](../../operation/pkg-install)。
- 安装 TDengine 官方安装包的同时会自动安装 taosBenchmark, 详情请参考 [TDengine 安装](../../get-started/)。
- 单独编译 taos-tools 并安装, 详情请参考 [taos-tools](https://github.com/taosdata/taos-tools) 仓库。

View File

@ -16,7 +16,7 @@ taosKeeper 是 TDengine 3.0 版本监控指标的导出工具,通过简单的
taosKeeper 有两种安装方式:
taosKeeper 安装方式:
- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper, 详情请参考[ TDengine 安装](../../operation/pkg-install)。
- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper, 详情请参考[ TDengine 安装](../../get-started/)。
- 单独编译 taosKeeper 并安装,详情请参考 [taosKeeper](https://github.com/taosdata/taoskeeper) 仓库。

View File

@ -23,7 +23,7 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送
1. Linux 操作系统
2. 已安装 Java 8 和 Maven
3. 已安装 Git、curl、vi
4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](../../operation/pkg-install)
4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](../../get-started/)
## 安装 Kafka

View File

@ -41,16 +41,16 @@ enum {
STREAM_STATUS__PAUSE,
};
enum {
typedef enum ETaskStatus {
TASK_STATUS__NORMAL = 0,
TASK_STATUS__DROPPING,
TASK_STATUS__FAIL,
TASK_STATUS__UNINIT, // not used, an placeholder
TASK_STATUS__STOP,
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
};
} ETaskStatus;
enum {
TASK_SCHED_STATUS__INACTIVE = 1,

View File

@ -338,7 +338,7 @@ if [ "$verMode" == "cluster" ]; then
tmp_pwd=`pwd`
cd ${install_dir}/connector
if [ ! -d taos-connector-jdbc ];then
git clone -b 3.2.1 --depth=1 https://github.com/taosdata/taos-connector-jdbc.git ||:
git clone -b main --depth=1 https://github.com/taosdata/taos-connector-jdbc.git ||:
fi
cd taos-connector-jdbc
mvn clean package -Dmaven.test.skip=true

View File

@ -2342,9 +2342,9 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
alterReq.alterType, alterReq.numOfFields, alterReq.ttl);
SName name = {0};
tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB);
tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
auditRecord(pReq, pMnode->clusterId, "alterStb", name.dbname, alterReq.name, detail);
auditRecord(pReq, pMnode->clusterId, "alterStb", name.dbname, name.tname, detail);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {

View File

@ -868,6 +868,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mndTransDrop(pTrans);
taosThreadMutexLock(&execNodeList.lock);
mDebug("register to stream task node list");
keepStreamTasksInBuf(&streamObj, &execNodeList);
taosThreadMutexUnlock(&execNodeList.lock);
@ -876,8 +877,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
char detail[2000] = {0};
sprintf(detail,
"checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64
", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64
", maxDelay:%" PRId64 ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", maxDelay:%" PRId64
", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark,
createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate,
createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,
@ -1574,8 +1575,8 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
} else if (taskStatus == TASK_STATUS__DROPPING) {
memcpy(varDataVal(status), "dropping", 8);
varDataSetLen(status, 8);
} else if (taskStatus == TASK_STATUS__FAIL) {
memcpy(varDataVal(status), "fail", 4);
} else if (taskStatus == TASK_STATUS__UNINIT) {
memcpy(varDataVal(status), "uninit", 6);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__STOP) {
memcpy(varDataVal(status), "stop", 4);
@ -2016,14 +2017,11 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
const SEp* p = GET_ACTIVE_EP(pCurrent);
for (int32_t i = 0; i < pCurrent->numOfEps; ++i) {
const SEp *p = &(pCurrent->eps[i]);
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
return false;
}
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
return false;
}
return true;
}
@ -2120,6 +2118,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid);
int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo);
if (code != TSDB_CODE_SUCCESS) {
sdbCancelFetch(pSdb, pIter);
return code;
}
}
@ -2223,18 +2222,22 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
code = mndProcessVgroupChange(pMnode, &changeInfo);
// keep the new vnode snapshot
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("create trans successfully, update cached node list");
taosArrayDestroy(execNodeList.pNodeEntryList);
execNodeList.pNodeEntryList = pNodeSnapshot;
execNodeList.ts = ts;
}
} else {
mDebug("no update found in nodeList");
taosArrayDestroy(pNodeSnapshot);
}
taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap);
// keep the new vnode snapshot
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
taosArrayDestroy(execNodeList.pNodeEntryList);
execNodeList.pNodeEntryList = pNodeSnapshot;
execNodeList.ts = ts;
}
mDebug("end to do stream task node change checking");
atomic_store_32(&mndNodeCheckSentinel, 0);
return 0;
@ -2284,7 +2287,6 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p
// todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
int32_t code = TSDB_CODE_SUCCESS;
@ -2309,8 +2311,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
int64_t k[2] = {p->streamId, p->taskId};
int64_t k[2] = {p->streamId, p->taskId};
int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
if (index == NULL) {
continue;

View File

@ -165,6 +165,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);
int32_t tqCheckAndRunStreamTask(STQ* pTq);
int32_t tqStartStreamTasks(STQ* pTq);
int32_t tqStopStreamTasks(STQ* pTq);
// tq util

View File

@ -1416,7 +1416,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
}
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) {
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
// no lock needs to secure the access of the version
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
// discard all the data when the stream task is suspended.
@ -1700,20 +1700,47 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamSetStatusNormal(pTask);
SStreamTask** ppHTask = NULL;
if (pTask->historyTaskId.taskId != 0) {
keys[0] = pTask->historyTaskId.streamId;
keys[1] = pTask->historyTaskId.taskId;
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppHTask == NULL || *ppHTask == NULL) {
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
pMeta->vgId, req.taskId);
} else {
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
}
}
{
streamSetStatusNormal(pTask);
streamMetaSaveTask(pMeta, pTask);
if (ppHTask != NULL) {
streamMetaSaveTask(pMeta, *ppHTask);
}
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
}
streamTaskStop(pTask);
if (ppHTask != NULL) {
streamTaskStop(*ppHTask);
}
tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr);
pMeta->closedTask += 1;
if (ppHTask != NULL) {
pMeta->closedTask += 1;
}
// possibly only handle the stream task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
bool allStopped = (pMeta->closedTask == numOfTasks);
if (allStopped) {
@ -1752,6 +1779,7 @@ _end:
taosWUnLockLatch(&pMeta->lock);
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
vInfo("vgId:%d, restart all stream tasks", vgId);
tqStartStreamTasks(pTq);
tqCheckAndRunStreamTaskAsync(pTq);
}
}

View File

@ -51,7 +51,7 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
SStreamSnapReader* pSnapReader = NULL;
if (streamSnapReaderOpen(pTq, sver, chkpId, pTq->path, &pSnapReader) == 0) {
if (streamSnapReaderOpen(meta, sver, chkpId, pTq->path, &pSnapReader) == 0) {
pReader->complete = 1;
} else {
code = -1;

View File

@ -224,6 +224,35 @@ int32_t tqStopStreamTasks(STQ* pTq) {
return 0;
}
int32_t tqStartStreamTasks(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
taosWLockLatch(&pMeta->lock);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key));
int8_t status = (*pTask)->status.taskStatus;
if (status == TASK_STATUS__STOP) {
streamSetStatusNormal(*pTask);
}
}
taosWUnLockLatch(&pMeta->lock);
return 0;
}
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
// seek the stored version and extract data from WAL
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);

View File

@ -560,6 +560,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId);
} else {
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
tqStartStreamTasks(pVnode->pTq);
tqCheckAndRunStreamTaskAsync(pVnode->pTq);
}
} else {

View File

@ -669,8 +669,13 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
p->info.id.groupId = tupleGroupId;
pInfo->groupId = tupleGroupId;
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
if (p->info.rows == 0) {
appendOneRowToDataBlock(p, pTupleHandle);
p->info.id.groupId = pInfo->groupId = tupleGroupId;
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
}
}
} else {
appendOneRowToDataBlock(p, pTupleHandle);
@ -715,14 +720,9 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
// if limit is reached within a group, do not clear limiInfo otherwise the next block
// will be processed.
if (newgroup && limitReached) {
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
if (p->info.rows > 0 || limitReached) {
if (p->info.rows > 0) {
break;
}
}

View File

@ -1839,10 +1839,6 @@ static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (TSDB_DATA_TYPE_VARBINARY == ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS;
}

View File

@ -2902,7 +2902,7 @@ static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) {
taosCreateMD5Hash(buf, len);
strncpy(pFunc->node.aliasName, buf, TSDB_COL_NAME_LEN - 1);
len = snprintf(buf, sizeof(buf) - 1, "%s(%s)", pSrcFunc->functionName, pCol->colName);
taosCreateMD5Hash(buf, len);
// note: userAlias could be truncated here
strncpy(pFunc->node.userAlias, buf, TSDB_COL_NAME_LEN - 1);
}
} else {
@ -2910,7 +2910,7 @@ static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) {
taosCreateMD5Hash(buf, len);
strncpy(pFunc->node.aliasName, buf, TSDB_COL_NAME_LEN - 1);
len = snprintf(buf, sizeof(buf) - 1, "%s(%s)", pSrcFunc->functionName, pExpr->userAlias);
taosCreateMD5Hash(buf, len);
// note: userAlias could be truncated here
strncpy(pFunc->node.userAlias, buf, TSDB_COL_NAME_LEN - 1);
}

View File

@ -391,8 +391,8 @@ static void doRetryDispatchData(void* param, void* tmrId) {
SStreamTask* pTask = param;
if (streamTaskShouldStop(&pTask->status)) {
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
return;
}
@ -409,17 +409,22 @@ static void doRetryDispatchData(void* param, void* tmrId) {
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
}
} else {
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
}
} else {
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref);
}
}
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
qError("s-task:%s dispatch data in %" PRId64 "ms", pTask->id.idStr, waitDuration);
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
qWarn("s-task:%s dispatch data in %" PRId64 "ms, in timer", pTask->id.idStr, waitDuration);
if (pTask->launchTaskTimer != NULL) {
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
} else {
pTask->launchTaskTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer);
}
}
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
@ -540,8 +545,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
}
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms",
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS);
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
break;
}
@ -995,8 +1002,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data",
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data, ref:%d",
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, ref);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else { // pipeline send data in output queue
// this message has been sent successfully, let's try next one.

View File

@ -666,6 +666,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (p == NULL) {
// pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader
// In this case, we try not to start fill-history task anymore.
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) {
doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask);

View File

@ -235,7 +235,13 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
streamTaskLaunchScanHistory(pTask);
} else {
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
if (pTask->info.fillHistory == 1) {
qDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id);
pTask->status.taskStatus = TASK_STATUS__DROPPING;
ASSERT(pTask->historyTaskId.taskId == 0);
} else {
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
}
}
// when current stream task is ready, check the related fill history task.
@ -579,19 +585,17 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
// todo fix the bug: 2. race condition
// an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
int32_t tId = pTask->historyTaskId.taskId;
if (tId == 0) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->historyTaskId.taskId;
if (hTaskId == 0) {
return TSDB_CODE_SUCCESS;
}
ASSERT(pTask->status.downstreamReady == 1);
qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
pTask->historyTaskId.streamId, tId);
pTask->historyTaskId.streamId, hTaskId);
SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->historyTaskId.taskId;
int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId};
int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId};
// Set the execute conditions, including the query time window and the version range
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
@ -610,11 +614,12 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
// todo failed to create timer
taosMemoryFree(pInfo);
} else {
atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
ASSERT(ref == 1);
qDebug("s-task:%s set timer active flag", pTask->id.idStr);
}
} else { // timer exists
ASSERT(pTask->status.timerActive > 0);
ASSERT(pTask->status.timerActive == 1);
qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
}
@ -918,6 +923,13 @@ void streamTaskHalt(SStreamTask* pTask) {
return;
}
// wait for checkpoint completed
while(pTask->status.taskStatus == TASK_STATUS__CK) {
qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr,
streamGetTaskStatusStr(TASK_STATUS__CK));
taosMsleep(1000);
}
// upgrade to halt status
if (status == TASK_STATUS__PAUSE) {
qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),

View File

@ -59,6 +59,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py

View File

@ -0,0 +1,20 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database test
sql use test
sql CREATE TABLE `tb` (`ts` TIMESTAMP, `c0` INT, `c1` FLOAT, `c2` BINARY(10))
sql insert into tb values("2022-05-15 00:01:08.000", 1, 1.0, "abc")
sql insert into tb values("2022-05-16 00:01:08.000", 2, 2.0, "bcd")
sql insert into tb values("2022-05-17 00:01:08.000", 3, 3.0, "cde")
#sleep 10000000
system taos -P7100 -s 'source tsim/query/t/multires_func.sql' | grep -v 'Query OK' | grep -v 'Client Version' > /tmp/multires_func.result
system echo ----------------------diff start-----------------------
system git diff --exit-code --color tsim/query/r/multires_func.result /tmp/multires_func.result
system echo ----------------------diff succeed-----------------------

View File

@ -0,0 +1,31 @@
Copyright (c) 2022 by TDengine, all rights reserved.
taos> source tsim/query/t/multires_func.sql
taos> use test;
Database changed.
taos> select count(*) from tb\G;
*************************** 1.row ***************************
count(*): 3
taos> select last(*) from tb\G;
*************************** 1.row ***************************
ts: 2022-05-17 00:01:08.000
c0: 3
c1: 3.0000000
c2: cde
taos> select last_row(*) from tb\G;
*************************** 1.row ***************************
ts: 2022-05-17 00:01:08.000
c0: 3
c1: 3.0000000
c2: cde
taos> select first(*) from tb\G;
*************************** 1.row ***************************
ts: 2022-05-15 00:01:08.000
c0: 1
c1: 1.0000000
c2: abc

View File

@ -0,0 +1,5 @@
use test;
select count(*) from tb\G;
select last(*) from tb\G;
select last_row(*) from tb\G;
select first(*) from tb\G;

View File

@ -198,7 +198,7 @@ class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.clock_gettime(time.CLOCK_REALTIME)
seed = time.time() % 10000
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")

View File

@ -210,7 +210,7 @@ class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.clock_gettime(time.CLOCK_REALTIME)
seed = time.time() % 10000
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")

View File

@ -220,7 +220,7 @@ class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.clock_gettime(time.CLOCK_REALTIME)
seed = time.time() % 10000
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")

View File

@ -251,10 +251,19 @@ class TDTestCase:
tdSql.checkData(2, 4, 9)
tdSql.checkData(3, 4, 9)
def test_partition_by_limit_no_agg(self):
sql_template = 'select t1 from meters partition by t1 limit %d'
for i in range(1, 5000, 1000):
tdSql.query(sql_template % i)
tdSql.checkRows(5 * i)
def run(self):
self.prepareTestEnv()
self.test_interval_limit_offset()
self.test_interval_partition_by_slimit_limit()
self.test_partition_by_limit_no_agg()
def stop(self):
tdSql.close()

View File

@ -220,7 +220,7 @@ class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.clock_gettime(time.CLOCK_REALTIME)
seed = time.time() % 10000
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")

View File

@ -269,7 +269,7 @@ class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.clock_gettime(time.CLOCK_REALTIME)
seed = time.time() % 10000
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")

View File

@ -233,7 +233,7 @@ class TMQCom:
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(stbName,i)
sql += " %s.%s%d values "%(dbName, stbName, i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
rowsBatched += 1
@ -241,7 +241,7 @@ class TMQCom:
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(stbName,i)
sql = "insert into %s.%s%d values " %(dbName, stbName,i)
else:
sql = "insert into "
#end sql
@ -263,7 +263,7 @@ class TMQCom:
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(ctbPrefix,i)
sql += " %s.%s%d values "%(dbName, ctbPrefix,i)
for j in range(rowsPerTbl):
if (j % 2 == 0):
sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, j, j)
@ -274,7 +274,7 @@ class TMQCom:
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
sql = "insert into %s.%s%d values " %(dbName, ctbPrefix, i)
else:
sql = "insert into "
#end sql
@ -296,7 +296,7 @@ class TMQCom:
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(ctbPrefix,i+ctbStartIdx)
sql += " %s.%s%d values "%(dbName, ctbPrefix, i+ctbStartIdx)
for j in range(rowsPerTbl):
if (j % 2 == 0):
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j)
@ -307,7 +307,7 @@ class TMQCom:
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i+ctbStartIdx)
sql = "insert into %s.%s%d values " %(dbName, ctbPrefix, i+ctbStartIdx)
else:
sql = "insert into "
#end sql

View File

@ -28,12 +28,12 @@ static void shellRecordCommandToHistory(char *command);
static int32_t shellRunCommand(char *command, bool recordHistory);
static void shellRunSingleCommandImp(char *command);
static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision);
static int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres);
static int64_t shellDumpResultToFile(const char *fname, TAOS_RES *tres);
static void shellPrintNChar(const char *str, int32_t length, int32_t width);
static void shellPrintGeometry(const unsigned char *str, int32_t length, int32_t width);
static int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql);
static int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql);
static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql);
static int64_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql);
static int64_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql);
static int64_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql);
static void shellReadHistory();
static void shellWriteHistory();
static void shellPrintError(TAOS_RES *tres, int64_t st);
@ -238,14 +238,14 @@ void shellRunSingleCommandImp(char *command) {
if (pFields != NULL) { // select and show kinds of commands
int32_t error_no = 0;
int32_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode, command);
int64_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode, command);
if (numOfRows < 0) return;
et = taosGetTimestampUs();
if (error_no == 0) {
printf("Query OK, %d row(s) in set (%.6fs)\r\n", numOfRows, (et - st) / 1E6);
printf("Query OK, %"PRId64 " row(s) in set (%.6fs)\r\n", numOfRows, (et - st) / 1E6);
} else {
printf("Query interrupted (%s), %d row(s) in set (%.6fs)\r\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6);
printf("Query interrupted (%s), %"PRId64 " row(s) in set (%.6fs)\r\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6);
}
taos_free_result(pSql);
} else {
@ -430,7 +430,7 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
}
}
int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) {
int64_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) {
char fullname[PATH_MAX] = {0};
if (taosExpandDir(fname, fullname, PATH_MAX) != 0) {
tstrncpy(fullname, fname, PATH_MAX);
@ -459,7 +459,7 @@ int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) {
}
taosFprintfFile(pFile, "\r\n");
int32_t numOfRows = 0;
int64_t numOfRows = 0;
do {
int32_t *length = taos_fetch_lengths(tres);
for (int32_t i = 0; i < num_fields; i++) {
@ -702,7 +702,7 @@ bool shellIsShowQuery(const char *sql) {
return false;
}
int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) {
int64_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) {
TAOS_ROW row = taos_fetch_row(tres);
if (row == NULL) {
return 0;
@ -726,11 +726,11 @@ int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) {
resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM;
}
int32_t numOfRows = 0;
int64_t numOfRows = 0;
int32_t showMore = 1;
do {
if (numOfRows < resShowMaxNum) {
printf("*************************** %d.row ***************************\r\n", numOfRows + 1);
printf("*************************** %"PRId64".row ***************************\r\n", numOfRows + 1);
int32_t *length = taos_fetch_lengths(tres);
@ -856,7 +856,7 @@ void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields) {
putchar('\n');
}
int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) {
int64_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) {
TAOS_ROW row = taos_fetch_row(tres);
if (row == NULL) {
return 0;
@ -879,7 +879,7 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) {
resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM;
}
int32_t numOfRows = 0;
int64_t numOfRows = 0;
int32_t showMore = 1;
do {
@ -915,8 +915,8 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) {
return numOfRows;
}
int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql) {
int32_t numOfRows = 0;
int64_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql) {
int64_t numOfRows = 0;
if (fname != NULL) {
numOfRows = shellDumpResultToFile(fname, tres);
} else if (vertical) {

View File

@ -157,10 +157,6 @@ void varbinary_sql_test() {
taos_free_result(pRes);
// string function test, not support
pRes = taos_query(taos, "select length(c2) from stb");
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select ltrim(c2) from stb");
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
@ -190,7 +186,7 @@ void varbinary_sql_test() {
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
// support first/last/last_row/count/hyperloglog/sample/tail/mode
// support first/last/last_row/count/hyperloglog/sample/tail/mode/length
pRes = taos_query(taos, "select first(c2) from stb");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
@ -207,6 +203,10 @@ void varbinary_sql_test() {
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select length(c2) from stb where c2 = '\\x7F8290'");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select cast(t2 as varbinary(16)) from stb order by ts");
while ((row = taos_fetch_row(pRes)) != NULL) {
int32_t* length = taos_fetch_lengths(pRes);