merge 3.0
This commit is contained in:
parent
87c65f3b68
commit
3005fd642c
|
@ -2226,7 +2226,7 @@ FETCH_NEXT_BLOCK:
|
||||||
if (pBlock->info.type == STREAM_CHECKPOINT) {
|
if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
streamScanOperatorSaveCheckpoint(pInfo);
|
streamScanOperatorSaveCheckpoint(pInfo);
|
||||||
}
|
}
|
||||||
printDataBlock(pBlock, "stream scan ck");
|
// printDataBlock(pBlock, "stream scan ck");
|
||||||
return pInfo->pCheckpointRes;
|
return pInfo->pCheckpointRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
#include "operator.h"
|
#include "operator.h"
|
||||||
#include "querytask.h"
|
#include "querytask.h"
|
||||||
|
#include "tchecksum.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
@ -26,12 +27,15 @@
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
|
|
||||||
#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
|
#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
|
||||||
#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
|
#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
|
||||||
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
|
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
|
||||||
#define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState"
|
#define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState"
|
||||||
#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState"
|
#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState"
|
||||||
#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState"
|
#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState"
|
||||||
|
#define STREAM_INTERVAL_OP_CHECKPOINT_NAME "StreamIntervalOperator_Checkpoint"
|
||||||
|
#define STREAM_SESSION_OP_CHECKPOINT_NAME "StreamSessionOperator_Checkpoint"
|
||||||
|
#define STREAM_STATE_OP_CHECKPOINT_NAME "StreamStateOperator_Checkpoint"
|
||||||
|
|
||||||
typedef struct SStateWindowInfo {
|
typedef struct SStateWindowInfo {
|
||||||
SResultWindowInfo winInfo;
|
SResultWindowInfo winInfo;
|
||||||
|
@ -353,7 +357,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin
|
||||||
for (int32_t i = *index; i < size; i++) {
|
for (int32_t i = *index; i < size; i++) {
|
||||||
SWinKey* pWin = taosArrayGet(pWins, i);
|
SWinKey* pWin = taosArrayGet(pWins, i);
|
||||||
void* tbname = NULL;
|
void* tbname = NULL;
|
||||||
pInfo->statestore.streamStateGetParName(pInfo->pState, pWin->groupId, &tbname);
|
pInfo->stateStore.streamStateGetParName(pInfo->pState, pWin->groupId, &tbname);
|
||||||
if (tbname == NULL) {
|
if (tbname == NULL) {
|
||||||
appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL);
|
appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL);
|
||||||
} else {
|
} else {
|
||||||
|
@ -361,7 +365,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
|
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
|
||||||
appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName);
|
appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName);
|
||||||
}
|
}
|
||||||
pInfo->statestore.streamStateFreeVal(tbname);
|
pInfo->stateStore.streamStateFreeVal(tbname);
|
||||||
(*index)++;
|
(*index)++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -381,7 +385,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
|
||||||
blockDataDestroy(pInfo->pPullDataRes);
|
blockDataDestroy(pInfo->pPullDataRes);
|
||||||
taosArrayDestroy(pInfo->pDelWins);
|
taosArrayDestroy(pInfo->pDelWins);
|
||||||
blockDataDestroy(pInfo->pDelRes);
|
blockDataDestroy(pInfo->pDelRes);
|
||||||
pInfo->statestore.streamFileStateDestroy(pInfo->pState->pFileState);
|
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
|
||||||
taosMemoryFreeClear(pInfo->pState);
|
taosMemoryFreeClear(pInfo->pState);
|
||||||
|
|
||||||
nodesDestroyNode((SNode*)pInfo->pPhyNode);
|
nodesDestroyNode((SNode*)pInfo->pPhyNode);
|
||||||
|
@ -392,6 +396,8 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
|
||||||
pInfo->pUpdatedMap = NULL;
|
pInfo->pUpdatedMap = NULL;
|
||||||
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
|
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
|
||||||
|
|
||||||
|
blockDataDestroy(pInfo->pCheckpointRes);
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -416,7 +422,8 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt
|
||||||
pScanInfo->windowSup.parentType = type;
|
pScanInfo->windowSup.parentType = type;
|
||||||
pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup;
|
pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup;
|
||||||
if (!pScanInfo->pUpdateInfo) {
|
if (!pScanInfo->pUpdateInfo) {
|
||||||
pScanInfo->pUpdateInfo = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark);
|
pScanInfo->pUpdateInfo =
|
||||||
|
pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate);
|
||||||
}
|
}
|
||||||
|
|
||||||
pScanInfo->interval = pInfo->interval;
|
pScanInfo->interval = pInfo->interval;
|
||||||
|
@ -513,7 +520,7 @@ static void clearStreamIntervalOperator(SStreamIntervalOperatorInfo* pInfo) {
|
||||||
clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
|
clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||||
pInfo->aggSup.currentPageId = -1;
|
pInfo->aggSup.currentPageId = -1;
|
||||||
pInfo->statestore.streamStateClear(pInfo->pState);
|
pInfo->stateStore.streamStateClear(pInfo->pState);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clearSpecialDataBlock(SSDataBlock* pBlock) {
|
static void clearSpecialDataBlock(SSDataBlock* pBlock) {
|
||||||
|
@ -745,11 +752,6 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
|
||||||
return startPos;
|
return startPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int64_t ckId) {
|
|
||||||
pTaskInfo->streamInfo.dataVersion = version;
|
|
||||||
pTaskInfo->streamInfo.checkPointId = ckId;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId,
|
static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId,
|
||||||
SSHashObj* pUpdatedMap) {
|
SSHashObj* pUpdatedMap) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
|
||||||
|
@ -794,7 +796,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
|
||||||
.groupId = groupId,
|
.groupId = groupId,
|
||||||
};
|
};
|
||||||
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
|
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
|
||||||
if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->statestore) && isClosed &&
|
if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore) && isClosed &&
|
||||||
!chIds) {
|
!chIds) {
|
||||||
SPullWindowInfo pull = {
|
SPullWindowInfo pull = {
|
||||||
.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
|
.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
|
||||||
|
@ -826,7 +828,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput,
|
int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput,
|
||||||
pSup->rowEntryInfoOffset, &pInfo->aggSup, &pInfo->statestore);
|
pSup->rowEntryInfoOffset, &pInfo->aggSup, &pInfo->stateStore);
|
||||||
pResult = (SResultRow*)pResPos->pRowBuff;
|
pResult = (SResultRow*)pResPos->pRowBuff;
|
||||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
@ -914,6 +916,214 @@ static void resetUnCloseWinInfo(SSHashObj* winMap) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t encodeSWinKey(void** buf, SWinKey* key) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += taosEncodeFixedI64(buf, key->ts);
|
||||||
|
tlen += taosEncodeFixedU64(buf, key->groupId);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* decodeSWinKey(void* buf, SWinKey* key) {
|
||||||
|
buf = taosDecodeFixedI64(buf, &key->ts);
|
||||||
|
buf = taosDecodeFixedU64(buf, &key->groupId);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t encodeSRowBuffPos(void** buf, SRowBuffPos* pos) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += encodeSWinKey(buf, pos->pKey);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* decodeSRowBuffPos(void* buf, SRowBuffPos* pos) {
|
||||||
|
buf = decodeSWinKey(buf, pos->pKey);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += taosEncodeFixedI64(buf, pTwAggSup->minTs);
|
||||||
|
tlen += taosEncodeFixedI64(buf, pTwAggSup->maxTs);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* decodeSTimeWindowAggSupp(void* buf, STimeWindowAggSupp* pTwAggSup) {
|
||||||
|
buf = taosDecodeFixedI64(buf, &pTwAggSup->minTs);
|
||||||
|
buf = taosDecodeFixedI64(buf, &pTwAggSup->maxTs);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t encodeSTimeWindow(void** buf, STimeWindow* pWin) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += taosEncodeFixedI64(buf, pWin->skey);
|
||||||
|
tlen += taosEncodeFixedI64(buf, pWin->ekey);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* decodeSTimeWindow(void* buf, STimeWindow* pWin) {
|
||||||
|
buf = taosDecodeFixedI64(buf, &pWin->skey);
|
||||||
|
buf = taosDecodeFixedI64(buf, &pWin->ekey);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t encodeSPullWindowInfo(void** buf, SPullWindowInfo* pPullInfo) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += encodeSTimeWindow(buf, &pPullInfo->calWin);
|
||||||
|
tlen += taosEncodeFixedU64(buf, pPullInfo->groupId);
|
||||||
|
tlen += encodeSTimeWindow(buf, &pPullInfo->window);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* decodeSPullWindowInfo(void* buf, SPullWindowInfo* pPullInfo) {
|
||||||
|
buf = decodeSTimeWindow(buf, &pPullInfo->calWin);
|
||||||
|
buf = taosDecodeFixedU64(buf, &pPullInfo->groupId);
|
||||||
|
buf = decodeSTimeWindow(buf, &pPullInfo->window);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t encodeSPullWindowInfoArray(void** buf, SArray* pPullInfos) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
int32_t size = taosArrayGetSize(pPullInfos);
|
||||||
|
tlen += taosEncodeFixedI32(buf, size);
|
||||||
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
void* pItem = taosArrayGet(pPullInfos, i);
|
||||||
|
tlen += encodeSPullWindowInfo(buf, pItem);
|
||||||
|
}
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* decodeSPullWindowInfoArray(void* buf, SArray* pPullInfos) {
|
||||||
|
int32_t size = 0;
|
||||||
|
buf = taosDecodeFixedI32(buf, &size);
|
||||||
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
SPullWindowInfo item = {0};
|
||||||
|
buf = decodeSPullWindowInfo(buf, &item);
|
||||||
|
taosArrayPush(pPullInfos, &item);
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t doStreamIntervalEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator) {
|
||||||
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
|
if (!pInfo) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* pData = (buf == NULL) ? NULL : *buf;
|
||||||
|
|
||||||
|
// 1.pResultRowHashTable
|
||||||
|
int32_t tlen = 0;
|
||||||
|
int32_t mapSize = tSimpleHashGetSize(pInfo->aggSup.pResultRowHashTable);
|
||||||
|
tlen += taosEncodeFixedI32(buf, mapSize);
|
||||||
|
void* pIte = NULL;
|
||||||
|
size_t keyLen = 0;
|
||||||
|
int32_t iter = 0;
|
||||||
|
while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) {
|
||||||
|
void* key = taosHashGetKey(pIte, &keyLen);
|
||||||
|
tlen += encodeSWinKey(buf, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2.twAggSup
|
||||||
|
tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
|
||||||
|
|
||||||
|
// 3.pPullDataMap
|
||||||
|
int32_t size = taosHashGetSize(pInfo->pPullDataMap);
|
||||||
|
tlen += taosEncodeFixedI32(buf, size);
|
||||||
|
pIte = NULL;
|
||||||
|
keyLen = 0;
|
||||||
|
while ((pIte = taosHashIterate(pInfo->pPullDataMap, pIte)) != NULL) {
|
||||||
|
void* key = taosHashGetKey(pIte, &keyLen);
|
||||||
|
tlen += encodeSWinKey(buf, key);
|
||||||
|
SArray* pArray = (SArray*)pIte;
|
||||||
|
int32_t chSize = taosArrayGetSize(pArray);
|
||||||
|
tlen += taosEncodeFixedI32(buf, chSize);
|
||||||
|
for (int32_t i = 0; i < chSize; i++) {
|
||||||
|
void* pChItem = taosArrayGet(pArray, i);
|
||||||
|
tlen += taosEncodeFixedI32(buf, *(int32_t*)pChItem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4.pPullWins
|
||||||
|
tlen += encodeSPullWindowInfoArray(buf, pInfo->pPullWins);
|
||||||
|
|
||||||
|
// 5.dataVersion
|
||||||
|
tlen += taosEncodeFixedI64(buf, pInfo->dataVersion);
|
||||||
|
|
||||||
|
// 6.checksum
|
||||||
|
if (buf) {
|
||||||
|
uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
|
||||||
|
tlen += taosEncodeFixedU32(buf, cksum);
|
||||||
|
} else {
|
||||||
|
tlen += sizeof(uint32_t);
|
||||||
|
}
|
||||||
|
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) {
|
||||||
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
|
if (!pInfo) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 6.checksum
|
||||||
|
int32_t dataLen = len - sizeof(uint32_t);
|
||||||
|
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
||||||
|
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
||||||
|
ASSERT(0); // debug
|
||||||
|
qError("stream interval state is invalid");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1.pResultRowHashTable
|
||||||
|
int32_t mapSize = 0;
|
||||||
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
|
SWinKey key = {0};
|
||||||
|
buf = decodeSWinKey(buf, &key);
|
||||||
|
SRowBuffPos* pPos = NULL;
|
||||||
|
int32_t resSize = pInfo->aggSup.resultRowSize;
|
||||||
|
pInfo->stateStore.streamStateAddIfNotExist(pInfo->pState, &key, (void**)&pPos, &resSize);
|
||||||
|
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2.twAggSup
|
||||||
|
buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
|
||||||
|
|
||||||
|
// 3.pPullDataMap
|
||||||
|
int32_t size = 0;
|
||||||
|
buf = taosDecodeFixedI32(buf, &size);
|
||||||
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
SWinKey key = {0};
|
||||||
|
SArray* pArray = taosArrayInit(0, sizeof(int32_t));
|
||||||
|
buf = decodeSWinKey(buf, &key);
|
||||||
|
int32_t chSize = 0;
|
||||||
|
buf = taosDecodeFixedI32(buf, &chSize);
|
||||||
|
for (int32_t i = 0; i < chSize; i++) {
|
||||||
|
int32_t chId = 0;
|
||||||
|
buf = taosDecodeFixedI32(buf, &chId);
|
||||||
|
taosArrayPush(pArray, &chId);
|
||||||
|
}
|
||||||
|
taosHashPut(pInfo->pPullDataMap, &key, sizeof(SWinKey), &pArray, POINTER_BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4.pPullWins
|
||||||
|
buf = decodeSPullWindowInfoArray(buf, pInfo->pPullWins);
|
||||||
|
|
||||||
|
// 5.dataVersion
|
||||||
|
buf = taosDecodeFixedI64(buf, &pInfo->dataVersion);
|
||||||
|
}
|
||||||
|
|
||||||
|
void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
|
||||||
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
|
int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator);
|
||||||
|
void* buf = taosMemoryCalloc(1, len);
|
||||||
|
void* pBuf = buf;
|
||||||
|
len = doStreamIntervalEncodeOpState(&pBuf, len, pOperator);
|
||||||
|
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
|
||||||
|
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
|
static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
|
@ -966,21 +1176,18 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
resetUnCloseWinInfo(pInfo->aggSup.pResultRowHashTable);
|
resetUnCloseWinInfo(pInfo->aggSup.pResultRowHashTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pInfo->reCkBlock) {
|
||||||
|
pInfo->reCkBlock = false;
|
||||||
|
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||||
|
return pInfo->pCheckpointRes;
|
||||||
|
}
|
||||||
|
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
if (!IS_FINAL_INTERVAL_OP(pOperator)) {
|
if (!IS_FINAL_INTERVAL_OP(pOperator)) {
|
||||||
clearFunctionContext(&pOperator->exprSupp);
|
clearFunctionContext(&pOperator->exprSupp);
|
||||||
// semi interval operator clear disk buffer
|
// semi interval operator clear disk buffer
|
||||||
clearStreamIntervalOperator(pInfo);
|
clearStreamIntervalOperator(pInfo);
|
||||||
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
|
qDebug("===stream===clear semi operator");
|
||||||
qDebug("stask:%s ===stream===%s clear", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType));
|
|
||||||
} else {
|
|
||||||
if (pInfo->twAggSup.maxTs > 0 &&
|
|
||||||
pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
|
|
||||||
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
|
||||||
pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
|
|
||||||
pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
|
|
||||||
}
|
|
||||||
qDebug("stask:%s ===stream===%s close", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType));
|
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1075,6 +1282,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
continue;
|
continue;
|
||||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
|
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
|
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
||||||
|
doStreamIntervalSaveCheckpoint(pOperator);
|
||||||
|
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||||
|
continue;
|
||||||
} else {
|
} else {
|
||||||
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||||
}
|
}
|
||||||
|
@ -1155,7 +1367,7 @@ static void streamIntervalReleaseState(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t resSize = sizeof(TSKEY);
|
int32_t resSize = sizeof(TSKEY);
|
||||||
pInfo->statestore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
|
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
|
||||||
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize);
|
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize);
|
||||||
}
|
}
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
|
@ -1172,12 +1384,12 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
void* pBuf = NULL;
|
void* pBuf = NULL;
|
||||||
int32_t code = pInfo->statestore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
|
int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
|
||||||
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size);
|
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size);
|
||||||
TSKEY ts = *(TSKEY*)pBuf;
|
TSKEY ts = *(TSKEY*)pBuf;
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFree(pBuf);
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
|
||||||
pInfo->statestore.streamStateReloadInfo(pInfo->pState, ts);
|
pInfo->stateStore.streamStateReloadInfo(pInfo->pState, ts);
|
||||||
}
|
}
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (downstream->fpSet.reloadStreamStateFn) {
|
if (downstream->fpSet.reloadStreamStateFn) {
|
||||||
|
@ -1186,7 +1398,8 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||||
SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
|
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
|
||||||
|
SReadHandle* pHandle) {
|
||||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||||
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
|
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
@ -1211,9 +1424,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
.deleteMark = getDeleteMark(pIntervalPhyNode),
|
.deleteMark = getDeleteMark(pIntervalPhyNode),
|
||||||
.deleteMarkSaved = 0,
|
.deleteMarkSaved = 0,
|
||||||
.calTriggerSaved = 0,
|
.calTriggerSaved = 0,
|
||||||
.checkPointTs = 0,
|
|
||||||
.checkPointInterval =
|
|
||||||
convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision),
|
|
||||||
};
|
};
|
||||||
ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
|
ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
|
||||||
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||||
|
@ -1266,12 +1476,13 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
pInfo->pUpdated = NULL;
|
pInfo->pUpdated = NULL;
|
||||||
pInfo->pUpdatedMap = NULL;
|
pInfo->pUpdatedMap = NULL;
|
||||||
int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols);
|
int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols);
|
||||||
pInfo->pState->pFileState =
|
pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(
|
||||||
pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
|
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
||||||
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo));
|
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId);
|
||||||
pInfo->dataVersion = 0;
|
pInfo->dataVersion = 0;
|
||||||
pInfo->statestore = pTaskInfo->storageAPI.stateStore;
|
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||||
pInfo->recvGetAll = false;
|
pInfo->recvGetAll = false;
|
||||||
|
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||||
|
|
||||||
pOperator->operatorType = pPhyNode->type;
|
pOperator->operatorType = pPhyNode->type;
|
||||||
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
|
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
|
||||||
|
@ -1293,6 +1504,16 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// for stream
|
||||||
|
void* buff = NULL;
|
||||||
|
int32_t len = 0;
|
||||||
|
int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
|
||||||
|
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len);
|
||||||
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
|
doStreamIntervalDecodeOpState(buff, len, pOperator);
|
||||||
|
taosMemoryFree(buff);
|
||||||
|
}
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
@ -1327,11 +1548,12 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
|
||||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||||
blockDataDestroy(pInfo->pDelRes);
|
blockDataDestroy(pInfo->pDelRes);
|
||||||
blockDataDestroy(pInfo->pWinBlock);
|
blockDataDestroy(pInfo->pWinBlock);
|
||||||
blockDataDestroy(pInfo->pUpdateRes);
|
|
||||||
tSimpleHashCleanup(pInfo->pStUpdated);
|
tSimpleHashCleanup(pInfo->pStUpdated);
|
||||||
tSimpleHashCleanup(pInfo->pStDeleted);
|
tSimpleHashCleanup(pInfo->pStDeleted);
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->historyWins);
|
taosArrayDestroy(pInfo->historyWins);
|
||||||
|
blockDataDestroy(pInfo->pCheckpointRes);
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1374,7 +1596,8 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
|
||||||
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
|
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
|
||||||
pScanInfo->pState = pAggSup->pState;
|
pScanInfo->pState = pAggSup->pState;
|
||||||
if (!pScanInfo->pUpdateInfo) {
|
if (!pScanInfo->pUpdateInfo) {
|
||||||
pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
|
pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
|
||||||
|
pScanInfo->igCheckUpdate);
|
||||||
}
|
}
|
||||||
pScanInfo->twAggSup = *pTwSup;
|
pScanInfo->twAggSup = *pTwSup;
|
||||||
}
|
}
|
||||||
|
@ -2003,6 +2226,137 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t encodeSSessionKey(void** buf, SSessionKey* key) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += encodeSTimeWindow(buf, &key->win);
|
||||||
|
tlen += taosEncodeFixedU64(buf, key->groupId);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* decodeSSessionKey(void* buf, SSessionKey* key) {
|
||||||
|
buf = decodeSTimeWindow(buf, &key->win);
|
||||||
|
buf = taosDecodeFixedU64(buf, &key->groupId);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += taosEncodeFixedBool(buf, key->isOutput);
|
||||||
|
tlen += encodeSSessionKey(buf, &key->sessionWin);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) {
|
||||||
|
buf = taosDecodeFixedBool(buf, &key->isOutput);
|
||||||
|
key->pOutputBuf = NULL;
|
||||||
|
buf = decodeSSessionKey(buf, &key->sessionWin);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t doStreamSessionEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
|
||||||
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
if (!pInfo) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* pData = (buf == NULL) ? NULL : *buf;
|
||||||
|
|
||||||
|
// 1.streamAggSup.pResultRows
|
||||||
|
int32_t tlen = 0;
|
||||||
|
int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
|
||||||
|
tlen += taosEncodeFixedI32(buf, mapSize);
|
||||||
|
void* pIte = NULL;
|
||||||
|
size_t keyLen = 0;
|
||||||
|
int32_t iter = 0;
|
||||||
|
while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
|
||||||
|
void* key = taosHashGetKey(pIte, &keyLen);
|
||||||
|
tlen += encodeSSessionKey(buf, key);
|
||||||
|
tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2.twAggSup
|
||||||
|
tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
|
||||||
|
|
||||||
|
// 3.pChildren
|
||||||
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
|
tlen += taosEncodeFixedI32(buf, size);
|
||||||
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i);
|
||||||
|
tlen += doStreamSessionEncodeOpState(buf, 0, pChOp, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4.dataVersion
|
||||||
|
tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
|
||||||
|
|
||||||
|
// 5.checksum
|
||||||
|
if (isParent) {
|
||||||
|
if (buf) {
|
||||||
|
uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
|
||||||
|
tlen += taosEncodeFixedU32(buf, cksum);
|
||||||
|
} else {
|
||||||
|
tlen += sizeof(uint32_t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
|
||||||
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
if (!pInfo) {
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5.checksum
|
||||||
|
if (isParent) {
|
||||||
|
int32_t dataLen = len - sizeof(uint32_t);
|
||||||
|
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
||||||
|
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
||||||
|
ASSERT(0); // debug
|
||||||
|
qError("stream interval state is invalid");
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1.streamAggSup.pResultRows
|
||||||
|
int32_t mapSize = 0;
|
||||||
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
|
SSessionKey key = {0};
|
||||||
|
SResultWindowInfo winfo = {0};
|
||||||
|
buf = decodeSSessionKey(buf, &key);
|
||||||
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2.twAggSup
|
||||||
|
buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
|
||||||
|
|
||||||
|
// 3.pChildren
|
||||||
|
int32_t size = 0;
|
||||||
|
buf = taosDecodeFixedI32(buf, &size);
|
||||||
|
ASSERT(size <= taosArrayGetSize(pInfo->pChildren));
|
||||||
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i);
|
||||||
|
buf = doStreamSessionDecodeOpState(buf, 0, pChOp, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4.dataVersion
|
||||||
|
buf = taosDecodeFixedI64(buf, &pInfo->dataVersion);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
|
||||||
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true);
|
||||||
|
void* buf = taosMemoryCalloc(1, len);
|
||||||
|
void* pBuf = buf;
|
||||||
|
len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true);
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
|
||||||
|
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
@ -2058,6 +2412,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
continue;
|
continue;
|
||||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
|
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
|
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
|
||||||
|
doStreamSessionSaveCheckpoint(pOperator);
|
||||||
|
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||||
|
continue;
|
||||||
} else {
|
} else {
|
||||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||||
}
|
}
|
||||||
|
@ -2249,7 +2608,19 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
pInfo->isHistoryOp = pHandle->fillHistory;
|
pInfo->isHistoryOp = pHandle->fillHistory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||||
|
// for stream
|
||||||
|
void* buff = NULL;
|
||||||
|
int32_t len = 0;
|
||||||
|
int32_t res =
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
|
||||||
|
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len);
|
||||||
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
|
doStreamSessionDecodeOpState(buff, len, pOperator, true);
|
||||||
|
taosMemoryFree(buff);
|
||||||
|
}
|
||||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
||||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
|
||||||
|
@ -2316,7 +2687,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
clearSpecialDataBlock(pInfo->pUpdateRes);
|
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2336,6 +2706,10 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
continue;
|
continue;
|
||||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
|
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
|
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
|
||||||
|
doStreamSessionSaveCheckpoint(pOperator);
|
||||||
|
continue;
|
||||||
} else {
|
} else {
|
||||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||||
}
|
}
|
||||||
|
@ -2387,12 +2761,11 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
||||||
pOperator->operatorType = pPhyNode->type;
|
pOperator->operatorType = pPhyNode->type;
|
||||||
|
|
||||||
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
||||||
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
|
|
||||||
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
|
||||||
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
|
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
}
|
}
|
||||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), pPhyNode->type, false, OP_NOT_OPENED, pInfo,
|
||||||
|
pTaskInfo);
|
||||||
|
|
||||||
if (numOfChild > 0) {
|
if (numOfChild > 0) {
|
||||||
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
|
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
|
||||||
|
@ -2441,6 +2814,8 @@ void destroyStreamStateOperatorInfo(void* param) {
|
||||||
taosArrayDestroy(pInfo->historyWins);
|
taosArrayDestroy(pInfo->historyWins);
|
||||||
tSimpleHashCleanup(pInfo->pSeUpdated);
|
tSimpleHashCleanup(pInfo->pSeUpdated);
|
||||||
tSimpleHashCleanup(pInfo->pSeDeleted);
|
tSimpleHashCleanup(pInfo->pSeDeleted);
|
||||||
|
blockDataDestroy(pInfo->pCheckpointRes);
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2648,6 +3023,109 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t doStreamStateEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
|
||||||
|
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
if (!pInfo) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* pData = (buf == NULL) ? NULL : *buf;
|
||||||
|
|
||||||
|
// 1.streamAggSup.pResultRows
|
||||||
|
int32_t tlen = 0;
|
||||||
|
int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
|
||||||
|
tlen += taosEncodeFixedI32(buf, mapSize);
|
||||||
|
void* pIte = NULL;
|
||||||
|
size_t keyLen = 0;
|
||||||
|
int32_t iter = 0;
|
||||||
|
while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
|
||||||
|
void* key = taosHashGetKey(pIte, &keyLen);
|
||||||
|
tlen += encodeSSessionKey(buf, key);
|
||||||
|
tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2.twAggSup
|
||||||
|
tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
|
||||||
|
|
||||||
|
// 3.pChildren
|
||||||
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
|
tlen += taosEncodeFixedI32(buf, size);
|
||||||
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i);
|
||||||
|
tlen += doStreamStateEncodeOpState(buf, 0, pChOp, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4.dataVersion
|
||||||
|
tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
|
||||||
|
|
||||||
|
// 5.checksum
|
||||||
|
if (isParent) {
|
||||||
|
if (buf) {
|
||||||
|
uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
|
||||||
|
tlen += taosEncodeFixedU32(buf, cksum);
|
||||||
|
} else {
|
||||||
|
tlen += sizeof(uint32_t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
|
||||||
|
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
if (!pInfo) {
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5.checksum
|
||||||
|
if (isParent) {
|
||||||
|
int32_t dataLen = len - sizeof(uint32_t);
|
||||||
|
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
||||||
|
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
||||||
|
ASSERT(0); // debug
|
||||||
|
qError("stream interval state is invalid");
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1.streamAggSup.pResultRows
|
||||||
|
int32_t mapSize = 0;
|
||||||
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
|
SSessionKey key = {0};
|
||||||
|
SResultWindowInfo winfo = {0};
|
||||||
|
buf = decodeSSessionKey(buf, &key);
|
||||||
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2.twAggSup
|
||||||
|
buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
|
||||||
|
|
||||||
|
// 3.pChildren
|
||||||
|
int32_t size = 0;
|
||||||
|
buf = taosDecodeFixedI32(buf, &size);
|
||||||
|
ASSERT(size <= taosArrayGetSize(pInfo->pChildren));
|
||||||
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i);
|
||||||
|
buf = doStreamStateDecodeOpState(buf, 0, pChOp, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4.dataVersion
|
||||||
|
buf = taosDecodeFixedI64(buf, &pInfo->dataVersion);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) {
|
||||||
|
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true);
|
||||||
|
void* buf = taosMemoryCalloc(1, len);
|
||||||
|
void* pBuf = buf;
|
||||||
|
len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true);
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME,
|
||||||
|
strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len);
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) {
|
static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) {
|
||||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||||
|
@ -2700,7 +3178,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
printDataBlock(pBlock, "single state recv", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||||
pBlock->info.type == STREAM_CLEAR) {
|
pBlock->info.type == STREAM_CLEAR) {
|
||||||
|
@ -2715,6 +3193,11 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
continue;
|
continue;
|
||||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
|
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
|
||||||
|
doStreamSessionSaveCheckpoint(pOperator);
|
||||||
|
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||||
|
continue;
|
||||||
} else {
|
} else {
|
||||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||||
}
|
}
|
||||||
|
@ -2926,6 +3409,19 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pInfo->isHistoryOp = pHandle->fillHistory;
|
pInfo->isHistoryOp = pHandle->fillHistory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||||
|
|
||||||
|
// for stream
|
||||||
|
void* buff = NULL;
|
||||||
|
int32_t len = 0;
|
||||||
|
int32_t res =
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME,
|
||||||
|
strlen(STREAM_STATE_OP_CHECKPOINT_NAME), &buff, &len);
|
||||||
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
|
doStreamStateDecodeOpState(buff, len, pOperator, true);
|
||||||
|
taosMemoryFree(buff);
|
||||||
|
}
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
|
||||||
|
@ -2984,14 +3480,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
resetUnCloseWinInfo(pInfo->aggSup.pResultRowHashTable);
|
resetUnCloseWinInfo(pInfo->aggSup.pResultRowHashTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
setOperatorCompleted(pOperator);
|
if (pInfo->reCkBlock) {
|
||||||
if (pInfo->twAggSup.maxTs > 0 &&
|
pInfo->reCkBlock = false;
|
||||||
pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
|
// printDataBlock(pInfo->pCheckpointRes, "single interval ck");
|
||||||
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
return pInfo->pCheckpointRes;
|
||||||
pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
|
|
||||||
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
|
|
||||||
pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3030,6 +3525,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||||
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||||
return pBlock;
|
return pBlock;
|
||||||
|
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
|
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
||||||
|
doStreamIntervalSaveCheckpoint(pOperator);
|
||||||
|
pInfo->reCkBlock = true;
|
||||||
|
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||||
|
continue;
|
||||||
} else {
|
} else {
|
||||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||||
}
|
}
|
||||||
|
@ -3078,7 +3579,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
|
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -3100,16 +3601,11 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
|
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
|
||||||
};
|
};
|
||||||
|
|
||||||
pInfo->twAggSup = (STimeWindowAggSupp){
|
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
|
||||||
.waterMark = pIntervalPhyNode->window.watermark,
|
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
.maxTs = INT64_MIN,
|
||||||
.maxTs = INT64_MIN,
|
.minTs = INT64_MAX,
|
||||||
.minTs = INT64_MAX,
|
.deleteMark = getDeleteMark(pIntervalPhyNode)};
|
||||||
.deleteMark = getDeleteMark(pIntervalPhyNode),
|
|
||||||
.checkPointTs = 0,
|
|
||||||
.checkPointInterval =
|
|
||||||
convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision),
|
|
||||||
};
|
|
||||||
|
|
||||||
ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
|
ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
|
||||||
|
|
||||||
|
@ -3168,7 +3664,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
|
|
||||||
pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
|
pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
|
||||||
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
||||||
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo));
|
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId);
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
|
@ -3176,8 +3672,19 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
|
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
|
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
|
||||||
|
|
||||||
pInfo->statestore = pTaskInfo->storageAPI.stateStore;
|
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||||
pInfo->recvGetAll = false;
|
pInfo->recvGetAll = false;
|
||||||
|
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||||
|
|
||||||
|
// for stream
|
||||||
|
void* buff = NULL;
|
||||||
|
int32_t len = 0;
|
||||||
|
int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
|
||||||
|
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len);
|
||||||
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
|
doStreamIntervalDecodeOpState(buff, len, pOperator);
|
||||||
|
taosMemoryFree(buff);
|
||||||
|
}
|
||||||
|
|
||||||
initIntervalDownStream(downstream, pPhyNode->type, pInfo);
|
initIntervalDownStream(downstream, pPhyNode->type, pInfo);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
|
Loading…
Reference in New Issue