Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2

This commit is contained in:
Haojun Liao 2023-08-25 17:32:50 +08:00
commit 43a3f9f535
12 changed files with 1468 additions and 214 deletions

View File

@ -9,7 +9,7 @@ description: This document describes how to query data in TDengine.
```sql
SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE()}
SELECT [hints] [DISTINCT] select_list
SELECT [hints] [DISTINCT] [TAGS] select_list
from_clause
[WHERE condition]
[partition_by_clause]
@ -225,6 +225,14 @@ The \_IROWTS pseudocolumn can only be used with INTERP function. This pseudocolu
select _irowts, interp(current) from meters range('2020-01-01 10:00:00', '2020-01-01 10:30:00') every(1s) fill(linear);
```
### TAGS Query
The TAGS keyword returns only tag columns from all child tables when only tag columns are specified. One row containing tag columns is returned for each child table.
```sql
SELECT TAGS tag_name [, tag_name ...] FROM stb_name
```
## Query Objects
`FROM` can be followed by a number of tables or super tables, or can be followed by a sub-query.

View File

@ -9,7 +9,7 @@ description: 查询数据的详细语法
```sql
SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE()}
SELECT [hints] [DISTINCT] select_list
SELECT [hints] [DISTINCT] [TAGS] select_list
from_clause
[WHERE condition]
[partition_by_clause]
@ -160,6 +160,16 @@ SELECT DISTINCT col_name [, col_name ...] FROM tb_name;
:::
### 标签查询
当查询的列只有标签列时,`TAGS` 关键字可以指定返回所有子表的标签列。每个子表只返回一行标签列。
返回所有子表的标签列:
```sql
SELECT TAGS tag_name [, tag_name ...] FROM stb_name
```
### 结果集列名
`SELECT`子句中,如果不指定返回结果集合的列名,结果集列名称默认使用`SELECT`子句中的表达式名称作为列名称。此外,用户可使用`AS`来重命名返回结果集合中列的名称。例如:

View File

@ -356,9 +356,6 @@
#define TK_WAL 338
#define TK_NK_SPACE 600
#define TK_NK_COMMENT 601
#define TK_NK_ILLEGAL 602

View File

@ -189,8 +189,8 @@ int32_t streamInit();
void streamCleanUp();
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue);
void streamQueueCleanup(SStreamQueue* pQueue);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);

View File

@ -296,53 +296,70 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
}
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
int32_t code = walNextValidMsg(pReader);
if (code != TSDB_CODE_SUCCESS) {
int32_t code = 0;
while(1) {
code = walNextValidMsg(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SWalCont* pCont = &pReader->pHead->head;
int64_t ver = pCont->version;
if (ver > maxVer) {
tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
return TSDB_CODE_SUCCESS;
}
if (pCont->msgType == TDMT_VND_SUBMIT) {
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
void* data = taosMemoryMalloc(len);
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
code = TSDB_CODE_OUT_OF_MEMORY;
terrno = code;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
return code;
}
memcpy(data, pBody, len);
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
*pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
if (*pItem == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
terrno = code;
tqError("%s failed to create data submit for stream since out of memory", id);
return code;
}
} else if (pCont->msgType == TDMT_VND_DELETE) {
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
int32_t len = pCont->bodyLen - sizeof(SMsgHead);
code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
if (code == TSDB_CODE_SUCCESS) {
if (*pItem == NULL) {
tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
// we need to continue check next data in the wal files.
continue;
} else {
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
}
} else {
terrno = code;
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
return code;
}
} else {
ASSERT(0);
}
return code;
}
int64_t ver = pReader->pHead->head.version;
if (ver > maxVer) {
tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id);
return TSDB_CODE_SUCCESS;
}
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
void* data = taosMemoryMalloc(len);
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
return -1;
}
memcpy(data, pBody, len);
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
*pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
if (*pItem == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("%s failed to create data submit for stream since out of memory", id);
return terrno;
}
} else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) {
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
} else {
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
}
} else {
ASSERT(0);
}
return 0;
}
// todo ignore the error in wal?

View File

@ -16,23 +16,12 @@
#include "tsdb.h"
#include "vndCos.h"
// =============== PAGE-WISE FILE ===============
int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
int32_t code = 0;
STsdbFD *pFD = NULL;
static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
int32_t code = 0;
const char *path = pFD->path;
int32_t szPage = pFD->szPage;
int32_t flag = pFD->flag;
*ppFD = NULL;
pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
if (pFD == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pFD->path = (char *)&pFD[1];
strcpy(pFD->path, path);
pFD->szPage = szPage;
pFD->flag = flag;
pFD->pFD = taosOpenFile(path, flag);
if (pFD->pFD == NULL) {
int errsv = errno;
@ -55,8 +44,7 @@ int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **p
goto _exit;
}
}
pFD->szPage = szPage;
pFD->pgno = 0;
pFD->pBuf = taosMemoryCalloc(1, szPage);
if (pFD->pBuf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -79,6 +67,30 @@ int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **p
pFD->szFile = pFD->szFile / szPage;
}
_exit:
return code;
}
// =============== PAGE-WISE FILE ===============
int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
int32_t code = 0;
STsdbFD *pFD = NULL;
*ppFD = NULL;
pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
if (pFD == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pFD->path = (char *)&pFD[1];
strcpy(pFD->path, path);
pFD->szPage = szPage;
pFD->flag = flag;
pFD->szPage = szPage;
pFD->pgno = 0;
*ppFD = pFD;
_exit:
@ -98,6 +110,13 @@ void tsdbCloseFile(STsdbFD **ppFD) {
static int32_t tsdbWriteFilePage(STsdbFD *pFD) {
int32_t code = 0;
if (!pFD->pFD) {
code = tsdbOpenFileImpl(pFD);
if (code) {
goto _exit;
}
}
if (pFD->pgno > 0) {
int64_t n = taosLSeekFile(pFD->pFD, PAGE_OFFSET(pFD->pgno, pFD->szPage), SEEK_SET);
if (n < 0) {
@ -127,6 +146,12 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
int32_t code = 0;
// ASSERT(pgno <= pFD->szFile);
if (!pFD->pFD) {
code = tsdbOpenFileImpl(pFD);
if (code) {
goto _exit;
}
}
// seek
int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);

View File

@ -1009,7 +1009,7 @@ join_type(A) ::= INNER.
/************************************************ query_specification *************************************************/
query_specification(A) ::=
SELECT hint_list(M) tag_mode_opt(N) set_quantifier_opt(B) select_list(C) from_clause_opt(D)
SELECT hint_list(M) set_quantifier_opt(B) tag_mode_opt(N) select_list(C) from_clause_opt(D)
where_clause_opt(E) partition_by_clause_opt(F) range_opt(J) every_opt(K)
fill_opt(L) twindow_clause_opt(G) group_by_clause_opt(H) having_clause_opt(I). {
A = createSelectStmt(pCxt, B, C, D, M);

File diff suppressed because it is too large Load Diff

View File

@ -979,6 +979,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
}
streamFreeQitem(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
return TSDB_CODE_SUCCESS;
}

View File

@ -40,20 +40,25 @@ SStreamQueue* streamQueueOpen(int64_t cap) {
return pQueue;
}
void streamQueueClose(SStreamQueue* pQueue) {
streamQueueCleanup(pQueue);
// void streamQueueClose(SStreamQueue* pQueue) {
// streamQueueCleanup(pQueue);
taosFreeQall(pQueue->qall);
taosCloseQueue(pQueue->queue);
taosMemoryFree(pQueue);
}
// taosFreeQall(pQueue->qall);
// taosCloseQueue(pQueue->queue);
// taosMemoryFree(pQueue);
// }
void streamQueueCleanup(SStreamQueue* pQueue) {
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue));
void* qItem = NULL;
while ((qItem = streamQueueNextItem(pQueue)) != NULL) {
streamFreeQitem(qItem);
}
pQueue->status = STREAM_QUEUE__SUCESS;
taosFreeQall(pQueue->qall);
taosCloseQueue(pQueue->queue);
taosMemoryFree(pQueue);
}
#if 0
@ -114,14 +119,13 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
}
#endif
// todo refactor:
// read data from input queue
typedef struct SQueueReader {
SStreamQueue* pQueue;
int32_t taskLevel;
int32_t maxBlocks; // maximum block in one batch
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
int32_t taskLevel;
int32_t maxBlocks; // maximum block in one batch
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
} SQueueReader;
#if 0
@ -221,7 +225,6 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
qItem->type == STREAM_INPUT__TRANS_STATE) {
if (*pInput == NULL) {
char* p = NULL;
if (qItem->type == STREAM_INPUT__CHECKPOINT) {
p = "checkpoint";
@ -237,7 +240,8 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
return TSDB_CODE_SUCCESS;
} else {
// previous existed blocks needs to be handle, before handle the checkpoint msg block
qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous blocks, numOfBlocks:%d", id, *numOfBlocks);
qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous blocks, numOfBlocks:%d", id,
*numOfBlocks);
streamQueueProcessFail(pTask->inputQueue);
return TSDB_CODE_SUCCESS;
}

View File

@ -283,11 +283,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputQueue) {
streamQueueClose(pTask->inputQueue);
streamQueueClose(pTask->inputQueue, pTask->id.taskId);
}
if (pTask->outputInfo.queue) {
streamQueueClose(pTask->outputInfo.queue);
streamQueueClose(pTask->outputInfo.queue, pTask->id.taskId);
}
if (pTask->exec.qmsg) {
@ -319,6 +319,10 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
taosThreadMutexDestroy(&pTask->lock);
if (pTask->msgInfo.pData != NULL) {
destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
}
if (pTask->id.idStr != NULL) {
taosMemoryFree((void*)pTask->id.idStr);

View File

@ -384,7 +384,7 @@ static void taosGetLogFileName(char *fn) {
}
static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
#ifdef WINDOWS
#ifdef WINDOWS_STASH
/*
* always set maxFileNum to 1
* means client log filename is unique in windows