Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30837
This commit is contained in:
commit
1e5a9870ed
|
@ -2,7 +2,7 @@
|
|||
IF (DEFINED VERNUMBER)
|
||||
SET(TD_VER_NUMBER ${VERNUMBER})
|
||||
ELSE ()
|
||||
SET(TD_VER_NUMBER "3.3.3.0.alpha")
|
||||
SET(TD_VER_NUMBER "3.3.4.0.alpha")
|
||||
ENDIF ()
|
||||
|
||||
IF (DEFINED VERCOMPATIBLE)
|
||||
|
|
|
@ -19,7 +19,7 @@ After TDengine server or client installation, `taos.h` is located at
|
|||
The dynamic libraries for the TDengine client driver are located in.
|
||||
|
||||
- Linux: `/usr/local/taos/driver/libtaos.so`
|
||||
- Windows: `C:\TDengine\taos.dll`
|
||||
- Windows: `C:\TDengine\driver\taos.dll`
|
||||
- macOS: `/usr/local/lib/libtaos.dylib`
|
||||
|
||||
## Supported platforms
|
||||
|
|
|
@ -20,6 +20,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t
|
|||
|
||||
import Release from "/components/ReleaseV3";
|
||||
|
||||
## 3.3.3.0
|
||||
|
||||
<Release type="tdengine" version="3.3.3.0" />
|
||||
|
||||
## 3.3.2.0
|
||||
|
||||
<Release type="tdengine" version="3.3.2.0" />
|
||||
|
|
|
@ -239,7 +239,7 @@ d4,2017-07-14T10:40:00.006+08:00,-2.740636,10,-0.893545,7,California.LosAngles
|
|||
|
||||
- `plugins_home`:外部数据源连接器所在目录。
|
||||
- `data_dir`:数据文件存放目录。
|
||||
- `instanceId`:当前 explorer 服务的实例 ID,如果同一台机器上启动了多个 explorer 实例,必须保证各个实例的实例 ID 互不相同。
|
||||
- `instanceId`:当前 taosX 服务的实例 ID,如果同一台机器上启动了多个 taosX 实例,必须保证各个实例的实例 ID 互不相同。
|
||||
- `logs_home`:日志文件存放目录,`taosX` 日志文件的前缀为 `taosx.log`,外部数据源有自己的日志文件名前缀。已弃用,请使用 `log.path` 代替。
|
||||
- `log_level`:日志等级,可选级别包括 `error`、`warn`、`info`、`debug`、`trace`,默认值为 `info`。已弃用,请使用 `log.level` 代替。
|
||||
- `log_keep_days`:日志的最大存储天数,`taosX` 日志将按天划分为不同的文件。已弃用,请使用 `log.keepDays` 代替。
|
||||
|
|
|
@ -11,6 +11,7 @@ sidebar_label: taosX-Agent
|
|||
|
||||
- `endpoint`: 必填,`taosX` 的 GRPC 服务地址。
|
||||
- `token`: 必填,在 `Explorer` 上创建 `Agent` 时,产生的 Token。
|
||||
- `instanceId`:当前 taosx-agent 服务的实例 ID,如果同一台机器上启动了多个 taosx-agent 实例,必须保证各个实例的实例 ID 互不相同。
|
||||
- `compression`: 非必填,可配置为 `ture` 或 `false`, 默认为 `false`。配置为`true`, 则开启 `Agent` 和 `taosX` 通信数据压缩。
|
||||
- `log_level`: 非必填,日志级别,默认为 `info`, 同 `taosX` 一样,支持 `error`,`warn`,`info`,`debug`,`trace` 五级。已弃用,请使用 `log.level` 代替。
|
||||
- `log_keep_days`:非必填,日志保存天数,默认为 `30` 天。已弃用,请使用 `log.keepDays` 代替。
|
||||
|
@ -37,7 +38,7 @@ sidebar_label: taosX-Agent
|
|||
# server instance id
|
||||
#
|
||||
# The instanceId of each instance is unique on the host
|
||||
# instanceId = 64
|
||||
# instanceId = 48
|
||||
|
||||
# enable communication data compression between Agent and taosX
|
||||
#
|
||||
|
|
|
@ -132,7 +132,7 @@ cors = true
|
|||
- `cluster`:TDengine 集群的 taosAdapter 地址。
|
||||
- `cluster_native`:TDengine 集群的原生连接地址,默认关闭。
|
||||
- `x_api`:taosX 的 gRPC 地址。
|
||||
- `grpc`:taosX 代理向 taosX 建立连接的 gRPC 地址.
|
||||
- `grpc`:taosX 代理向 taosX 建立连接的 gRPC 地址。
|
||||
- `cors`:CORS 配置开关,默认为 `false`。当为 `true` 时,允许跨域访问。
|
||||
- `ssl.certificate`:SSL 证书(如果同时设置了 certificate 与 certificate_key 两个参数,则启用 HTTPS 服务,否则不启用)。
|
||||
- `ssl.certificate_key`:SSL 证书密钥。
|
||||
|
|
|
@ -27,7 +27,7 @@ TDengine 服务端或客户端安装后,`taosws.h` 位于:
|
|||
TDengine 客户端驱动的动态库位于:
|
||||
|
||||
- Linux: `/usr/local/taos/driver/libtaosws.so`
|
||||
- Windows: `C:\TDengine\taosws.dll`
|
||||
- Windows: `C:\TDengine\driver\taosws.dll`
|
||||
- macOS: `/usr/local/lib/libtaosws.dylib`
|
||||
|
||||
### 支持的平台
|
||||
|
@ -626,7 +626,7 @@ TDengine 服务端或客户端安装后,`taos.h` 位于:
|
|||
TDengine 客户端驱动的动态库位于:
|
||||
|
||||
- Linux: `/usr/local/taos/driver/libtaos.so`
|
||||
- Windows: `C:\TDengine\taos.dll`
|
||||
- Windows: `C:\TDengine\driver\taos.dll`
|
||||
- macOS: `/usr/local/lib/libtaos.dylib`
|
||||
|
||||
### 支持的平台
|
||||
|
|
|
@ -24,6 +24,10 @@ TDengine 3.x 各版本安装包下载链接如下:
|
|||
|
||||
import Release from "/components/ReleaseV3";
|
||||
|
||||
## 3.3.3.0
|
||||
|
||||
<Release type="tdengine" version="3.3.3.0" />
|
||||
|
||||
## 3.3.2.0
|
||||
|
||||
<Release type="tdengine" version="3.3.2.0" />
|
||||
|
|
|
@ -616,8 +616,8 @@ function update_TDengine() {
|
|||
[ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: taosadapter &${NC}"
|
||||
else
|
||||
echo -e "${GREEN_DARK}To start service ${NC}: launchctl start com.tdengine.taosd${NC}"
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: launchctl start com.tdengine.taosadapter${NC}"
|
||||
echo -e "${GREEN_DARK}To start service ${NC}: sudo launchctl start com.tdengine.taosd${NC}"
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: sudo launchctl start com.tdengine.taosadapter${NC}"
|
||||
fi
|
||||
fi
|
||||
|
||||
|
@ -668,8 +668,8 @@ function install_TDengine() {
|
|||
[ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: taosadapter &${NC}"
|
||||
else
|
||||
echo -e "${GREEN_DARK}To start service ${NC}: launchctl start com.tdengine.taosd${NC}"
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: launchctl start com.tdengine.taosadapter${NC}"
|
||||
echo -e "${GREEN_DARK}To start service ${NC}: sudo launchctl start com.tdengine.taosd${NC}"
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: sudo launchctl start com.tdengine.taosadapter${NC}"
|
||||
fi
|
||||
fi
|
||||
|
||||
|
|
|
@ -906,7 +906,7 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
|
|||
if (gCtgMgmt.queue.stopQueue) {
|
||||
ctgFreeQNode(node);
|
||||
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
|
||||
CTG_RET(TSDB_CODE_CTG_EXIT);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_EXIT);
|
||||
}
|
||||
|
||||
gCtgMgmt.queue.tail->next = node;
|
||||
|
@ -924,7 +924,7 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
|
|||
code = tsem_post(&gCtgMgmt.queue.reqSem);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
qError("tsem_post failed, code:%x", code);
|
||||
CTG_RET(code);
|
||||
CTG_ERR_JRET(code);
|
||||
}
|
||||
|
||||
if (syncOp) {
|
||||
|
@ -935,9 +935,15 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
|
|||
if (!operation->unLocked) {
|
||||
CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
|
||||
}
|
||||
taosMemoryFree(operation);
|
||||
TAOS_UNUSED(tsem_destroy(&operation->rspSem));
|
||||
taosMemoryFreeClear(operation);
|
||||
}
|
||||
return code;
|
||||
|
||||
_return:
|
||||
if (syncOp && operation) {
|
||||
TAOS_UNUSED(tsem_destroy(&operation->rspSem));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -273,10 +273,18 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
|
|||
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
|
||||
void* pParam) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pParam == NULL) {
|
||||
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
qError("invalid input param in creating data deleter, code%s", tstrerror(code));
|
||||
goto _end;
|
||||
}
|
||||
|
||||
SDeleterParam* pDeleterParam = (SDeleterParam*)pParam;
|
||||
|
||||
SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle));
|
||||
if (NULL == deleter) {
|
||||
code = terrno;
|
||||
taosArrayDestroy(pDeleterParam->pUidList);
|
||||
taosMemoryFree(pParam);
|
||||
goto _end;
|
||||
}
|
||||
|
@ -292,12 +300,6 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData
|
|||
deleter->pDeleter = pDeleterNode;
|
||||
deleter->pSchema = pDataSink->pInputDataBlockDesc;
|
||||
|
||||
if (pParam == NULL) {
|
||||
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
qError("invalid input param in creating data deleter, code%s", tstrerror(code));
|
||||
goto _end;
|
||||
}
|
||||
|
||||
deleter->pParam = pParam;
|
||||
deleter->status = DS_BUF_EMPTY;
|
||||
deleter->queryEnd = false;
|
||||
|
|
|
@ -904,7 +904,7 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) {
|
||||
if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
|
||||
", quit, ref:%d",
|
||||
|
@ -1055,7 +1055,8 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
|||
numOfNotSend = taosArrayGetSize(pNotSendList);
|
||||
if (numOfNotSend > 0) {
|
||||
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
|
||||
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
|
||||
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
"trigger-recv-monitor");
|
||||
} else {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref);
|
||||
|
|
|
@ -526,6 +526,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
|
|||
int32_t msgId = pMsgInfo->msgId;
|
||||
int32_t code = 0;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
bool inDispatch = true;
|
||||
|
||||
stDebug("s-task:%s start monitor dispatch data", id);
|
||||
|
||||
|
@ -550,12 +551,15 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
|
|||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s not in dispatch procedure, abort from timer, ref:%d", pTask->id.idStr, ref);
|
||||
|
||||
pTask->msgInfo.inMonitor = 0;
|
||||
streamMutexUnlock(&pMsgInfo->lock);
|
||||
return;
|
||||
pMsgInfo->inMonitor = 0;
|
||||
inDispatch = false;
|
||||
}
|
||||
streamMutexUnlock(&pMsgInfo->lock);
|
||||
|
||||
if (!inDispatch) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now);
|
||||
if (numOfFailed == 0) {
|
||||
stDebug("s-task:%s no error occurs, check again in %dms", id, DISPATCH_RETRY_INTERVAL_MS);
|
||||
|
@ -638,15 +642,54 @@ void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) {
|
|||
"dispatch-monitor");
|
||||
}
|
||||
|
||||
static int32_t doAddDispatchBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
|
||||
SArray* vgInfo, uint32_t hashValue, int64_t now, bool* pFound) {
|
||||
size_t numOfVgroups = taosArrayGetSize(vgInfo);
|
||||
int32_t code = 0;
|
||||
|
||||
*pFound = false;
|
||||
|
||||
for (int32_t j = 0; j < numOfVgroups; j++) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
|
||||
if (pVgInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
|
||||
if ((code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j])) < 0) {
|
||||
stError("s-task:%s failed to add dispatch block, code:%s", pTask->id.idStr, tstrerror(terrno));
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pReqs[j].blockNum == 0) {
|
||||
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
|
||||
if (pDstVgroupInfo != NULL) {
|
||||
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false);
|
||||
}
|
||||
}
|
||||
|
||||
pReqs[j].blockNum++;
|
||||
*pFound = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int64_t groupId,
|
||||
int64_t now) {
|
||||
bool found = false;
|
||||
uint32_t hashValue = 0;
|
||||
int32_t numOfVgroups = 0;
|
||||
int32_t code = 0;
|
||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
|
||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
if (pTask->pNameMap == NULL) {
|
||||
pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
if (pTask->pNameMap == NULL) {
|
||||
stError("s-task:%s failed to init the name map, code:%s", pTask->id.idStr, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t));
|
||||
|
@ -669,11 +712,11 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|||
}
|
||||
}
|
||||
} else {
|
||||
int32_t code = buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId,
|
||||
pDataBlock->info.parTbName);
|
||||
code = buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId,
|
||||
pDataBlock->info.parTbName);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to build child table name for group:%" PRId64 ", code:%s", pTask->id.idStr,
|
||||
groupId, tstrerror(code));
|
||||
stError("s-task:%s failed to build child table name for group:%" PRId64 ", code:%s", pTask->id.idStr, groupId,
|
||||
tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -688,44 +731,21 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|||
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
|
||||
|
||||
// failed to put into name buffer, no need to do anything
|
||||
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { // allow error, and do nothing
|
||||
int32_t code = tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
|
||||
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { // allow error, and do nothing
|
||||
code = tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
|
||||
}
|
||||
}
|
||||
|
||||
numOfVgroups = taosArrayGetSize(vgInfo);
|
||||
|
||||
// TODO: optimize search
|
||||
streamMutexLock(&pTask->msgInfo.lock);
|
||||
code = doAddDispatchBlock(pTask, pReqs, pDataBlock, vgInfo, hashValue, now, &found);
|
||||
streamMutexUnlock(&pTask->msgInfo.lock);
|
||||
|
||||
for (int32_t j = 0; j < numOfVgroups; j++) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
|
||||
if (pVgInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
|
||||
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
|
||||
streamMutexUnlock(&pTask->msgInfo.lock);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pReqs[j].blockNum == 0) {
|
||||
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
|
||||
if (pDstVgroupInfo != NULL) {
|
||||
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false);
|
||||
}
|
||||
}
|
||||
|
||||
pReqs[j].blockNum++;
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
streamMutexUnlock(&pTask->msgInfo.lock);
|
||||
if (!found) {
|
||||
stError("s-task:%s not found req hash value:%u", pTask->id.idStr, hashValue);
|
||||
stError("s-task:%s not found req hash value:%u, failed to add dispatch block", pTask->id.idStr, hashValue);
|
||||
return TSDB_CODE_STREAM_INTERNAL_ERROR;
|
||||
} else {
|
||||
return 0;
|
||||
|
@ -919,7 +939,7 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, int32_t num) {
|
|||
}
|
||||
|
||||
static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, int32_t num, int32_t vgId, int32_t level,
|
||||
const char* id) {
|
||||
const char* id) {
|
||||
SArray* pTmp = taosArrayInit(4, sizeof(int32_t));
|
||||
if (pTmp == NULL) {
|
||||
return terrno;
|
||||
|
@ -940,8 +960,8 @@ static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, in
|
|||
stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId);
|
||||
return terrno;
|
||||
} else {
|
||||
stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId,
|
||||
level, pInfo->upstreamTaskId);
|
||||
stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, level,
|
||||
pInfo->upstreamTaskId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -987,13 +1007,48 @@ static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t
|
|||
}
|
||||
}
|
||||
|
||||
static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
||||
static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, SArray* pNotRspList) {
|
||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
|
||||
SArray* pList = pActiveInfo->pReadyMsgList;
|
||||
int32_t num = taosArrayGetSize(pList);
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int32_t checkpointId = pActiveInfo->activeId;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t notRsp = 0;
|
||||
|
||||
int32_t code = doTaskChkptStatusCheck(pTask, num);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id);
|
||||
if (code) {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id,
|
||||
tstrerror(code), ref);
|
||||
return code;
|
||||
}
|
||||
|
||||
notRsp = taosArrayGetSize(pNotRspList);
|
||||
if (notRsp == 0) {
|
||||
streamClearChkptReadyMsg(pActiveInfo);
|
||||
} else {
|
||||
doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = param;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
const char* id = pTask->id.idStr;
|
||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
|
||||
SArray* pNotRspList = NULL;
|
||||
int32_t code = 0;
|
||||
int32_t notRsp = 0;
|
||||
|
||||
// check the status every 100ms
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
|
@ -1004,7 +1059,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
if (++pTmrInfo->activeCounter < 50) {
|
||||
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
"chkpt-ready-monitor");
|
||||
return;
|
||||
}
|
||||
|
@ -1027,45 +1082,26 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
streamMutexLock(&pActiveInfo->lock);
|
||||
code = chkptReadyMsgSendHelper(pTask, pNotRspList);
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
|
||||
SArray* pList = pActiveInfo->pReadyMsgList;
|
||||
int32_t num = taosArrayGetSize(pList);
|
||||
int32_t code = doTaskChkptStatusCheck(pTask, num);
|
||||
if (code) {
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id);
|
||||
if (code) {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id,
|
||||
tstrerror(code), ref);
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
|
||||
taosArrayDestroy(pNotRspList);
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t checkpointId = pActiveInfo->activeId;
|
||||
int32_t notRsp = taosArrayGetSize(pNotRspList);
|
||||
doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList);
|
||||
|
||||
notRsp = taosArrayGetSize(pNotRspList);
|
||||
if (notRsp > 0) { // send checkpoint-ready msg again
|
||||
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
stDebug("s-task:%s start to monitor checkpoint-ready msg recv status in 10s", id);
|
||||
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
"chkpt-ready-monitor");
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
} else {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stDebug(
|
||||
"s-task:%s vgId:%d checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg and quit "
|
||||
"from timer, ref:%d",
|
||||
id, vgId, ref);
|
||||
|
||||
streamClearChkptReadyMsg(pActiveInfo);
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
// release should be the last execution, since pTask may be destroy after it immidiately.
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
}
|
||||
|
@ -1124,7 +1160,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
|
|||
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
|
||||
streamMetaAcquireOneTask(pTask);
|
||||
|
||||
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
"chkpt-ready-monitor");
|
||||
|
||||
// mark the timer monitor checkpointId
|
||||
|
@ -1190,6 +1226,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
|
|||
taosMemoryFree(buf);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen);
|
||||
|
||||
int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN;
|
||||
|
@ -1359,29 +1396,11 @@ void initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t upstr
|
|||
pReadyInfo->childId = childId;
|
||||
}
|
||||
|
||||
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, int32_t index, int64_t checkpointId) {
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SStreamUpstreamEpInfo* pInfo = NULL;
|
||||
streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo);
|
||||
if (pInfo == NULL) {
|
||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
}
|
||||
|
||||
STaskCheckpointReadyInfo info = {0};
|
||||
initCheckpointReadyInfo(&info, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId);
|
||||
|
||||
stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64 "-0x%x (vgId:%d) idx:%d",
|
||||
pTask->id.idStr, pTask->info.taskLevel, pTask->id.streamId, pInfo->taskId, pInfo->nodeId, index);
|
||||
|
||||
static int32_t doAddChkptReadyMsg(SStreamTask* pTask, STaskCheckpointReadyInfo* pInfo) {
|
||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||
|
||||
streamMutexLock(&pActiveInfo->lock);
|
||||
void* px = taosArrayPush(pActiveInfo->pReadyMsgList, &info);
|
||||
void* px = taosArrayPush(pActiveInfo->pReadyMsgList, pInfo);
|
||||
if (px == NULL) {
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
stError("s-task:%s failed to add readyMsg info, code: out of memory", pTask->id.idStr);
|
||||
return terrno;
|
||||
}
|
||||
|
@ -1395,10 +1414,36 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
|
|||
stDebug("s-task:%s %d/%d checkpoint-trigger recv", pTask->id.idStr, numOfRecv, total);
|
||||
}
|
||||
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, int32_t index, int64_t checkpointId) {
|
||||
int32_t code = 0;
|
||||
STaskCheckpointReadyInfo info = {0};
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SStreamUpstreamEpInfo* pInfo = NULL;
|
||||
streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo);
|
||||
if (pInfo == NULL) {
|
||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
}
|
||||
|
||||
initCheckpointReadyInfo(&info, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId);
|
||||
|
||||
stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64 "-0x%x (vgId:%d) idx:%d",
|
||||
pTask->id.idStr, pTask->info.taskLevel, pTask->id.streamId, pInfo->taskId, pInfo->nodeId, index);
|
||||
|
||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||
|
||||
streamMutexLock(&pActiveInfo->lock);
|
||||
code = doAddChkptReadyMsg(pTask, &info);
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
return code;
|
||||
}
|
||||
|
||||
void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) {
|
||||
if (pActiveInfo == NULL) {
|
||||
return;
|
||||
|
|
|
@ -117,12 +117,14 @@ void streamTaskResumeInFuture(SStreamTask* pTask) {
|
|||
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
void streamTaskResumeHelper(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = (SStreamTask*)param;
|
||||
SStreamTaskId* pId = &pTask->id;
|
||||
SStreamTaskState p = streamTaskGetStatus(pTask);
|
||||
SStreamTask* pTask = (SStreamTask*)param;
|
||||
SStreamTaskId* pId = &pTask->id;
|
||||
SStreamTaskState p = streamTaskGetStatus(pTask);
|
||||
int32_t code = 0;
|
||||
|
||||
if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) {
|
||||
int8_t status = streamTaskSetSchedStatusInactive(pTask);
|
||||
TAOS_UNUSED(status);
|
||||
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s status:%s not resume task, ref:%d", pId->idStr, p.name, ref);
|
||||
|
@ -131,13 +133,12 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
|
|||
return;
|
||||
}
|
||||
|
||||
int32_t code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK);
|
||||
code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK);
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
if (code) {
|
||||
stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, tstrerror(code), ref);
|
||||
} else {
|
||||
stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime,
|
||||
ref);
|
||||
stDebug("trigger to resume s-task:%s after idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime, ref);
|
||||
|
||||
// release the task ref count
|
||||
streamTaskClearSchedIdleInfo(pTask);
|
||||
|
|
|
@ -64,7 +64,6 @@ static int32_t streamTaskSetReady(SStreamTask* pTask) {
|
|||
|
||||
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
|
||||
SStreamScanHistoryReq req;
|
||||
int32_t code = 0;
|
||||
initScanHistoryReq(pTask, &req, igUntreated);
|
||||
|
||||
int32_t len = sizeof(SStreamScanHistoryReq);
|
||||
|
@ -173,7 +172,7 @@ int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask) {
|
|||
code = streamTaskStartScanHistory(pTask);
|
||||
}
|
||||
|
||||
// NOTE: there will be an deadlock if launch fill history here.
|
||||
// NOTE: there will be a deadlock if launch fill history here.
|
||||
// start the related fill-history task, when current task is ready
|
||||
// if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
// streamLaunchFillHistoryTask(pTask);
|
||||
|
@ -219,7 +218,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
|||
|
||||
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId);
|
||||
|
||||
// Set the execute conditions, including the query time window and the version range
|
||||
// Set the execution conditions, including the query time window and the version range
|
||||
streamMetaRLock(pMeta);
|
||||
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
|
||||
streamMetaRUnLock(pMeta);
|
||||
|
|
|
@ -742,6 +742,20 @@ char *tz_win[554][2] = {{"Asia/Shanghai", "China Standard Time"},
|
|||
|
||||
static int isdst_now = 0;
|
||||
|
||||
void parseTimeStr(char *p, char to[5]) {
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
if (strlen(p) > i) {
|
||||
to[i] = p[i];
|
||||
} else {
|
||||
to[i] = '0';
|
||||
}
|
||||
}
|
||||
if (strlen(p) == 2) {
|
||||
to[1] = '0';
|
||||
to[2] = p[1];
|
||||
}
|
||||
}
|
||||
|
||||
int32_t taosSetSystemTimezone(const char *inTimezoneStr, char *outTimezoneStr, int8_t *outDaylight,
|
||||
enum TdTimezone *tsTimezone) {
|
||||
if (inTimezoneStr == NULL || inTimezoneStr[0] == 0) {
|
||||
|
@ -798,7 +812,9 @@ int32_t taosSetSystemTimezone(const char *inTimezoneStr, char *outTimezoneStr, i
|
|||
memcpy(&winStr[3], pp, ppp - pp);
|
||||
indexStr = ppp - pp + 3;
|
||||
}
|
||||
sprintf(&winStr[indexStr], "%c%c%c:%c%c:00", (p[0] == '+' ? '+' : '-'), p[1], p[2], p[3], p[4]);
|
||||
char to[5];
|
||||
parseTimeStr(p, to);
|
||||
sprintf(&winStr[indexStr], "%c%c%c:%c%c:00", (to[0] == '+' ? '+' : '-'), to[1], to[2], to[3], to[4]);
|
||||
*tsTimezone = -taosStr2Int32(p, NULL, 10);
|
||||
} else {
|
||||
*tsTimezone = 0;
|
||||
|
@ -806,7 +822,9 @@ int32_t taosSetSystemTimezone(const char *inTimezoneStr, char *outTimezoneStr, i
|
|||
}
|
||||
_putenv(winStr);
|
||||
_tzset();
|
||||
strcpy(outTimezoneStr, inTimezoneStr);
|
||||
if (outTimezoneStr != inTimezoneStr) {
|
||||
strcpy(outTimezoneStr, inTimezoneStr);
|
||||
}
|
||||
*outDaylight = 0;
|
||||
|
||||
#elif defined(_TD_DARWIN_64)
|
||||
|
|
|
@ -0,0 +1,378 @@
|
|||
from frame.log import *
|
||||
from frame.cases import *
|
||||
from frame.sql import *
|
||||
from frame.caseBase import *
|
||||
from frame import *
|
||||
from frame.eos import *
|
||||
import random
|
||||
import string
|
||||
|
||||
"""
|
||||
TD-32198: https://jira.taosdata.com:18080/browse/TD-32198
|
||||
Having关键字的专项测试,主要覆盖以下 4 种场景:
|
||||
1、普通聚合查询
|
||||
2、关联查询
|
||||
3、窗口切分查询
|
||||
4、流计算中的窗口切分查询
|
||||
"""
|
||||
|
||||
|
||||
class TDTestCase(TBase):
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor())
|
||||
|
||||
def prepare_global_data(self):
|
||||
tdSql.execute("DROP DATABASE IF EXISTS db_td32198;")
|
||||
tdSql.execute("create database db_td32198;")
|
||||
tdSql.execute("use db_td32198;")
|
||||
|
||||
def prepare_agg_data(self):
|
||||
# database for case TD-32198
|
||||
# super table
|
||||
tdSql.execute("DROP STABLE IF EXISTS meters")
|
||||
tdSql.execute("CREATE STABLE `meters` (`ts` TIMESTAMP , `current` FLOAT , `voltage` INT , `phase` FLOAT ) \
|
||||
TAGS (`groupid` TINYINT, `location` VARCHAR(16));")
|
||||
|
||||
# child table
|
||||
tdSql.execute("CREATE TABLE `ct_1` USING `meters` (`groupid`, `location`) TAGS (1, 'beijing');")
|
||||
# tdSql.execute("CREATE TABLE `ct_2` USING `meters` (`groupid`, `location`) TAGS (2, 'shanghai');")
|
||||
|
||||
data = [
|
||||
('2020-06-01 00:00:00.000', 0.1000000, 12, 0.0200000),
|
||||
('2020-06-01 00:15:00.000', 0.3614670, 18, 0.1071560),
|
||||
('2020-06-01 00:30:00.000', 0.5209450, 18, 0.1736480),
|
||||
('2020-06-01 00:45:00.000', 0.8764570, 18, 0.2588190),
|
||||
('2020-06-01 01:00:00.000', 1.0260600, 14, 0.3620200),
|
||||
('2020-06-01 01:15:00.000', 1.3678550, 0, 0.4226180),
|
||||
('2020-06-01 01:30:00.000', 1.6000000, 12, 0.5200000),
|
||||
('2020-06-01 01:45:00.000', 1.8207290, 2, 0.5835760),
|
||||
('2020-06-01 02:00:00.000', 1.9283630, 18, 0.6527880),
|
||||
('2020-06-01 02:15:00.000', 2.1213200, 18, 0.7271070),
|
||||
('2020-06-01 02:30:00.000', 2.3981330, 12, 0.7760440),
|
||||
('2020-06-01 02:45:00.000', 2.4574561, 14, 0.8291520),
|
||||
('2020-06-01 03:00:00.000', 2.6980760, 14, 0.8760250),
|
||||
('2020-06-01 03:15:00.000', 2.8189230, 10, 0.9063080),
|
||||
('2020-06-01 03:30:00.000', 2.8190780, 6, 0.9396930),
|
||||
('2020-06-01 03:45:00.000', 2.8977780, 10, 0.9859260),
|
||||
('2020-06-01 04:00:00.000', 2.9544230, 4, 1.0048079),
|
||||
('2020-06-01 04:15:00.000', 2.9885840, 14, 1.0061949),
|
||||
('2020-06-01 04:30:00.000', 3.0999999, 6, 1.0200000),
|
||||
('2020-06-01 04:45:00.000', 3.0885839, 10, 1.0161951),
|
||||
('2020-06-01 05:00:00.000', 2.9544230, 18, 0.9848080),
|
||||
('2020-06-01 05:15:00.000', 2.9977770, 2, 0.9859260),
|
||||
('2020-06-01 05:30:00.000', 2.8190780, 0, 0.9496930),
|
||||
('2020-06-01 05:45:00.000', 2.7189231, 18, 0.9163080),
|
||||
('2020-06-01 06:00:00.000', 2.5980761, 10, 0.8860250)
|
||||
]
|
||||
|
||||
sql = "insert into ct_1 values";
|
||||
for t in data:
|
||||
sql += "('{}', {}, {}, {}),".format(t[0], t[1], t[2], t[3])
|
||||
sql += ";"
|
||||
tdSql.execute(sql)
|
||||
tdLog.debug("sql: %s" % sql)
|
||||
|
||||
def test_agg_having(self):
|
||||
tdSql.query("select voltage, sum(voltage), count(*) from ct_1 group by voltage;")
|
||||
tdSql.checkRows(8)
|
||||
tdSql.checkData(7, 2, 7)
|
||||
tdSql.checkData(7, 1, 126)
|
||||
|
||||
tdSql.query("select voltage, sum(voltage), count(*) from ct_1 group by voltage having count(voltage)>=4;");
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(2, 2, 7)
|
||||
tdSql.checkData(2, 1, 126)
|
||||
|
||||
tdSql.query("select voltage, sum(voltage), count(*) from ct_1 group by voltage having count(current)>=4;");
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(2, 2, 7)
|
||||
tdSql.checkData(2, 1, 126)
|
||||
|
||||
tdSql.query("select voltage, sum(voltage), count(*) from ct_1 group by voltage having voltage >=14;");
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 2, 4)
|
||||
tdSql.checkData(1, 1, 126)
|
||||
|
||||
tdSql.error("select voltage, count(*) from ct_1 group by voltage having current >1.0260600;");
|
||||
|
||||
def prepare_join_data(self):
|
||||
# super table
|
||||
tdSql.execute("DROP STABLE IF EXISTS meters")
|
||||
tdSql.execute("CREATE STABLE `meters` (`ts` TIMESTAMP , `current` FLOAT , `voltage` INT , `phase` FLOAT ) \
|
||||
TAGS (`groupid` TINYINT, `location` VARCHAR(16));")
|
||||
|
||||
# child table
|
||||
tdSql.execute("CREATE TABLE `ct_join_1` USING `meters` (`groupid`, `location`) TAGS (1, 'beijing');")
|
||||
tdSql.execute("CREATE TABLE `ct_join_2` USING `meters` (`groupid`, `location`) TAGS (2, 'shanghai');")
|
||||
|
||||
# insert data for ts4806
|
||||
data_join_1 = [
|
||||
('2020-06-01 00:00:00.000', 0.1000000, 12, 0.0200000),
|
||||
('2020-06-01 00:10:00.000', 1.2278550, 9, 0.4226180),
|
||||
('2020-06-01 00:15:00.000', 0.3614670, 18, 0.1071560),
|
||||
('2020-06-01 00:30:00.000', 0.5209450, 18, 0.1736480),
|
||||
('2020-06-01 00:40:00.000', 1.5230000, 10, 0.5200000),
|
||||
('2020-06-01 00:45:00.000', 0.8764570, 18, 0.2588190),
|
||||
('2020-06-01 00:50:00.000', 1.6507290, 11, 0.5835760),
|
||||
('2020-06-01 01:00:00.000', 1.0260600, 14, 0.3620200),
|
||||
('2020-06-01 01:15:00.000', 1.3678550, 0, 0.4226180),
|
||||
('2020-06-01 01:20:00.000', 1.1213200, 13, 0.7271070),
|
||||
('2020-06-01 01:30:00.000', 1.6000000, 12, 0.5200000),
|
||||
('2020-06-01 01:45:00.000', 1.8207290, 2, 0.5835760),
|
||||
('2020-06-01 02:00:00.000', 1.9283630, 18, 0.6527880),
|
||||
('2020-06-01 02:05:00.000', 0.9283630, 6, 0.6527880),
|
||||
('2020-06-01 02:15:00.000', 2.1213200, 18, 0.7271070)
|
||||
]
|
||||
|
||||
data_join_2 = [
|
||||
('2020-06-01 00:00:00.000', 0.3614670, 9, 0.0200000),
|
||||
('2020-06-01 00:15:00.000', 0.1000000, 12, 0.1071560),
|
||||
('2020-06-01 00:30:00.000', 0.5209450, 15, 0.1736480),
|
||||
('2020-06-01 00:45:00.000', 0.8764570, 18, 0.2588190),
|
||||
('2020-06-01 01:00:00.000', 1.0260600, 15, 0.3620200),
|
||||
('2020-06-01 01:15:00.000', 1.3678550, 7, 0.4226180),
|
||||
('2020-06-01 01:30:00.000', 1.6000000, 12, 0.5200000),
|
||||
('2020-06-01 01:45:00.000', 1.8207290, 7, 0.5835760),
|
||||
('2020-06-01 02:00:00.000', 1.0260600, 13, 0.6527880),
|
||||
('2020-06-01 02:15:00.000', 0.5209450, 18, 0.7271070)
|
||||
]
|
||||
|
||||
sql = "insert into ct_join_1 values";
|
||||
for t in data_join_1:
|
||||
sql += "('{}', {}, {}, {}),".format(t[0], t[1], t[2], t[3])
|
||||
sql += ";"
|
||||
tdSql.execute(sql)
|
||||
tdLog.debug("ct_join_1 sql: %s" % sql)
|
||||
|
||||
sql = "insert into ct_join_2 values";
|
||||
for t in data_join_2:
|
||||
sql += "('{}', {}, {}, {}),".format(t[0], t[1], t[2], t[3])
|
||||
sql += ";"
|
||||
tdSql.execute(sql)
|
||||
tdLog.debug("ct_join_2 sql: %s" % sql)
|
||||
|
||||
def test_join_having(self):
|
||||
tdSql.query("SELECT a.voltage, count(*) FROM ct_join_1 a JOIN ct_join_2 b ON a.ts = b.ts \
|
||||
group by a.voltage having count(*) > 4;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 1, 5)
|
||||
tdSql.checkData(0, 0, 18)
|
||||
|
||||
tdSql.error("SELECT a.voltage, count(*) FROM ct_join_1 a JOIN ct_join_2 b ON a.ts = b.ts \
|
||||
group by a.voltage having b.voltage > 14;")
|
||||
|
||||
tdSql.query("SELECT a.voltage, count(*) FROM ct_join_1 a left JOIN ct_join_2 b ON a.ts = b.ts \
|
||||
group by a.voltage having count(*) > 4;");
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 1, 5)
|
||||
tdSql.checkData(0, 0, 18)
|
||||
|
||||
tdSql.error("SELECT a.voltage, count(*) FROM ct_join_1 a left JOIN ct_join_2 b ON a.ts = b.ts \
|
||||
group by a.voltage having b.voltage > 14;");
|
||||
|
||||
tdSql.query("SELECT a.ts, a.voltage, avg(b.voltage) FROM ct_join_2 a LEFT WINDOW JOIN ct_join_1 b \
|
||||
WINDOW_OFFSET(-15m, 15m) where a.voltage >=18 and b.voltage > 11 having avg(b.voltage) > 17;");
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.error("SELECT a.ts, a.voltage, avg(b.voltage) FROM ct_join_2 a LEFT WINDOW JOIN ct_join_1 b \
|
||||
WINDOW_OFFSET(-15m, 15m) where a.voltage >=18 and b.voltage > 11 having b.voltage > 17;");
|
||||
|
||||
def prepare_window_data(self):
|
||||
# super table
|
||||
tdSql.execute("DROP STABLE IF EXISTS meters")
|
||||
tdSql.execute("CREATE STABLE `meters` (`ts` TIMESTAMP , `current` FLOAT , `voltage` INT , `phase` FLOAT ) \
|
||||
TAGS (`groupid` TINYINT, `location` VARCHAR(16));")
|
||||
|
||||
# child table
|
||||
tdSql.execute("CREATE TABLE `ct_win` USING `meters` (`groupid`, `location`) TAGS (1, 'beijing');")
|
||||
|
||||
# insert data for ts4806
|
||||
data_win = [
|
||||
('2020-06-01 00:00:00.000', 0.1000000, 12, 0.0200000),
|
||||
('2020-06-01 00:10:00.000', 1.2278550, 9, 0.4226180),
|
||||
('2020-06-01 00:15:00.000', 0.3614670, 18, 0.1071560),
|
||||
('2020-06-01 00:30:00.000', 0.5209450, 18, 0.1736480),
|
||||
('2020-06-01 00:40:00.000', 1.5230000, 18, 0.5200000),
|
||||
('2020-06-01 00:45:00.000', 0.8764570, 18, 0.2588190),
|
||||
('2020-06-01 00:50:00.000', 1.6507290, 11, 0.5835760),
|
||||
('2020-06-01 01:00:00.000', 1.0260600, 14, 0.3620200),
|
||||
('2020-06-01 01:15:00.000', 1.3678550, 14, 0.4226180),
|
||||
('2020-06-01 01:20:00.000', 1.1213200, 13, 0.7271070),
|
||||
('2020-06-01 01:30:00.000', 1.6000000, 12, 0.5200000),
|
||||
('2020-06-01 01:45:00.000', 1.8207290, 12, 0.5835760),
|
||||
('2020-06-01 02:00:00.000', 1.9283630, 18, 0.6527880),
|
||||
('2020-06-01 02:05:00.000', 0.9283630, 18, 0.6527880),
|
||||
('2020-06-01 02:15:00.000', 2.1213200, 18, 0.7271070)
|
||||
]
|
||||
|
||||
sql = "insert into ct_win values";
|
||||
for t in data_win:
|
||||
sql += "('{}', {}, {}, {}),".format(t[0], t[1], t[2], t[3])
|
||||
sql += ";"
|
||||
tdSql.execute(sql)
|
||||
tdLog.debug("data_win sql: %s" % sql)
|
||||
|
||||
def test_window_having(self):
|
||||
tdSql.query("SELECT _WSTART, _WEND, COUNT(*) FROM ct_win INTERVAL(15m) having count(*) > 1;")
|
||||
tdSql.checkRows(5)
|
||||
tdSql.checkData(0, 2, 2)
|
||||
|
||||
tdSql.error("SELECT _WSTART, _WEND, COUNT(*) FROM ct_win INTERVAL(15m) having voltage > 12;");
|
||||
|
||||
tdSql.query("SELECT _wstart, _wend, COUNT(*) AS cnt, FIRST(ts) AS fst, voltage FROM ct_win \
|
||||
STATE_WINDOW(voltage) having count(*) > 3;");
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 2, 4)
|
||||
|
||||
tdSql.error("SELECT _wstart, _wend, COUNT(*) AS cnt, FIRST(ts) AS fst, voltage FROM ct_win \
|
||||
STATE_WINDOW(voltage) having phase > 0.26;");
|
||||
|
||||
tdSql.query("SELECT _wstart, _wend, COUNT(*), FIRST(ts) FROM ct_win SESSION(ts, 10m) having count(*) > 3;");
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 2, 5)
|
||||
|
||||
tdSql.error("SELECT _wstart, _wend, COUNT(*), FIRST(ts) FROM ct_win SESSION(ts, 10m) having voltage > 12;");
|
||||
|
||||
tdSql.query("select _wstart, _wend, count(*), first(voltage), last(voltage) from ct_win \
|
||||
event_window start with voltage <= 12 end with voltage >= 17 having count(*) > 3;");
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 2, 7)
|
||||
tdSql.checkData(0, 3, 11)
|
||||
tdSql.checkData(0, 4, 18)
|
||||
|
||||
tdSql.error("select _wstart, _wend, count(*) from ct_win \
|
||||
event_window start with voltage <=12 end with voltage >= 17 having phase > 0.2;");
|
||||
|
||||
tdSql.query(
|
||||
"select _wstart, _wend, count(*), sum(voltage) from ct_win count_window(4) having sum(voltage) > 57;");
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 2, 4)
|
||||
tdSql.checkData(0, 3, 61)
|
||||
|
||||
tdSql.error("select _wstart, _wend, count(*), sum(voltage) from ct_win count_window(4) having voltage > 12;");
|
||||
|
||||
|
||||
def prepare_stream_window_data(self):
|
||||
# super table
|
||||
tdSql.execute("DROP STABLE IF EXISTS meters")
|
||||
tdSql.execute("CREATE STABLE `meters` (`ts` TIMESTAMP , `current` FLOAT , `voltage` INT , `phase` FLOAT ) \
|
||||
TAGS (`groupid` TINYINT, `location` VARCHAR(16));")
|
||||
|
||||
# child table
|
||||
tdSql.execute("CREATE TABLE `ct_steam_win` USING `meters` (`groupid`, `location`) TAGS (1, 'beijing');")
|
||||
|
||||
# insert data for ts4806
|
||||
data_win = [
|
||||
('2020-06-01 00:00:00.000', 0.1000000, 12, 0.0200000),
|
||||
('2020-06-01 00:10:00.000', 1.2278550, 9, 0.4226180),
|
||||
('2020-06-01 00:15:00.000', 0.3614670, 18, 0.1071560),
|
||||
('2020-06-01 00:30:00.000', 0.5209450, 18, 0.1736480),
|
||||
('2020-06-01 00:40:00.000', 1.5230000, 18, 0.5200000),
|
||||
('2020-06-01 00:45:00.000', 0.8764570, 18, 0.2588190),
|
||||
('2020-06-01 00:50:00.000', 1.6507290, 11, 0.5835760),
|
||||
('2020-06-01 01:00:00.000', 1.0260600, 14, 0.3620200),
|
||||
('2020-06-01 01:15:00.000', 1.3678550, 14, 0.4226180),
|
||||
('2020-06-01 01:20:00.000', 1.1213200, 13, 0.7271070),
|
||||
('2020-06-01 01:30:00.000', 1.6000000, 12, 0.5200000),
|
||||
('2020-06-01 01:45:00.000', 1.8207290, 12, 0.5835760),
|
||||
('2020-06-01 02:00:00.000', 1.9283630, 18, 0.6527880),
|
||||
('2020-06-01 02:05:00.000', 0.9283630, 18, 0.6527880),
|
||||
('2020-06-01 02:15:00.000', 2.1213200, 18, 0.7271070)
|
||||
]
|
||||
|
||||
sql = "insert into ct_win values";
|
||||
for t in data_win:
|
||||
sql += "('{}', {}, {}, {}),".format(t[0], t[1], t[2], t[3])
|
||||
sql += ";"
|
||||
tdSql.execute(sql)
|
||||
tdLog.debug("data_win sql: %s" % sql)
|
||||
|
||||
# 支持会话窗口、状态窗口、滑动窗口、事件窗口和计数窗口,
|
||||
# 其中,状态窗口、事件窗口 和 计数窗口 搭配超级表时必须与 partition by tbname 一起使用
|
||||
def test_stream_window_having(self):
|
||||
tdSql.execute("CREATE STREAM streams0 fill_history 1 INTO streamt0 AS \
|
||||
SELECT _WSTART, _WEND, COUNT(*) FROM meters PARTITION BY tbname INTERVAL(15m) having count(*) > 1;")
|
||||
tdSql.query("select * from streamt0;");
|
||||
tdSql.checkRows(5)
|
||||
tdSql.checkData(0, 2, 2)
|
||||
|
||||
tdSql.error("CREATE STREAM streams10 fill_history 1 INTO streamt10 AS SELECT _WSTART, _WEND, COUNT(*) \
|
||||
FROM meters PARTITION BY tbname INTERVAL(15m) having voltage > 12;");
|
||||
|
||||
|
||||
tdSql.execute("CREATE STREAM streams1 fill_history 1 INTO streamt1 AS \
|
||||
SELECT _wstart, _wend, COUNT(*) AS cnt, FIRST(ts) AS fst, voltage FROM meters PARTITION BY tbname \
|
||||
STATE_WINDOW(voltage) having count(*) > 3;");
|
||||
tdSql.query("select * from streamt1;");
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 2, 4)
|
||||
|
||||
tdSql.error("CREATE STREAM streams11 fill_history 1 INTO streamt11 AS \
|
||||
SELECT _wstart, _wend, COUNT(*) AS cnt, FIRST(ts) AS fst, voltage FROM meters PARTITION BY tbname \
|
||||
STATE_WINDOW(voltage) having phase > 0.26;");
|
||||
|
||||
|
||||
tdSql.execute("CREATE STREAM streams2 fill_history 1 INTO streamt2 AS \
|
||||
SELECT _wstart, _wend, COUNT(*), FIRST(ts) FROM meters SESSION(ts, 10m) having count(*) > 3;");
|
||||
tdSql.query("select * from streamt2;");
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 2, 5)
|
||||
|
||||
tdSql.error("CREATE STREAM streams12 fill_history 1 INTO streamt12 AS \
|
||||
SELECT _wstart, _wend, COUNT(*), FIRST(ts) FROM meters SESSION(ts, 10m) having voltage > 12;");
|
||||
|
||||
tdSql.execute("CREATE STREAM streams3 fill_history 1 INTO streamt3 AS \
|
||||
select _wstart, _wend, count(*), first(voltage), last(voltage) from meters PARTITION BY tbname \
|
||||
event_window start with voltage <= 12 end with voltage >= 17 having count(*) > 3;");
|
||||
tdSql.query("select * from streamt3;");
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 2, 7)
|
||||
tdSql.checkData(0, 3, 11)
|
||||
tdSql.checkData(0, 4, 18)
|
||||
|
||||
tdSql.error("CREATE STREAM streams13 fill_history 1 INTO streamt13 AS \
|
||||
select _wstart, _wend, count(*), first(voltage), last(voltage) from meters PARTITION BY tbname \
|
||||
event_window start with voltage <= 12 end with voltage >= 17 having phase > 0.2;");
|
||||
|
||||
tdSql.execute("CREATE STREAM streams4 fill_history 1 INTO streamt4 AS \
|
||||
select _wstart, _wend, count(*), sum(voltage) from meters PARTITION BY tbname \
|
||||
count_window(4) having sum(voltage) > 57;");
|
||||
tdSql.query("select * from streamt4;");
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 2, 4)
|
||||
tdSql.checkData(0, 3, 61)
|
||||
|
||||
tdSql.error("CREATE STREAM streams14 fill_history 1 INTO streamt14 AS \
|
||||
select _wstart, _wend, count(*), sum(voltage) from meters PARTITION BY tbname \
|
||||
count_window(4) having voltage > 12;");
|
||||
|
||||
|
||||
|
||||
def run(self):
|
||||
self.prepare_global_data()
|
||||
|
||||
self.prepare_agg_data()
|
||||
self.test_agg_having()
|
||||
|
||||
self.prepare_join_data()
|
||||
self.test_join_having()
|
||||
|
||||
self.prepare_window_data()
|
||||
self.test_window_having()
|
||||
|
||||
'''
|
||||
self.prepare_stream_window_data()
|
||||
self.test_stream_window_having()
|
||||
'''
|
||||
|
||||
def stop(self):
|
||||
tdSql.execute("drop database db_td32198;")
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -46,6 +46,7 @@
|
|||
,,y,army,./pytest.sh python3 ./test.py -f query/last/test_last.py
|
||||
,,y,army,./pytest.sh python3 ./test.py -f query/window/base.py
|
||||
,,y,army,./pytest.sh python3 ./test.py -f query/sys/tb_perf_queries_exist_test.py -N 3
|
||||
,,y,army,./pytest.sh python3 ./test.py -f query/test_having.py
|
||||
|
||||
#
|
||||
# system test
|
||||
|
|
Loading…
Reference in New Issue