Merge branch '3.0' into enh/triggerCheckPoint2
This commit is contained in:
commit
60d1df0e25
|
@ -9,7 +9,7 @@ description: This document describes how to query data in TDengine.
|
|||
```sql
|
||||
SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE()}
|
||||
|
||||
SELECT [hints] [DISTINCT] select_list
|
||||
SELECT [hints] [DISTINCT] [TAGS] select_list
|
||||
from_clause
|
||||
[WHERE condition]
|
||||
[partition_by_clause]
|
||||
|
@ -225,6 +225,14 @@ The \_IROWTS pseudocolumn can only be used with INTERP function. This pseudocolu
|
|||
select _irowts, interp(current) from meters range('2020-01-01 10:00:00', '2020-01-01 10:30:00') every(1s) fill(linear);
|
||||
```
|
||||
|
||||
### TAGS Query
|
||||
|
||||
The TAGS keyword returns only tag columns from all child tables when only tag columns are specified. One row containing tag columns is returned for each child table.
|
||||
|
||||
```sql
|
||||
SELECT TAGS tag_name [, tag_name ...] FROM stb_name
|
||||
```
|
||||
|
||||
## Query Objects
|
||||
|
||||
`FROM` can be followed by a number of tables or super tables, or can be followed by a sub-query.
|
||||
|
|
|
@ -9,7 +9,7 @@ description: 查询数据的详细语法
|
|||
```sql
|
||||
SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE()}
|
||||
|
||||
SELECT [hints] [DISTINCT] select_list
|
||||
SELECT [hints] [DISTINCT] [TAGS] select_list
|
||||
from_clause
|
||||
[WHERE condition]
|
||||
[partition_by_clause]
|
||||
|
@ -160,6 +160,16 @@ SELECT DISTINCT col_name [, col_name ...] FROM tb_name;
|
|||
|
||||
:::
|
||||
|
||||
### 标签查询
|
||||
|
||||
当查询的列只有标签列时,`TAGS` 关键字可以指定返回所有子表的标签列。每个子表只返回一行标签列。
|
||||
|
||||
返回所有子表的标签列:
|
||||
|
||||
```sql
|
||||
SELECT TAGS tag_name [, tag_name ...] FROM stb_name
|
||||
```
|
||||
|
||||
### 结果集列名
|
||||
|
||||
`SELECT`子句中,如果不指定返回结果集合的列名,结果集列名称默认使用`SELECT`子句中的表达式名称作为列名称。此外,用户可使用`AS`来重命名返回结果集合中列的名称。例如:
|
||||
|
|
|
@ -356,9 +356,6 @@
|
|||
#define TK_WAL 338
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#define TK_NK_SPACE 600
|
||||
#define TK_NK_COMMENT 601
|
||||
#define TK_NK_ILLEGAL 602
|
||||
|
|
|
@ -189,8 +189,8 @@ int32_t streamInit();
|
|||
void streamCleanUp();
|
||||
|
||||
SStreamQueue* streamQueueOpen(int64_t cap);
|
||||
void streamQueueClose(SStreamQueue* pQueue);
|
||||
void streamQueueCleanup(SStreamQueue* pQueue);
|
||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
|
||||
|
||||
static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
|
||||
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
|
||||
|
|
|
@ -296,27 +296,33 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
|
|||
}
|
||||
|
||||
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
|
||||
int32_t code = walNextValidMsg(pReader);
|
||||
int32_t code = 0;
|
||||
|
||||
while(1) {
|
||||
code = walNextValidMsg(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
int64_t ver = pReader->pHead->head.version;
|
||||
SWalCont* pCont = &pReader->pHead->head;
|
||||
int64_t ver = pCont->version;
|
||||
if (ver > maxVer) {
|
||||
tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
|
||||
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
||||
int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
||||
if (pCont->msgType == TDMT_VND_SUBMIT) {
|
||||
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
|
||||
int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
|
||||
|
||||
void* data = taosMemoryMalloc(len);
|
||||
if (data == NULL) {
|
||||
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
terrno = code;
|
||||
|
||||
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
memcpy(data, pBody, len);
|
||||
|
@ -324,25 +330,36 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
|||
|
||||
*pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
|
||||
if (*pItem == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
terrno = code;
|
||||
tqError("%s failed to create data submit for stream since out of memory", id);
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
} else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) {
|
||||
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
|
||||
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
|
||||
} else if (pCont->msgType == TDMT_VND_DELETE) {
|
||||
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
|
||||
int32_t len = pCont->bodyLen - sizeof(SMsgHead);
|
||||
|
||||
code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
if (*pItem == NULL) {
|
||||
tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
|
||||
// we need to continue check next data in the wal files.
|
||||
continue;
|
||||
} else {
|
||||
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
|
||||
}
|
||||
} else {
|
||||
terrno = code;
|
||||
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
// todo ignore the error in wal?
|
||||
|
|
|
@ -16,23 +16,12 @@
|
|||
#include "tsdb.h"
|
||||
#include "vndCos.h"
|
||||
|
||||
// =============== PAGE-WISE FILE ===============
|
||||
int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
|
||||
static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
|
||||
int32_t code = 0;
|
||||
STsdbFD *pFD = NULL;
|
||||
const char *path = pFD->path;
|
||||
int32_t szPage = pFD->szPage;
|
||||
int32_t flag = pFD->flag;
|
||||
|
||||
*ppFD = NULL;
|
||||
|
||||
pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
|
||||
if (pFD == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
pFD->path = (char *)&pFD[1];
|
||||
strcpy(pFD->path, path);
|
||||
pFD->szPage = szPage;
|
||||
pFD->flag = flag;
|
||||
pFD->pFD = taosOpenFile(path, flag);
|
||||
if (pFD->pFD == NULL) {
|
||||
int errsv = errno;
|
||||
|
@ -55,8 +44,7 @@ int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **p
|
|||
goto _exit;
|
||||
}
|
||||
}
|
||||
pFD->szPage = szPage;
|
||||
pFD->pgno = 0;
|
||||
|
||||
pFD->pBuf = taosMemoryCalloc(1, szPage);
|
||||
if (pFD->pBuf == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -79,6 +67,30 @@ int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **p
|
|||
pFD->szFile = pFD->szFile / szPage;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
// =============== PAGE-WISE FILE ===============
|
||||
int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
|
||||
int32_t code = 0;
|
||||
STsdbFD *pFD = NULL;
|
||||
|
||||
*ppFD = NULL;
|
||||
|
||||
pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
|
||||
if (pFD == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
pFD->path = (char *)&pFD[1];
|
||||
strcpy(pFD->path, path);
|
||||
pFD->szPage = szPage;
|
||||
pFD->flag = flag;
|
||||
pFD->szPage = szPage;
|
||||
pFD->pgno = 0;
|
||||
|
||||
*ppFD = pFD;
|
||||
|
||||
_exit:
|
||||
|
@ -98,6 +110,13 @@ void tsdbCloseFile(STsdbFD **ppFD) {
|
|||
static int32_t tsdbWriteFilePage(STsdbFD *pFD) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (!pFD->pFD) {
|
||||
code = tsdbOpenFileImpl(pFD);
|
||||
if (code) {
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
if (pFD->pgno > 0) {
|
||||
int64_t n = taosLSeekFile(pFD->pFD, PAGE_OFFSET(pFD->pgno, pFD->szPage), SEEK_SET);
|
||||
if (n < 0) {
|
||||
|
@ -127,6 +146,12 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
|
|||
int32_t code = 0;
|
||||
|
||||
// ASSERT(pgno <= pFD->szFile);
|
||||
if (!pFD->pFD) {
|
||||
code = tsdbOpenFileImpl(pFD);
|
||||
if (code) {
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
// seek
|
||||
int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
|
||||
|
|
|
@ -1009,7 +1009,7 @@ join_type(A) ::= INNER.
|
|||
|
||||
/************************************************ query_specification *************************************************/
|
||||
query_specification(A) ::=
|
||||
SELECT hint_list(M) tag_mode_opt(N) set_quantifier_opt(B) select_list(C) from_clause_opt(D)
|
||||
SELECT hint_list(M) set_quantifier_opt(B) tag_mode_opt(N) select_list(C) from_clause_opt(D)
|
||||
where_clause_opt(E) partition_by_clause_opt(F) range_opt(J) every_opt(K)
|
||||
fill_opt(L) twindow_clause_opt(G) group_by_clause_opt(H) having_clause_opt(I). {
|
||||
A = createSelectStmt(pCxt, B, C, D, M);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -979,6 +979,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
}
|
||||
|
||||
streamFreeQitem(pTask->msgInfo.pData);
|
||||
pTask->msgInfo.pData = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -48,12 +48,17 @@ void streamQueueClose(SStreamQueue* pQueue) {
|
|||
taosMemoryFree(pQueue);
|
||||
}
|
||||
|
||||
void streamQueueCleanup(SStreamQueue* pQueue) {
|
||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
|
||||
qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue));
|
||||
void* qItem = NULL;
|
||||
while ((qItem = streamQueueNextItem(pQueue)) != NULL) {
|
||||
streamFreeQitem(qItem);
|
||||
}
|
||||
|
||||
pQueue->status = STREAM_QUEUE__SUCESS;
|
||||
taosFreeQall(pQueue->qall);
|
||||
taosCloseQueue(pQueue->queue);
|
||||
taosMemoryFree(pQueue);
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
|
|
@ -283,11 +283,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
|
||||
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
|
||||
if (pTask->inputQueue) {
|
||||
streamQueueClose(pTask->inputQueue);
|
||||
streamQueueClose(pTask->inputQueue, pTask->id.taskId);
|
||||
}
|
||||
|
||||
if (pTask->outputInfo.queue) {
|
||||
streamQueueClose(pTask->outputInfo.queue);
|
||||
streamQueueClose(pTask->outputInfo.queue, pTask->id.taskId);
|
||||
}
|
||||
|
||||
if (pTask->exec.qmsg) {
|
||||
|
@ -319,6 +319,10 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
|
||||
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
|
||||
taosThreadMutexDestroy(&pTask->lock);
|
||||
if (pTask->msgInfo.pData != NULL) {
|
||||
destroyStreamDataBlock(pTask->msgInfo.pData);
|
||||
pTask->msgInfo.pData = NULL;
|
||||
}
|
||||
|
||||
if (pTask->id.idStr != NULL) {
|
||||
taosMemoryFree((void*)pTask->id.idStr);
|
||||
|
|
|
@ -384,7 +384,7 @@ static void taosGetLogFileName(char *fn) {
|
|||
}
|
||||
|
||||
static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
|
||||
#ifdef WINDOWS
|
||||
#ifdef WINDOWS_STASH
|
||||
/*
|
||||
* always set maxFileNum to 1
|
||||
* means client log filename is unique in windows
|
||||
|
|
Loading…
Reference in New Issue