Merge branch '3.0' into feature/TD-18581-D

This commit is contained in:
Cary Xu 2022-09-20 11:32:06 +08:00
commit e4dd4cc313
5 changed files with 26 additions and 18 deletions

View File

@ -66,7 +66,7 @@ order_expr:
A query can be performed on some or all columns. Data and tag columns can all be included in the SELECT list.
## Wildcards
### Wildcards
You can use an asterisk (\*) as a wildcard character to indicate all columns. For standard tables, the asterisk indicates only data columns. For supertables and subtables, tag columns are also included.
@ -136,6 +136,8 @@ taos> SELECT ts, ts AS primary_key_ts FROM d1001;
### Pseudocolumns
**Pseudocolumn:** A pseudo-column behaves like a table column but is not actually stored in the table. You can select from pseudo-columns, but you cannot insert, update, or delete their values. A pseudo-column is also similar to a function without arguments. This section describes these pseudo-columns:
**TBNAME**
The TBNAME pseudocolumn in a supertable contains the names of subtables within the supertable.

View File

@ -137,6 +137,8 @@ taos> SELECT ts, ts AS primary_key_ts FROM d1001;
### 伪列
**伪列**: 伪列的行为表现与普通数据列相似但其并不实际存储在表中。可以查询伪列,但不能对其做插入、更新和删除的操作。伪列有点像没有参数的函数。下面介绍是可用的伪列:
**TBNAME**
`TBNAME` 可以视为超级表中一个特殊的标签,代表子表的表名。

View File

@ -1064,6 +1064,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SOperatorInfo* pOperator, STimeWindowAggSupp* pTwSup);
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp);
void printDataBlock(SSDataBlock* pBlock, const char* flag);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);

View File

@ -1331,7 +1331,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
// must check update info first.
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
isDeletedWindow(&win, pBlock->info.groupId, pInfo->windowSup.pIntervalAggSup);
isDeletedStreamWindow(&win, pBlock->info.groupId, pInfo->pTableScanOp, &pInfo->twAggSup);
if ((update || closedWin) && out) {
qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
uint64_t gpId = closedWin && pInfo->partitionSup.needCalc
@ -1931,11 +1931,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pTagCond = pTagCond;
pInfo->pGroupTags = pTableScanNode->pGroupTags;
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pTableScanNode->watermark,
.calTrigger = pTableScanNode->triggerType,
.maxTs = INT64_MIN,
};
int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
@ -1985,7 +1980,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pUpdateInfo = NULL;
pInfo->pTableScanOp = pTableScanOp;
pInfo->interval = pTSInfo->pdInfo.interval;
pInfo->readHandle = *pHandle;
pInfo->tableUid = pScanPhyNode->uid;

View File

@ -1753,16 +1753,17 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
}
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval,
int64_t waterMark) {
STimeWindowAggSupp* pTwSup) {
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, waterMark);
initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, pTwSup);
return;
}
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup.parentType = type;
pScanInfo->windowSup.pIntervalAggSup = pSup;
pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, waterMark);
pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark);
pScanInfo->interval = *pInterval;
pScanInfo->twAggSup = *pTwSup;
}
void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
@ -1847,11 +1848,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL,
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) {
initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup, &pInfo->interval,
pInfo->twAggSup.waterMark);
}
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -2868,6 +2864,19 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
return p1 == NULL;
}
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SOperatorInfo* pOperator, STimeWindowAggSupp* pTwSup) {
if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
SWinKey key = {.ts = pWin->skey, .groupId = groupId};
void* pVal = NULL;
int32_t size = 0;
if (streamStateGet(pOperator->pTaskInfo->streamInfo.pState, &key, &pVal, &size) < 0) {
return false;
}
streamStateReleaseBuf(pOperator->pTaskInfo->streamInfo.pState, &key, pVal);
}
return false;
}
int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols, int32_t startPos, TSKEY eKey,
STimeWindow* pNextWin) {
int32_t forwardRows =
@ -3425,7 +3434,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, pInfo->twAggSup.waterMark);
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
}
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -5944,7 +5953,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamIntervalOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, pInfo->twAggSup.waterMark);
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;