Merge branch '3.0' into coverage/TD-28602-3.0
This commit is contained in:
commit
ee7070445e
|
@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list
|
||||||
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
|
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
|
||||||
|
|
||||||
hint:
|
hint:
|
||||||
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP
|
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT
|
||||||
|
|
||||||
select_list:
|
select_list:
|
||||||
select_expr [, select_expr] ...
|
select_expr [, select_expr] ...
|
||||||
|
@ -87,12 +87,13 @@ Hints are a means of user control over query optimization for individual stateme
|
||||||
|
|
||||||
The list of currently supported Hints is as follows:
|
The list of currently supported Hints is as follows:
|
||||||
|
|
||||||
| **Hint** | **Params** | **Comment** | **Scopt** |
|
| **Hint** | **Params** | **Comment** | **Scope** |
|
||||||
| :-----------: | -------------- | -------------------------- | -----------------------------------|
|
| :-----------: | -------------- | -------------------------- | -----------------------------------|
|
||||||
| BATCH_SCAN | None | Batch table scan | JOIN statment for stable |
|
| BATCH_SCAN | None | Batch table scan | JOIN statment for stable |
|
||||||
| NO_BATCH_SCAN | None | Sequential table scan | JOIN statment for stable |
|
| NO_BATCH_SCAN | None | Sequential table scan | JOIN statment for stable |
|
||||||
| SORT_FOR_GROUP| None | Use sort for partition, conflict with PARTITION_FIRST | With normal column in partition by list |
|
| SORT_FOR_GROUP| None | Use sort for partition, conflict with PARTITION_FIRST | With normal column in partition by list |
|
||||||
| PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list |
|
| PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list |
|
||||||
|
| PARA_TABLES_SORT| None | When sorting the supertable rows by timestamp, No temporary disk space is used. When there are numerous tables, each with long rows, the corresponding algorithm associated with this prompt may consume a substantial amount of memory, potentially leading to an Out Of Memory (OOM) situation. | Sorting the supertable rows by timestamp |
|
||||||
|
|
||||||
For example:
|
For example:
|
||||||
|
|
||||||
|
@ -100,6 +101,7 @@ For example:
|
||||||
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
|
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
|
||||||
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||||
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||||
|
SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts;
|
||||||
```
|
```
|
||||||
|
|
||||||
## Lists
|
## Lists
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list
|
||||||
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
|
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
|
||||||
|
|
||||||
hint:
|
hint:
|
||||||
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP
|
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT
|
||||||
|
|
||||||
select_list:
|
select_list:
|
||||||
select_expr [, select_expr] ...
|
select_expr [, select_expr] ...
|
||||||
|
@ -93,13 +93,14 @@ Hints 是用户控制单个语句查询优化的一种手段,当 Hint 不适
|
||||||
| NO_BATCH_SCAN | 无 | 采用顺序读表的方式 | 超级表 JOIN 语句 |
|
| NO_BATCH_SCAN | 无 | 采用顺序读表的方式 | 超级表 JOIN 语句 |
|
||||||
| SORT_FOR_GROUP| 无 | 采用sort方式进行分组, 与PARTITION_FIRST冲突 | partition by 列表有普通列时 |
|
| SORT_FOR_GROUP| 无 | 采用sort方式进行分组, 与PARTITION_FIRST冲突 | partition by 列表有普通列时 |
|
||||||
| PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 |
|
| PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 |
|
||||||
|
| PARA_TABLES_SORT| 无 | 超级表的数据按时间戳排序时, 不使用临时磁盘空间, 只使用内存。当子表数量多, 行长比较大时候, 会使用大量内存, 可能发生OOM | 超级表的数据按时间戳排序时 |
|
||||||
举例:
|
举例:
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
|
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
|
||||||
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||||
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||||
|
SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts;
|
||||||
```
|
```
|
||||||
|
|
||||||
## 列表
|
## 列表
|
||||||
|
|
|
@ -379,6 +379,7 @@
|
||||||
#define TK_NO_BATCH_SCAN 607
|
#define TK_NO_BATCH_SCAN 607
|
||||||
#define TK_SORT_FOR_GROUP 608
|
#define TK_SORT_FOR_GROUP 608
|
||||||
#define TK_PARTITION_FIRST 609
|
#define TK_PARTITION_FIRST 609
|
||||||
|
#define TK_PARA_TABLES_SORT 610
|
||||||
|
|
||||||
|
|
||||||
#define TK_NK_NIL 65535
|
#define TK_NK_NIL 65535
|
||||||
|
|
|
@ -26,6 +26,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
|
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
|
||||||
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
|
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
|
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
|
||||||
bool isLeader, bool restored);
|
bool isLeader, bool restored);
|
||||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
||||||
|
|
|
@ -121,6 +121,7 @@ typedef struct SScanLogicNode {
|
||||||
bool filesetDelimited; // returned blocks delimited by fileset
|
bool filesetDelimited; // returned blocks delimited by fileset
|
||||||
bool isCountByTag; // true if selectstmt hasCountFunc & part by tag/tbname
|
bool isCountByTag; // true if selectstmt hasCountFunc & part by tag/tbname
|
||||||
SArray* pFuncTypes; // for last, last_row
|
SArray* pFuncTypes; // for last, last_row
|
||||||
|
bool paraTablesSort; // for table merge scan
|
||||||
} SScanLogicNode;
|
} SScanLogicNode;
|
||||||
|
|
||||||
typedef struct SJoinLogicNode {
|
typedef struct SJoinLogicNode {
|
||||||
|
@ -443,6 +444,7 @@ typedef struct STableScanPhysiNode {
|
||||||
int8_t igCheckUpdate;
|
int8_t igCheckUpdate;
|
||||||
bool filesetDelimited;
|
bool filesetDelimited;
|
||||||
bool needCountEmptyTable;
|
bool needCountEmptyTable;
|
||||||
|
bool paraTablesSort;
|
||||||
} STableScanPhysiNode;
|
} STableScanPhysiNode;
|
||||||
|
|
||||||
typedef STableScanPhysiNode STableSeqScanPhysiNode;
|
typedef STableScanPhysiNode STableSeqScanPhysiNode;
|
||||||
|
|
|
@ -128,6 +128,7 @@ typedef enum EHintOption {
|
||||||
HINT_BATCH_SCAN,
|
HINT_BATCH_SCAN,
|
||||||
HINT_SORT_FOR_GROUP,
|
HINT_SORT_FOR_GROUP,
|
||||||
HINT_PARTITION_FIRST,
|
HINT_PARTITION_FIRST,
|
||||||
|
HINT_PARA_TABLES_SORT
|
||||||
} EHintOption;
|
} EHintOption;
|
||||||
|
|
||||||
typedef struct SHintNode {
|
typedef struct SHintNode {
|
||||||
|
|
|
@ -631,7 +631,6 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
|
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
|
||||||
|
|
||||||
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
|
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
|
||||||
bool isNull = false;
|
bool isNull = false;
|
||||||
if (pBlock->pBlockAgg == NULL) {
|
if (pBlock->pBlockAgg == NULL) {
|
||||||
|
|
|
@ -88,6 +88,7 @@ SArray *smGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
_OVER:
|
_OVER:
|
||||||
|
|
|
@ -837,6 +837,7 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -119,6 +119,7 @@ int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
|
||||||
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||||
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
|
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
|
||||||
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||||
|
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream);
|
||||||
|
|
||||||
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
|
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
|
||||||
void destroyStreamTaskIter(SStreamTaskIter *pIter);
|
void destroyStreamTaskIter(SStreamTaskIter *pIter);
|
||||||
|
|
|
@ -44,9 +44,8 @@ static bool hasCountWindowNode(SPhysiNode* pNode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool countWindowStreamTask(SSubplan* pPlan) {
|
static bool isCountWindowStreamTask(SSubplan* pPlan) {
|
||||||
SPhysiNode* pNode = pPlan->pNode;
|
return hasCountWindowNode((SPhysiNode*)pPlan->pNode);
|
||||||
return hasCountWindowNode(pNode);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
||||||
|
@ -342,13 +341,13 @@ static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan) {
|
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) {
|
||||||
bool hasCountWindowNode = countWindowStreamTask(pPlan);
|
bool hasCountWindowNode = isCountWindowStreamTask(pPlan);
|
||||||
bool isRelStreamTask = (pTask->hTaskInfo.id.taskId != 0);
|
|
||||||
if (hasCountWindowNode && isRelStreamTask) {
|
if (hasCountWindowNode && (!isFillhistoryTask)) {
|
||||||
SStreamStatus* pStatus = &pTask->status;
|
SStreamStatus* pStatus = &pTask->status;
|
||||||
mDebug("s-task:0x%x status is set to %s from %s for count window agg task with fill-history option set",
|
mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set",
|
||||||
pTask->id.taskId, streamTaskGetStatusStr(pStatus->taskStatus), streamTaskGetStatusStr(TASK_STATUS__HALT));
|
pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus));
|
||||||
pStatus->taskStatus = TASK_STATUS__HALT;
|
pStatus->taskStatus = TASK_STATUS__HALT;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -398,15 +397,17 @@ static void setHTasksId(SStreamObj* pStream) {
|
||||||
|
|
||||||
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
|
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
|
||||||
SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) {
|
SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) {
|
||||||
// new stream task
|
|
||||||
SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam);
|
SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
|
mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
|
||||||
|
|
||||||
haltInitialTaskStatus(pTask, plan);
|
if (pStream->conf.fillHistory) {
|
||||||
|
haltInitialTaskStatus(pTask, plan, isFillhistory);
|
||||||
|
}
|
||||||
|
|
||||||
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
|
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
|
||||||
|
|
||||||
|
@ -415,6 +416,7 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
|
|
||||||
#define MND_STREAM_MAX_NUM 60
|
#define MND_STREAM_MAX_NUM 60
|
||||||
|
|
||||||
typedef struct SMStreamNodeCheckMsg {
|
typedef struct {
|
||||||
int8_t placeHolder; // // to fix windows compile error, define place holder
|
int8_t placeHolder; // // to fix windows compile error, define place holder
|
||||||
} SMStreamNodeCheckMsg;
|
} SMStreamNodeCheckMsg;
|
||||||
|
|
||||||
|
@ -152,7 +152,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
|
||||||
goto STREAM_DECODE_OVER;
|
goto STREAM_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sver != MND_STREAM_VER_NUMBER) {
|
if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
|
mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
|
||||||
goto STREAM_DECODE_OVER;
|
goto STREAM_DECODE_OVER;
|
||||||
|
@ -1545,6 +1545,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mInfo("stream:%s,%"PRId64 " start to pause stream", pauseReq.name, pStream->uid);
|
||||||
|
|
||||||
if (pStream->status == STREAM_STATUS__PAUSE) {
|
if (pStream->status == STREAM_STATUS__PAUSE) {
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -16,6 +16,10 @@
|
||||||
#include "mndStream.h"
|
#include "mndStream.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SMsgHead head;
|
||||||
|
} SMStreamHbRspMsg;
|
||||||
|
|
||||||
typedef struct SFailedCheckpointInfo {
|
typedef struct SFailedCheckpointInfo {
|
||||||
int64_t streamUid;
|
int64_t streamUid;
|
||||||
int64_t checkpointId;
|
int64_t checkpointId;
|
||||||
|
@ -65,7 +69,7 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI
|
||||||
taosArrayPush(pList, pInfo);
|
taosArrayPush(pList, pInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
||||||
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
|
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -115,7 +119,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, in
|
||||||
} else {
|
} else {
|
||||||
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
|
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
|
||||||
pStream->uid, transId);
|
pStream->uid, transId);
|
||||||
code = createStreamResetStatusTrans(pMnode, pStream);
|
code = mndCreateStreamResetStatusTrans(pMnode, pStream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -211,7 +215,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
mInfo("receive pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name);
|
mInfo("receive pause stream:%s, %s, %"PRId64 ", because grant expired", pStream->name, reqPause.name, pStream->uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
|
@ -222,11 +226,11 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
|
||||||
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SStreamHbMsg req = {0};
|
SStreamHbMsg req = {0};
|
||||||
SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
SArray *pFailedTasks = NULL;
|
||||||
SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
|
SArray *pOrphanTasks = NULL;
|
||||||
|
|
||||||
if(grantCheckExpire(TSDB_GRANT_STREAMS) < 0){
|
if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) {
|
||||||
if(suspendAllStreams(pMnode, &pReq->info) < 0){
|
if (suspendAllStreams(pMnode, &pReq->info) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -244,6 +248,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
|
|
||||||
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
||||||
|
|
||||||
|
pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
||||||
|
pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
|
||||||
|
|
||||||
taosThreadMutexLock(&execInfo.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
|
|
||||||
// extract stream task list
|
// extract stream task list
|
||||||
|
@ -349,5 +356,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
taosArrayDestroy(pFailedTasks);
|
taosArrayDestroy(pFailedTasks);
|
||||||
taosArrayDestroy(pOrphanTasks);
|
taosArrayDestroy(pOrphanTasks);
|
||||||
|
|
||||||
|
{
|
||||||
|
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamHbRspMsg)};
|
||||||
|
rsp.pCont = rpcMallocCont(rsp.contLen);
|
||||||
|
SMsgHead* pHead = rsp.pCont;
|
||||||
|
pHead->vgId = htonl(req.vgId);
|
||||||
|
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
|
pReq->info.handle = NULL; // disable auto rsp
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -261,22 +261,30 @@ int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msg
|
||||||
return mndTransAppendRedoAction(pTrans, &action);
|
return mndTransAppendRedoAction(pTrans, &action);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool identicalName(const char* pDb, const char* pParam, int32_t len) {
|
||||||
|
return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
|
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
|
||||||
// data in the hash table will be removed automatically, no need to remove it here.
|
void *pIter = NULL;
|
||||||
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
|
|
||||||
if (pTransInfo == NULL) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// not checkpoint trans, ignore
|
while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
|
||||||
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
|
SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
|
||||||
mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId);
|
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *pDupDBName = strndup(pDBName, len);
|
SStreamObj *pStream = mndGetStreamObj(pMnode, pTransInfo->streamId);
|
||||||
mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName);
|
if (pStream != NULL) {
|
||||||
taosMemoryFree(pDupDBName);
|
if (identicalName(pStream->sourceDb, pDBName, len)) {
|
||||||
|
mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
|
||||||
|
} else if (identicalName(pStream->targetDb, pDBName, len)) {
|
||||||
|
mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb);
|
||||||
|
}
|
||||||
|
|
||||||
|
mndReleaseStream(pMnode, pStream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -231,6 +231,8 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mDebug("set the resume action for trans:%d", pTrans->id);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2484,7 +2484,7 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)privilege, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)privilege, false);
|
||||||
|
|
||||||
char objName[20] = {0};
|
char objName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(objName, "all", pShow->pMeta->pSchemas[cols].bytes);
|
STR_WITH_MAXSIZE_TO_VARSTR(objName, "all", pShow->pMeta->pSchemas[cols].bytes);
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)objName, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)objName, false);
|
||||||
|
|
File diff suppressed because one or more lines are too long
|
@ -178,6 +178,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
return tqStreamTaskProcessCheckRsp(pSnode->pMeta, pMsg, true);
|
return tqStreamTaskProcessCheckRsp(pSnode->pMeta, pMsg, true);
|
||||||
case TDMT_STREAM_TASK_CHECKPOINT_READY:
|
case TDMT_STREAM_TASK_CHECKPOINT_READY:
|
||||||
return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg);
|
return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg);
|
||||||
|
case TDMT_MND_STREAM_HEARTBEAT_RSP:
|
||||||
|
return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg);
|
||||||
default:
|
default:
|
||||||
sndError("invalid snode msg:%d", pMsg->msgType);
|
sndError("invalid snode msg:%d", pMsg->msgType);
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
|
@ -242,6 +242,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
||||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||||
int32_t tqScanWal(STQ* pTq);
|
int32_t tqScanWal(STQ* pTq);
|
||||||
|
|
|
@ -1220,3 +1220,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg);
|
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this function is needed, do not try to remove it.
|
||||||
|
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg);
|
||||||
|
}
|
||||||
|
|
|
@ -612,9 +612,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, true);
|
||||||
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, false);
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
|
|
||||||
// drop the stream task now
|
// drop the stream task now
|
||||||
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
@ -938,4 +936,11 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
|
||||||
|
|
||||||
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) {
|
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) {
|
||||||
return taosArrayGetSize(pMeta->pTaskList);
|
return taosArrayGetSize(pMeta->pTaskList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
|
|
||||||
#define ROCKS_BATCH_SIZE (4096)
|
#define ROCKS_BATCH_SIZE (4096)
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int32_t tsdbOpenBICache(STsdb *pTsdb) {
|
static int32_t tsdbOpenBICache(STsdb *pTsdb) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5);
|
SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5);
|
||||||
|
@ -52,6 +53,7 @@ static void tsdbCloseBICache(STsdb *pTsdb) {
|
||||||
taosThreadMutexDestroy(&pTsdb->biMutex);
|
taosThreadMutexDestroy(&pTsdb->biMutex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
|
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -1627,11 +1629,13 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
code = tsdbOpenBICache(pTsdb);
|
code = tsdbOpenBICache(pTsdb);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
code = tsdbOpenBCache(pTsdb);
|
code = tsdbOpenBCache(pTsdb);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1673,7 +1677,9 @@ void tsdbCloseCache(STsdb *pTsdb) {
|
||||||
taosThreadMutexDestroy(&pTsdb->lruMutex);
|
taosThreadMutexDestroy(&pTsdb->lruMutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
tsdbCloseBICache(pTsdb);
|
tsdbCloseBICache(pTsdb);
|
||||||
|
#endif
|
||||||
tsdbCloseBCache(pTsdb);
|
tsdbCloseBCache(pTsdb);
|
||||||
tsdbClosePgCache(pTsdb);
|
tsdbClosePgCache(pTsdb);
|
||||||
tsdbCloseRocksCache(pTsdb);
|
tsdbCloseRocksCache(pTsdb);
|
||||||
|
@ -3447,6 +3453,7 @@ int32_t tsdbCacheGetElems(SVnode *pVnode) {
|
||||||
return elems;
|
return elems;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) {
|
static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) {
|
||||||
struct {
|
struct {
|
||||||
int32_t fid;
|
int32_t fid;
|
||||||
|
@ -3523,7 +3530,6 @@ int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHa
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef BUILD_NO_CALL
|
|
||||||
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) {
|
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
|
|
@ -2543,7 +2543,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
||||||
while (1) {
|
while (1) {
|
||||||
// only check here, since the iterate data in memory is very fast.
|
// only check here, since the iterate data in memory is very fast.
|
||||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
|
||||||
return pReader->code;
|
return pReader->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2694,7 +2694,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
|
||||||
return pReader->code;
|
return pReader->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2909,7 +2909,7 @@ static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
|
||||||
return pReader->code;
|
return pReader->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2951,7 +2951,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
|
||||||
return pReader->code;
|
return pReader->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -796,6 +796,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
||||||
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_TASK_CHECKPOINT_READY:
|
case TDMT_STREAM_TASK_CHECKPOINT_READY:
|
||||||
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
|
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
|
||||||
|
case TDMT_MND_STREAM_HEARTBEAT_RSP:
|
||||||
|
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
|
||||||
default:
|
default:
|
||||||
vError("unknown msg type:%d in stream queue", pMsg->msgType);
|
vError("unknown msg type:%d in stream queue", pMsg->msgType);
|
||||||
return TSDB_CODE_APP_ERROR;
|
return TSDB_CODE_APP_ERROR;
|
||||||
|
|
|
@ -283,6 +283,42 @@ typedef struct STableScanInfo {
|
||||||
bool needCountEmptyTable;
|
bool needCountEmptyTable;
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
|
||||||
|
typedef enum ESubTableInputType {
|
||||||
|
SUB_TABLE_MEM_BLOCK,
|
||||||
|
SUB_TABLE_EXT_PAGES,
|
||||||
|
} ESubTableInputType;
|
||||||
|
|
||||||
|
typedef struct STmsSubTableInput {
|
||||||
|
STsdbReader* pReader;
|
||||||
|
SQueryTableDataCond tblCond;
|
||||||
|
STableKeyInfo* pKeyInfo;
|
||||||
|
bool bInMemReader;
|
||||||
|
ESubTableInputType type;
|
||||||
|
SSDataBlock* pReaderBlock;
|
||||||
|
|
||||||
|
SArray* aBlockPages;
|
||||||
|
SSDataBlock* pPageBlock;
|
||||||
|
int32_t pageIdx;
|
||||||
|
|
||||||
|
int32_t rowIdx;
|
||||||
|
int64_t* aTs;
|
||||||
|
} STmsSubTableInput;
|
||||||
|
|
||||||
|
typedef struct SBlockOrderInfo SBlockOrderInfo;
|
||||||
|
typedef struct STmsSubTablesMergeInfo {
|
||||||
|
SBlockOrderInfo* pOrderInfo;
|
||||||
|
|
||||||
|
int32_t numSubTables;
|
||||||
|
STmsSubTableInput* aInputs;
|
||||||
|
SMultiwayMergeTreeInfo* pTree;
|
||||||
|
int32_t numSubTablesCompleted;
|
||||||
|
|
||||||
|
int32_t numTableBlocksInMem;
|
||||||
|
SDiskbasedBuf* pBlocksBuf;
|
||||||
|
|
||||||
|
int32_t numInMemReaders;
|
||||||
|
} STmsSubTablesMergeInfo;
|
||||||
|
|
||||||
typedef struct STableMergeScanInfo {
|
typedef struct STableMergeScanInfo {
|
||||||
int32_t tableStartIndex;
|
int32_t tableStartIndex;
|
||||||
int32_t tableEndIndex;
|
int32_t tableEndIndex;
|
||||||
|
@ -296,7 +332,6 @@ typedef struct STableMergeScanInfo {
|
||||||
SSDataBlock* pSortInputBlock;
|
SSDataBlock* pSortInputBlock;
|
||||||
SSDataBlock* pReaderBlock;
|
SSDataBlock* pReaderBlock;
|
||||||
int64_t startTs; // sort start time
|
int64_t startTs; // sort start time
|
||||||
SArray* sortSourceParams;
|
|
||||||
SLimitInfo limitInfo;
|
SLimitInfo limitInfo;
|
||||||
int64_t numOfRows;
|
int64_t numOfRows;
|
||||||
SScanInfo scanInfo;
|
SScanInfo scanInfo;
|
||||||
|
@ -317,6 +352,8 @@ typedef struct STableMergeScanInfo {
|
||||||
SSDataBlock* nextDurationBlocks[2];
|
SSDataBlock* nextDurationBlocks[2];
|
||||||
bool rtnNextDurationBlocks;
|
bool rtnNextDurationBlocks;
|
||||||
int32_t nextDurationBlocksIdx;
|
int32_t nextDurationBlocksIdx;
|
||||||
|
|
||||||
|
STmsSubTablesMergeInfo* pSubTablesMergeInfo;
|
||||||
} STableMergeScanInfo;
|
} STableMergeScanInfo;
|
||||||
|
|
||||||
typedef struct STagScanFilterContext {
|
typedef struct STagScanFilterContext {
|
||||||
|
|
|
@ -3421,6 +3421,414 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// table merge scan operator
|
||||||
|
|
||||||
|
// table merge scan operator
|
||||||
|
|
||||||
|
static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) {
|
||||||
|
int32_t left = *(int32_t*)pLeft;
|
||||||
|
int32_t right = *(int32_t*)pRight;
|
||||||
|
STmsSubTablesMergeInfo* pInfo = (STmsSubTablesMergeInfo*)param;
|
||||||
|
|
||||||
|
int32_t leftIdx = pInfo->aInputs[left].rowIdx;
|
||||||
|
int32_t rightIdx = pInfo->aInputs[right].rowIdx;
|
||||||
|
|
||||||
|
if (leftIdx == -1) {
|
||||||
|
return 1;
|
||||||
|
} else if (rightIdx == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t leftTs = pInfo->aInputs[left].aTs[leftIdx];
|
||||||
|
int64_t rightTs = pInfo->aInputs[right].aTs[rightIdx];
|
||||||
|
int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
|
||||||
|
if (pInfo->pOrderInfo->order == TSDB_ORDER_DESC) {
|
||||||
|
ret = -1 * ret;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
|
||||||
|
memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
|
||||||
|
dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
|
||||||
|
for (int i = 0; i < src->numOfCols; i++) {
|
||||||
|
dst->colList[i] = src->colList[i];
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSubTableInput* pInput, bool* pSubTableHasBlock) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||||
|
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
const SStorageAPI* pAPI= &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
|
blockDataCleanup(pInput->pReaderBlock);
|
||||||
|
if (!pInput->bInMemReader) {
|
||||||
|
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInput->tblCond, pInput->pKeyInfo, 1, pInput->pReaderBlock,
|
||||||
|
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
|
||||||
|
if (code != 0) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->base.dataReader = pInput->pReader;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
bool hasNext = false;
|
||||||
|
int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext);
|
||||||
|
if (code != 0) {
|
||||||
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
|
||||||
|
pInfo->base.dataReader = NULL;
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
if (!hasNext || isTaskKilled(pTaskInfo)) {
|
||||||
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
|
||||||
|
pInfo->base.dataReader = NULL;
|
||||||
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
|
}
|
||||||
|
*pSubTableHasBlock = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInput->tblCond.order == TSDB_ORDER_ASC) {
|
||||||
|
pInput->tblCond.twindows.skey = pInput->pReaderBlock->info.window.ekey + 1;
|
||||||
|
} else {
|
||||||
|
pInput->tblCond.twindows.ekey = pInput->pReaderBlock->info.window.skey - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t status = 0;
|
||||||
|
code = loadDataBlock(pOperator, &pInfo->base, pInput->pReaderBlock, &status);
|
||||||
|
if (code != 0) {
|
||||||
|
pInfo->base.dataReader = NULL;
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
|
||||||
|
*pSubTableHasBlock = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pInput->pReaderBlock->info.rows == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pSubTableHasBlock = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*pSubTableHasBlock) {
|
||||||
|
pInput->pReaderBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pReaderBlock->info.id.uid);
|
||||||
|
pOperator->resultInfo.totalRows += pInput->pReaderBlock->info.rows;
|
||||||
|
}
|
||||||
|
if (!pInput->bInMemReader || !*pSubTableHasBlock) {
|
||||||
|
pAPI->tsdReader.tsdReaderClose(pInput->pReader);
|
||||||
|
pInput->pReader = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->base.dataReader = NULL;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
|
||||||
|
pInfo->bGroupProcessed = false;
|
||||||
|
|
||||||
|
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||||
|
int32_t i = pInfo->tableStartIndex + 1;
|
||||||
|
for (; i < numOfTables; ++i) {
|
||||||
|
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
|
||||||
|
if (tableKeyInfo->groupId != pInfo->groupId) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pInfo->tableEndIndex = i - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||||
|
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
|
||||||
|
STmsSubTableInput * pInput = pSubTblsInfo->aInputs + i;
|
||||||
|
if (pInput->rowIdx == -1) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (pInput->type == SUB_TABLE_MEM_BLOCK) {
|
||||||
|
pInput->rowIdx = 0;
|
||||||
|
pInput->pageIdx = -1;
|
||||||
|
}
|
||||||
|
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
|
||||||
|
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
|
||||||
|
pInput->aTs = (int64_t*)col->pData;
|
||||||
|
}
|
||||||
|
tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, subTblRowCompareFn);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
|
||||||
|
setGroupStartEndIndex(pInfo);
|
||||||
|
STmsSubTablesMergeInfo* pSubTblsInfo = taosMemoryCalloc(1, sizeof(STmsSubTablesMergeInfo));
|
||||||
|
if (pSubTblsInfo == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
pSubTblsInfo->pOrderInfo = taosArrayGet(pInfo->pSortInfo, 0);
|
||||||
|
pSubTblsInfo->numSubTables = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
|
||||||
|
pSubTblsInfo->aInputs = taosMemoryCalloc(pSubTblsInfo->numSubTables, sizeof(STmsSubTableInput));
|
||||||
|
if (pSubTblsInfo->aInputs == NULL) {
|
||||||
|
taosMemoryFree(pSubTblsInfo);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
int32_t bufPageSize = pInfo->bufPageSize;
|
||||||
|
int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize;
|
||||||
|
int32_t code =
|
||||||
|
createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosMemoryFree(pSubTblsInfo->aInputs);
|
||||||
|
taosMemoryFree(pSubTblsInfo);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
pSubTblsInfo->numTableBlocksInMem = pSubTblsInfo->numSubTables;
|
||||||
|
pSubTblsInfo->numInMemReaders = pSubTblsInfo->numSubTables;
|
||||||
|
|
||||||
|
pInfo->pSubTablesMergeInfo = pSubTblsInfo;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* pInfo) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||||
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
|
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
|
||||||
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
|
||||||
|
pInput->type = SUB_TABLE_MEM_BLOCK;
|
||||||
|
dumpQueryTableCond(&pInfo->base.cond, &pInput->tblCond);
|
||||||
|
pInput->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
pInput->pPageBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex);
|
||||||
|
pInput->pKeyInfo = keyInfo;
|
||||||
|
|
||||||
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i + 1 < pSubTblsInfo->numInMemReaders) {
|
||||||
|
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInput->tblCond, keyInfo, 1, pInput->pReaderBlock,
|
||||||
|
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
|
||||||
|
pInput->bInMemReader = true;
|
||||||
|
} else {
|
||||||
|
pInput->pReader = NULL;
|
||||||
|
pInput->bInMemReader = false;
|
||||||
|
}
|
||||||
|
bool hasNext = true;
|
||||||
|
fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext);
|
||||||
|
if (!hasNext) {
|
||||||
|
pInput->rowIdx = -1;
|
||||||
|
++pSubTblsInfo->numSubTablesCompleted;
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
pInput->rowIdx = 0;
|
||||||
|
pInput->pageIdx = -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||||
|
STableMergeScanInfo* pInfo = pOperatorInfo->info;
|
||||||
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
||||||
|
bool hasNext = true;
|
||||||
|
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext);
|
||||||
|
if (!hasNext) {
|
||||||
|
pInput->rowIdx = -1;
|
||||||
|
++pSubTblsInfo->numSubTablesCompleted;
|
||||||
|
} else {
|
||||||
|
pInput->rowIdx = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||||
|
STableMergeScanInfo* pInfo = pOperatorInfo->info;
|
||||||
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
||||||
|
|
||||||
|
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
|
||||||
|
if (pInput->rowIdx < pInputBlock->info.rows - 1) {
|
||||||
|
++pInput->rowIdx;
|
||||||
|
} else if (pInput->rowIdx == pInputBlock->info.rows -1 ) {
|
||||||
|
if (pInput->type == SUB_TABLE_MEM_BLOCK) {
|
||||||
|
adjustSubTableFromMemBlock(pOperatorInfo, pSubTblsInfo);
|
||||||
|
}
|
||||||
|
if (pInput->rowIdx != -1) {
|
||||||
|
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
|
||||||
|
pInput->aTs = (int64_t*)col->pData;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tMergeTreeAdjust(pSubTblsInfo->pTree, tMergeTreeGetAdjustIndex(pSubTblsInfo->pTree));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo, SSDataBlock* pBlock) {
|
||||||
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
||||||
|
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
|
||||||
|
SColumnInfoData* pSrcColInfo = taosArrayGet(pInputBlock->pDataBlock, i);
|
||||||
|
bool isNull = colDataIsNull(pSrcColInfo, pInputBlock->info.rows, pInput->rowIdx, NULL);
|
||||||
|
|
||||||
|
if (isNull) {
|
||||||
|
colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
|
||||||
|
} else {
|
||||||
|
if (pSrcColInfo->pData != NULL) {
|
||||||
|
char* pData = colDataGetData(pSrcColInfo, pInput->rowIdx);
|
||||||
|
colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pBlock->info.dataLoad = 1;
|
||||||
|
pBlock->info.scanFlag = pInputBlock->info.scanFlag;
|
||||||
|
pBlock->info.rows += 1;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pResBlock, int32_t capacity) {
|
||||||
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
|
||||||
|
|
||||||
|
blockDataCleanup(pResBlock);
|
||||||
|
bool finished = false;
|
||||||
|
while (true) {
|
||||||
|
while (1) {
|
||||||
|
if (pSubTblsInfo->numSubTablesCompleted >= pSubTblsInfo->numSubTables) {
|
||||||
|
finished = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
appendChosenRowToDataBlock(pSubTblsInfo, pResBlock);
|
||||||
|
adjustSubTableForNextRow(pOperator, pSubTblsInfo);
|
||||||
|
|
||||||
|
if (pResBlock->info.rows >= capacity) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
|
T_LONG_JMP(pOperator->pTaskInfo->env, pTaskInfo->code);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
|
||||||
|
if (finished || limitReached || pResBlock->info.rows > 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t startSubTablesTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
initSubTablesMergeInfo(pInfo);
|
||||||
|
|
||||||
|
initSubTableInputs(pOperator, pInfo);
|
||||||
|
|
||||||
|
openSubTablesMergeSort(pInfo->pSubTablesMergeInfo);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
|
||||||
|
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
|
||||||
|
if (pSubTblsInfo != NULL) {
|
||||||
|
tMergeTreeDestroy(&pSubTblsInfo->pTree);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
|
||||||
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
|
||||||
|
taosMemoryFree(pInput->tblCond.colList);
|
||||||
|
blockDataDestroy(pInput->pReaderBlock);
|
||||||
|
blockDataDestroy(pInput->pPageBlock);
|
||||||
|
taosArrayDestroy(pInput->aBlockPages);
|
||||||
|
pInfo->base.readerAPI.tsdReaderClose(pInput->pReader);
|
||||||
|
pInput->pReader = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf);
|
||||||
|
taosMemoryFree(pSubTblsInfo->aInputs);
|
||||||
|
|
||||||
|
taosMemoryFree(pSubTblsInfo);
|
||||||
|
pInfo->pSubTablesMergeInfo = NULL;
|
||||||
|
}
|
||||||
|
taosMemoryTrim(0);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) {
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
|
||||||
|
if (!pInfo->hasGroupId) {
|
||||||
|
pInfo->hasGroupId = true;
|
||||||
|
|
||||||
|
if (tableListSize == 0) {
|
||||||
|
setOperatorCompleted(pOperator);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pInfo->tableStartIndex = 0;
|
||||||
|
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
|
||||||
|
startSubTablesTableMergeScan(pOperator);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = NULL;
|
||||||
|
while (pInfo->tableStartIndex < tableListSize) {
|
||||||
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||||
|
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
|
||||||
|
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||||
|
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
|
||||||
|
}
|
||||||
|
if (pBlock != NULL) {
|
||||||
|
pBlock->info.id.groupId = pInfo->groupId;
|
||||||
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
pInfo->bGroupProcessed = true;
|
||||||
|
return pBlock;
|
||||||
|
} else {
|
||||||
|
// Data of this group are all dumped, let's try the next group
|
||||||
|
stopSubTablesTableMergeScan(pInfo);
|
||||||
|
if (pInfo->tableEndIndex >= tableListSize - 1) {
|
||||||
|
setOperatorCompleted(pOperator);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
||||||
|
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
|
||||||
|
startSubTablesTableMergeScan(pOperator);
|
||||||
|
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return pBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) {
|
static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) {
|
||||||
STableMergeScanInfo* pInfo = pTableMergeScanInfo;
|
STableMergeScanInfo* pInfo = pTableMergeScanInfo;
|
||||||
if (pInfo->mSkipTables == NULL) {
|
if (pInfo->mSkipTables == NULL) {
|
||||||
|
@ -3575,15 +3983,6 @@ SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
|
||||||
return pList;
|
return pList;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
|
|
||||||
memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
|
|
||||||
dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
|
|
||||||
for (int i = 0; i < src->numOfCols; i++) {
|
|
||||||
dst->colList[i] = src->colList[i];
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
|
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
|
||||||
STableMergeScanInfo* pTmsInfo = param;
|
STableMergeScanInfo* pTmsInfo = param;
|
||||||
if (type == TSD_READER_NOTIFY_DURATION_START) {
|
if (type == TSD_READER_NOTIFY_DURATION_START) {
|
||||||
|
@ -3671,8 +4070,6 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
int32_t tableStartIdx = pInfo->tableStartIndex;
|
int32_t tableStartIdx = pInfo->tableStartIndex;
|
||||||
int32_t tableEndIdx = pInfo->tableEndIndex;
|
int32_t tableEndIdx = pInfo->tableEndIndex;
|
||||||
|
|
||||||
tSimpleHashClear(pInfo->mTableNumRows);
|
|
||||||
|
|
||||||
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
||||||
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
||||||
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
|
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
|
||||||
|
@ -3823,10 +4220,8 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
void destroyTableMergeScanOperatorInfo(void* param) {
|
void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
||||||
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
|
|
||||||
|
|
||||||
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams);
|
|
||||||
|
|
||||||
|
// start one reader variable
|
||||||
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
||||||
pTableScanInfo->base.dataReader = NULL;
|
pTableScanInfo->base.dataReader = NULL;
|
||||||
|
|
||||||
|
@ -3837,18 +4232,22 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
|
||||||
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
||||||
pTableScanInfo->pSortHandle = NULL;
|
pTableScanInfo->pSortHandle = NULL;
|
||||||
taosHashCleanup(pTableScanInfo->mSkipTables);
|
taosHashCleanup(pTableScanInfo->mSkipTables);
|
||||||
pTableScanInfo->mSkipTables = NULL;
|
pTableScanInfo->mSkipTables = NULL;
|
||||||
|
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
|
||||||
|
// end one reader variable
|
||||||
|
|
||||||
|
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
|
||||||
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
|
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
|
||||||
|
|
||||||
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
|
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
|
||||||
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
|
|
||||||
pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock);
|
pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock);
|
||||||
|
|
||||||
taosArrayDestroy(pTableScanInfo->pSortInfo);
|
taosArrayDestroy(pTableScanInfo->pSortInfo);
|
||||||
|
|
||||||
|
stopSubTablesTableMergeScan(pTableScanInfo);
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3922,14 +4321,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
|
||||||
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
|
||||||
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
|
||||||
|
|
||||||
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
|
|
||||||
|
|
||||||
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
|
|
||||||
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
|
||||||
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
|
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
|
||||||
|
|
||||||
pInfo->mergeLimit = -1;
|
pInfo->mergeLimit = -1;
|
||||||
|
@ -3938,24 +4329,37 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
|
pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
|
||||||
pInfo->mSkipTables = NULL;
|
pInfo->mSkipTables = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||||
|
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
||||||
|
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
|
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
|
||||||
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
|
||||||
|
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
|
||||||
|
|
||||||
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
||||||
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
|
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
|
||||||
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
|
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
|
||||||
|
|
||||||
|
//start one reader variable
|
||||||
|
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
|
||||||
if (!tsExperimental) {
|
if (!tsExperimental) {
|
||||||
pInfo->filesetDelimited = false;
|
pInfo->filesetDelimited = false;
|
||||||
} else {
|
} else {
|
||||||
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
|
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
|
||||||
}
|
}
|
||||||
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
|
// end one reader variable
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(
|
||||||
optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn, NULL);
|
optrDummyOpenFn, pTableScanNode->paraTablesSort ? doTableMergeScanParaSubTables : doTableMergeScan, NULL,
|
||||||
|
destroyTableMergeScanOperatorInfo, optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn,
|
||||||
|
NULL);
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
|
|
@ -115,8 +115,16 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows,
|
static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) {
|
||||||
SSHashObj* pStDeleted, bool* pRebuild) {
|
SSessionKey key = {0};
|
||||||
|
getSessionHashKey(pKey, &key);
|
||||||
|
tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
|
||||||
|
tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs,
|
||||||
|
int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated,
|
||||||
|
SSHashObj* pStDeleted, bool* pRebuild) {
|
||||||
SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
|
SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
for (int32_t i = start; i < rows; i++) {
|
for (int32_t i = start; i < rows; i++) {
|
||||||
|
@ -148,6 +156,7 @@ static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowI
|
||||||
|
|
||||||
if (needDelState) {
|
if (needDelState) {
|
||||||
memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
|
memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
|
||||||
|
removeCountResult(pStUpdated, pAggSup->pResultRows, &sWinKey);
|
||||||
if (pWinInfo->winInfo.pStatePos->needFree) {
|
if (pWinInfo->winInfo.pStatePos->needFree) {
|
||||||
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey);
|
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey);
|
||||||
}
|
}
|
||||||
|
@ -164,7 +173,7 @@ void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, ESt
|
||||||
} else {
|
} else {
|
||||||
pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, pKey);
|
pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, pKey);
|
||||||
}
|
}
|
||||||
SSessionKey tmpKey = {0};
|
SSessionKey tmpKey = {.groupId = pKey->groupId, .win.ekey = INT64_MIN, .win.skey = INT64_MIN};
|
||||||
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
|
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pAggSup->stateStore.streamStateFreeCur(pCur);
|
pAggSup->stateStore.streamStateFreeCur(pCur);
|
||||||
|
@ -242,7 +251,8 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
||||||
setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
|
setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
|
||||||
slidingRows = *curWin.pWindowCount;
|
slidingRows = *curWin.pWindowCount;
|
||||||
if (!buffInfo.rebuildWindow) {
|
if (!buffInfo.rebuildWindow) {
|
||||||
winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStDeleted, &buffInfo.rebuildWindow);
|
winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated,
|
||||||
|
pStDeleted, &buffInfo.rebuildWindow);
|
||||||
}
|
}
|
||||||
if (buffInfo.rebuildWindow) {
|
if (buffInfo.rebuildWindow) {
|
||||||
SSessionKey range = {0};
|
SSessionKey range = {0};
|
||||||
|
|
|
@ -459,7 +459,7 @@ static void idxInterRsltDestroy(SArray* results) {
|
||||||
|
|
||||||
static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
|
static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
|
||||||
// refactor, merge interResults into fResults by oType
|
// refactor, merge interResults into fResults by oType
|
||||||
for (int i = 0; i < taosArrayGetSize(in); i--) {
|
for (int i = 0; i < taosArrayGetSize(in); i++) {
|
||||||
SArray* t = taosArrayGetP(in, i);
|
SArray* t = taosArrayGetP(in, i);
|
||||||
taosArraySort(t, uidCompare);
|
taosArraySort(t, uidCompare);
|
||||||
taosArrayRemoveDuplicate(t, uidCompare, NULL);
|
taosArrayRemoveDuplicate(t, uidCompare, NULL);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "index.h"
|
#include "index.h"
|
||||||
#include "indexComm.h"
|
#include "indexComm.h"
|
||||||
#include "indexInt.h"
|
#include "indexInt.h"
|
||||||
|
#include "indexUtil.h"
|
||||||
#include "nodes.h"
|
#include "nodes.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "scalar.h"
|
#include "scalar.h"
|
||||||
|
@ -77,15 +78,15 @@ typedef struct SIFParam {
|
||||||
char dbName[TSDB_DB_NAME_LEN];
|
char dbName[TSDB_DB_NAME_LEN];
|
||||||
char colName[TSDB_COL_NAME_LEN * 2 + 4];
|
char colName[TSDB_COL_NAME_LEN * 2 + 4];
|
||||||
|
|
||||||
SIndexMetaArg arg;
|
SIndexMetaArg arg;
|
||||||
SMetaDataFilterAPI api;
|
SMetaDataFilterAPI api;
|
||||||
} SIFParam;
|
} SIFParam;
|
||||||
|
|
||||||
typedef struct SIFCtx {
|
typedef struct SIFCtx {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
SHashObj *pRes; /* element is SIFParam */
|
SHashObj *pRes; /* element is SIFParam */
|
||||||
bool noExec; // true: just iterate condition tree, and add hint to executor plan
|
bool noExec; // true: just iterate condition tree, and add hint to executor plan
|
||||||
SIndexMetaArg arg;
|
SIndexMetaArg arg;
|
||||||
SMetaDataFilterAPI *pAPI;
|
SMetaDataFilterAPI *pAPI;
|
||||||
} SIFCtx;
|
} SIFCtx;
|
||||||
|
|
||||||
|
@ -669,6 +670,10 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
|
||||||
if (sifSetFltParam(left, right, &typedata, ¶m) != 0) return -1;
|
if (sifSetFltParam(left, right, &typedata, ¶m) != 0) return -1;
|
||||||
}
|
}
|
||||||
ret = left->api.metaFilterTableIds(arg->metaEx, ¶m, output->result);
|
ret = left->api.metaFilterTableIds(arg->metaEx, ¶m, output->result);
|
||||||
|
if (ret == 0) {
|
||||||
|
taosArraySort(output->result, uidCompare);
|
||||||
|
taosArrayRemoveDuplicate(output->result, uidCompare, NULL);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -875,8 +880,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
|
||||||
} else if (node->condType == LOGIC_COND_TYPE_NOT) {
|
} else if (node->condType == LOGIC_COND_TYPE_NOT) {
|
||||||
// taosArrayAddAll(output->result, params[m].result);
|
// taosArrayAddAll(output->result, params[m].result);
|
||||||
}
|
}
|
||||||
taosArraySort(output->result, idxUidCompare);
|
taosArraySort(output->result, uidCompare);
|
||||||
taosArrayRemoveDuplicate(output->result, idxUidCompare, NULL);
|
taosArrayRemoveDuplicate(output->result, uidCompare, NULL);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (int32_t m = 0; m < node->pParameterList->length; m++) {
|
for (int32_t m = 0; m < node->pParameterList->length; m++) {
|
||||||
|
@ -1016,7 +1021,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status, SMetaDataFilterAPI* pAPI) {
|
static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status, SMetaDataFilterAPI *pAPI) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
@ -1054,7 +1059,8 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status, SMetaDataFilte
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status, SMetaDataFilterAPI* pAPI) {
|
int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status,
|
||||||
|
SMetaDataFilterAPI *pAPI) {
|
||||||
SIdxFltStatus st = idxGetFltStatus(pFilterNode, pAPI);
|
SIdxFltStatus st = idxGetFltStatus(pFilterNode, pAPI);
|
||||||
if (st == SFLT_NOT_INDEX) {
|
if (st == SFLT_NOT_INDEX) {
|
||||||
*status = st;
|
*status = st;
|
||||||
|
@ -1081,7 +1087,7 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SIdxFltStatus idxGetFltStatus(SNode *pFilterNode, SMetaDataFilterAPI* pAPI) {
|
SIdxFltStatus idxGetFltStatus(SNode *pFilterNode, SMetaDataFilterAPI *pAPI) {
|
||||||
SIdxFltStatus st = SFLT_NOT_INDEX;
|
SIdxFltStatus st = SFLT_NOT_INDEX;
|
||||||
if (pFilterNode == NULL) {
|
if (pFilterNode == NULL) {
|
||||||
return SFLT_NOT_INDEX;
|
return SFLT_NOT_INDEX;
|
||||||
|
|
|
@ -456,6 +456,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
|
||||||
COPY_SCALAR_FIELD(filesetDelimited);
|
COPY_SCALAR_FIELD(filesetDelimited);
|
||||||
COPY_SCALAR_FIELD(isCountByTag);
|
COPY_SCALAR_FIELD(isCountByTag);
|
||||||
CLONE_OBJECT_FIELD(pFuncTypes, functParamClone);
|
CLONE_OBJECT_FIELD(pFuncTypes, functParamClone);
|
||||||
|
COPY_SCALAR_FIELD(paraTablesSort);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -688,6 +689,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
|
||||||
COPY_SCALAR_FIELD(igExpired);
|
COPY_SCALAR_FIELD(igExpired);
|
||||||
COPY_SCALAR_FIELD(filesetDelimited);
|
COPY_SCALAR_FIELD(filesetDelimited);
|
||||||
COPY_SCALAR_FIELD(needCountEmptyTable);
|
COPY_SCALAR_FIELD(needCountEmptyTable);
|
||||||
|
COPY_SCALAR_FIELD(paraTablesSort);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -698,6 +698,7 @@ static const char* jkScanLogicPlanTagCond = "TagCond";
|
||||||
static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
||||||
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
|
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
|
||||||
static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited";
|
static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited";
|
||||||
|
static const char* jkScanLogicPlanParaTablesSort = "ParaTablesSort";
|
||||||
|
|
||||||
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
||||||
|
@ -745,6 +746,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited);
|
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanParaTablesSort, pNode->paraTablesSort);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -795,6 +799,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited);
|
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetBoolValue(pJson, jkScanLogicPlanParaTablesSort, &pNode->paraTablesSort);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1888,6 +1895,7 @@ static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
|
||||||
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
|
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
|
||||||
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
|
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
|
||||||
static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable";
|
static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable";
|
||||||
|
static const char* jkTableScanPhysiPlanParaTablesSort = "ParaTablesSort";
|
||||||
|
|
||||||
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
||||||
|
@ -1962,6 +1970,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, pNode->needCountEmptyTable);
|
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, pNode->needCountEmptyTable);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanParaTablesSort, pNode->paraTablesSort);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2038,6 +2049,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, &pNode->needCountEmptyTable);
|
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, &pNode->needCountEmptyTable);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanParaTablesSort, &pNode->paraTablesSort);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2185,6 +2185,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeValueBool(pEncoder, pNode->needCountEmptyTable);
|
code = tlvEncodeValueBool(pEncoder, pNode->needCountEmptyTable);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeValueBool(pEncoder, pNode->paraTablesSort);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2269,6 +2272,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj)
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvDecodeValueBool(pDecoder, &pNode->needCountEmptyTable);
|
code = tlvDecodeValueBool(pDecoder, &pNode->needCountEmptyTable);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvDecodeValueBool(pDecoder, &pNode->paraTablesSort);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -401,6 +401,9 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt
|
||||||
case HINT_PARTITION_FIRST:
|
case HINT_PARTITION_FIRST:
|
||||||
if (paramNum > 0 || hasHint(*ppHintList, HINT_SORT_FOR_GROUP)) return true;
|
if (paramNum > 0 || hasHint(*ppHintList, HINT_SORT_FOR_GROUP)) return true;
|
||||||
break;
|
break;
|
||||||
|
case HINT_PARA_TABLES_SORT:
|
||||||
|
if (paramNum > 0 || hasHint(*ppHintList, HINT_PARA_TABLES_SORT)) return true;
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -479,6 +482,14 @@ SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) {
|
||||||
}
|
}
|
||||||
opt = HINT_PARTITION_FIRST;
|
opt = HINT_PARTITION_FIRST;
|
||||||
break;
|
break;
|
||||||
|
case TK_PARA_TABLES_SORT:
|
||||||
|
lastComma = false;
|
||||||
|
if (0 != opt || inParamList) {
|
||||||
|
quit = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
opt = HINT_PARA_TABLES_SORT;
|
||||||
|
break;
|
||||||
case TK_NK_LP:
|
case TK_NK_LP:
|
||||||
lastComma = false;
|
lastComma = false;
|
||||||
if (0 == opt || inParamList) {
|
if (0 == opt || inParamList) {
|
||||||
|
|
|
@ -173,6 +173,7 @@ static SKeyword keywordTable[] = {
|
||||||
{"OUTPUTTYPE", TK_OUTPUTTYPE},
|
{"OUTPUTTYPE", TK_OUTPUTTYPE},
|
||||||
{"PAGES", TK_PAGES},
|
{"PAGES", TK_PAGES},
|
||||||
{"PAGESIZE", TK_PAGESIZE},
|
{"PAGESIZE", TK_PAGESIZE},
|
||||||
|
{"PARA_TABLES_SORT", TK_PARA_TABLES_SORT},
|
||||||
{"PARTITION", TK_PARTITION},
|
{"PARTITION", TK_PARTITION},
|
||||||
{"PARTITION_FIRST", TK_PARTITION_FIRST},
|
{"PARTITION_FIRST", TK_PARTITION_FIRST},
|
||||||
{"PASS", TK_PASS},
|
{"PASS", TK_PASS},
|
||||||
|
|
|
@ -47,6 +47,7 @@ int32_t validateQueryPlan(SPlanContext* pCxt, SQueryPlan* pPlan);
|
||||||
|
|
||||||
bool getBatchScanOptionFromHint(SNodeList* pList);
|
bool getBatchScanOptionFromHint(SNodeList* pList);
|
||||||
bool getSortForGroupOptHint(SNodeList* pList);
|
bool getSortForGroupOptHint(SNodeList* pList);
|
||||||
|
bool getparaTablesSortOptHint(SNodeList* pList);
|
||||||
bool getOptHint(SNodeList* pList, EHintOption hint);
|
bool getOptHint(SNodeList* pList, EHintOption hint);
|
||||||
SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr);
|
SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr);
|
||||||
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);
|
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);
|
||||||
|
|
|
@ -501,7 +501,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
||||||
} else {
|
} else {
|
||||||
nodesDestroyNode((SNode*)pScan);
|
nodesDestroyNode((SNode*)pScan);
|
||||||
}
|
}
|
||||||
|
pScan->paraTablesSort = getparaTablesSortOptHint(pSelect->pHint);
|
||||||
pCxt->hasScan = true;
|
pCxt->hasScan = true;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -651,6 +651,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
|
||||||
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
|
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
|
||||||
pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
|
pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
|
||||||
pTableScan->needCountEmptyTable = pScanLogicNode->isCountByTag;
|
pTableScan->needCountEmptyTable = pScanLogicNode->isCountByTag;
|
||||||
|
pTableScan->paraTablesSort = pScanLogicNode->paraTablesSort;
|
||||||
|
|
||||||
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
|
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
|
|
@ -466,6 +466,18 @@ bool getOptHint(SNodeList* pList, EHintOption hint) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool getparaTablesSortOptHint(SNodeList* pList) {
|
||||||
|
if (!pList) return false;
|
||||||
|
SNode* pNode;
|
||||||
|
FOREACH(pNode, pList) {
|
||||||
|
SHintNode* pHint = (SHintNode*)pNode;
|
||||||
|
if (pHint->option == HINT_PARA_TABLES_SORT) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) {
|
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SLogicNode* pCurr = (SLogicNode*)pNode;
|
SLogicNode* pCurr = (SLogicNode*)pNode;
|
||||||
|
|
|
@ -60,6 +60,7 @@ extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t
|
||||||
int64_t insertJobRefId = 0;
|
int64_t insertJobRefId = 0;
|
||||||
int64_t queryJobRefId = 0;
|
int64_t queryJobRefId = 0;
|
||||||
|
|
||||||
|
bool schtJobDone = false;
|
||||||
uint64_t schtMergeTemplateId = 0x4;
|
uint64_t schtMergeTemplateId = 0x4;
|
||||||
uint64_t schtFetchTaskId = 0;
|
uint64_t schtFetchTaskId = 0;
|
||||||
uint64_t schtQueryId = 1;
|
uint64_t schtQueryId = 1;
|
||||||
|
@ -450,6 +451,8 @@ void *schtSendRsp(void *param) {
|
||||||
|
|
||||||
schReleaseJob(job);
|
schReleaseJob(job);
|
||||||
|
|
||||||
|
schtJobDone = true;
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1028,6 +1031,8 @@ TEST(insertTest, normalCase) {
|
||||||
TdThreadAttr thattr;
|
TdThreadAttr thattr;
|
||||||
taosThreadAttrInit(&thattr);
|
taosThreadAttrInit(&thattr);
|
||||||
|
|
||||||
|
schtJobDone = false;
|
||||||
|
|
||||||
TdThread thread1;
|
TdThread thread1;
|
||||||
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
|
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
|
||||||
|
|
||||||
|
@ -1045,6 +1050,14 @@ TEST(insertTest, normalCase) {
|
||||||
code = schedulerExecJob(&req, &insertJobRefId);
|
code = schedulerExecJob(&req, &insertJobRefId);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
if (schtJobDone) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosUsleep(10000);
|
||||||
|
}
|
||||||
|
|
||||||
schedulerFreeJob(&insertJobRefId, 0);
|
schedulerFreeJob(&insertJobRefId, 0);
|
||||||
|
|
||||||
schedulerDestroy();
|
schedulerDestroy();
|
||||||
|
|
|
@ -1160,7 +1160,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
SRpcMsg msg = {.info.noResp = 1};
|
SRpcMsg msg = {0};
|
||||||
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
|
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
|
||||||
|
|
||||||
pMeta->pHbInfo->hbCount += 1;
|
pMeta->pHbInfo->hbCount += 1;
|
||||||
|
|
|
@ -736,6 +736,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
|
||||||
void* pRockVal = NULL;
|
void* pRockVal = NULL;
|
||||||
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId);
|
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId);
|
||||||
code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen);
|
code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen);
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
|
if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
|
||||||
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code);
|
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
@ -743,7 +744,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
|
||||||
COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) );
|
COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) );
|
||||||
if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
|
if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
|
||||||
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
|
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
|
||||||
streamStateFreeCur(pCur);
|
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -751,7 +751,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
|
||||||
pWinKey->win.ekey = endTs;
|
pWinKey->win.ekey = endTs;
|
||||||
(*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
|
(*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
|
||||||
taosMemoryFree(pRockVal);
|
taosMemoryFree(pRockVal);
|
||||||
streamStateFreeCur(pCur);
|
|
||||||
} else {
|
} else {
|
||||||
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey);
|
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey);
|
||||||
code = TSDB_CODE_FAILED;
|
code = TSDB_CODE_FAILED;
|
||||||
|
|
|
@ -756,14 +756,18 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool metaLock) {
|
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool metaLock) {
|
||||||
|
if (pTask == NULL) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
|
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
|
||||||
if (pTask->info.fillHistory == 0) {
|
if (pTask->info.fillHistory == 0) {
|
||||||
return 0;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (metaLock) {
|
if (metaLock) {
|
||||||
streamMetaWLock(pTask->pMeta);
|
streamMetaWLock(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
|
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
|
||||||
|
@ -784,7 +788,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool
|
||||||
}
|
}
|
||||||
|
|
||||||
if (metaLock) {
|
if (metaLock) {
|
||||||
streamMetaWUnLock(pTask->pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -961,7 +961,7 @@ static void cliSendCb(uv_write_t* req, int status) {
|
||||||
tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
|
tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pMsg->msg.contLen == 0 && pMsg->msg.pCont != 0) {
|
if (pMsg != NULL && pMsg->msg.contLen == 0 && pMsg->msg.pCont != 0) {
|
||||||
rpcFreeCont(pMsg->msg.pCont);
|
rpcFreeCont(pMsg->msg.pCont);
|
||||||
pMsg->msg.pCont = 0;
|
pMsg->msg.pCont = 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1329,7 +1329,6 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
|
||||||
char *data = taosMemoryMalloc(compressSize);
|
char *data = taosMemoryMalloc(compressSize);
|
||||||
gzFile dstFp = NULL;
|
gzFile dstFp = NULL;
|
||||||
|
|
||||||
TdFilePtr pFile = NULL;
|
|
||||||
TdFilePtr pSrcFile = NULL;
|
TdFilePtr pSrcFile = NULL;
|
||||||
|
|
||||||
pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM);
|
pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM);
|
||||||
|
@ -1369,8 +1368,8 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cmp_end:
|
cmp_end:
|
||||||
if (pFile) {
|
if (fd >= 0) {
|
||||||
taosCloseFile(&pFile);
|
close(fd);
|
||||||
}
|
}
|
||||||
if (pSrcFile) {
|
if (pSrcFile) {
|
||||||
taosCloseFile(&pSrcFile);
|
taosCloseFile(&pSrcFile);
|
||||||
|
|
|
@ -57,6 +57,8 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 2
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 2
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 3
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 3
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 4
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 4
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/para_tms.py
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/para_tms2.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py
|
||||||
|
|
|
@ -656,7 +656,9 @@ if $data31 != 4 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql_error select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,c;
|
sql_error select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,c;
|
||||||
|
print select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname,t1,t2 interval(1m) sliding(15s) order by tbname;
|
||||||
sql select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname,t1,t2 interval(1m) sliding(15s) order by tbname;
|
sql select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname,t1,t2 interval(1m) sliding(15s) order by tbname;
|
||||||
|
print $rows
|
||||||
if $rows != 40 then
|
if $rows != 40 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue