Merge branch '3.0' into feat/TD-24834

This commit is contained in:
dm chen 2024-04-16 11:35:49 +08:00 committed by GitHub
commit aedec5ead7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 774 additions and 249 deletions

View File

@ -952,7 +952,7 @@ FIRST(expr)
**More explanation**:
- FIRST(\*) can be used to get the first non-null value of all columns
- FIRST(\*) can be used to get the first non-null value of all columns; When querying a super table and multiResultFunctionStarReturnTags is set to 0 (default), FIRST(\*) only returns columns of super table; When set to 1, returns columns and tags of the super table.
- NULL will be returned if all the values of the specified column are all NULL
- A result will NOT be returned if all the columns in the result set are all NULL
@ -1014,7 +1014,7 @@ LAST(expr)
**More explanation**:
- LAST(\*) can be used to get the last non-NULL value of all columns
- LAST(\*) can be used to get the last non-NULL value of all columns; When querying a super table and multiResultFunctionStarReturnTags is set to 0 (default), LAST(\*) only returns columns of super table; When set to 1, returns columns and tags of the super table.
- If the values of a column in the result set are all NULL, NULL is returned for that column; if all columns in the result are all NULL, no result will be returned.
- When it's used on a STable, if there are multiple values with the timestamp in the result set, one of them will be returned randomly and it's not guaranteed that the same value is returned if the same query is run multiple times.
@ -1035,6 +1035,7 @@ LAST_ROW(expr)
**More explanations**:
- LAST_ROW(\*) can be used to get the last value of all columns; When querying a super table and multiResultFunctionStarReturnTags is set to 0 (default), LAST_ROW(\*) only returns columns of super table; When set to 1, returns columns and tags of the super table.
- When it's used on a STable, if there are multiple values with the timestamp in the result set, one of them will be returned randomly and it's not guaranteed that the same value is returned if the same query is run multiple times.
- Can't be used with `INTERVAL`.

View File

@ -231,6 +231,16 @@ Please note the `taoskeeper` needs to be installed and running to create the `lo
| Default Value | 0 |
| Notes | When multiple of the above functions act on the same column at the same time and no alias is specified, if the order by clause refers to the column name, column selection ambiguous will occur because the aliases of multiple columns are the same. |
### multiResultFunctionStarReturnTags
| Attribute | Description |
| ------------- | --------------------------------------------------------------------------------------------------------------- |
| Applicable | Client only |
| Meaning | When querying a super table, whether last(\*)/last_row(\*)/first(\*) returns tags is affected by this parameter. When querying a normal table or subtable, this parameter has no effect. |
| Value Range | 0: do not return tags, 1: return tags |
| Default Value | 0 |
| Notes | When this parameter is set to 0, last(\*)/last_row(\*)/first(\*) only returns the columns of the super table; When it is 1, return the columns and tags of the super table. |
## Locale Parameters
### timezone

View File

@ -954,7 +954,7 @@ FIRST(expr)
**使用说明**:
- 如果要返回各个列的首个(时间戳最小)非 NULL 值,可以使用 FIRST(\*)
- 如果要返回各个列的首个(时间戳最小)非 NULL 值,可以使用 FIRST(\*)查询超级表且multiResultFunctionStarReturnTags设置为 0 (默认值) 时FIRST(\*)只返回超级表的普通列;设置为 1 时,返回超级表的普通列和标签列。
- 如果结果集中的某列全部为 NULL 值,则该列的返回结果也是 NULL
- 如果结果集中所有列全部为 NULL 值,则不返回结果。
@ -1006,7 +1006,7 @@ LAST(expr)
**使用说明**:
- 如果要返回各个列的最后(时间戳最大)一个非 NULL 值,可以使用 LAST(\*)
- 如果要返回各个列的最后(时间戳最大)一个非 NULL 值,可以使用 LAST(\*)查询超级表且multiResultFunctionStarReturnTags设置为 0 (默认值) 时LAST(\*)只返回超级表的普通列;设置为 1 时,返回超级表的普通列和标签列。
- 如果结果集中的某列全部为 NULL 值,则该列的返回结果也是 NULL如果结果集中所有列全部为 NULL 值,则不返回结果。
- 在用于超级表时,时间戳完全一样且同为最大的数据行可能有多个,那么会从中随机返回一条,而并不保证多次运行所挑选的数据行必然一致。
@ -1026,7 +1026,7 @@ LAST_ROW(expr)
**适用于**:表和超级表。
**使用说明**
- 如果要返回各个列的最后一条记录(时间戳最大),可以使用 LAST_ROW(\*)查询超级表且multiResultFunctionStarReturnTags设置为 0 (默认值) 时LAST_ROW(\*)只返回超级表的普通列;设置为 1 时,返回超级表的普通列和标签列。
- 在用于超级表时,时间戳完全一样且同为最大的数据行可能有多个,那么会从中随机返回一条,而并不保证多次运行所挑选的数据行必然一致。
- 不能与 INTERVAL 一起使用。

View File

@ -230,6 +230,16 @@ taos -C
| 缺省值 | 1 |
| 补充说明 | 该参数设置为 1 时,如果查询中含有 GROUP BYPARTITION BY 以及 INTERVAL 子句且相应的组或窗口内数据为空或者NULL 对应的组或窗口将不返回查询结果 |
### multiResultFunctionStarReturnTags
| 属性 | 说明 |
| -------- | ---------------------------------------------------------------------------------------------------------------------------------------------- |
| 适用范围 | 仅客户端适用 |
| 含义 | 查询超级表时last(\*)/last_row(\*)/first(\*) 是否返回标签列;查询普通表、子表时,不受该参数影响。 |
| 取值范围 | 0不返回标签列1返回标签列 |
| 缺省值 | 0 |
| 补充说明 | 该参数设置为 0 时last(\*)/last_row(\*)/first(\*) 只返回超级表的普通列;为 1 时,返回超级表的普通列和标签列 |
## 区域相关
### timezone

View File

@ -178,6 +178,7 @@ extern int32_t tsMetaCacheMaxSize;
extern int32_t tsSlowLogThreshold;
extern int32_t tsSlowLogScope;
extern int32_t tsTimeSeriesThreshold;
extern bool tsMultiResultFunctionStarReturnTags;
// client
extern int32_t tsMinSlidingTime;

View File

@ -40,4 +40,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode);
#endif // TDENGINE_TQ_COMMON_H

View File

@ -304,9 +304,9 @@ typedef struct SStreamTaskId {
typedef struct SCheckpointInfo {
int64_t startTs;
int64_t checkpointId;
int64_t checkpointVer; // latest checkpointId version
int64_t checkpointId; // latest checkpoint id
int64_t checkpointVer; // latest checkpoint offset in wal
int64_t checkpointTime; // latest checkpoint time
int64_t processedVer;
int64_t nextProcessVer; // current offset in WAL, not serialize it
int64_t failedId; // record the latest failed checkpoint id
@ -386,6 +386,9 @@ typedef struct STaskExecStatisInfo {
int64_t created;
int64_t init;
int64_t start;
int64_t startCheckpointId;
int64_t startCheckpointVer;
int64_t step1Start;
double step1El;
int64_t step2Start;
@ -442,6 +445,7 @@ struct SStreamTask {
SCheckpointInfo chkInfo;
STaskExec exec;
SDataRange dataRange;
SVersionRange step2Range;
SHistoryTaskInfo hTaskInfo;
STaskId streamTaskId;
STaskExecStatisInfo execInfo;
@ -672,24 +676,34 @@ typedef struct {
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
typedef struct STaskCkptInfo {
int64_t latestId; // saved checkpoint id
int64_t latestVer; // saved checkpoint ver
int64_t latestTime; // latest checkpoint time
int64_t activeId; // current active checkpoint id
int32_t activeTransId; // checkpoint trans id
int8_t failed; // denote if the checkpoint is failed or not
} STaskCkptInfo;
typedef struct STaskStatusEntry {
STaskId id;
int32_t status;
int32_t statusLastDuration; // to record the last duration of current status
int64_t stage;
int32_t nodeId;
int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task
int64_t checkpointId; // current active checkpoint id
int32_t chkpointTransId; // checkpoint trans id
int8_t checkpointFailed; // denote if the checkpoint is failed or not
bool inputQChanging; // inputQ is changing or not
int64_t inputQUnchangeCounter;
double inputQUsed; // in MiB
double inputRate;
double sinkQuota; // existed quota size for sink task
double sinkDataSize; // sink to dst data size
STaskId id;
int32_t status;
int32_t statusLastDuration; // to record the last duration of current status
int64_t stage;
int32_t nodeId;
SVersionRange verRange; // start/end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task
bool inputQChanging; // inputQ is changing or not
int64_t inputQUnchangeCounter;
double inputQUsed; // in MiB
double inputRate;
double sinkQuota; // existed quota size for sink task
double sinkDataSize; // sink to dst data size
int64_t startTime;
int64_t startCheckpointId;
int64_t startCheckpointVer;
int64_t hTaskId;
STaskCkptInfo checkpointInfo;
} STaskStatusEntry;
typedef struct SStreamHbMsg {
@ -888,4 +902,4 @@ void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp);
}
#endif
#endif /* ifndef _STREAM_H_ */
#endif /* ifndef _STREAM_H_ */

View File

@ -163,6 +163,8 @@ static const SSysDbTableSchema userStbsSchema[] = {
static const SSysDbTableSchema streamSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "stream_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
@ -184,7 +186,16 @@ static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "start_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "start_ver", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "checkpoint_id", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "checkpoint_version", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema userTblsSchema[] = {

View File

@ -181,6 +181,7 @@ int32_t tsMetaCacheMaxSize = -1; // MB
int32_t tsSlowLogThreshold = 3; // seconds
int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL;
int32_t tsTimeSeriesThreshold = 50;
bool tsMultiResultFunctionStarReturnTags = false;
/*
* denote if the server needs to compress response message at the application layer to client, including query rsp,
@ -560,6 +561,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "multiResultFunctionStarReturnTags", tsMultiResultFunctionStarReturnTags, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1;
return 0;
}
@ -1130,6 +1133,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->i32;
tsExperimental = cfgGetItem(pCfg, "experimental")->bval;
tsMultiResultFunctionStarReturnTags = cfgGetItem(pCfg, "multiResultFunctionStarReturnTags")->bval;
return 0;
}
@ -1775,7 +1780,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
{"shellActivityTimer", &tsShellActivityTimer},
{"slowLogThreshold", &tsSlowLogThreshold},
{"useAdapter", &tsUseAdapter},
{"experimental", &tsExperimental}};
{"experimental", &tsExperimental},
{"multiResultFunctionStarReturnTags", &tsMultiResultFunctionStarReturnTags} };
if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) {
taosCfgSetOption(options, tListLen(options), pItem, false);

View File

@ -1345,9 +1345,31 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
// create time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
// stream id
char buf[128] = {0};
int32_t len = tintToHex(pStream->uid, &buf[4]);
buf[2] = '0';
buf[3] = 'x';
varDataSetLen(buf, len + 2);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, buf, false);
// related fill-history stream id
memset(buf, 0, tListLen(buf));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pStream->hTaskUid != 0) {
len = tintToHex(pStream->hTaskUid, &buf[4]);
varDataSetLen(buf, len + 2);
colDataSetVal(pColInfo, numOfRows, buf, false);
} else {
colDataSetVal(pColInfo, numOfRows, buf, true);
}
// related fill-history stream id
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -1510,13 +1532,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
// info
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
const char *sinkStr = "%.2fMiB";
sprintf(buf, sinkStr, pe->sinkDataSize);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// offset info
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd);
sprintf(buf, offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
}
STR_TO_VARSTR(vbuf, buf);
@ -1524,6 +1547,55 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
// start_time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startTime, false);
// start id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startCheckpointId, false);
// start ver
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startCheckpointVer, false);
// checkpoint time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pe->checkpointInfo.latestTime != 0) {
colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false);
} else {
colDataSetVal(pColInfo, numOfRows, 0, true);
}
// checkpoint_id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestId, false);
// checkpoint info
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false);
// ds_err_info
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, 0, true);
// history_task_id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pe->hTaskId != 0) {
memset(idstr, 0, tListLen(idstr));
len = tintToHex(pe->hTaskId, &idstr[4]);
idstr[2] = '0';
idstr[3] = 'x';
varDataSetLen(idstr, len + 2);
colDataSetVal(pColInfo, numOfRows, idstr, false);
} else {
colDataSetVal(pColInfo, numOfRows, 0, true);
}
// history_task_status
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, 0, true);
return TSDB_CODE_SUCCESS;
}

View File

@ -294,12 +294,14 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
streamTaskStatusCopy(pTaskEntry, p);
if ((p->checkpointId != 0) && p->checkpointFailed) {
STaskCkptInfo *pChkInfo = &p->checkpointInfo;
if ((pChkInfo->activeId != 0) && pChkInfo->failed) {
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
p->checkpointId, p->chkpointTransId);
pChkInfo->activeId, pChkInfo->activeTransId);
SFailedCheckpointInfo info = {
.transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
.transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId};
addIntoCheckpointList(pFailedTasks, &info);
}
}

View File

@ -62,8 +62,8 @@ SRpcMsg buildHbReq() {
entry.id.taskId = 5;
entry.id.streamId = defStreamId;
entry.checkpointId = 1;
entry.checkpointFailed = true;
entry.checkpointInfo.activeId = 1;
entry.checkpointInfo.failed = true;
taosArrayPush(msg.pTaskStatus, &entry);
}

View File

@ -31,7 +31,6 @@ extern "C" {
#endif
struct SSnode {
char* path;
SStreamMeta* pMeta;
SMsgCb msgCb;
};

View File

@ -32,6 +32,7 @@ static STaskId replaceStreamTaskId(SStreamTask *pTask) {
pTask->id.taskId = pTask->streamTaskId.taskId;
return id;
}
static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) {
ASSERT(pTask->info.fillHistory);
pTask->id.taskId = pId->taskId;
@ -48,46 +49,23 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
streamTaskOpenAllUpstreamInput(pTask);
STaskId taskId = {0};
if (pTask->info.fillHistory) {
taskId = replaceStreamTaskId(pTask);
code = tqExpandStreamTask(pTask, pSnode->pMeta, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
sndError("s-task:%s failed to open state for task", pTask->id.idStr);
return -1;
} else {
sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
if (pTask->info.fillHistory) {
restoreStreamTaskId(pTask, &taskId);
}
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
.vnode = NULL,
.numOfVgroups = numOfVgroups,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
};
initStreamStateAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, SNODE_HANDLE, pTask->id.taskId);
ASSERT(pTask->exec.pExecutor);
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
if (pChkInfo->checkpointId != 0) {
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
pChkInfo->processedVer = pChkInfo->checkpointVer;
pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64,
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
@ -117,11 +95,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pSnode->path = taosStrdup(path);
if (pSnode->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
@ -140,7 +113,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
return pSnode;
FAIL:
taosMemoryFree(pSnode->path);
taosMemoryFree(pSnode);
return NULL;
}
@ -156,7 +128,6 @@ void sndClose(SSnode *pSnode) {
streamMetaNotifyClose(pSnode->pMeta);
streamMetaCommit(pSnode->pMeta);
streamMetaClose(pSnode->pMeta);
taosMemoryFree(pSnode->path);
taosMemoryFree(pSnode);
}

View File

@ -711,22 +711,6 @@ end:
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
ASSERT(pTask->info.fillHistory);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
return id;
}
static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) {
ASSERT(pTask->info.fillHistory);
pTask->id.taskId = pId->taskId;
pTask->id.streamId = pId->streamId;
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("s-task:0x%x start to expand task", pTask->id.taskId);
@ -736,74 +720,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
return code;
}
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
STaskId taskId = {0};
if (pTask->info.fillHistory) {
taskId = replaceStreamTaskId(pTask);
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId);
return -1;
}
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
if (pTask->info.fillHistory) {
restoreStreamTaskId(pTask, &taskId);
}
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
.vnode = pTq->pVnode,
.initTqReader = 1,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
};
initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (pTask->exec.pExecutor == NULL) {
return -1;
}
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
STaskId taskId = {0};
if (pTask->info.fillHistory) {
taskId = replaceStreamTaskId(pTask);
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId);
return -1;
} else {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
if (pTask->info.fillHistory) {
restoreStreamTaskId(pTask, &taskId);
}
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
.vnode = NULL,
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList),
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
};
initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (pTask->exec.pExecutor == NULL) {
return -1;
}
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
code = tqExpandStreamTask(pTask, pTq->pStreamMeta, pTq->pVnode);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// sink
@ -839,12 +758,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
if (pChkInfo->checkpointId != 0) {
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
pChkInfo->processedVer = pChkInfo->checkpointVer;
pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
@ -890,33 +812,33 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
const char* id = pTask->id.idStr;
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
SVersionRange* pRange = &pTask->dataRange.range;
SVersionRange* pStep2Range = &pTask->step2Range;
// if it's an source task, extract the last version in wal.
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
pTask->execInfo.step2Start = taosGetTimestampMs();
if (done) {
qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pRange->minVer,
pRange->maxVer, 0.0);
qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer,
pStep2Range->maxVer, 0.0);
streamTaskPutTranstateIntoInputQ(pTask);
streamExecTask(pTask); // exec directly
} else {
STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64
", do secondary scan-history from WAL after halt the related stream task:%s",
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
id, pTask->info.taskLevel, pStep2Range->minVer, pStep2Range->maxVer, pWindow->skey, pWindow->ekey,
pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow);
int64_t dstVer = pTask->dataRange.range.minVer;
int64_t dstVer =pStep2Range->minVer;
pTask->chkInfo.nextProcessVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE);
/*int8_t status = */ streamTaskSetSchedStatusInactive(pTask);

View File

@ -242,21 +242,23 @@ int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
// todo handle memory error
bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
const char* id = pTask->id.idStr;
int64_t maxVer = pTask->dataRange.range.maxVer;
int64_t maxVer = pTask->step2Range.maxVer;
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
if ((pTask->info.fillHistory == 1) && ver > maxVer) {
if (!pTask->status.appendTranstateBlock) {
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
", not scan wal anymore, add transfer-state block into inputQ",
id, ver, maxVer);
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs",
id, pTask->step2Range.minVer, maxVer, el);
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
return true;
} else {
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal",
id, ver, maxVer);
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64
", not scan wal",
id, ver, pTask->step2Range.minVer, maxVer);
}
}
@ -389,7 +391,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
}
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX;
taosThreadMutexLock(&pTask->lock);

View File

@ -23,6 +23,69 @@ typedef struct STaskUpdateEntry {
int32_t transId;
} STaskUpdateEntry;
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
ASSERT(pTask->info.fillHistory);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
return id;
}
static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) {
ASSERT(pTask->info.fillHistory);
pTask->id.taskId = pId->taskId;
pTask->id.streamId = pId->streamId;
}
int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) {
int32_t vgId = pMeta->vgId;
STaskId taskId = {0};
if (pTask->info.fillHistory) {
taskId = replaceStreamTaskId(pTask);
}
pTask->pState = streamStateOpen(pMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
return -1;
} else {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
if (pTask->info.fillHistory) {
restoreStreamTaskId(pTask, &taskId);
}
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
};
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
handle.vnode = pVnode;
handle.initTqReader = 1;
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
}
initStorageAPI(&handle.api);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) {
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (pTask->exec.pExecutor == NULL) {
tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr);
return -1;
}
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
}
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);

View File

@ -37,7 +37,6 @@ typedef struct {
int64_t cid;
int64_t now;
TSKEY nextKey;
TSKEY maxDelKey;
int32_t fid;
int32_t expLevel;
SDiskID did;
@ -161,15 +160,13 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
int64_t numRecord = 0;
SMetaInfo info;
if (committer->ctx->fset == NULL && !committer->ctx->hasTSData) {
if (committer->ctx->maxKey < committer->ctx->maxDelKey) {
committer->ctx->nextKey = committer->ctx->maxKey + 1;
} else {
committer->ctx->nextKey = TSKEY_MAX;
}
return 0;
if (committer->tsdb->imem->nDel == 0) {
goto _exit;
}
// do not need to write tomb data if there is no ts data
bool skip = (committer->ctx->fset == NULL && !committer->ctx->hasTSData);
committer->ctx->tbid->suid = 0;
committer->ctx->tbid->uid = 0;
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
@ -196,9 +193,11 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
record->skey = TMAX(record->skey, committer->ctx->minKey);
record->ekey = TMIN(record->ekey, committer->ctx->maxKey);
numRecord++;
code = tsdbFSetWriteTombRecord(committer->writer, record);
TSDB_CHECK_CODE(code, lino, _exit);
if (!skip) {
numRecord++;
code = tsdbFSetWriteTombRecord(committer->writer, record);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
code = tsdbIterMergerNext(committer->tombIterMerger);
@ -480,28 +479,11 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co
STbData *tbData = TCONTAINER_OF(node, STbData, rbtn);
for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) {
if (delData->sKey < committer->ctx->nextKey) {
committer->ctx->nextKey = delData->sKey;
}
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, delData->sKey);
}
}
}
committer->ctx->maxDelKey = TSKEY_MIN;
TSKEY minKey = TSKEY_MAX;
TSKEY maxKey = TSKEY_MIN;
if (TARRAY2_SIZE(committer->fsetArr) > 0) {
STFileSet *fset = TARRAY2_LAST(committer->fsetArr);
tsdbFidKeyRange(fset->fid, committer->minutes, committer->precision, &minKey, &committer->ctx->maxDelKey);
fset = TARRAY2_FIRST(committer->fsetArr);
tsdbFidKeyRange(fset->fid, committer->minutes, committer->precision, &minKey, &maxKey);
}
if (committer->ctx->nextKey < TMIN(tsdb->imem->minKey, minKey)) {
committer->ctx->nextKey = TMIN(tsdb->imem->minKey, minKey);
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);

View File

@ -5037,7 +5037,13 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int32_t step = asc ? 1 : -1;
int64_t ts = asc ? pReader->info.window.skey - 1 : pReader->info.window.ekey + 1;
int64_t ts = 0;
if (asc) {
ts = (pReader->info.window.skey > INT64_MIN)? pReader->info.window.skey-1:pReader->info.window.skey;
} else {
ts = (pReader->info.window.ekey < INT64_MAX)? pReader->info.window.ekey + 1:pReader->info.window.ekey;
}
resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
// no data in files, let's try buffer in memory

View File

@ -2093,12 +2093,14 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in
tDecoderInit(pCoder, pReq, len);
tDecodeDeleteRes(pCoder, pRes);
for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid);
code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey);
if (code) goto _err;
code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, pRes->ctimeMs);
if (code) goto _err;
if (pRes->affectedRows > 0) {
for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid);
code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey);
if (code) goto _err;
code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, pRes->ctimeMs);
if (code) goto _err;
}
}
code = tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len);
@ -2296,5 +2298,5 @@ _OVER:
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
return 0;
}
int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync);
int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; }
#endif

View File

@ -926,8 +926,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
" - %" PRId64,
qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 "-%" PRId64 ", window:%" PRId64
"-%" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
return 0;

View File

@ -2486,6 +2486,20 @@ static int32_t translateFunctionImpl(STranslateContext* pCxt, SFunctionNode** pF
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode** pFunc) {
SNode* pParam = NULL;
if (strcmp((*pFunc)->functionName, "tbname") == 0 && (*pFunc)->pParameterList != NULL) {
pParam = nodesListGetNode((*pFunc)->pParameterList, 0);
if(pParam && nodeType(pParam) == QUERY_NODE_VALUE) {
if (pCxt && pCxt->pCurrStmt && pCxt->pCurrStmt->type == QUERY_NODE_SELECT_STMT &&
((SSelectStmt*)pCxt->pCurrStmt)->pFromTable &&
nodeType(((SSelectStmt*)pCxt->pCurrStmt)->pFromTable) == QUERY_NODE_REAL_TABLE) {
SRealTableNode* pRealTable = (SRealTableNode*)((SSelectStmt*)pCxt->pCurrStmt)->pFromTable;
if (strcmp(((SValueNode*)pParam)->literal, pRealTable->table.tableName) == 0) {
NODES_DESTORY_LIST((*pFunc)->pParameterList);
(*pFunc)->pParameterList = NULL;
}
}
}
}
FOREACH(pParam, (*pFunc)->pParameterList) {
if (isMultiResFunc(pParam)) {
pCxt->errCode = TSDB_CODE_FUNC_FUNTION_PARA_NUM;
@ -2732,11 +2746,14 @@ static bool hasTbnameFunction(SNodeList* pPartitionByList) {
return false;
}
static bool fromSubtable(SNode* table) {
static bool fromSingleTable(SNode* table) {
if (NULL == table) return false;
if (table->type == QUERY_NODE_REAL_TABLE && ((SRealTableNode*)table)->pMeta &&
((SRealTableNode*)table)->pMeta->tableType == TSDB_CHILD_TABLE) {
return true;
if (table->type == QUERY_NODE_REAL_TABLE && ((SRealTableNode*)table)->pMeta) {
int8_t type = ((SRealTableNode*)table)->pMeta->tableType;
if(type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE
|| type == TSDB_SYSTEM_TABLE) {
return true;
}
}
return false;
}
@ -2835,7 +2852,7 @@ static EDealRes doCheckAggColCoexist(SNode** pNode, void* pContext) {
}
SNode* pPartKey = NULL;
bool partionByTbname = false;
if (fromSubtable(((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pFromTable) ||
if (fromSingleTable(((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pFromTable) ||
hasTbnameFunction(((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pPartitionByList)) {
partionByTbname = true;
}
@ -2844,7 +2861,9 @@ static EDealRes doCheckAggColCoexist(SNode** pNode, void* pContext) {
return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode);
}
}
if (partionByTbname && QUERY_NODE_COLUMN == nodeType(*pNode) && ((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) {
if (partionByTbname &&
((QUERY_NODE_COLUMN == nodeType(*pNode) && ((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) ||
(QUERY_NODE_FUNCTION == nodeType(*pNode) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)*pNode)->funcType))) {
return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode);
}
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
@ -3429,9 +3448,9 @@ static int32_t createMultiResFuncsParas(STranslateContext* pCxt, SNodeList* pSrc
SNode* pPara = NULL;
FOREACH(pPara, pSrcParas) {
if (nodesIsStar(pPara)) {
code = createAllColumns(pCxt, true, &pExprs);
code = createAllColumns(pCxt, !tsMultiResultFunctionStarReturnTags, &pExprs);
} else if (nodesIsTableStar(pPara)) {
code = createTableAllCols(pCxt, (SColumnNode*)pPara, true, &pExprs);
code = createTableAllCols(pCxt, (SColumnNode*)pPara, !tsMultiResultFunctionStarReturnTags, &pExprs);
} else {
code = nodesListMakeStrictAppend(&pExprs, nodesCloneNode(pPara));
}

View File

@ -2689,7 +2689,8 @@ static bool isNeedSplitCacheLastFunc(SFunctionNode* pFunc, SScanLogicNode* pScan
int32_t funcType = pFunc->funcType;
if ((FUNCTION_TYPE_LAST_ROW != funcType || (FUNCTION_TYPE_LAST_ROW == funcType && TSDB_CACHE_MODEL_LAST_VALUE == pScan->cacheLastMode)) &&
(FUNCTION_TYPE_LAST != funcType || (FUNCTION_TYPE_LAST == funcType && (TSDB_CACHE_MODEL_LAST_ROW == pScan->cacheLastMode ||
QUERY_NODE_OPERATOR == nodeType(nodesListGetNode(pFunc->pParameterList, 0)) || QUERY_NODE_VALUE == nodeType(nodesListGetNode(pFunc->pParameterList, 0))))) &&
QUERY_NODE_OPERATOR == nodeType(nodesListGetNode(pFunc->pParameterList, 0)) || QUERY_NODE_VALUE == nodeType(nodesListGetNode(pFunc->pParameterList, 0)) ||
COLUMN_TYPE_COLUMN != ((SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0))->colType))) &&
FUNCTION_TYPE_SELECT_VALUE != funcType && FUNCTION_TYPE_GROUP_KEY != funcType) {
return true;
}
@ -2709,8 +2710,9 @@ static bool lastRowScanOptCheckFuncList(SLogicNode* pNode, int8_t cacheLastModel
if (FUNCTION_TYPE_LAST == pAggFunc->funcType) {
if (QUERY_NODE_COLUMN == nodeType(pParam)) {
SColumnNode* pCol = (SColumnNode*)pParam;
if (pCol->colType != COLUMN_TYPE_COLUMN) {
return false;
if (pCol->colType != COLUMN_TYPE_COLUMN && TSDB_CACHE_MODEL_LAST_ROW != cacheLastModel) {
needSplitFuncCount++;
*hasOtherFunc = true;
}
if (lastColId != pCol->colId) {
lastColId = pCol->colId;
@ -2991,6 +2993,13 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
}
}
}
FOREACH(pColNode, pScan->pScanPseudoCols) {
if (nodesEqualNode(pParamNode, pColNode)) {
if (funcType != FUNCTION_TYPE_LAST) {
nodesListMakeAppend(&pLastRowCols, nodesCloneNode(pColNode));
}
}
}
}
}
@ -3129,6 +3138,10 @@ static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg,
if (TSDB_CODE_SUCCESS != code) {
return code;
}
code = nodesCollectColumnsFromNode((SNode*)list, NULL, COLLECT_COL_TYPE_TAG, &pScan->pScanPseudoCols);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
nodesFree(list);
bool found = false;
FOREACH(pNode, pScan->pScanCols) {
@ -3150,6 +3163,10 @@ static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg,
if (TSDB_CODE_SUCCESS != code) {
return code;
}
code = createColumnByRewriteExprs(pScan->pScanPseudoCols, &pScan->node.pTargets);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
OPTIMIZE_FLAG_CLEAR_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH);
}

View File

@ -309,6 +309,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
pCKInfo->checkpointId = pCKInfo->checkpointingId;
pCKInfo->checkpointVer = pCKInfo->processedVer;
pCKInfo->checkpointTime = pCKInfo->startTs;
streamTaskClearCheckInfo(p, false);
taosThreadMutexUnlock(&p->lock);

View File

@ -591,19 +591,16 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
}
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
tFreeStreamTask(pTask);
return -1;
}
taosArrayPush(pMeta->pTaskList, &pTask->id);
if (streamMetaSaveTask(pMeta, pTask) < 0) {
tFreeStreamTask(pTask);
return -1;
}
if (streamMetaCommit(pMeta) < 0) {
tFreeStreamTask(pTask);
return -1;
}
@ -960,11 +957,18 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1;
if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointId) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1;
if (tEncodeI32(pEncoder, ps->chkpointTransId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verRange.minVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verRange.maxVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.activeId) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointInfo.failed) < 0) return -1;
if (tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->hTaskId) < 0) return -1;
}
int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
@ -999,11 +1003,19 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointId) < 0) return -1;
if (tDecodeI8(pDecoder, &entry.checkpointFailed) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.chkpointTransId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verRange.minVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verRange.maxVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.activeId) < 0) return -1;
if (tDecodeI8(pDecoder, &entry.checkpointInfo.failed) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1;
entry.id.taskId = taskId;
taosArrayPush(pReq->pTaskStatus, &entry);
@ -1105,7 +1117,16 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
.status = streamTaskGetStatus(*pTask)->state,
.nodeId = hbMsg.vgId,
.stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
.startTime = (*pTask)->execInfo.start,
.checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId,
.checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer,
.checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime,
.hTaskId = (*pTask)->hTaskInfo.id.taskId,
.startCheckpointId = (*pTask)->execInfo.startCheckpointId,
.startCheckpointVer = (*pTask)->execInfo.startCheckpointVer,
};
entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
@ -1115,11 +1136,11 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
}
if ((*pTask)->chkInfo.checkpointingId != 0) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0;
entry.checkpointId = (*pTask)->chkInfo.checkpointingId;
entry.chkpointTransId = (*pTask)->chkInfo.transId;
entry.checkpointInfo.failed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0;
entry.checkpointInfo.activeId = (*pTask)->chkInfo.checkpointingId;
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.transId;
if (entry.checkpointFailed) {
if (entry.checkpointInfo.failed) {
stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId);
}
}
@ -1130,7 +1151,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
entry.processedVer = (*pTask)->chkInfo.processedVer;
}
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer);
}
addUpdateNodeIntoHbMsg(*pTask, &hbMsg);

View File

@ -44,7 +44,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId);
static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
int32_t streamTaskSetReady(SStreamTask* pTask) {
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
SStreamTaskState* p = streamTaskGetStatus(pTask);
if ((p->state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
@ -868,8 +868,10 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe
} else {
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to
// [pTask->dataRange.range.maxVer, ver1]
pRange->minVer = walScanStartVer;
pRange->maxVer = nextProcessVer - 1;
pTask->step2Range.minVer = walScanStartVer;
pTask->step2Range.maxVer = nextProcessVer - 1;
stDebug("s-task:%s set step2 verRange:%" PRId64 "-%" PRId64 ", step1 verRange:%" PRId64 "-%" PRId64, pTask->id.idStr,
pTask->step2Range.minVer, pTask->step2Range.maxVer, pRange->minVer, pRange->maxVer);
return false;
}
}

View File

@ -849,13 +849,15 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
pDst->inputQUsed = pSrc->inputQUsed;
pDst->inputRate = pSrc->inputRate;
pDst->processedVer = pSrc->processedVer;
pDst->verStart = pSrc->verStart;
pDst->verEnd = pSrc->verEnd;
pDst->verRange = pSrc->verRange;
pDst->sinkQuota = pSrc->sinkQuota;
pDst->sinkDataSize = pSrc->sinkDataSize;
pDst->checkpointId = pSrc->checkpointId;
pDst->checkpointFailed = pSrc->checkpointFailed;
pDst->chkpointTransId = pSrc->chkpointTransId;
pDst->checkpointInfo = pSrc->checkpointInfo;
pDst->startCheckpointId = pSrc->startCheckpointId;
pDst->startCheckpointVer = pSrc->startCheckpointVer;
pDst->startTime = pSrc->startTime;
pDst->hTaskId = pSrc->hTaskId;
}
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {

View File

@ -108,6 +108,10 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_cache_scan.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_cache_scan.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_cache_scan.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py

View File

@ -0,0 +1,189 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql alter local "multiResultFunctionStarReturnTags" "0";
print step1=====================
sql drop database if exists test;
sql create database test vgroups 4 CACHEMODEL 'both';
sql use test;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(3,3,3);
sql create table t4 using st tags(NULL,4,4);
sql insert into t1 values(1648791211000,1,1,1);
sql insert into t1 values(1648791211001,2,2,2);
sql insert into t2 values(1648791211002,3,3,3);
sql insert into t2 values(1648791211003,4,4,4);
sql insert into t3 values(1648791211004,5,5,5);
sql insert into t3 values(1648791211005,6,6,6);
sql insert into t4 values(1648791211007,NULL,NULL,NULL);
sql select last(*),last_row(*) from st;
if $cols != 8 then
print ======cols=$cols
return -1
endi
sql alter local "multiResultFunctionStarReturnTags" "1";
sql select last(*),last_row(*) from st;
if $cols != 14 then
print ======cols=$cols
return -1
endi
sql select last(*) from st;
if $cols != 7 then
return -1
endi
sql select last_row(*) from st;
if $cols != 7 then
return -1
endi
sql select last(*),last_row(*) from t1;
if $cols != 8 then
return -1
endi
print step2=====================
sql drop database if exists test1;
sql create database test1 vgroups 4 CACHEMODEL 'last_row';
sql use test1;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(3,3,3);
sql create table t4 using st tags(NULL,4,4);
sql insert into t1 values(1648791211000,1,1,1);
sql insert into t1 values(1648791211001,2,2,2);
sql insert into t2 values(1648791211002,3,3,3);
sql insert into t2 values(1648791211003,4,4,4);
sql insert into t3 values(1648791211004,5,5,5);
sql insert into t3 values(1648791211005,6,6,6);
sql insert into t4 values(1648791211007,NULL,NULL,NULL);
sql select last(*),last_row(*) from st;
if $cols != 14 then
return -1
endi
sql select last(*) from st;
if $cols != 7 then
return -1
endi
return -1
sql select last_row(*) from st;
if $cols != 7 then
return -1
endi
sql select last(*),last_row(*) from t1;
if $cols != 8 then
return -1
endi
print step3=====================
sql drop database if exists test2;
sql create database test2 vgroups 4 CACHEMODEL 'last_value';
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(3,3,3);
sql create table t4 using st tags(NULL,4,4);
sql insert into t1 values(1648791211000,1,1,1);
sql insert into t1 values(1648791211001,2,2,2);
sql insert into t2 values(1648791211002,3,3,3);
sql insert into t2 values(1648791211003,4,4,4);
sql insert into t3 values(1648791211004,5,5,5);
sql insert into t3 values(1648791211005,6,6,6);
sql insert into t4 values(1648791211007,NULL,NULL,NULL);
sql select last(*),last_row(*) from st;
if $cols != 14 then
return -1
endi
sql select last(*) from st;
if $cols != 7 then
return -1
endi
sql select last_row(*) from st;
if $cols != 7 then
return -1
endi
sql select last(*),last_row(*) from t1;
if $cols != 8 then
return -1
endi
sql drop database if exists test4;
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(3,3,3);
sql create table t4 using st tags(NULL,4,4);
sql insert into t1 values(1648791211000,1,1,1);
sql insert into t1 values(1648791211001,2,2,2);
sql insert into t2 values(1648791211002,3,3,3);
sql insert into t2 values(1648791211003,4,4,4);
sql insert into t3 values(1648791211004,5,5,5);
sql insert into t3 values(1648791211005,6,6,6);
sql insert into t4 values(1648791211007,NULL,NULL,NULL);
sql select last(*),last_row(*) from st;
if $cols != 14 then
return -1
endi
sql select last(*) from st;
if $cols != 7 then
return -1
endi
sql select last_row(*) from st;
if $cols != 7 then
return -1
endi
sql select last(*),last_row(*) from t1;
if $cols != 8 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -221,7 +221,7 @@ class TDTestCase:
tdSql.checkEqual(20470,len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdSql.checkEqual(True, len(tdSql.queryResult) in range(230, 231))
tdSql.checkEqual(True, len(tdSql.queryResult) in range(226, 241))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult))

View File

@ -147,11 +147,11 @@ class TDTestCase:
tdSql.execute(f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname')
tdSql.query(f'select * from information_schema.ins_streams where stream_name = "{stream_name}"')
print(tdSql.queryResult)
tdSql.checkEqual(tdSql.queryResult[0][2],f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname')
tdSql.checkEqual(tdSql.queryResult[0][4],f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname')
tdSql.execute(f'drop stream {stream_name}')
tdSql.execute(f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb')
tdSql.query(f'select * from information_schema.ins_streams where stream_name = "{stream_name}"')
tdSql.checkEqual(tdSql.queryResult[0][2],f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb')
tdSql.checkEqual(tdSql.queryResult[0][4],f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb')
tdSql.execute(f'drop database {self.dbname}')
def run(self):
self.drop_ntb_check()

View File

@ -0,0 +1,194 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def td29092(self, dbname="db"):
tdSql.execute(f'use {dbname}')
tdSql.execute('CREATE STABLE `st` (`ts` TIMESTAMP, `v1` INT) TAGS (`t1` INT);')
tdSql.execute('CREATE STABLE `st2` (`ts` TIMESTAMP, `v1` INT) TAGS (`t1` INT);')
tdSql.execute('CREATE TABLE `t1` USING `st` (`t1`) TAGS (1);')
tdSql.execute('CREATE TABLE `t2` USING `st` (`t1`) TAGS (2);')
tdSql.execute('CREATE TABLE `t21` USING `st2` (`t1`) TAGS (21);')
tdSql.execute('CREATE TABLE `nt` (`ts` TIMESTAMP, `v1` INT);')
now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
for i in range(3):
tdSql.execute(
f"insert into {dbname}.t1 values ( { now_time + i * 1000 }, {i} )"
)
tdSql.execute(
f"insert into {dbname}.t2 values ( { now_time + i * 1000 }, {i} )"
)
tdSql.execute(
f"insert into {dbname}.nt values ( { now_time + i * 1000 }, {i} )"
)
tdLog.debug(f"-------------- step1: normal table test ------------------")
tdSql.query("select tbname, count(*) from nt;")
tdSql.checkRows(1)
tdSql.checkData(0, 0, "nt")
tdSql.checkData(0, 1, 3)
tdSql.query("select nt.tbname, count(*) from nt;")
tdSql.checkRows(1)
tdSql.checkData(0, 0, "nt")
tdSql.checkData(0, 1, 3)
tdSql.query("select tbname, count(*) from nt group by tbname")
tdSql.checkRows(1)
tdSql.checkData(0, 0, "nt")
tdSql.checkData(0, 1, 3)
tdSql.query("select nt.tbname, count(*) from nt group by tbname")
tdSql.checkRows(1)
tdSql.checkData(0, 0, "nt")
tdSql.checkData(0, 1, 3)
tdSql.query("select nt.tbname, count(*) from nt group by nt.tbname")
tdSql.checkRows(1)
tdSql.checkData(0, 0, "nt")
tdSql.checkData(0, 1, 3)
tdLog.debug(f"-------------- step2: system table test ------------------")
tdSql.query("select tbname, count(*) from information_schema.ins_dnodes")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 1)
tdSql.query("select ins_dnodes.tbname, count(*) from information_schema.ins_dnodes")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 1)
tdSql.query("select tbname, count(*) from information_schema.ins_dnodes group by tbname")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 1)
tdSql.query("select ins_dnodes.tbname, count(*) from information_schema.ins_dnodes group by tbname")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 1)
tdSql.query("select ins_dnodes.tbname, count(*) from information_schema.ins_dnodes group by ins_dnodes.tbname")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 1)
tdLog.debug(f"-------------- step3: subtable test ------------------")
tdSql.query("select tbname, count(*) from t1")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 3)
tdSql.query("select t1.tbname, count(*) from t1")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 3)
tdSql.query("select tbname, count(*) from t1 group by tbname")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 3)
tdSql.query("select t1.tbname, count(*) from t1 group by tbname")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 3)
tdSql.query("select t1.tbname, count(*) from t1 group by t1.tbname")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 3)
tdSql.error("select t1.tbname, count(*) from t2 group by t1.tbname")
tdSql.error("select t1.tbname, count(*) from t1 group by t2.tbname")
tdSql.error("select t2.tbname, count(*) from t1 group by t1.tbname")
tdLog.debug(f"-------------- step4: super table test ------------------")
tdSql.query("select tbname, count(*) from st group by tbname")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 3)
tdSql.checkData(1, 1, 3)
tdSql.query("select tbname, count(*) from st partition by tbname")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 3)
tdSql.checkData(1, 1, 3)
tdSql.query("select ts, t1 from st where st.tbname=\"t1\"")
tdSql.checkRows(3)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 1)
tdSql.checkData(2, 1, 1)
tdSql.query("select tbname, ts from st where tbname=\"t2\"")
tdSql.checkRows(3)
tdSql.query("select tbname, ts from st where tbname=\"t2\" order by tbname")
tdSql.checkRows(3)
tdSql.query("select tbname, ts from st where tbname=\"t2\" order by st.tbname")
tdSql.checkRows(3)
tdSql.query("select tbname, count(*) from st where tbname=\"t2\" group by tbname order by tbname")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 3)
tdSql.query("select tbname, count(*) from st group by tbname order by tbname")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 3)
tdSql.checkData(1, 1, 3)
tdSql.query("select tbname, count(*) from st group by st.tbname order by st.tbname")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 3)
tdSql.checkData(1, 1, 3)
tdLog.debug(f"-------------- step4: join test ------------------")
tdSql.query("select t1.tbname, t2.tbname from t1, t2 where t1.ts=t2.ts and t1.tbname!=t2.tbname")
tdSql.checkRows(3)
tdSql.query("select t1.tbname, t2.tbname from t1, t2 where t1.ts=t2.ts and t1.tbname!=t2.tbname order by t1.tbname")
tdSql.checkRows(3)
tdSql.query("select st.tbname, st2.tbname from st, st2 where st.ts=st2.ts and st.tbname!=st2.tbname order by st.tbname")
tdSql.checkRows(0)
tdSql.execute(f"insert into t21 values ( { now_time + 1000 }, 1 )")
tdSql.query("select st.tbname, st2.tbname from st, st2 where st.ts=st2.ts and st.tbname!=st2.tbname order by st.tbname")
tdSql.checkRows(2)
tdSql.query("select t1.tbname, st2.tbname from t1, st2 where t1.ts=st2.ts and t1.tbname!=st2.tbname order by t1.tbname")
tdSql.checkRows(1)
tdSql.query("select nt.ts, st.tbname from nt, st where nt.ts=st.ts order by st.tbname")
tdSql.checkRows(6)
tdSql.query("select nt.ts, t1.tbname from nt, t1 where nt.ts=t1.ts order by t1.tbname")
tdSql.checkRows(3)
def run(self):
tdSql.prepare()
self.td29092()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())