Merge branch '3.0' of github.com:taosdata/TDengine into szhou/tms-wc/save-row-simplebuf
This commit is contained in:
commit
fbc1d6244f
|
@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list
|
||||||
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
|
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
|
||||||
|
|
||||||
hint:
|
hint:
|
||||||
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP
|
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT
|
||||||
|
|
||||||
select_list:
|
select_list:
|
||||||
select_expr [, select_expr] ...
|
select_expr [, select_expr] ...
|
||||||
|
@ -87,12 +87,13 @@ Hints are a means of user control over query optimization for individual stateme
|
||||||
|
|
||||||
The list of currently supported Hints is as follows:
|
The list of currently supported Hints is as follows:
|
||||||
|
|
||||||
| **Hint** | **Params** | **Comment** | **Scopt** |
|
| **Hint** | **Params** | **Comment** | **Scope** |
|
||||||
| :-----------: | -------------- | -------------------------- | -----------------------------------|
|
| :-----------: | -------------- | -------------------------- | -----------------------------------|
|
||||||
| BATCH_SCAN | None | Batch table scan | JOIN statment for stable |
|
| BATCH_SCAN | None | Batch table scan | JOIN statment for stable |
|
||||||
| NO_BATCH_SCAN | None | Sequential table scan | JOIN statment for stable |
|
| NO_BATCH_SCAN | None | Sequential table scan | JOIN statment for stable |
|
||||||
| SORT_FOR_GROUP| None | Use sort for partition, conflict with PARTITION_FIRST | With normal column in partition by list |
|
| SORT_FOR_GROUP| None | Use sort for partition, conflict with PARTITION_FIRST | With normal column in partition by list |
|
||||||
| PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list |
|
| PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list |
|
||||||
|
| PARA_TABLES_SORT| None | When sorting the supertable rows by timestamp, No temporary disk space is used. When there are numerous tables, each with long rows, the corresponding algorithm associated with this prompt may consume a substantial amount of memory, potentially leading to an Out Of Memory (OOM) situation. | Sorting the supertable rows by timestamp |
|
||||||
|
|
||||||
For example:
|
For example:
|
||||||
|
|
||||||
|
@ -100,6 +101,7 @@ For example:
|
||||||
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
|
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
|
||||||
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||||
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||||
|
SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts;
|
||||||
```
|
```
|
||||||
|
|
||||||
## Lists
|
## Lists
|
||||||
|
|
|
@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list
|
||||||
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
|
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
|
||||||
|
|
||||||
hint:
|
hint:
|
||||||
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP
|
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT
|
||||||
|
|
||||||
select_list:
|
select_list:
|
||||||
select_expr [, select_expr] ...
|
select_expr [, select_expr] ...
|
||||||
|
@ -93,13 +93,14 @@ Hints 是用户控制单个语句查询优化的一种手段,当 Hint 不适
|
||||||
| NO_BATCH_SCAN | 无 | 采用顺序读表的方式 | 超级表 JOIN 语句 |
|
| NO_BATCH_SCAN | 无 | 采用顺序读表的方式 | 超级表 JOIN 语句 |
|
||||||
| SORT_FOR_GROUP| 无 | 采用sort方式进行分组, 与PARTITION_FIRST冲突 | partition by 列表有普通列时 |
|
| SORT_FOR_GROUP| 无 | 采用sort方式进行分组, 与PARTITION_FIRST冲突 | partition by 列表有普通列时 |
|
||||||
| PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 |
|
| PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 |
|
||||||
|
| PARA_TABLES_SORT| 无 | 超级表的数据按时间戳排序时, 不使用临时磁盘空间, 只使用内存。当子表数量多, 行长比较大时候, 会使用大量内存, 可能发生OOM | 超级表的数据按时间戳排序时 |
|
||||||
举例:
|
举例:
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
|
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
|
||||||
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||||
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||||
|
SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts;
|
||||||
```
|
```
|
||||||
|
|
||||||
## 列表
|
## 列表
|
||||||
|
|
|
@ -119,6 +119,7 @@ int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
|
||||||
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||||
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
|
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
|
||||||
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||||
|
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream);
|
||||||
|
|
||||||
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
|
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
|
||||||
void destroyStreamTaskIter(SStreamTaskIter *pIter);
|
void destroyStreamTaskIter(SStreamTaskIter *pIter);
|
||||||
|
|
|
@ -44,9 +44,8 @@ static bool hasCountWindowNode(SPhysiNode* pNode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool countWindowStreamTask(SSubplan* pPlan) {
|
static bool isCountWindowStreamTask(SSubplan* pPlan) {
|
||||||
SPhysiNode* pNode = pPlan->pNode;
|
return hasCountWindowNode((SPhysiNode*)pPlan->pNode);
|
||||||
return hasCountWindowNode(pNode);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
||||||
|
@ -342,13 +341,13 @@ static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan) {
|
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) {
|
||||||
bool hasCountWindowNode = countWindowStreamTask(pPlan);
|
bool hasCountWindowNode = isCountWindowStreamTask(pPlan);
|
||||||
bool isRelStreamTask = (pTask->hTaskInfo.id.taskId != 0);
|
|
||||||
if (hasCountWindowNode && isRelStreamTask) {
|
if (hasCountWindowNode && (!isFillhistoryTask)) {
|
||||||
SStreamStatus* pStatus = &pTask->status;
|
SStreamStatus* pStatus = &pTask->status;
|
||||||
mDebug("s-task:0x%x status is set to %s from %s for count window agg task with fill-history option set",
|
mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set",
|
||||||
pTask->id.taskId, streamTaskGetStatusStr(pStatus->taskStatus), streamTaskGetStatusStr(TASK_STATUS__HALT));
|
pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus));
|
||||||
pStatus->taskStatus = TASK_STATUS__HALT;
|
pStatus->taskStatus = TASK_STATUS__HALT;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -398,15 +397,17 @@ static void setHTasksId(SStreamObj* pStream) {
|
||||||
|
|
||||||
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
|
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
|
||||||
SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) {
|
SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) {
|
||||||
// new stream task
|
|
||||||
SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam);
|
SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
|
mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
|
||||||
|
|
||||||
haltInitialTaskStatus(pTask, plan);
|
if (pStream->conf.fillHistory) {
|
||||||
|
haltInitialTaskStatus(pTask, plan, isFillhistory);
|
||||||
|
}
|
||||||
|
|
||||||
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
|
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
|
||||||
|
|
||||||
|
@ -415,6 +416,7 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,7 +69,7 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI
|
||||||
taosArrayPush(pList, pInfo);
|
taosArrayPush(pList, pInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
||||||
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
|
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -119,7 +119,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, in
|
||||||
} else {
|
} else {
|
||||||
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
|
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
|
||||||
pStream->uid, transId);
|
pStream->uid, transId);
|
||||||
code = createStreamResetStatusTrans(pMnode, pStream);
|
code = mndCreateStreamResetStatusTrans(pMnode, pStream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -261,22 +261,30 @@ int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msg
|
||||||
return mndTransAppendRedoAction(pTrans, &action);
|
return mndTransAppendRedoAction(pTrans, &action);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool identicalName(const char* pDb, const char* pParam, int32_t len) {
|
||||||
|
return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
|
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
|
||||||
// data in the hash table will be removed automatically, no need to remove it here.
|
void *pIter = NULL;
|
||||||
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
|
|
||||||
if (pTransInfo == NULL) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// not checkpoint trans, ignore
|
while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
|
||||||
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
|
SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
|
||||||
mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId);
|
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *pDupDBName = strndup(pDBName, len);
|
SStreamObj *pStream = mndGetStreamObj(pMnode, pTransInfo->streamId);
|
||||||
mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName);
|
if (pStream != NULL) {
|
||||||
taosMemoryFree(pDupDBName);
|
if (identicalName(pStream->sourceDb, pDBName, len)) {
|
||||||
|
mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
|
||||||
|
} else if (identicalName(pStream->targetDb, pDBName, len)) {
|
||||||
|
mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb);
|
||||||
|
}
|
||||||
|
|
||||||
|
mndReleaseStream(pMnode, pStream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -231,6 +231,8 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mDebug("set the resume action for trans:%d", pTrans->id);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
File diff suppressed because one or more lines are too long
|
@ -2543,7 +2543,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
||||||
while (1) {
|
while (1) {
|
||||||
// only check here, since the iterate data in memory is very fast.
|
// only check here, since the iterate data in memory is very fast.
|
||||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
|
||||||
return pReader->code;
|
return pReader->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2694,7 +2694,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
|
||||||
return pReader->code;
|
return pReader->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2909,7 +2909,7 @@ static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
|
||||||
return pReader->code;
|
return pReader->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2951,7 +2951,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
|
||||||
return pReader->code;
|
return pReader->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3424,12 +3424,6 @@ _error:
|
||||||
// table merge scan operator
|
// table merge scan operator
|
||||||
|
|
||||||
// table merge scan operator
|
// table merge scan operator
|
||||||
// TODO: limit / duration optimization
|
|
||||||
// TODO: get block from tsdReader function, with task killed, func_data all filter out, skip, finish
|
|
||||||
// TODO: error processing, memory freeing
|
|
||||||
// TODO: add log for error and perf
|
|
||||||
// TODO: tsdb reader open/close dynamically
|
|
||||||
// TODO: blockdata deep cleanup
|
|
||||||
|
|
||||||
static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) {
|
static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) {
|
||||||
int32_t left = *(int32_t*)pLeft;
|
int32_t left = *(int32_t*)pLeft;
|
||||||
|
|
|
@ -115,8 +115,16 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows,
|
static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) {
|
||||||
SSHashObj* pStDeleted, bool* pRebuild) {
|
SSessionKey key = {0};
|
||||||
|
getSessionHashKey(pKey, &key);
|
||||||
|
tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
|
||||||
|
tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs,
|
||||||
|
int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated,
|
||||||
|
SSHashObj* pStDeleted, bool* pRebuild) {
|
||||||
SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
|
SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
for (int32_t i = start; i < rows; i++) {
|
for (int32_t i = start; i < rows; i++) {
|
||||||
|
@ -148,6 +156,7 @@ static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowI
|
||||||
|
|
||||||
if (needDelState) {
|
if (needDelState) {
|
||||||
memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
|
memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
|
||||||
|
removeCountResult(pStUpdated, pAggSup->pResultRows, &sWinKey);
|
||||||
if (pWinInfo->winInfo.pStatePos->needFree) {
|
if (pWinInfo->winInfo.pStatePos->needFree) {
|
||||||
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey);
|
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey);
|
||||||
}
|
}
|
||||||
|
@ -242,7 +251,8 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
||||||
setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
|
setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
|
||||||
slidingRows = *curWin.pWindowCount;
|
slidingRows = *curWin.pWindowCount;
|
||||||
if (!buffInfo.rebuildWindow) {
|
if (!buffInfo.rebuildWindow) {
|
||||||
winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStDeleted, &buffInfo.rebuildWindow);
|
winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated,
|
||||||
|
pStDeleted, &buffInfo.rebuildWindow);
|
||||||
}
|
}
|
||||||
if (buffInfo.rebuildWindow) {
|
if (buffInfo.rebuildWindow) {
|
||||||
SSessionKey range = {0};
|
SSessionKey range = {0};
|
||||||
|
|
|
@ -698,7 +698,7 @@ static const char* jkScanLogicPlanTagCond = "TagCond";
|
||||||
static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
||||||
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
|
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
|
||||||
static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited";
|
static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited";
|
||||||
static const char* jkScanLogicPlanparaTablesSort = "paraTablesSort";
|
static const char* jkScanLogicPlanParaTablesSort = "ParaTablesSort";
|
||||||
|
|
||||||
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
||||||
|
@ -747,7 +747,7 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited);
|
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->paraTablesSort);
|
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanParaTablesSort, pNode->paraTablesSort);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -800,7 +800,7 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
|
||||||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited);
|
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->paraTablesSort);
|
code = tjsonGetBoolValue(pJson, jkScanLogicPlanParaTablesSort, &pNode->paraTablesSort);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1895,7 +1895,7 @@ static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
|
||||||
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
|
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
|
||||||
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
|
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
|
||||||
static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable";
|
static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable";
|
||||||
static const char* jkTableScanPhysiPlanparaTablesSort = "paraTablesSort";
|
static const char* jkTableScanPhysiPlanParaTablesSort = "ParaTablesSort";
|
||||||
|
|
||||||
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
||||||
|
@ -1971,7 +1971,7 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, pNode->needCountEmptyTable);
|
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, pNode->needCountEmptyTable);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanparaTablesSort, pNode->paraTablesSort);
|
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanParaTablesSort, pNode->paraTablesSort);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2050,7 +2050,7 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
||||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, &pNode->needCountEmptyTable);
|
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, &pNode->needCountEmptyTable);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanparaTablesSort, &pNode->paraTablesSort);
|
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanParaTablesSort, &pNode->paraTablesSort);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -736,6 +736,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
|
||||||
void* pRockVal = NULL;
|
void* pRockVal = NULL;
|
||||||
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId);
|
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId);
|
||||||
code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen);
|
code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen);
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
|
if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
|
||||||
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code);
|
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
@ -743,7 +744,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
|
||||||
COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) );
|
COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) );
|
||||||
if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
|
if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
|
||||||
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
|
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
|
||||||
streamStateFreeCur(pCur);
|
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -751,7 +751,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
|
||||||
pWinKey->win.ekey = endTs;
|
pWinKey->win.ekey = endTs;
|
||||||
(*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
|
(*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
|
||||||
taosMemoryFree(pRockVal);
|
taosMemoryFree(pRockVal);
|
||||||
streamStateFreeCur(pCur);
|
|
||||||
} else {
|
} else {
|
||||||
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey);
|
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey);
|
||||||
code = TSDB_CODE_FAILED;
|
code = TSDB_CODE_FAILED;
|
||||||
|
|
|
@ -1329,7 +1329,6 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
|
||||||
char *data = taosMemoryMalloc(compressSize);
|
char *data = taosMemoryMalloc(compressSize);
|
||||||
gzFile dstFp = NULL;
|
gzFile dstFp = NULL;
|
||||||
|
|
||||||
TdFilePtr pFile = NULL;
|
|
||||||
TdFilePtr pSrcFile = NULL;
|
TdFilePtr pSrcFile = NULL;
|
||||||
|
|
||||||
pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM);
|
pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM);
|
||||||
|
@ -1369,8 +1368,8 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cmp_end:
|
cmp_end:
|
||||||
if (pFile) {
|
if (fd >= 0) {
|
||||||
taosCloseFile(&pFile);
|
close(fd);
|
||||||
}
|
}
|
||||||
if (pSrcFile) {
|
if (pSrcFile) {
|
||||||
taosCloseFile(&pSrcFile);
|
taosCloseFile(&pSrcFile);
|
||||||
|
|
Loading…
Reference in New Issue