stream primary key

This commit is contained in:
54liuyao 2024-03-21 10:50:07 +08:00
parent 81f0ff341c
commit a60f390cdc
26 changed files with 3831 additions and 3690 deletions

View File

@ -2467,6 +2467,8 @@ typedef struct {
int8_t igUpdate;
int64_t lastTs;
SArray* pVgroupVerList;
// 3.3.0.0
SArray* pCols; // array of SField
} SCMCreateStreamReq;
typedef struct {

View File

@ -578,6 +578,7 @@ typedef struct SWindowPhysiNode {
int64_t watermark;
int64_t deleteMark;
int8_t igExpired;
int8_t destHasPrimayKey;
bool mergeDataBlock;
} SWindowPhysiNode;

View File

@ -44,6 +44,8 @@ typedef struct SPlanContext {
const char* pUser;
bool sysInfo;
int64_t allocatorId;
bool destHasPrimaryKey;
bool sourceHasPrimaryKey;
} SPlanContext;
// Create the physical plan for the query, according to the AST.

View File

@ -7311,6 +7311,16 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI64(&encoder, p->ver) < 0) return -1;
}
int32_t colSize = taosArrayGetSize(pReq->pCols);
if (tEncodeI32(&encoder, colSize) < 0) return -1;
for (int32_t i = 0; i < colSize; ++i) {
SField *pField = taosArrayGet(pReq->pCols, i);
if (tEncodeI8(&encoder, pField->type) < 0) return -1;
if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -7416,6 +7426,27 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
}
}
}
int32_t colSize = 0;
if (tDecodeI32(&decoder, &colSize) < 0) return -1;
if (colSize > 0) {
pReq->pCols = taosArrayInit(colSize, sizeof(SField));
if (pReq->pCols == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < colSize; ++i) {
SField field = {0};
if (tDecodeI8(&decoder, &field.type) < 0) return -1;
if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (taosArrayPush(pReq->pCols, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -7496,6 +7527,7 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosArrayDestroy(pReq->pTags);
taosArrayDestroy(pReq->fillNullCols);
taosArrayDestroy(pReq->pVgroupVerList);
taosArrayDestroy(pReq->pCols);
}
int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) {

View File

@ -287,6 +287,45 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
return 0;
}
static int32_t createSchemaByFields(const SArray* pFields, SSchemaWrapper* pWrapper) {
pWrapper->nCols = taosArrayGetSize(pFields);
pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
if (NULL == pWrapper->pSchema) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SNode* pNode;
int32_t index = 0;
for(int32_t i = 0; i < pWrapper->nCols; i++) {
SField* pField = (SField*)taosArrayGet(pFields, i);
if (TSDB_DATA_TYPE_NULL == pField->type) {
pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
} else {
pWrapper->pSchema[index].type = pField->type;
pWrapper->pSchema[index].bytes = pField->bytes;
}
pWrapper->pSchema[index].colId = index + 1;
strcpy(pWrapper->pSchema[index].name, pField->name);
pWrapper->pSchema[index].flags = pField->flags;
index += 1;
}
return TSDB_CODE_SUCCESS;
}
static bool hasPrimaryKey(SSchemaWrapper* pWrapper) {
if (pWrapper->nCols < 2) {
return false;
}
for (int32_t i = 1; i < pWrapper->nCols; i++) {
if(pWrapper->pSchema[i].flags & COL_IS_KEY) {
return true;
}
}
return false;
}
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
SNode *pAst = NULL;
SQueryPlan *pPlan = NULL;
@ -352,8 +391,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
goto FAIL;
}
// extract output schema from ast
if (qExtractResultSchema(pAst, (int32_t *)&pObj->outputSchema.nCols, &pObj->outputSchema.pSchema) != 0) {
// create output schema
if (createSchemaByFields(pCreate->pCols, &pObj->outputSchema) != TSDB_CODE_SUCCESS) {
goto FAIL;
}
@ -389,6 +428,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->outputSchema.pSchema = pFullSchema;
}
bool hasKey = hasPrimaryKey(&pObj->outputSchema);
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
@ -398,6 +438,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.igExpired = pObj->conf.igExpired,
.deleteMark = pObj->deleteMark,
.igCheckUpdate = pObj->igCheckUpdate,
.destHasPrimaryKey = hasKey,
};
// using ast and param to build physical plan

View File

@ -602,6 +602,8 @@ typedef struct SStreamIntervalOperatorInfo {
bool recvPullover;
SSDataBlock* pMidPulloverRes;
bool clearState;
SSHashObj* pDeletedMap;
bool destHasPrimaryKey;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {
@ -652,6 +654,8 @@ typedef struct SStreamSessionAggOperatorInfo {
SSDataBlock* pCheckpointRes;
bool clearState;
bool recvGetAll;
bool destHasPrimaryKey;
SSHashObj* pPkDeleted;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
@ -676,6 +680,8 @@ typedef struct SStreamStateAggOperatorInfo {
bool reCkBlock;
SSDataBlock* pCheckpointRes;
bool recvGetAll;
SSHashObj* pPkDeleted;
bool destHasPrimaryKey;
} SStreamStateAggOperatorInfo;
typedef struct SStreamEventAggOperatorInfo {
@ -702,6 +708,8 @@ typedef struct SStreamEventAggOperatorInfo {
SSDataBlock* pCheckpointRes;
SFilterInfo* pStartCondInfo;
SFilterInfo* pEndCondInfo;
SSHashObj* pPkDeleted;
bool destHasPrimaryKey;
} SStreamEventAggOperatorInfo;
typedef struct SStreamCountAggOperatorInfo {
@ -723,6 +731,8 @@ typedef struct SStreamCountAggOperatorInfo {
bool reCkBlock;
bool recvGetAll;
SSDataBlock* pCheckpointRes;
SSHashObj* pPkDeleted;
bool destHasPrimaryKey;
} SStreamCountAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {
@ -901,7 +911,7 @@ void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* p
void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey);
void deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate, SSHashObj* pMapDelete);
void deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate, SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd);
int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated);
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed);
int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar);
@ -951,6 +961,7 @@ bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pK
void saveDeleteInfo(SArray* pWins, SSessionKey key);
void removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SArray* pWins);
void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted);
void copyDeleteSessionKey(SSHashObj* source, SSHashObj* dest);
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo);
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType);

View File

@ -24,7 +24,7 @@
#include "tlog.h"
#include "ttime.h"
#define IS_FINAL_COUNT_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_COUNT)
#define IS_NORMAL_COUNT_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT)
#define STREAM_COUNT_OP_STATE_NAME "StreamCountHistoryState"
#define STREAM_COUNT_OP_CHECKPOINT_NAME "StreamCountOperator_Checkpoint"
@ -61,6 +61,8 @@ void destroyStreamCountAggOperatorInfo(void* param) {
taosArrayDestroy(pInfo->historyWins);
blockDataDestroy(pInfo->pCheckpointRes);
tSimpleHashCleanup(pInfo->pPkDeleted);
taosMemoryFreeClear(param);
}
@ -274,16 +276,20 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
pOperator, 0);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
qError("%s do stream count aggregate impl error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
break;
}
saveSessionOutputBuf(pAggSup, &curWin.winInfo);
if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_COUNT_OP(pOperator)) {
saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
code = saveResult(curWin.winInfo, pStUpdated);
if (code != TSDB_CODE_SUCCESS) {
qError("%s do stream count aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo),
tstrerror(code));
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
break;
}
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
@ -484,7 +490,7 @@ void doDeleteCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SAr
}
void deleteCountWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
SSHashObj* pMapDelete) {
SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd) {
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
if (isSlidingCountWindow(pAggSup)) {
doDeleteCountWindows(pAggSup, pBlock, pWins);
@ -493,6 +499,9 @@ void deleteCountWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHa
}
removeSessionResults(pAggSup, pMapUpdate, pWins);
copyDeleteWindowInfo(pWins, pMapDelete);
if (needAdd) {
copyDeleteWindowInfo(pWins, pPkDelete);
}
taosArrayDestroy(pWins);
}
@ -542,7 +551,8 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
deleteCountWinState(&pInfo->streamAggSup, pBlock, pInfo->pStUpdated, pInfo->pStDeleted);
bool add = pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator);
deleteCountWinState(&pInfo->streamAggSup, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, pInfo->pPkDeleted, add);
continue;
} else if (pBlock->info.type == STREAM_CLEAR) {
doResetCountWindows(&pInfo->streamAggSup, pBlock);
@ -582,6 +592,10 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
if (pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator)) {
copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted);
}
SSDataBlock* opRes = buildCountResult(pOperator);
if (opRes) {
return opRes;
@ -694,6 +708,8 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->recvGetAll = false;
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimayKey;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, true,

View File

@ -27,7 +27,7 @@
#include "tlog.h"
#include "ttime.h"
#define IS_FINAL_EVENT_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_EVENT)
#define IS_NORMAL_EVENT_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT)
#define STREAM_EVENT_OP_STATE_NAME "StreamEventHistoryState"
#define STREAM_EVENT_OP_CHECKPOINT_NAME "StreamEventOperator_Checkpoint"
@ -66,6 +66,8 @@ void destroyStreamEventOperatorInfo(void* param) {
taosArrayDestroy(pInfo->historyWins);
blockDataDestroy(pInfo->pCheckpointRes);
tSimpleHashCleanup(pInfo->pPkDeleted);
if (pInfo->pStartCondInfo != NULL) {
filterFreeInfo(pInfo->pStartCondInfo);
pInfo->pStartCondInfo = NULL;
@ -99,12 +101,17 @@ int32_t getEndCondIndex(bool* pEnd, int32_t start, int32_t rows) {
return -1;
}
void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd, int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) {
static bool isWindowIncomplete(SEventWindowInfo* pWinInfo) {
return !(pWinInfo->pWinFlag->startFlag && pWinInfo->pWinFlag->endFlag);
}
void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd,
int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t size = pAggSup->resultRowSize;
TSKEY ts = pTs [index];
bool start = pStart[index];
bool end = pEnd[index];
TSKEY ts = pTs[index];
bool start = pStart[index];
bool end = pEnd[index];
pCurWin->winInfo.sessionWin.groupId = groupId;
pCurWin->winInfo.sessionWin.win.skey = ts;
pCurWin->winInfo.sessionWin.win.ekey = ts;
@ -117,7 +124,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI
bool inWin = isInTimeWindow(&leftWinKey.win, ts, 0);
setEventWindowInfo(pAggSup, &leftWinKey, pVal, pCurWin);
if(inWin || (pCurWin->pWinFlag->startFlag && !pCurWin->pWinFlag->endFlag) ) {
pCurWin->winInfo.isOutput = true;
pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin);
goto _end;
}
}
@ -130,7 +137,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI
int32_t endi = getEndCondIndex(pEnd, index, rows);
if (endi < 0 || pTs[endi] >= rightWinKey.win.skey) {
setEventWindowInfo(pAggSup, &rightWinKey, pVal, pCurWin);
pCurWin->winInfo.isOutput = true;
pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin);
goto _end;
}
}
@ -241,10 +248,6 @@ static int32_t compactEventWindow(SOperatorInfo* pOperator, SEventWindowInfo* pC
return winNum;
}
static bool isWindowIncomplete(SEventWindowInfo* pWinInfo) {
return !(pWinInfo->pWinFlag->startFlag && pWinInfo->pWinFlag->endFlag);
}
bool doDeleteEventWindow(SStreamAggSupporter* pAggSup, SSHashObj* pSeUpdated, SSessionKey* pKey) {
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, pKey);
removeSessionResult(pAggSup, pSeUpdated, pAggSup->pResultRows, pKey);
@ -322,6 +325,9 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
&curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL);
tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
doDeleteEventWindow(pAggSup, pSeUpdated, &curWin.winInfo.sessionWin);
if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator) && !isWindowIncomplete(&curWin)) {
saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
}
releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAPI->stateStore);
SSessionKey tmpSeInfo = {0};
getSessionHashKey(&curWin.winInfo.sessionWin, &tmpSeInfo);
@ -331,7 +337,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
pOperator, 0);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
break;
}
compactEventWindow(pOperator, &curWin, pInfo->pSeUpdated, pInfo->pSeDeleted, false);
saveSessionOutputBuf(pAggSup, &curWin.winInfo);
@ -344,6 +350,10 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
continue;
}
if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator)) {
saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
code = saveResult(curWin.winInfo, pSeUpdated);
}
@ -516,7 +526,8 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
deleteSessionWinState(&pInfo->streamAggSup, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted);
bool add = pInfo->destHasPrimaryKey && IS_NORMAL_EVENT_OP(pOperator);
deleteSessionWinState(&pInfo->streamAggSup, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted, pInfo->pPkDeleted, add);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
@ -556,6 +567,9 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
getMaxTsWins(pHisWins, pInfo->historyWins);
taosArrayDestroy(pHisWins);
}
if (pInfo->destHasPrimaryKey && IS_NORMAL_EVENT_OP(pOperator)) {
copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pSeDeleted);
}
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
@ -737,6 +751,8 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->reCkBlock = false;
pInfo->recvGetAll = false;
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimayKey;
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
pInfo, pTaskInfo);

View File

@ -29,7 +29,13 @@
#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
#define IS_MID_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL)
#define IS_NORMAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || (op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
#define IS_NORMAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || (op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
#define IS_NORMAL_STATE_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE)
#define DEAULT_DELETE_MARK INT64_MAX
#define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState"
#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState"
@ -426,6 +432,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
tSimpleHashCleanup(pInfo->pDeletedMap);
blockDataDestroy(pInfo->pCheckpointRes);
@ -520,9 +527,7 @@ int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResu
char* value = NULL;
int32_t size = pAggSup->resultRowSize;
if (pStore->streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = pStore->streamStateAddIfNotExist(pState, &key, (void**)&value, &size);
*pResult = (SRowBuffPos*)value;
SResultRow* res = (SResultRow*)((*pResult)->pRowBuff);
@ -530,7 +535,7 @@ int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResu
// set time window for current result
res->win = (*win);
setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
return code;
}
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup,
@ -606,6 +611,7 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins,
int32_t numOfCh, SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
TSKEY* tsData = (TSKEY*)pStartCol->pData;
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
@ -644,6 +650,9 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina
// add pull data request
if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) {
addPullWindow(pMap, &winRes, numOfCh);
if (pInfo->destHasPrimaryKey) {
tSimpleHashPut(pInfo->pDeletedMap,&winRes, sizeof(SWinKey), NULL, 0);
}
qDebug("===stream===prepare final retrive for delete %" PRId64 ", size:%d", winRes.ts, numOfCh);
}
}
@ -668,6 +677,9 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo, i
// add pull data request
if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
addPullWindow(pInfo->pPullDataMap, winKey, pInfo->numOfChild);
if (pInfo->destHasPrimaryKey) {
tSimpleHashPut(pInfo->pDeletedMap,winKey, sizeof(SWinKey), NULL, 0);
}
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, pInfo->numOfChild);
}
} else {
@ -798,7 +810,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
}
static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId,
SSHashObj* pUpdatedMap) {
SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
@ -866,6 +878,9 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
// add pull data request
if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
addPullWindow(pInfo->pPullDataMap, &winRes, pInfo->numOfChild);
if (pInfo->destHasPrimaryKey) {
tSimpleHashPut(pInfo->pDeletedMap,&winRes, sizeof(SWinKey), NULL, 0);
}
}
} else {
int32_t index = -1;
@ -893,9 +908,9 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput,
pSup->rowEntryInfoOffset, &pInfo->aggSup, &pInfo->stateStore);
pResult = (SResultRow*)pResPos->pRowBuff;
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
if (pResult == NULL) {
qError("%s set interval output buff error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
break;
}
if (IS_FINAL_INTERVAL_OP(pOperator)) {
forwardRows = 1;
@ -908,6 +923,11 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
.ts = pResult->win.skey,
.groupId = groupId,
};
if (pInfo->destHasPrimaryKey && code == TSDB_CODE_SUCCESS && IS_NORMAL_INTERVAL_OP(pOperator)) {
tSimpleHashPut(pDeletedMap,&key, sizeof(SWinKey), NULL, 0);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
saveWinResult(&key, pResPos, pUpdatedMap);
}
@ -1165,6 +1185,16 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
taosMemoryFree(buf);
}
static void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins) {
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pMap, pIte, &iter)) != NULL) {
void* pKey = tSimpleHashGetKey(pIte, NULL);
taosArrayPush(pWins, pKey);
}
tSimpleHashClear(pMap);
}
static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -1363,7 +1393,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap);
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
@ -1373,6 +1403,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (IS_FINAL_INTERVAL_OP(pOperator)) {
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
pInfo->pPullDataMap, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator);
if (pInfo->destHasPrimaryKey) {
copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins);
}
}
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
@ -1567,6 +1600,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
pInfo->clearState = false;
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);
pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey;
pOperator->operatorType = pPhyNode->type;
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
@ -1648,6 +1683,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
taosArrayDestroy(pInfo->historyWins);
blockDataDestroy(pInfo->pCheckpointRes);
tSimpleHashCleanup(pInfo->pPkDeleted);
taosMemoryFreeClear(param);
}
@ -1872,10 +1908,10 @@ void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins) {
}
int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) {
SSessionKey* pWin = taosArrayGet(pWins, i);
SResultWindowInfo* pWin = taosArrayGet(pWins, i);
if (!pWin) continue;
SSessionKey key = {0};
getSessionHashKey(pWin, &key);
getSessionHashKey(&pWin->sessionWin, &key);
tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
}
}
@ -2116,17 +2152,20 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
pOperator, winDelta);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
qError("%s do stream session aggregate impl error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
break;
}
compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted, addGap);
saveSessionOutputBuf(pAggSup, &winInfo);
if (pInfo->destHasPrimaryKey && winInfo.isOutput && IS_NORMAL_SESSION_OP(pOperator)) {
saveDeleteRes(pInfo->pPkDeleted, winInfo.sessionWin);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
code = saveResult(winInfo, pStUpdated);
if (code != TSDB_CODE_SUCCESS) {
qError("%s do stream session aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo),
tstrerror(code));
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
break;
}
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
@ -2652,6 +2691,20 @@ void resetUnCloseSessionWinInfo(SSHashObj* winMap) {
}
}
void copyDeleteSessionKey(SSHashObj* source, SSHashObj* dest) {
if (tSimpleHashGetSize(source) == 0) {
return;
}
void* pIte = NULL;
int32_t iter = 0;
size_t keyLen = 0;
while ((pIte = tSimpleHashIterate(source, pIte, &iter)) != NULL) {
SSessionKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
saveDeleteRes(dest, *pKey);
}
tSimpleHashClear(source);
}
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
@ -2712,6 +2765,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
rebuildSessionWindow(pOperator, pWins, pInfo->pStUpdated);
}
copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
if (pInfo->destHasPrimaryKey && IS_NORMAL_SESSION_OP(pOperator)) {
copyDeleteWindowInfo(pWins, pInfo->pPkDeleted);
}
taosArrayDestroy(pWins);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
@ -2767,6 +2823,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pInfo->isHistoryOp) {
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
}
if (pInfo->destHasPrimaryKey && IS_NORMAL_SESSION_OP(pOperator)) {
copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted);
}
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
@ -2992,6 +3051,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->clearState = false;
pInfo->recvGetAll = false;
pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey;
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
@ -3032,11 +3093,14 @@ static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
}
void deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
SSHashObj* pMapDelete) {
SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd) {
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
doDeleteTimeWindows(pAggSup, pBlock, pWins);
removeSessionResults(pAggSup, pMapUpdate, pWins);
copyDeleteWindowInfo(pWins, pMapDelete);
if (needAdd) {
copyDeleteWindowInfo(pWins, pPkDelete);
}
taosArrayDestroy(pWins);
}
@ -3099,7 +3163,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
// gap must be 0
deleteSessionWinState(pAggSup, pBlock, pInfo->pStUpdated, pInfo->pStDeleted);
deleteSessionWinState(pAggSup, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, NULL, false);
pInfo->clearState = true;
break;
} else if (pBlock->info.type == STREAM_GET_ALL) {
@ -3225,6 +3289,7 @@ void destroyStreamStateOperatorInfo(void* param) {
taosArrayDestroy(pInfo->historyWins);
blockDataDestroy(pInfo->pCheckpointRes);
tSimpleHashCleanup(pInfo->pPkDeleted);
taosMemoryFreeClear(param);
}
@ -3466,15 +3531,19 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
pOperator, 0);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
qError("%s do one window aggregate impl error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
break;
}
saveSessionOutputBuf(pAggSup, &curWin.winInfo);
if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_STATE_OP(pOperator)) {
saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
code = saveResult(curWin.winInfo, pSeUpdated);
if (code != TSDB_CODE_SUCCESS) {
qError("%s do stream state aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
break;
}
}
@ -3661,7 +3730,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
deleteSessionWinState(&pInfo->streamAggSup, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted);
bool add = pInfo->destHasPrimaryKey && IS_NORMAL_STATE_OP(pOperator);
deleteSessionWinState(&pInfo->streamAggSup, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted, pInfo->pPkDeleted, add);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
@ -3697,6 +3767,9 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if (pInfo->isHistoryOp) {
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
}
if (pInfo->destHasPrimaryKey && IS_NORMAL_STATE_OP(pOperator)) {
copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pSeDeleted);
}
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
@ -3878,6 +3951,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->recvGetAll = false;
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey;
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
@ -4018,7 +4093,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
#endif
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap);
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
}
@ -4026,6 +4101,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL,
pInfo->pUpdatedMap, pInfo->pDelWins, pOperator);
if (pInfo->destHasPrimaryKey && IS_NORMAL_INTERVAL_OP(pOperator)) {
copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins);
}
void* pIte = NULL;
int32_t iter = 0;
@ -4142,6 +4220,10 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->recvGetAll = false;
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);
pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey;
// for stream
void* buff = NULL;
int32_t len = 0;
@ -4220,8 +4302,9 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS
int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput,
pSup->rowEntryInfoOffset, &pInfo->aggSup, &pInfo->stateStore);
pResult = (SResultRow*)pResPos->pRowBuff;
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
if (pResult == NULL) {
qError("%s set interval output buff error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
break;
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cmdnodes.h"
#include "nodesUtil.h"
#include "plannodes.h"
#include "querynodes.h"
@ -124,6 +125,15 @@ static int32_t columnNodeCopy(const SColumnNode* pSrc, SColumnNode* pDst) {
return TSDB_CODE_SUCCESS;
}
static int32_t columnDefNodeCopy(const SColumnDefNode* pSrc, SColumnDefNode* pDst) {
COPY_CHAR_ARRAY_FIELD(colName);
COPY_OBJECT_FIELD(dataType, sizeof(SDataType));
COPY_CHAR_ARRAY_FIELD(comments);
COPY_SCALAR_FIELD(sma);
COPY_SCALAR_FIELD(is_pk);
return TSDB_CODE_SUCCESS;
}
static int32_t valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, exprNodeCopy);
COPY_CHAR_POINT_FIELD(literal);
@ -717,6 +727,7 @@ static int32_t physiWindowCopy(const SWindowPhysiNode* pSrc, SWindowPhysiNode* p
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(destHasPrimayKey);
return TSDB_CODE_SUCCESS;
}
@ -834,6 +845,9 @@ SNode* nodesCloneNode(const SNode* pNode) {
case QUERY_NODE_COLUMN:
code = columnNodeCopy((const SColumnNode*)pNode, (SColumnNode*)pDst);
break;
case QUERY_NODE_COLUMN_DEF:
code = columnDefNodeCopy((const SColumnDefNode*)pNode, (SColumnDefNode*)pDst);
break;
case QUERY_NODE_VALUE:
code = valueNodeCopy((const SValueNode*)pNode, (SValueNode*)pDst);
break;

View File

@ -2511,6 +2511,7 @@ static const char* jkWindowPhysiPlanDeleteMark = "DeleteMark";
static const char* jkWindowPhysiPlanIgnoreExpired = "IgnoreExpired";
static const char* jkWindowPhysiPlanInputTsOrder = "InputTsOrder";
static const char* jkWindowPhysiPlanMergeDataBlock = "MergeDataBlock";
static const char* jkWindowPhysiPlanDestHasPrimaryKey = "DestHasPrimaryKey";
static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
const SWindowPhysiNode* pNode = (const SWindowPhysiNode*)pObj;
@ -2543,6 +2544,9 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkWindowPhysiPlanMergeDataBlock, pNode->mergeDataBlock);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanDestHasPrimaryKey, pNode->destHasPrimayKey);
}
return code;
}
@ -2578,6 +2582,9 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkWindowPhysiPlanMergeDataBlock, &pNode->mergeDataBlock);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkWindowPhysiPlanDestHasPrimaryKey, &pNode->destHasPrimayKey);
}
return code;
}

View File

@ -2907,7 +2907,8 @@ enum {
PHY_WINDOW_CODE_IG_EXPIRED,
PHY_WINDOW_CODE_INPUT_TS_ORDER,
PHY_WINDOW_CODE_OUTPUT_TS_ORDER,
PHY_WINDOW_CODE_MERGE_DATA_BLOCK
PHY_WINDOW_CODE_MERGE_DATA_BLOCK,
PHY_WINDOW_CODE_DEST_HAS_PRIMARY_KEY,
};
static int32_t physiWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -2941,6 +2942,9 @@ static int32_t physiWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_WINDOW_CODE_MERGE_DATA_BLOCK, pNode->mergeDataBlock);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_WINDOW_CODE_DEST_HAS_PRIMARY_KEY, pNode->destHasPrimayKey);
}
return code;
}
@ -2982,6 +2986,9 @@ static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_WINDOW_CODE_MERGE_DATA_BLOCK:
code = tlvDecodeBool(pTlv, &pNode->mergeDataBlock);
break;
case PHY_WINDOW_CODE_DEST_HAS_PRIMARY_KEY:
code = tlvDecodeI8(pTlv, &pNode->destHasPrimayKey);
break;
default:
break;
}

View File

@ -680,13 +680,23 @@ cmd ::= RESUME STREAM exists_opt(A) ignore_opt(C) stream_name(B).
%type col_list_opt { SNodeList* }
%destructor col_list_opt { nodesDestroyList($$); }
col_list_opt(A) ::= . { A = NULL; }
col_list_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
col_list_opt(A) ::= NK_LP column_stream_def_list(B) NK_RP. { A = B; }
%type column_stream_def_list { SNodeList* }
%destructor column_stream_def_list { nodesDestroyList($$); }
column_stream_def_list(A) ::= column_stream_def(B). { A = createNodeList(pCxt, B); }
column_stream_def_list(A) ::= column_stream_def_list(B)
NK_COMMA column_stream_def(C). { A = addNodeToList(pCxt, B, C); }
column_stream_def(A) ::= column_name(B). { A = createColumnDefNode(pCxt, &B, createDataType(TSDB_DATA_TYPE_NULL), NULL, false); }
column_stream_def(A) ::= column_name(B) PRIMARY KEY. { A = createColumnDefNode(pCxt, &B, createDataType(TSDB_DATA_TYPE_NULL), NULL, true); }
//column_stream_def(A) ::= column_def(B). { A = B; }
%type tag_def_or_ref_opt { SNodeList* }
%destructor tag_def_or_ref_opt { nodesDestroyList($$); }
tag_def_or_ref_opt(A) ::= . { A = NULL; }
tag_def_or_ref_opt(A) ::= tags_def(B). { A = B; }
tag_def_or_ref_opt(A) ::= TAGS NK_LP col_name_list(B) NK_RP. { A = B; }
tag_def_or_ref_opt(A) ::= TAGS NK_LP column_stream_def_list(B) NK_RP. { A = B; }
stream_options(A) ::= . { A = createStreamOptions(pCxt); }
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); }

View File

@ -6060,22 +6060,26 @@ static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, in
return code;
}
static int32_t checkTableSchema(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
SHashObj* pHash = taosHashInit(LIST_LENGTH(pStmt->pTags) + LIST_LENGTH(pStmt->pCols),
static int32_t checkTableSchemaImpl(STranslateContext* pCxt, SNodeList* pTags, SNodeList* pCols, SNodeList* pRollupFuncs) {
SHashObj* pHash = taosHashInit(LIST_LENGTH(pTags) + LIST_LENGTH(pCols),
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == pHash) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = checkTableTagsSchema(pCxt, pHash, pStmt->pTags);
int32_t code = checkTableTagsSchema(pCxt, pHash, pTags);
if (TSDB_CODE_SUCCESS == code) {
code = checkTableColsSchema(pCxt, pHash, LIST_LENGTH(pStmt->pTags), pStmt->pCols, pStmt->pOptions->pRollupFuncs);
code = checkTableColsSchema(pCxt, pHash, LIST_LENGTH(pTags), pCols, pRollupFuncs);
}
taosHashCleanup(pHash);
return code;
}
static int32_t checkTableSchema(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
return checkTableSchemaImpl(pCxt, pStmt->pTags, pStmt->pCols, pStmt->pOptions->pRollupFuncs);
}
static int32_t getTableDelayOrWatermarkOption(STranslateContext* pCxt, const char* pName, int64_t minVal,
int64_t maxVal, SValueNode* pVal, int64_t* pMaxDelay) {
int32_t code = (DEAL_RES_ERROR == translateValue(pCxt, pVal) ? pCxt->errCode : TSDB_CODE_SUCCESS);
@ -7693,7 +7697,7 @@ static void getStreamQueryFirstProjectAliasName(SHashObj* pUserAliasSet, char* a
}
static int32_t addWstartTsToCreateStreamQueryImpl(STranslateContext* pCxt, SSelectStmt* pSelect,
SHashObj* pUserAliasSet) {
SHashObj* pUserAliasSet, SNodeList* pCols, SCMCreateStreamReq* pReq) {
SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 0);
if (NULL == pSelect->pWindow ||
(QUERY_NODE_FUNCTION == nodeType(pProj) && 0 == strcmp("_wstart", ((SFunctionNode*)pProj)->functionName))) {
@ -7709,18 +7713,27 @@ static int32_t addWstartTsToCreateStreamQueryImpl(STranslateContext* pCxt, SSele
if (TSDB_CODE_SUCCESS == code) {
code = nodesListPushFront(pSelect->pProjectionList, (SNode*)pFunc);
}
if (TSDB_CODE_SUCCESS == code && STREAM_CREATE_STABLE_TRUE == pReq->createStb) {
SColumnDefNode* pColDef = (SColumnDefNode*)nodesMakeNode(QUERY_NODE_COLUMN_DEF);
strcpy(pColDef->colName, pFunc->node.aliasName);
pColDef->dataType = pFunc->node.resType;
pColDef->sma = true;
pColDef->is_pk = false;
code = nodesListPushFront(pCols, (SNode*)pColDef);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode*)pFunc);
}
return code;
}
static int32_t addWstartTsToCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt) {
static int32_t addWstartTsToCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt, SNodeList* pCols, SCMCreateStreamReq* pReq) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt;
SHashObj* pUserAliasSet = NULL;
int32_t code = checkProjectAlias(pCxt, pSelect->pProjectionList, &pUserAliasSet);
if (TSDB_CODE_SUCCESS == code) {
code = addWstartTsToCreateStreamQueryImpl(pCxt, pSelect, pUserAliasSet);
code = addWstartTsToCreateStreamQueryImpl(pCxt, pSelect, pUserAliasSet, pCols, pReq);
}
taosHashCleanup(pUserAliasSet);
return code;
@ -7838,6 +7851,44 @@ static int32_t addNullTagsToCreateStreamQuery(STranslateContext* pCxt, STableMet
return addNullTagsForExistTable(pCxt, pMeta, (SSelectStmt*)pStmt->pQuery);
}
static int32_t addColDefNodeByProj(SNodeList** ppCols, SNode* pProject, int8_t flags) {
SExprNode* pExpr = (SExprNode*)pProject;
SColumnDefNode* pColDef = (SColumnDefNode*)nodesMakeNode(QUERY_NODE_COLUMN_DEF);
strcpy(pColDef->colName, pExpr->userAlias);
pColDef->dataType = pExpr->resType;
pColDef->sma = flags & COL_SMA_ON;
pColDef->is_pk = flags & COL_IS_KEY;
return nodesListMakeAppend(ppCols, (SNode*)pColDef);
}
static int32_t addColsToCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
if ( STREAM_CREATE_STABLE_FALSE == pReq->createStb) {
return TSDB_CODE_SUCCESS;
}
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
SNode* pProject = NULL;
SNode* pNode = NULL;
if (0 != LIST_LENGTH(pStmt->pCols)) {
if (LIST_LENGTH(pStmt->pCols) != LIST_LENGTH(pSelect->pProjectionList)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
}
FORBOTH(pNode, pStmt->pCols, pProject, pSelect->pProjectionList) {
SExprNode* pExpr = (SExprNode*)pProject;
SColumnDefNode* pColDef = (SColumnDefNode*)pNode;
pColDef->dataType = pExpr->resType;
}
return TSDB_CODE_SUCCESS;
}
FOREACH(pProject, pSelect->pProjectionList) {
int32_t code = addColDefNodeByProj(&pStmt->pCols, pProject, 0);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta,
SCreateStreamStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
@ -7944,7 +7995,8 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
return TSDB_CODE_SUCCESS;
}
static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STableMeta* pMeta, SNodeList* pProjections) {
static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STableMeta* pMeta, SNodeList* pProjections,
SNodeList** ppCols) {
if (getNumOfColumns(pMeta) != LIST_LENGTH(pProjections)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns");
}
@ -7963,6 +8015,15 @@ static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STable
}
REPLACE_NODE(pFunc);
}
SColumnDefNode* pColDef = (SColumnDefNode*)nodesMakeNode(QUERY_NODE_COLUMN_DEF);
strcpy(pColDef->colName, pSchema->name);
pColDef->dataType = dt;
pColDef->sma = pSchema->flags & COL_SMA_ON;
pColDef->is_pk = pSchema->flags & COL_IS_KEY;
int32_t code = nodesListMakeAppend(ppCols, (SNode*)pColDef);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
return TSDB_CODE_SUCCESS;
@ -7970,6 +8031,7 @@ static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STable
typedef struct SProjColPos {
int32_t colId;
int8_t flags;
SNode* pProj;
} SProjColPos;
@ -7993,10 +8055,11 @@ static int32_t addProjToProjColPos(STranslateContext* pCxt, const SSchema* pSche
if (!dataTypeEqual(&dt, &((SExprNode*)pNewProj)->resType)) {
SNode* pFunc = NULL;
code = createCastFunc(pCxt, pNewProj, dt, &pFunc);
strcpy(((SExprNode*)pFunc)->userAlias, ((SExprNode*)pNewProj)->userAlias);
pNewProj = pFunc;
}
if (TSDB_CODE_SUCCESS == code) {
SProjColPos pos = {.colId = pSchema->colId, .pProj = pNewProj};
SProjColPos pos = {.colId = pSchema->colId, .pProj = pNewProj, .flags = pSchema->flags};
code = (NULL == taosArrayPush(pProjColPos, &pos) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);
}
if (TSDB_CODE_SUCCESS != code) {
@ -8028,13 +8091,13 @@ static int32_t setFillNullCols(SArray* pProjColPos, const STableMeta* pMeta, SCM
return TSDB_CODE_SUCCESS;
}
static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList* pCols, const STableMeta* pMeta,
static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList** ppCols, const STableMeta* pMeta,
SNodeList** pProjections, SCMCreateStreamReq* pReq) {
if (LIST_LENGTH(pCols) != LIST_LENGTH(*pProjections)) {
if (LIST_LENGTH((*ppCols)) != LIST_LENGTH(*pProjections)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns");
}
SArray* pProjColPos = taosArrayInit(LIST_LENGTH(pCols), sizeof(SProjColPos));
SArray* pProjColPos = taosArrayInit(LIST_LENGTH((*ppCols)), sizeof(SProjColPos));
if (NULL == pProjColPos) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -8043,10 +8106,10 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList* pCol
bool hasPrimaryKey = false;
SNode* pCol = NULL;
SNode* pProj = NULL;
FORBOTH(pCol, pCols, pProj, *pProjections) {
const SSchema* pSchema = getNormalColSchema(pMeta, ((SColumnNode*)pCol)->colName);
FORBOTH(pCol, (*ppCols), pProj, *pProjections) {
const SSchema* pSchema = getNormalColSchema(pMeta, ((SColumnDefNode*)pCol)->colName);
if (NULL == pSchema) {
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnNode*)pCol)->colName);
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnDefNode*)pCol)->colName);
}
if (TSDB_CODE_SUCCESS == code) {
code = addProjToProjColPos(pCxt, pSchema, pProj, pProjColPos);
@ -8065,31 +8128,37 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList* pCol
}
SNodeList* pNewProjections = NULL;
SNodeList* pNewCols = NULL;
if (TSDB_CODE_SUCCESS == code) {
taosArraySort(pProjColPos, projColPosCompar);
int32_t num = taosArrayGetSize(pProjColPos);
pNewProjections = nodesMakeList();
pNewCols = nodesMakeList();
if (NULL == pNewProjections) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < num; ++i) {
SProjColPos* pPos = taosArrayGet(pProjColPos, i);
code = nodesListStrictAppend(pNewProjections, pPos->pProj);
addColDefNodeByProj(&pNewCols, pPos->pProj, pPos->flags);
pPos->pProj = NULL;
}
}
if (TSDB_CODE_SUCCESS == code && pMeta->tableInfo.numOfColumns > LIST_LENGTH(pCols)) {
if (TSDB_CODE_SUCCESS == code && pMeta->tableInfo.numOfColumns > LIST_LENGTH((*ppCols)) ) {
code = setFillNullCols(pProjColPos, pMeta, pReq);
}
if (TSDB_CODE_SUCCESS == code) {
taosArrayDestroy(pProjColPos);
nodesDestroyList(*pProjections);
nodesDestroyList(*ppCols);
*pProjections = pNewProjections;
*ppCols = pNewCols;
} else {
taosArrayDestroyEx(pProjColPos, projColPosDelete);
nodesDestroyList(pNewProjections);
nodesDestroyList(pNewCols);
}
return code;
@ -8099,9 +8168,9 @@ static int32_t adjustProjectionsForExistTable(STranslateContext* pCxt, SCreateSt
const STableMeta* pMeta, SCMCreateStreamReq* pReq) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (NULL == pStmt->pCols) {
return adjustDataTypeOfProjections(pCxt, pMeta, pSelect->pProjectionList);
return adjustDataTypeOfProjections(pCxt, pMeta, pSelect->pProjectionList, &pStmt->pCols);
}
return adjustOrderOfProjections(pCxt, pStmt->pCols, pMeta, &pSelect->pProjectionList, pReq);
return adjustOrderOfProjections(pCxt, &pStmt->pCols, pMeta, &pSelect->pProjectionList, pReq);
}
static bool isGroupIdTagStream(const STableMeta* pMeta, SNodeList* pTags) {
@ -8151,9 +8220,9 @@ static int32_t adjustOrderOfTags(STranslateContext* pCxt, SNodeList* pTags, cons
SNode* pTag = NULL;
SNode* pTagExpr = NULL;
FORBOTH(pTag, pTags, pTagExpr, *pTagExprs) {
const SSchema* pSchema = getTagSchema(pMeta, ((SColumnNode*)pTag)->colName);
const SSchema* pSchema = getTagSchema(pMeta, ((SColumnDefNode*)pTag)->colName);
if (NULL == pSchema) {
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAG_NAME, ((SColumnNode*)pTag)->colName);
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAG_NAME, ((SColumnDefNode*)pTag)->colName);
}
if (TSDB_CODE_SUCCESS == code) {
code = addProjToProjColPos(pCxt, pSchema, pTagExpr, pTagPos);
@ -8250,21 +8319,23 @@ static bool isTagDef(SNodeList* pTags) {
if (NULL == pTags) {
return false;
}
return QUERY_NODE_COLUMN_DEF == nodeType(nodesListGetNode(pTags, 0));
SColumnDefNode* pColDef = (SColumnDefNode*)nodesListGetNode(pTags, 0);
return TSDB_DATA_TYPE_NULL != pColDef->dataType.type;
}
static bool isTagBound(SNodeList* pTags) {
if (NULL == pTags) {
return false;
}
return QUERY_NODE_COLUMN == nodeType(nodesListGetNode(pTags, 0));
SColumnDefNode* pColDef = (SColumnDefNode*)nodesListGetNode(pTags, 0);
return TSDB_DATA_TYPE_NULL == pColDef->dataType.type;
}
static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq,
STableMeta** pMeta) {
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, pMeta);
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
if (NULL != pStmt->pCols || isTagBound(pStmt->pTags)) {
if (isTagBound(pStmt->pTags)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
}
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
@ -8405,6 +8476,19 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
return nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2);
}
static int32_t checkStreamDestTableSchema(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
SNode* pProj = nodesListGetNode(pStmt->pCols, 0);
SColumnDefNode* pCol = (SColumnDefNode*)pProj;
if (pCol && pCol->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
pCol->dataType = (SDataType){.type = TSDB_DATA_TYPE_TIMESTAMP,
.precision = 0,
.scale = 0,
.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
}
return checkTableSchemaImpl(pCxt, pStmt->pTags, pStmt->pCols, NULL);
}
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pCxt->createStream = true;
STableMeta* pMeta = NULL;
@ -8416,7 +8500,10 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
code = translateQuery(pCxt, pStmt->pQuery);
}
if (TSDB_CODE_SUCCESS == code) {
code = addWstartTsToCreateStreamQuery(pCxt, pStmt->pQuery);
code = addColsToCreateStreamQuery(pCxt, pStmt, pReq);
}
if (TSDB_CODE_SUCCESS == code) {
code = addWstartTsToCreateStreamQuery(pCxt, pStmt->pQuery, pStmt->pCols, pReq);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkStreamQuery(pCxt, pStmt);
@ -8427,6 +8514,9 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
if (TSDB_CODE_SUCCESS == code) {
code = adjustTags(pCxt, pStmt, pMeta, pReq);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkStreamDestTableSchema(pCxt, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
@ -8495,6 +8585,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
}
columnDefNodeToField(pStmt->pCols, &pReq->pCols);
pReq->igUpdate = pStmt->pOptions->ignoreUpdate;
}

File diff suppressed because it is too large Load Diff

View File

@ -1573,6 +1573,9 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
pWindow->watermark = pWindowLogicNode->watermark;
pWindow->deleteMark = pWindowLogicNode->deleteMark;
pWindow->igExpired = pWindowLogicNode->igExpired;
if (pCxt->pPlanCxt->streamQuery) {
pWindow->destHasPrimayKey = pCxt->pPlanCxt->destHasPrimaryKey;
}
pWindow->mergeDataBlock = (GROUP_ACTION_KEEP == pWindowLogicNode->node.groupAction ? false : true);
pWindow->node.inputTsOrder = pWindowLogicNode->node.inputTsOrder;
pWindow->node.outputTsOrder = pWindowLogicNode->node.outputTsOrder;

View File

@ -278,11 +278,11 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
#ifdef USE_ROCKSDB
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
memcpy(buf + len - rowSize, value, vLen);
return code;
return TSDB_CODE_SUCCESS;
#else
return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
#endif

View File

@ -438,6 +438,7 @@ SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
}
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
int32_t code = TSDB_CODE_SUCCESS;
pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
if (pos) {
@ -445,17 +446,18 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
*pVal = *pos;
(*pos)->beUsed = true;
(*pos)->beFlushed = false;
return TSDB_CODE_SUCCESS;
return code;
}
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
ASSERT(pNewPos->pRowBuff);
memcpy(pNewPos->pKey, pKey, keyLen);
code = TSDB_CODE_FAILED;
TSKEY ts = pFileState->getTs(pKey);
if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
int32_t len = 0;
void* p = NULL;
int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &p, &len);
code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &p, &len);
qDebug("===stream===get %" PRId64 " from disc, res %d", ts, code);
if (code == TSDB_CODE_SUCCESS) {
memcpy(pNewPos->pRowBuff, p, len);
@ -468,7 +470,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
*pVLen = pFileState->rowSize;
*pVal = pNewPos;
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {

59
tests/script/auto.sh Executable file
View File

@ -0,0 +1,59 @@
./test.sh -f tsim/stream/streamPrimaryKey0.sim
./test.sh -f tsim/stream/streamPrimaryKey1.sim
./test.sh -f tsim/stream/streamPrimaryKey2.sim
./test.sh -f tsim/stream/basic0.sim
./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/basic3.sim
./test.sh -f tsim/stream/basic4.sim
./test.sh -f tsim/stream/checkpointInterval0.sim
./test.sh -f tsim/stream/checkpointInterval1.sim
./test.sh -f tsim/stream/checkpointSession0.sim
./test.sh -f tsim/stream/checkpointSession1.sim
./test.sh -f tsim/stream/checkpointState0.sim
./test.sh -f tsim/stream/checkStreamSTable1.sim
./test.sh -f tsim/stream/checkStreamSTable.sim
./test.sh -f tsim/stream/deleteInterval.sim
./test.sh -f tsim/stream/deleteSession.sim
./test.sh -f tsim/stream/deleteState.sim
./test.sh -f tsim/stream/distributeInterval0.sim
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
./test.sh -f tsim/stream/distributeSession0.sim
./test.sh -f tsim/stream/drop_stream.sim
./test.sh -f tsim/stream/fillHistoryBasic1.sim
./test.sh -f tsim/stream/fillHistoryBasic2.sim
./test.sh -f tsim/stream/fillHistoryBasic3.sim
./test.sh -f tsim/stream/fillHistoryTransform.sim
./test.sh -f tsim/stream/fillIntervalDelete0.sim
./test.sh -f tsim/stream/fillIntervalDelete1.sim
./test.sh -f tsim/stream/fillIntervalLinear.sim
./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
./test.sh -f tsim/stream/fillIntervalPrevNext1.sim
./test.sh -f tsim/stream/fillIntervalPrevNext.sim
./test.sh -f tsim/stream/fillIntervalRange.sim
./test.sh -f tsim/stream/fillIntervalValue.sim
./test.sh -f tsim/stream/ignoreCheckUpdate.sim
./test.sh -f tsim/stream/ignoreExpiredData.sim
./test.sh -f tsim/stream/partitionby1.sim
./test.sh -f tsim/stream/partitionbyColumnInterval.sim
./test.sh -f tsim/stream/partitionbyColumnSession.sim
./test.sh -f tsim/stream/partitionbyColumnState.sim
./test.sh -f tsim/stream/partitionby.sim
./test.sh -f tsim/stream/pauseAndResume.sim
./test.sh -f tsim/stream/schedSnode.sim
./test.sh -f tsim/stream/session0.sim
./test.sh -f tsim/stream/session1.sim
./test.sh -f tsim/stream/sliding.sim
./test.sh -f tsim/stream/state0.sim
./test.sh -f tsim/stream/state1.sim
./test.sh -f tsim/stream/streamPrimaryKey0.sim
./test.sh -f tsim/stream/streamPrimaryKey1.sim
./test.sh -f tsim/stream/streamPrimaryKey2.sim
./test.sh -f tsim/stream/triggerInterval0.sim
./test.sh -f tsim/stream/triggerSession0.sim
./test.sh -f tsim/stream/udTableAndTag0.sim
./test.sh -f tsim/stream/udTableAndTag1.sim
./test.sh -f tsim/stream/udTableAndTag2.sim
./test.sh -f tsim/stream/windowClose.sim

1
tests/script/grep.sh Executable file
View File

@ -0,0 +1 @@
sh auto.sh|grep "success\|fail\|asan error_num"

View File

@ -15,6 +15,8 @@ sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from st session(ts, 10s);
sleep 1000
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t2 values(1648791213001,2,2,3,1.1);

View File

@ -0,0 +1,245 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 135
system sh/cfg.sh -n dnode1 -c streamBufferSize -v 10
system sh/exec.sh -n dnode1 -s start
sleep 500
sql connect
print step1=============
sql create database test vgroups 4;
sql use test;
sql create table st(ts timestamp, a int, b int , c int, d double) tags(ta varchar(100),tb int,tc int);
sql create table t1 using st tags("aa", 1, 2);
sql create table streamt0(ts timestamp, a int primary key, b bigint ) tags(ta varchar(100),tb int,tc int);
sql create table streamt2(ts timestamp, a int primary key, b bigint ) tags(ta varchar(100),tb int,tc int);
sql create stream streams0 trigger at_once ignore expired 0 ignore update 0 into streamt0 as select _wstart, count(*) c1, max(b) from t1 interval(1s);
sql create stream streams2 trigger at_once ignore expired 0 ignore update 0 into streamt2 tags(ta) as select _wstart, count(*) c1, max(b) from st partition by tbname ta interval(1s);
sleep 1000
sql insert into t1 values(1648791210000,1,2,3,1.0);
sleep 500
sql insert into t1 values(1648791210001,4,2,3,4.1);
sql insert into t1 values(1648791220000,2,2,3,1.1);
sql insert into t1 values(1648791230000,3,2,3,2.1);
sql insert into t1 values(1648791240000,4,2,3,3.1);
sql insert into t1 values(1648791250000,4,2,3,3.1);
$loop_count = 0
loop0:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt0
sql select * from streamt0;
print $data00 $data01 $data01
print $data10 $data11 $data11
print $data20 $data21 $data21
print $data30 $data31 $data31
print $data40 $data41 $data41
print $data50 $data51 $data51
print $data60 $data61 $data61
print $data70 $data71 $data71
if $rows != 5 then
print =====rows=$rows
goto loop0
endi
if $data01 != 2 then
print =====data01=$data01
goto loop0
endi
$loop_count = 0
loop2:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt2
sql select * from streamt2;
print $data00 $data01 $data01
print $data10 $data11 $data11
print $data20 $data21 $data21
print $data30 $data31 $data31
print $data40 $data41 $data41
print $data50 $data51 $data51
print $data60 $data61 $data61
print $data70 $data71 $data71
if $rows != 5 then
print =====rows=$rows
goto loop2
endi
if $data01 != 2 then
print =====data01=$data01
goto loop2
endi
print step2=============
sql create database test1 vgroups 4;
sql use test1;
sql create table st(ts timestamp, a int, b int , c int, d double) tags(ta varchar(100),tb int,tc int);
sql create table t1 using st tags("aa", 1, 2);
sql create table streamt3(ts timestamp, a int primary key, b bigint ) tags(ta varchar(100),tb int,tc int);
sql create table streamt5(ts timestamp, a int primary key, b bigint ) tags(ta varchar(100),tb int,tc int);
sql create stream streams3 trigger at_once ignore expired 0 ignore update 0 into streamt3 as select _wstart, count(*) c1, max(b) from t1 session(ts,1s);
sql create stream streams5 trigger at_once ignore expired 0 ignore update 0 into streamt5 tags(ta) as select _wstart, count(*) c1, max(b) from st partition by tbname ta session(ts,1s);
sleep 1000
sql insert into t1 values(1648791210000,1,2,3,1.0);
sleep 500
sql insert into t1 values(1648791210001,4,2,3,4.1);
sql insert into t1 values(1648791220000,2,2,3,1.1);
sql insert into t1 values(1648791230000,3,2,3,2.1);
sql insert into t1 values(1648791240000,4,2,3,3.1);
sql insert into t1 values(1648791250000,4,2,3,3.1);
$loop_count = 0
loop3:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3
sql select * from streamt3;
print $data00 $data01 $data01
print $data10 $data11 $data11
print $data20 $data21 $data21
print $data30 $data31 $data31
print $data40 $data41 $data41
print $data50 $data51 $data51
print $data60 $data61 $data61
print $data70 $data71 $data71
if $rows != 5 then
print =====rows=$rows
goto loop3
endi
if $data01 != 2 then
print =====data01=$data01
goto loop3
endi
$loop_count = 0
loop5:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt5
sql select * from streamt5;
print $data00 $data01 $data01
print $data10 $data11 $data11
print $data20 $data21 $data21
print $data30 $data31 $data31
print $data40 $data41 $data41
print $data50 $data51 $data51
print $data60 $data61 $data61
print $data70 $data71 $data71
if $rows != 5 then
print =====rows=$rows
goto loop5
endi
if $data01 != 2 then
print =====data01=$data01
goto loop5
endi
sql create database test2 vgroups 4;
sql use test2;
sql create table st(ts timestamp, a int, b int , c int, d double) tags(ta varchar(100),tb int,tc int);
sql create table t1 using st tags("aa", 1, 2);
sql create table streamt6(ts timestamp, a int primary key, b bigint ) tags(ta varchar(100),tb int,tc int);
sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 tags(ta) as select _wstart, count(*) c1, max(b) from st partition by tbname ta state_window(a);
sleep 1000
sql insert into t1 values(1648791210000,1,2,3,1.0);
sleep 500
sql insert into t1 values(1648791210001,1,2,3,4.1);
sql insert into t1 values(1648791220000,2,2,3,1.1);
sql insert into t1 values(1648791230000,3,2,3,2.1);
sql insert into t1 values(1648791240000,4,2,3,3.1);
sql insert into t1 values(1648791250000,5,2,3,3.1);
$loop_count = 0
loop6:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt6
sql select * from streamt6;
print $data00 $data01 $data01
print $data10 $data11 $data11
print $data20 $data21 $data21
print $data30 $data31 $data31
print $data40 $data41 $data41
print $data50 $data51 $data51
print $data60 $data61 $data61
print $data70 $data71 $data71
if $rows != 5 then
print =====rows=$rows
goto loop6
endi
if $data01 != 2 then
print =====data01=$data01
goto loop6
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,123 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 135
system sh/cfg.sh -n dnode1 -c streamBufferSize -v 10
system sh/exec.sh -n dnode1 -s start
sleep 500
sql connect
print step1=============
sql create database test vgroups 4;
sql use test;
sql create table st(ts timestamp, a int, b int , c int, d double) tags(ta varchar(100),tb int,tc int);
sql create table t1 using st tags("aa", 1, 2);
sql create table streamt1(ts timestamp, a int primary key, b bigint ) tags(ta varchar(100),tb int,tc int);
sql create stream streams1 trigger at_once ignore expired 0 ignore update 0 into streamt1 as select _wstart, count(*) c1, max(b) from st interval(1s);
sleep 1000
sql insert into t1 values(1648791210000,1,2,3,1.0);
sleep 500
sql insert into t1 values(1648791210001,4,2,3,4.1);
sql insert into t1 values(1648791220000,2,2,3,1.1);
sql insert into t1 values(1648791230000,3,2,3,2.1);
sql insert into t1 values(1648791240000,4,2,3,3.1);
sql insert into t1 values(1648791250000,4,2,3,3.1);
$loop_count = 0
loop1:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt1
sql select * from streamt1;
print $data00 $data01 $data01
print $data10 $data11 $data11
print $data20 $data21 $data21
print $data30 $data31 $data31
print $data40 $data41 $data41
print $data50 $data51 $data51
print $data60 $data61 $data61
print $data70 $data71 $data71
if $rows != 5 then
print =====rows=$rows
goto loop1
endi
if $data01 != 2 then
print =====data00=$data00
goto loop1
endi
print step2=============
sql create database test1 vgroups 4;
sql use test1;
sql create table st(ts timestamp, a int, b int , c int, d double) tags(ta varchar(100),tb int,tc int);
sql create table t1 using st tags("aa", 1, 2);
sql create table streamt3(ts timestamp, a int primary key, b bigint ) tags(ta varchar(100),tb int,tc int);
sql create stream streams3 trigger at_once ignore expired 0 ignore update 0 into streamt3 as select _wstart, count(*) c1, max(b) from st session(ts,1s);
sleep 1000
sql insert into t1 values(1648791210000,1,2,3,1.0);
sleep 500
sql insert into t1 values(1648791210001,4,2,3,4.1);
sql insert into t1 values(1648791220000,2,2,3,1.1);
sql insert into t1 values(1648791230000,3,2,3,2.1);
sql insert into t1 values(1648791240000,4,2,3,3.1);
sql insert into t1 values(1648791250000,4,2,3,3.1);
$loop_count = 0
loop3:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3
sql select * from streamt3;
print $data00 $data01 $data01
print $data10 $data11 $data11
print $data20 $data21 $data21
print $data30 $data31 $data31
print $data40 $data41 $data41
print $data50 $data51 $data51
print $data60 $data61 $data61
print $data70 $data71 $data71
if $rows != 5 then
print =====rows=$rows
goto loop3
endi
if $data01 != 2 then
print =====data00=$data00
goto loop3
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,141 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 135
system sh/cfg.sh -n dnode1 -c streamBufferSize -v 10
system sh/exec.sh -n dnode1 -s start
sleep 500
sql connect
print step1=============
sql create database test vgroups 4;
sql use test;
sql create table st(ts timestamp, a int, b int , c int, d double) tags(ta varchar(100),tb int,tc int);
sql create table t1 using st tags("aa", 1, 2);
sql create table streamt1(ts timestamp, a int primary key, b bigint ) tags(ta varchar(100),tb int,tc int);
sql create stream streams1 trigger at_once ignore expired 0 ignore update 0 into streamt1 tags(ta) as select _wstart, count(*) c1, max(b) from st partition by tbname ta EVENT_WINDOW start with a = 1 end with a = 3;
sleep 1000
sql insert into t1 values(1648791210000,1,2,3,1.0);
sql insert into t1 values(1648791210010,3,2,3,1.0);
sleep 500
sql insert into t1 values(1648791210001,2,2,3,1.0);
sql insert into t1 values(1648791220000,1,2,3,1.1);
sql insert into t1 values(1648791220001,3,2,3,1.1);
sql insert into t1 values(1648791230000,1,2,3,2.1);
sql insert into t1 values(1648791230001,3,2,3,2.1);
sql insert into t1 values(1648791240000,1,2,3,3.1);
sql insert into t1 values(1648791240001,3,2,3,3.1);
sql insert into t1 values(1648791250000,1,2,3,3.1);
sql insert into t1 values(1648791250001,3,2,3,3.1);
$loop_count = 0
loop1:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt1
sql select * from streamt1;
print $data00 $data01 $data01
print $data10 $data11 $data11
print $data20 $data21 $data21
print $data30 $data31 $data31
print $data40 $data41 $data41
print $data50 $data51 $data51
print $data60 $data61 $data61
print $data70 $data71 $data71
if $rows != 5 then
print =====rows=$rows
goto loop1
endi
if $data01 != 3 then
print =====data01=$data01
goto loop1
endi
print step2=============
sql create database test1 vgroups 4;
sql use test1;
sql create table st(ts timestamp, a int, b int , c int, d double) tags(ta varchar(100),tb int,tc int);
sql create table t1 using st tags("aa", 1, 2);
sql create table streamt3(ts timestamp, a int primary key, b bigint ) tags(ta varchar(100),tb int,tc int);
sql create stream streams3 trigger at_once ignore expired 1 ignore update 0 WATERMARK 1000s into streamt3 tags(ta) as select _wstart, count(*) c1, max(b) from st partition by tbname ta COUNT_WINDOW(2);
sleep 1000
sql insert into t1 values(1648791210000,1,2,3,1.0);
sleep 500
sql insert into t1 values(1648791210001,4,2,3,4.1);
sql insert into t1 values(1648791220000,2,2,3,1.1);
sql insert into t1 values(1648791220001,2,2,3,1.1);
sql insert into t1 values(1648791230000,3,2,3,2.1);
sql insert into t1 values(1648791230001,3,2,3,2.1);
sql insert into t1 values(1648791240000,4,2,3,3.1);
sql insert into t1 values(1648791240001,4,2,3,3.1);
sql insert into t1 values(1648791250000,4,2,3,3.1);
sql insert into t1 values(1648791250001,4,2,3,3.1);
$loop_count = 0
loop3:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3
sql select * from streamt3;
print $data00 $data01 $data01
print $data10 $data11 $data11
print $data20 $data21 $data21
print $data30 $data31 $data31
print $data40 $data41 $data41
print $data50 $data51 $data51
print $data60 $data61 $data61
print $data70 $data71 $data71
if $rows != 5 then
print =====rows=$rows
goto loop3
endi
if $data01 != 2 then
print =====data01=$data01
goto loop3
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,65 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
print ===== step1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print ===== step2
print ===== table name
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql_error create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1(a, b, c, d) as select _wstart, count(*) c1, max(a) from st interval(10s);
sql_error create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2(a, b) as select _wstart, count(*) c1, max(a) from st interval(10s);
sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3(a, b) as select count(*) c1, max(a) from st interval(10s);
sleep 1000
sql desc streamt3;
print $data00
print $data10
print $data20
print $data30
print $data40
if $rows != 4 then
return -1
endi
sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4(a, b, c) as select _wstart, count(*) c1, max(a) from st interval(10s);
sql create stream streams5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt5(a, b, c) as select _wstart, count(*) c1, max(a) from st partition by tbname interval(10s);
sql create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt6(a, b, c) tags(tbn varchar(60)) as select _wstart, count(*) c1, max(a) from st partition by tbname tbn interval(10s);
sql create stream streams7 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt7(a, b primary key, c) tags(tbn varchar(60)) as select _wstart, count(*) c1, max(a) from st partition by tbname tbn interval(10s);
sql_error create stream streams8 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt8(a, b, c primary key) as select _wstart, count(*) c1, max(a) from st interval(10s);
sql create stream streams9 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt9(a primary key, b) as select count(*) c1, max(a) from st interval(10s);
sql_error create stream streams10 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt10(a, b primary key, c) as select count(*) c1, max(a), max(b) from st interval(10s);
sql_error create stream streams11 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt11(a, b , a) as select count(*) c1, max(a), max(b) from st interval(10s);
sql_error create stream streams12 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt12(a, b , c) tags(c varchar(60)) as select count(*) c1, max(a), max(b) from st partition by tbname c interval(10s);
sql_error create stream streams13 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt13(a, b , c) tags(tc varchar(60)) as select count(*) c1, max(a) c1, max(b) from st partition by tbname tc interval(10s);
sql_error create stream streams14 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt14 tags(tc varchar(60)) as select count(*) tc, max(a) c1, max(b) from st partition by tbname tc interval(10s);
sql_error create stream streams15 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt15 tags(tc varchar(100) primary key) as select _wstart, count(*) c1, max(a) from st partition by tbname tc interval(10s);
print ======over
system sh/stop_dnodes.sh

View File

@ -339,6 +339,65 @@ if $data22 != t3 then
goto loop8
endi
print ===== step6
sql create database test5 vgroups 4;
sql use test5;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stable streamt5(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create stream streams5 trigger at_once ignore expired 0 ignore update 0 into streamt5(ts,b,a) as select _wstart, count(*), 1000 c1 from t1 interval(10s);
sleep 1000
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sql insert into t1 values(1648791243003,4,2,3,3.1);
$loop_count = 0
loop9:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt5;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
print $data30 $data31 $data32
print $data40 $data41 $data42
if $rows != 4 then
print =====rows=$rows
goto loop9
endi
if $data01 != 1000 then
print =====data01=$data01
goto loop9
endi
if $data02 != 1 then
print =====data02=$data02
goto loop9
endi
if $data31 != 1000 then
print =====data31=$data31
goto loop9
endi
if $data32 != 1 then
print =====data32=$data32
goto loop9
endi
print ======over
system sh/stop_dnodes.sh