feat(stream):user define tag
This commit is contained in:
parent
b9b8a7ed35
commit
d5ade95b31
|
@ -162,6 +162,7 @@ typedef enum EStreamType {
|
|||
STREAM_PULL_DATA,
|
||||
STREAM_PULL_OVER,
|
||||
STREAM_FILL_OVER,
|
||||
STREAM_CREATE_CHILD_TABLE,
|
||||
} EStreamType;
|
||||
|
||||
#pragma pack(push, 1)
|
||||
|
@ -205,8 +206,6 @@ typedef struct SDataBlockInfo {
|
|||
TSKEY watermark; // used for stream
|
||||
|
||||
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
|
||||
int32_t tagLen;
|
||||
void* pTag; // used for stream partition
|
||||
} SDataBlockInfo;
|
||||
|
||||
typedef struct SSDataBlock {
|
||||
|
@ -379,6 +378,11 @@ typedef struct SSortExecInfo {
|
|||
#define CALCULATE_END_TS_COLUMN_INDEX 5
|
||||
#define TABLE_NAME_COLUMN_INDEX 6
|
||||
|
||||
// stream create table block column
|
||||
#define UD_TABLE_NAME_COLUMN_INDEX 0
|
||||
#define UD_GROUPID_COLUMN_INDEX 1
|
||||
#define UD_TAG_COLUMN_INDEX 2
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1293,7 +1293,6 @@ void blockDataFreeRes(SSDataBlock* pBlock) {
|
|||
taosArrayDestroy(pBlock->pDataBlock);
|
||||
pBlock->pDataBlock = NULL;
|
||||
taosMemoryFreeClear(pBlock->pBlockAgg);
|
||||
taosMemoryFree(pBlock->info.pTag);
|
||||
memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
|
||||
}
|
||||
|
||||
|
@ -1961,10 +1960,10 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
|||
int32_t len = 0;
|
||||
len += snprintf(dumpBuf + len, size - len,
|
||||
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
|
||||
"|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "\n",
|
||||
"|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
|
||||
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
|
||||
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
|
||||
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey);
|
||||
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
|
|
|
@ -1331,9 +1331,9 @@ int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) {
|
|||
}
|
||||
n += tPutTagVal(p + n, (STagVal *)taosArrayGet(pArray, iTag), isJson);
|
||||
}
|
||||
#ifdef TD_DEBUG_PRINT_TAG
|
||||
// #ifdef TD_DEBUG_PRINT_TAG
|
||||
debugPrintSTag(*ppTag, __func__, __LINE__);
|
||||
#endif
|
||||
// #endif
|
||||
|
||||
return code;
|
||||
|
||||
|
|
|
@ -167,6 +167,10 @@ void tFreeStreamObj(SStreamObj *pStream) {
|
|||
taosArrayDestroy(pLevel);
|
||||
}
|
||||
taosArrayDestroy(pStream->tasks);
|
||||
// tagSchema.pSchema
|
||||
if (pStream->tagSchema.nCols > 0) {
|
||||
taosMemoryFree(pStream->tagSchema.pSchema);
|
||||
}
|
||||
}
|
||||
|
||||
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
|
||||
|
|
|
@ -465,7 +465,6 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
|
|||
SMCreateStbReq createReq = {0};
|
||||
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
createReq.numOfColumns = pStream->outputSchema.nCols;
|
||||
createReq.numOfTags = 1; // group id
|
||||
createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField));
|
||||
// build fields
|
||||
taosArraySetSize(createReq.pColumns, createReq.numOfColumns);
|
||||
|
@ -476,14 +475,29 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
|
|||
pField->type = pStream->outputSchema.pSchema[i].type;
|
||||
pField->bytes = pStream->outputSchema.pSchema[i].bytes;
|
||||
}
|
||||
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
|
||||
taosArraySetSize(createReq.pTags, 1);
|
||||
// build tags
|
||||
SField *pField = taosArrayGet(createReq.pTags, 0);
|
||||
strcpy(pField->name, "group_id");
|
||||
pField->type = TSDB_DATA_TYPE_UBIGINT;
|
||||
pField->flags = 0;
|
||||
pField->bytes = 8;
|
||||
|
||||
if (pStream->tagSchema.nCols == 0) {
|
||||
createReq.numOfTags = 1;
|
||||
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
|
||||
taosArraySetSize(createReq.pTags, createReq.numOfTags);
|
||||
// build tags
|
||||
SField *pField = taosArrayGet(createReq.pTags, 0);
|
||||
strcpy(pField->name, "group_id");
|
||||
pField->type = TSDB_DATA_TYPE_UBIGINT;
|
||||
pField->flags = 0;
|
||||
pField->bytes = 8;
|
||||
} else {
|
||||
createReq.numOfTags = pStream->tagSchema.nCols;
|
||||
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
|
||||
taosArraySetSize(createReq.pTags, createReq.numOfTags);
|
||||
for (int32_t i = 0; i < createReq.numOfTags; i++) {
|
||||
SField *pField = taosArrayGet(createReq.pTags, i);
|
||||
pField->bytes = pStream->tagSchema.pSchema[i].bytes;
|
||||
pField->flags = pStream->tagSchema.pSchema[i].flags;
|
||||
pField->type = pStream->tagSchema.pSchema[i].type;
|
||||
tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
|
||||
}
|
||||
}
|
||||
|
||||
if (mndCheckCreateStbReq(&createReq) != 0) {
|
||||
goto _OVER;
|
||||
|
|
|
@ -323,6 +323,70 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
|||
taosArrayDestroy(tagArray);
|
||||
}
|
||||
|
||||
static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, void** pBuf, int32_t* contLen) {
|
||||
int32_t ret = 0;
|
||||
SVCreateTbBatchReq reqs = {0};
|
||||
|
||||
reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
|
||||
if (NULL == reqs.pArray) {
|
||||
ret = -1;
|
||||
goto end;
|
||||
}
|
||||
taosArrayPush(reqs.pArray, req);
|
||||
reqs.nReqs = 1;
|
||||
|
||||
tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
|
||||
if (ret < 0) {
|
||||
ret = -1;
|
||||
goto end;
|
||||
}
|
||||
*contLen += sizeof(SMsgHead);
|
||||
*pBuf = rpcMallocCont(*contLen);
|
||||
if (NULL == *pBuf) {
|
||||
ret = -1;
|
||||
goto end;
|
||||
}
|
||||
((SMsgHead*)(*pBuf))->vgId = vgId;
|
||||
((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
|
||||
SEncoder coder = {0};
|
||||
tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead) );
|
||||
if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) {
|
||||
rpcFreeCont(*pBuf);
|
||||
*pBuf = NULL;
|
||||
*contLen = 0;
|
||||
tEncoderClear(&coder);
|
||||
ret = -1;
|
||||
goto end;
|
||||
}
|
||||
tEncoderClear(&coder);
|
||||
|
||||
end:
|
||||
taosArrayDestroy(reqs.pArray);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbReq* pReq) {
|
||||
void* buf = NULL;
|
||||
int32_t tlen = 0;
|
||||
encodeCreateChildTableForRPC(pReq, TD_VID(pVnode), &buf, &tlen);
|
||||
|
||||
SRpcMsg msg = {
|
||||
.msgType = TDMT_VND_CREATE_TABLE,
|
||||
.pCont = buf,
|
||||
.contLen = tlen,
|
||||
};
|
||||
|
||||
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
||||
tqError("failed to put into write-queue since %s", terrstr());
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_error:
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tqError("failed to encode submit req since %s", terrstr());
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||
const SArray* pBlocks = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
|
@ -339,12 +403,9 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
|||
SArray* tagArray = NULL;
|
||||
SArray* pVals = NULL;
|
||||
|
||||
if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < blockSz; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
SBatchDeleteReq deleteReq = {0};
|
||||
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
||||
|
@ -380,9 +441,98 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
|||
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
||||
tqDebug("failed to put delete req into write-queue since %s", terrstr());
|
||||
}
|
||||
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
for (int32_t rowId = 0; rowId < rows; rowId++) {
|
||||
SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq));
|
||||
if (!pCreateTbReq) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
// set const
|
||||
pCreateTbReq->flags = 0;
|
||||
pCreateTbReq->type = TSDB_CHILD_TABLE;
|
||||
pCreateTbReq->ctb.suid = suid;
|
||||
|
||||
// set super table name
|
||||
SName name = {0};
|
||||
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
|
||||
|
||||
// set tag content
|
||||
int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
if (size == 2) {
|
||||
tagArray = taosArrayInit(1, sizeof(STagVal));
|
||||
if (!tagArray) {
|
||||
goto _end;
|
||||
}
|
||||
STagVal tagVal = {
|
||||
.cid = pTSchema->numOfCols + 1,
|
||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||
.i64 = (int64_t)pDataBlock->info.id.groupId,
|
||||
};
|
||||
taosArrayPush(tagArray, &tagVal);
|
||||
|
||||
// set tag name
|
||||
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
|
||||
char tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
|
||||
taosArrayPush(tagName, tagNameStr);
|
||||
pCreateTbReq->ctb.tagName = tagName;
|
||||
} else {
|
||||
tagArray = taosArrayInit(size - 1, sizeof(STagVal));
|
||||
if (!tagArray) {
|
||||
goto _end;
|
||||
}
|
||||
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
|
||||
SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
|
||||
STagVal tagVal = {
|
||||
.cid = pTSchema->numOfCols + step,
|
||||
.type = pTagData->info.type,
|
||||
};
|
||||
void* pData = colDataGetData(pTagData, rowId);
|
||||
if (colDataIsNull_s(pTagData, rowId)) {
|
||||
tagVal.type = TSDB_DATA_TYPE_NULL;
|
||||
tagVal.pData = NULL;
|
||||
tagVal.nData = 0;
|
||||
} else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
|
||||
tagVal.nData = varDataLen(pData);
|
||||
tagVal.pData = varDataVal(pData);
|
||||
} else {
|
||||
memcpy(&tagVal.i64, pData, pTagData->info.bytes);
|
||||
}
|
||||
taosArrayPush(tagArray, &tagVal);
|
||||
}
|
||||
}
|
||||
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
|
||||
|
||||
STag* pTag = NULL;
|
||||
tTagNew(tagArray, 1, false, &pTag);
|
||||
tagArray = taosArrayDestroy(tagArray);
|
||||
if (pTag == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
|
||||
|
||||
// set table name
|
||||
SColumnInfoData* pTbColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
||||
if (colDataIsNull_s(pTbColInfo, rowId)) {
|
||||
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
|
||||
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
|
||||
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData);
|
||||
} else {
|
||||
void* pTbData = colDataGetData(pTbColInfo, rowId);
|
||||
pCreateTbReq->name = taosMemoryCalloc(1, varDataLen(pTbData) + 1);
|
||||
memcpy(pCreateTbReq->name, varDataVal(pTbData), varDataLen(pTbData));
|
||||
}
|
||||
|
||||
if (tqPutReqToQueue(pVnode, pCreateTbReq) != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||
taosMemoryFreeClear(pCreateTbReq);
|
||||
}
|
||||
} else {
|
||||
SSubmitTbData tbData = {0};
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
tqDebug("tq sink pipe2, convert block1 %d, rows: %d", i, rows);
|
||||
|
||||
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
|
||||
|
@ -394,6 +544,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
|||
tbData.sver = pTSchema->version;
|
||||
|
||||
char* ctbName = NULL;
|
||||
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), pDataBlock->info.parTbName);
|
||||
if (pDataBlock->info.parTbName[0]) {
|
||||
ctbName = strdup(pDataBlock->info.parTbName);
|
||||
} else {
|
||||
|
@ -423,7 +574,10 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
|||
pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
|
||||
|
||||
// set tag content
|
||||
taosArrayClear(tagArray);
|
||||
tagArray = taosArrayInit(1, sizeof(STagVal));
|
||||
if (!tagArray) {
|
||||
goto _end;
|
||||
}
|
||||
STagVal tagVal = {
|
||||
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
|
||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||
|
@ -434,6 +588,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
|||
|
||||
STag* pTag = NULL;
|
||||
tTagNew(tagArray, 1, false, &pTag);
|
||||
tagArray = taosArrayDestroy(tagArray);
|
||||
if (pTag == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
|
|
|
@ -474,9 +474,11 @@ typedef struct SStreamScanInfo {
|
|||
SNode* pTagIndexCond;
|
||||
|
||||
// recover
|
||||
int32_t blockRecoverContiCnt;
|
||||
int32_t blockRecoverTotCnt;
|
||||
int32_t blockRecoverContiCnt;
|
||||
int32_t blockRecoverTotCnt;
|
||||
SSDataBlock* pRecoverRes;
|
||||
|
||||
SSDataBlock* pCreateTbRes;
|
||||
} SStreamScanInfo;
|
||||
|
||||
typedef struct {
|
||||
|
@ -567,6 +569,8 @@ typedef struct SStreamIntervalOperatorInfo {
|
|||
SStreamState* pState;
|
||||
SWinKey delKey;
|
||||
uint64_t numOfDatapack;
|
||||
SArray* pUpdated;
|
||||
SHashObj* pUpdatedMap;
|
||||
} SStreamIntervalOperatorInfo;
|
||||
|
||||
typedef struct SDataGroupInfo {
|
||||
|
@ -613,6 +617,8 @@ typedef struct SStreamSessionAggOperatorInfo {
|
|||
SPhysiNode* pPhyNode; // create new child
|
||||
bool isFinal;
|
||||
bool ignoreExpiredData;
|
||||
SArray* pUpdated;
|
||||
SSHashObj* pStUpdated;
|
||||
} SStreamSessionAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamStateAggOperatorInfo {
|
||||
|
@ -628,6 +634,8 @@ typedef struct SStreamStateAggOperatorInfo {
|
|||
void* pDelIterator;
|
||||
SArray* pChildren; // cache for children's result;
|
||||
bool ignoreExpiredData;
|
||||
SArray* pUpdated;
|
||||
SSHashObj* pSeUpdated;
|
||||
} SStreamStateAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamPartitionOperatorInfo {
|
||||
|
@ -638,9 +646,11 @@ typedef struct SStreamPartitionOperatorInfo {
|
|||
SExprSupp tagCalSup;
|
||||
SHashObj* pPartitions;
|
||||
void* parIte;
|
||||
void* pTbNameIte;
|
||||
SSDataBlock* pInputDataBlock;
|
||||
int32_t tsColIndex;
|
||||
SSDataBlock* pDelRes;
|
||||
SSDataBlock* pCreateTbRes;
|
||||
} SStreamPartitionOperatorInfo;
|
||||
|
||||
typedef struct SStreamFillSupporter {
|
||||
|
@ -762,7 +772,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_
|
|||
|
||||
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream);
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
|
@ -838,7 +848,6 @@ bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pS
|
|||
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
|
||||
uint64_t* pGp, void* pTbName);
|
||||
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
|
||||
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
|
||||
|
||||
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
|
||||
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||
|
@ -857,6 +866,11 @@ void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t ord
|
|||
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
|
||||
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
|
||||
int32_t order, int64_t* pData);
|
||||
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId,
|
||||
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock);
|
||||
|
||||
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
|
||||
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -1424,6 +1424,18 @@ void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
|
|||
createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId);
|
||||
}
|
||||
|
||||
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
|
||||
*numOfExprs = LIST_LENGTH(pNodeList);
|
||||
SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
|
||||
|
||||
for (int32_t i = 0; i < (*numOfExprs); ++i) {
|
||||
SExprInfo* pExp = &pExprs[i];
|
||||
createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX);
|
||||
}
|
||||
|
||||
return pExprs;
|
||||
}
|
||||
|
||||
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) {
|
||||
int32_t numOfFuncs = LIST_LENGTH(pNodeList);
|
||||
int32_t numOfGroupKeys = 0;
|
||||
|
|
|
@ -2260,9 +2260,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
}
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
|
||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||
|
||||
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
|
||||
pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, isStream);
|
||||
pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
|
||||
pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
|
||||
|
|
|
@ -917,6 +917,7 @@ uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, S
|
|||
}
|
||||
|
||||
static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo->parIte != NULL; }
|
||||
static bool hasRemainTbName(SStreamPartitionOperatorInfo* pInfo) { return pInfo->pTbNameIte != NULL; }
|
||||
|
||||
static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
||||
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
|
@ -937,40 +938,13 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
|||
colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull);
|
||||
}
|
||||
pDest->info.rows++;
|
||||
if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) {
|
||||
void* tbname = NULL;
|
||||
if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) {
|
||||
memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
tdbFree(tbname);
|
||||
} else {
|
||||
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex);
|
||||
SSDataBlock* pResBlock = createDataBlock();
|
||||
pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN;
|
||||
SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
|
||||
taosArrayPush(pResBlock->pDataBlock, &data);
|
||||
blockDataEnsureCapacity(pResBlock, 1);
|
||||
projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL);
|
||||
ASSERT(pResBlock->info.rows == 1);
|
||||
ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
|
||||
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
|
||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);
|
||||
void* pData = colDataGetVarData(pCol, 0);
|
||||
// TODO check tbname validity
|
||||
if (pData != (void*)-1) {
|
||||
memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
|
||||
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
|
||||
memcpy(pDest->info.parTbName, varDataVal(pData), len);
|
||||
/*pDest->info.parTbName[len + 1] = 0;*/
|
||||
} else {
|
||||
pDest->info.parTbName[0] = 0;
|
||||
}
|
||||
if (pParInfo->groupId && pDest->info.parTbName[0]) {
|
||||
streamStatePutParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, pDest->info.parTbName);
|
||||
}
|
||||
/*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/
|
||||
blockDataDestroy(pTmpBlock);
|
||||
blockDataDestroy(pResBlock);
|
||||
}
|
||||
}
|
||||
pDest->info.parTbName[0] = 0;
|
||||
if (pInfo->tbnameCalSup.numOfExprs > 0) {
|
||||
void* tbname = NULL;
|
||||
if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) {
|
||||
memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
tdbFree(tbname);
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(pParInfo->rowIds);
|
||||
|
@ -986,6 +960,60 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
|||
return pDest;
|
||||
}
|
||||
|
||||
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId,
|
||||
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) {
|
||||
void* pValue = NULL;
|
||||
if (groupId != 0 && streamStateGetParName(pState, groupId, &pValue) != 0) {
|
||||
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
|
||||
if (pTableSup->numOfExprs > 0) {
|
||||
projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
|
||||
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
||||
void* pData = colDataGetVarData(pTbCol, pDestBlock->info.rows - 1);
|
||||
char* tbName = pSrcBlock->info.parTbName;
|
||||
memset(tbName, 0, TSDB_TABLE_NAME_LEN);
|
||||
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
|
||||
memcpy(tbName, varDataVal(pData), len);
|
||||
streamStatePutParName(pState, groupId, tbName);
|
||||
pDestBlock->info.rows--;
|
||||
} else {
|
||||
void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
||||
colDataAppendNULL(pTbNameCol, pDestBlock->info.rows);
|
||||
pSrcBlock->info.parTbName[0] = 0;
|
||||
}
|
||||
|
||||
if (pTagSup->numOfExprs > 0) {
|
||||
projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL);
|
||||
pDestBlock->info.rows--;
|
||||
}
|
||||
|
||||
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
|
||||
colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
|
||||
|
||||
pDestBlock->info.rows++;
|
||||
blockDataDestroy(pTmpBlock);
|
||||
}
|
||||
streamStateReleaseBuf(pState, NULL, pValue);
|
||||
}
|
||||
|
||||
static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) {
|
||||
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
if ( (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) || taosHashGetSize(pInfo->pPartitions) == 0) {
|
||||
return NULL;
|
||||
}
|
||||
blockDataCleanup(pInfo->pCreateTbRes);
|
||||
blockDataEnsureCapacity(pInfo->pCreateTbRes, taosHashGetSize(pInfo->pPartitions));
|
||||
SSDataBlock* pSrc = pInfo->pInputDataBlock;
|
||||
|
||||
while (pInfo->pTbNameIte != NULL) {
|
||||
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte;
|
||||
int32_t rowId = *(int32_t*) taosArrayGet(pParInfo->rowIds, 0);
|
||||
appendCreateTableRow(pOperator->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
|
||||
pParInfo->groupId, pSrc, rowId, pInfo->pCreateTbRes);
|
||||
pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, pInfo->pTbNameIte);
|
||||
}
|
||||
return pInfo->pCreateTbRes->info.rows > 0 ? pInfo->pCreateTbRes : NULL;
|
||||
}
|
||||
|
||||
static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||
pInfo->pInputDataBlock = pBlock;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
|
@ -1012,6 +1040,15 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
|
|||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pCtRes = NULL;
|
||||
|
||||
if (hasRemainTbName(pInfo)) {
|
||||
pCtRes = buildStreamCreateTableResult(pOperator);
|
||||
if (pCtRes != NULL) {
|
||||
return pCtRes;
|
||||
}
|
||||
}
|
||||
|
||||
if (hasRemainPartion(pInfo)) {
|
||||
return buildStreamPartitionResult(pOperator);
|
||||
}
|
||||
|
@ -1039,6 +1076,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
|
|||
return pInfo->pDelRes;
|
||||
} break;
|
||||
default:
|
||||
ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE, "invalid SSDataBlock type");
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
|
@ -1056,6 +1094,11 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
|
|||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
pInfo->parIte = taosHashIterate(pInfo->pPartitions, NULL);
|
||||
pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, NULL);
|
||||
pCtRes = buildStreamCreateTableResult(pOperator);
|
||||
if (pCtRes != NULL) {
|
||||
return pCtRes;
|
||||
}
|
||||
return buildStreamPartitionResult(pOperator);
|
||||
}
|
||||
|
||||
|
@ -1076,6 +1119,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
|
|||
cleanupExprSupp(&pInfo->tagCalSup);
|
||||
blockDataDestroy(pInfo->pDelRes);
|
||||
taosHashCleanup(pInfo->pPartitions);
|
||||
blockDataDestroy(pInfo->pCreateTbRes);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
|
@ -1091,6 +1135,46 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
|
|||
}
|
||||
}
|
||||
|
||||
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
|
||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
pBlock->info.hasVarCol = false;
|
||||
pBlock->info.id.groupId = 0;
|
||||
pBlock->info.rows = 0;
|
||||
pBlock->info.type = STREAM_CREATE_CHILD_TABLE;
|
||||
pBlock->info.watermark = INT64_MIN;
|
||||
|
||||
pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
|
||||
SColumnInfoData infoData = {0};
|
||||
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
|
||||
if (tbName->numOfExprs > 0) {
|
||||
infoData.info.bytes = tbName->pExprInfo->base.resSchema.bytes;
|
||||
} else {
|
||||
infoData.info.bytes = 1;
|
||||
}
|
||||
pBlock->info.rowSize += infoData.info.bytes;
|
||||
// sub table name
|
||||
taosArrayPush(pBlock->pDataBlock, &infoData);
|
||||
|
||||
SColumnInfoData gpIdData = {0};
|
||||
gpIdData.info.type = TSDB_DATA_TYPE_UBIGINT;
|
||||
gpIdData.info.bytes = 8;
|
||||
pBlock->info.rowSize += gpIdData.info.bytes;
|
||||
// group id
|
||||
taosArrayPush(pBlock->pDataBlock, &gpIdData);
|
||||
|
||||
for (int32_t i = 0; i < tag->numOfExprs; i++) {
|
||||
SColumnInfoData tagCol = {0};
|
||||
tagCol.info.type = tag->pExprInfo[i].base.resSchema.type;
|
||||
tagCol.info.bytes = tag->pExprInfo[i].base.resSchema.bytes;
|
||||
tagCol.info.precision = tag->pExprInfo[i].base.resSchema.precision;
|
||||
// tag info
|
||||
taosArrayPush(pBlock->pDataBlock, &tagCol);
|
||||
pBlock->info.rowSize += tagCol.info.bytes;
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo));
|
||||
|
@ -1110,6 +1194,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
|||
}
|
||||
}
|
||||
|
||||
pInfo->tbnameCalSup.numOfExprs = 0;
|
||||
if (pPartNode->pSubtable != NULL) {
|
||||
SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
|
||||
if (pSubTableExpr == NULL) {
|
||||
|
@ -1124,9 +1209,10 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
|||
}
|
||||
}
|
||||
|
||||
pInfo->tagCalSup.numOfExprs = 0;
|
||||
if (pPartNode->pTags != NULL) {
|
||||
int32_t numOfTags;
|
||||
SExprInfo* pTagExpr = createExprInfo(pPartNode->pTags, NULL, &numOfTags);
|
||||
SExprInfo* pTagExpr = createExpr(pPartNode->pTags, &numOfTags);
|
||||
if (pTagExpr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
|
@ -1137,6 +1223,12 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
|||
}
|
||||
}
|
||||
|
||||
if (pInfo->tbnameCalSup.numOfExprs != 0 || pInfo->tagCalSup.numOfExprs != 0) {
|
||||
pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
|
||||
} else {
|
||||
pInfo->pCreateTbRes = NULL;
|
||||
}
|
||||
|
||||
int32_t keyLen = 0;
|
||||
code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf,
|
||||
pInfo->partitionSup.pGroupCols);
|
||||
|
@ -1153,6 +1245,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
|||
blockDataEnsureCapacity(pInfo->binfo.pRes, 4096);
|
||||
|
||||
pInfo->parIte = NULL;
|
||||
pInfo->pTbNameIte = NULL;
|
||||
pInfo->pInputDataBlock = NULL;
|
||||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
|
|
|
@ -278,7 +278,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
|
||||
// for stream interval
|
||||
if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_DELETE_DATA) {
|
||||
pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
// printDataBlock1(pBlock, "project1");
|
||||
return pBlock;
|
||||
}
|
||||
|
|
|
@ -1361,54 +1361,16 @@ void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
|||
}
|
||||
#endif
|
||||
|
||||
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
||||
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
||||
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
|
||||
SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
|
||||
if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return;
|
||||
if (pBlock == NULL || pBlock->info.rows == 0) return;
|
||||
|
||||
void* tbname = NULL;
|
||||
if (streamStateGetParName(pState, pBlock->info.id.groupId, &tbname) == 0) {
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
tdbFree(tbname);
|
||||
return;
|
||||
} else {
|
||||
blockDataCleanup(pInfo->pCreateTbRes);
|
||||
if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
|
||||
pBlock->info.parTbName[0] = 0;
|
||||
}
|
||||
tdbFree(tbname);
|
||||
|
||||
SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
|
||||
ASSERT(pSrcBlock->info.rows == 1);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlock();
|
||||
pResBlock->info.rowSize = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
|
||||
SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
|
||||
taosArrayPush(pResBlock->pDataBlock, &data);
|
||||
blockDataEnsureCapacity(pResBlock, 1);
|
||||
|
||||
projectApplyFunctions(pTbNameCalSup->pExprInfo, pResBlock, pSrcBlock, pTbNameCalSup->pCtx, 1, NULL);
|
||||
ASSERT(pResBlock->info.rows == 1);
|
||||
ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
|
||||
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
|
||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);
|
||||
|
||||
void* pData = colDataGetData(pCol, 0);
|
||||
// TODO check tbname validation
|
||||
if (pData != (void*)-1 && pData != NULL) {
|
||||
memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
|
||||
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
|
||||
memcpy(pBlock->info.parTbName, varDataVal(pData), len);
|
||||
/*pBlock->info.parTbName[len + 1] = 0;*/
|
||||
} else {
|
||||
pBlock->info.parTbName[0] = 0;
|
||||
appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
|
||||
pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes);
|
||||
}
|
||||
|
||||
if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) {
|
||||
streamStatePutParName(pState, pBlock->info.id.groupId, pBlock->info.parTbName);
|
||||
}
|
||||
|
||||
blockDataDestroy(pSrcBlock);
|
||||
blockDataDestroy(pResBlock);
|
||||
}
|
||||
|
||||
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
|
||||
|
@ -1710,47 +1672,30 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
|||
}
|
||||
}
|
||||
|
||||
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey) {
|
||||
if (pInfo->pUpdateInfo) {
|
||||
checkUpdateData(pInfo, true, pInfo->pRes, true);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
|
||||
if (pInfo->pUpdateDataRes->info.rows > 0) {
|
||||
pInfo->updateResIndex = 0;
|
||||
if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
|
||||
} else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||
// return pInfo->pUpdateDataRes;
|
||||
} else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||
// NOTE: this operator does never check if current status is done or not
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStreamScanInfo* pInfo = pOperator->info;
|
||||
|
||||
qDebug("stream scan called");
|
||||
#if 0
|
||||
SStreamState* pState = pTaskInfo->streamInfo.pState;
|
||||
if (pState) {
|
||||
printf(">>>>>>>> stream write backend\n");
|
||||
SWinKey key = {
|
||||
.ts = 1,
|
||||
.groupId = 2,
|
||||
};
|
||||
char tmp[100] = "abcdefg1";
|
||||
if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
key.ts = 2;
|
||||
char tmp2[100] = "abcdefg2";
|
||||
if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
key.groupId = 5;
|
||||
key.ts = 1;
|
||||
char tmp3[100] = "abcdefg3";
|
||||
if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
char* val2 = NULL;
|
||||
int32_t sz;
|
||||
if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
printf("stream read %s %d\n", val2, sz);
|
||||
streamFreeVal(val2);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
|
||||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
|
||||
|
@ -1784,17 +1729,32 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
pInfo->blockRecoverContiCnt = 0;
|
||||
return NULL;
|
||||
}
|
||||
SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
|
||||
if (pBlock != NULL) {
|
||||
|
||||
switch (pInfo->scanMode) {
|
||||
case STREAM_SCAN_FROM_RES: {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
printDataBlock(pInfo->pRecoverRes, "scan recover");
|
||||
return pInfo->pRecoverRes;
|
||||
} break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
|
||||
if (pInfo->pRecoverRes != NULL) {
|
||||
pInfo->blockRecoverContiCnt++;
|
||||
calBlockTbName(pInfo, pBlock);
|
||||
calBlockTbName(pInfo, pInfo->pRecoverRes);
|
||||
if (pInfo->pUpdateInfo) {
|
||||
TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
|
||||
TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
}
|
||||
qDebug("stream recover scan get block, rows %d", pBlock->info.rows);
|
||||
printDataBlock(pBlock, "scan recover");
|
||||
return pBlock;
|
||||
if (pInfo->pCreateTbRes->info.rows > 0) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||
return pInfo->pCreateTbRes;
|
||||
}
|
||||
qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows);
|
||||
printDataBlock(pInfo->pRecoverRes, "scan recover");
|
||||
return pInfo->pRecoverRes;
|
||||
}
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
|
||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||
|
@ -1896,8 +1856,11 @@ FETCH_NEXT_BLOCK:
|
|||
qDebug("scan mode %d", pInfo->scanMode);
|
||||
switch (pInfo->scanMode) {
|
||||
case STREAM_SCAN_FROM_RES: {
|
||||
blockDataDestroy(pInfo->pUpdateRes);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey);
|
||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
pInfo->pRes->info.dataLoad = 1;
|
||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||
return pInfo->pRes;
|
||||
} break;
|
||||
case STREAM_SCAN_FROM_DELETE_DATA: {
|
||||
|
@ -1988,22 +1951,12 @@ FETCH_NEXT_BLOCK:
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pInfo->pUpdateInfo) {
|
||||
checkUpdateData(pInfo, true, pInfo->pRes, true);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
|
||||
if (pInfo->pUpdateDataRes->info.rows > 0) {
|
||||
pInfo->updateResIndex = 0;
|
||||
if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
|
||||
} else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||
return pInfo->pUpdateDataRes;
|
||||
} else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
|
||||
}
|
||||
}
|
||||
if (pInfo->pCreateTbRes->info.rows > 0) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||
return pInfo->pCreateTbRes;
|
||||
}
|
||||
|
||||
doCheckUpdate(pInfo, pBlockInfo->window.ekey);
|
||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
pInfo->pRes->info.dataLoad = 1;
|
||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||
|
@ -2017,7 +1970,6 @@ FETCH_NEXT_BLOCK:
|
|||
} else {
|
||||
continue;
|
||||
}
|
||||
/*blockDataCleanup(pInfo->pRes);*/
|
||||
}
|
||||
|
||||
// record the scan action.
|
||||
|
@ -2223,6 +2175,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
|
|||
}
|
||||
|
||||
cleanupExprSupp(&pStreamScan->tbnameCalSup);
|
||||
cleanupExprSupp(&pStreamScan->tagCalSup);
|
||||
|
||||
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
||||
blockDataDestroy(pStreamScan->pRes);
|
||||
|
@ -2230,6 +2183,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
|
|||
blockDataDestroy(pStreamScan->pPullDataRes);
|
||||
blockDataDestroy(pStreamScan->pDeleteDataRes);
|
||||
blockDataDestroy(pStreamScan->pUpdateDataRes);
|
||||
blockDataDestroy(pStreamScan->pCreateTbRes);
|
||||
taosArrayDestroy(pStreamScan->pBlockLists);
|
||||
taosMemoryFree(pStreamScan);
|
||||
}
|
||||
|
@ -2285,7 +2239,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
|
||||
if (pTableScanNode->pTags != NULL) {
|
||||
int32_t numOfTags;
|
||||
SExprInfo* pTagExpr = createExprInfo(pTableScanNode->pTags, NULL, &numOfTags);
|
||||
SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
|
||||
if (pTagExpr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
|
@ -2343,6 +2297,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pInfo->readHandle = *pHandle;
|
||||
pInfo->tableUid = pScanPhyNode->uid;
|
||||
pTaskInfo->streamInfo.snapshotVer = pHandle->version;
|
||||
pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
|
||||
blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
|
||||
|
||||
// set the extract column id to streamHandle
|
||||
tqReaderSetColIdList(pInfo->tqReader, pColIds);
|
||||
|
|
|
@ -1740,7 +1740,7 @@ void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
|
|||
}
|
||||
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo, bool isStream) {
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -2493,12 +2493,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
|
|||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
TSKEY maxTs = INT64_MIN;
|
||||
TSKEY minTs = INT64_MAX;
|
||||
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||
|
||||
|
@ -2554,9 +2550,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||
}
|
||||
if (!pInfo->pUpdatedMap) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
|
@ -2574,35 +2575,39 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
|
||||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pUpdatedMap);
|
||||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
|
||||
if (IS_FINAL_OP(pInfo)) {
|
||||
int32_t childIndex = getChildIndex(pBlock);
|
||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
||||
SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info;
|
||||
SExprSupp* pChildSup = &pChildOp->exprSupp;
|
||||
doDeleteWindows(pChildOp, &pChildInfo->interval, pBlock, NULL, NULL);
|
||||
rebuildIntervalWindow(pOperator, delWins, pUpdatedMap);
|
||||
rebuildIntervalWindow(pOperator, delWins, pInfo->pUpdatedMap);
|
||||
addRetriveWindow(delWins, pInfo);
|
||||
taosArrayAddAll(pInfo->pDelWins, delWins);
|
||||
taosArrayDestroy(delWins);
|
||||
continue;
|
||||
}
|
||||
removeResults(delWins, pUpdatedMap);
|
||||
removeResults(delWins, pInfo->pUpdatedMap);
|
||||
taosArrayAddAll(pInfo->pDelWins, delWins);
|
||||
taosArrayDestroy(delWins);
|
||||
break;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
|
||||
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
|
||||
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
|
||||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pUpdatedMap);
|
||||
if (taosArrayGetSize(pUpdated) > 0) {
|
||||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap);
|
||||
if (taosArrayGetSize(pInfo->pUpdated) > 0) {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) {
|
||||
processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
return pBlock;
|
||||
} else {
|
||||
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||
}
|
||||
|
||||
if (pInfo->scalarSupp.pExprInfo != NULL) {
|
||||
|
@ -2610,7 +2615,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
|
||||
}
|
||||
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
||||
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pUpdatedMap);
|
||||
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap);
|
||||
if (IS_FINAL_OP(pInfo)) {
|
||||
int32_t chIndex = getChildIndex(pBlock);
|
||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||
|
@ -2630,29 +2635,29 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
||||
doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.id.groupId, NULL);
|
||||
}
|
||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||
maxTs = TMAX(maxTs, pBlock->info.watermark);
|
||||
minTs = TMIN(minTs, pBlock->info.window.skey);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
|
||||
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
|
||||
}
|
||||
|
||||
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
|
||||
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
|
||||
if (IS_FINAL_OP(pInfo)) {
|
||||
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
|
||||
pInfo->pPullDataMap, pUpdatedMap, pInfo->pDelWins, pOperator);
|
||||
pInfo->pPullDataMap, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator);
|
||||
closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs);
|
||||
}
|
||||
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
|
||||
|
||||
void* pIte = NULL;
|
||||
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
|
||||
taosArrayPush(pUpdated, pIte);
|
||||
while ((pIte = taosHashIterate(pInfo->pUpdatedMap, pIte)) != NULL) {
|
||||
taosArrayPush(pInfo->pUpdated, pIte);
|
||||
}
|
||||
taosHashCleanup(pUpdatedMap);
|
||||
taosArraySort(pUpdated, resultrowComparAsc);
|
||||
taosHashCleanup(pInfo->pUpdatedMap);
|
||||
pInfo->pUpdatedMap = NULL;
|
||||
taosArraySort(pInfo->pUpdated, resultrowComparAsc);
|
||||
|
||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||
pInfo->pUpdated = NULL;
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
|
||||
|
@ -2790,6 +2795,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
pInfo->delKey.ts = INT64_MAX;
|
||||
pInfo->delKey.groupId = 0;
|
||||
pInfo->numOfDatapack = 0;
|
||||
pInfo->pUpdated = NULL;
|
||||
pInfo->pUpdatedMap = NULL;
|
||||
|
||||
pOperator->operatorType = pPhyNode->type;
|
||||
pOperator->blocking = true;
|
||||
|
@ -3419,7 +3426,6 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
TSKEY maxTs = INT64_MIN;
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -3439,10 +3445,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
SSHashObj* pStUpdated = tSimpleHashInit(64, hashFn);
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
SArray* pUpdated = taosArrayInit(16, sizeof(SSessionKey)); // SResKeyPos
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey));
|
||||
}
|
||||
if (!pInfo->pStUpdated) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
|
||||
}
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
|
@ -3455,21 +3465,25 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
||||
// gap must be 0
|
||||
doDeleteTimeWindows(pAggSup, pBlock, pWins);
|
||||
removeSessionResults(pStUpdated, pWins);
|
||||
removeSessionResults(pInfo->pStUpdated, pWins);
|
||||
if (IS_FINAL_OP(pInfo)) {
|
||||
int32_t childIndex = getChildIndex(pBlock);
|
||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
||||
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
|
||||
// gap must be 0
|
||||
doDeleteTimeWindows(&pChildInfo->streamAggSup, pBlock, NULL);
|
||||
rebuildSessionWindow(pOperator, pWins, pStUpdated);
|
||||
rebuildSessionWindow(pOperator, pWins, pInfo->pStUpdated);
|
||||
}
|
||||
copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
|
||||
taosArrayDestroy(pWins);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||
getAllSessionWindow(pAggSup->pResultRows, pStUpdated);
|
||||
getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
return pBlock;
|
||||
} else {
|
||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||
}
|
||||
|
||||
if (pInfo->scalarSupp.pExprInfo != NULL) {
|
||||
|
@ -3478,7 +3492,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
||||
doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo));
|
||||
doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo));
|
||||
if (IS_FINAL_OP(pInfo)) {
|
||||
int32_t chIndex = getChildIndex(pBlock);
|
||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||
|
@ -3495,20 +3509,20 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
||||
doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true);
|
||||
}
|
||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||
maxTs = TMAX(maxTs, pBlock->info.watermark);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
|
||||
}
|
||||
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
// restore the value
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pStUpdated);
|
||||
closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated);
|
||||
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
|
||||
copyUpdateResult(pStUpdated, pUpdated);
|
||||
removeSessionResults(pInfo->pStDeleted, pUpdated);
|
||||
tSimpleHashCleanup(pStUpdated);
|
||||
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||
copyUpdateResult(pInfo->pStUpdated, pInfo->pUpdated);
|
||||
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
|
||||
tSimpleHashCleanup(pInfo->pStUpdated);
|
||||
pInfo->pStUpdated = NULL;
|
||||
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||
pInfo->pUpdated = NULL;
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
#if 0
|
||||
|
@ -3593,6 +3607,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
pInfo->isFinal = false;
|
||||
pInfo->pPhyNode = pPhyNode;
|
||||
pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
|
||||
pInfo->pUpdated = NULL;
|
||||
pInfo->pStUpdated = NULL;
|
||||
|
||||
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||
|
@ -3653,10 +3669,14 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
SSHashObj* pStUpdated = tSimpleHashInit(64, hashFn);
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
SArray* pUpdated = taosArrayInit(16, sizeof(SSessionKey));
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey));
|
||||
}
|
||||
if (!pInfo->pStUpdated) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
|
||||
}
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
|
@ -3671,13 +3691,17 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
|||
// gap must be 0
|
||||
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
||||
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins);
|
||||
removeSessionResults(pStUpdated, pWins);
|
||||
removeSessionResults(pInfo->pStUpdated, pWins);
|
||||
copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
|
||||
taosArrayDestroy(pWins);
|
||||
break;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pStUpdated);
|
||||
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pStUpdated);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
return pBlock;
|
||||
} else {
|
||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||
}
|
||||
|
||||
if (pInfo->scalarSupp.pExprInfo != NULL) {
|
||||
|
@ -3686,18 +3710,20 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
||||
doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, NULL, false);
|
||||
doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, NULL, false);
|
||||
maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||
}
|
||||
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
pBInfo->pRes->info.watermark = pInfo->twAggSup.maxTs;
|
||||
|
||||
copyUpdateResult(pStUpdated, pUpdated);
|
||||
removeSessionResults(pInfo->pStDeleted, pUpdated);
|
||||
tSimpleHashCleanup(pStUpdated);
|
||||
copyUpdateResult(pInfo->pStUpdated, pInfo->pUpdated);
|
||||
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
|
||||
tSimpleHashCleanup(pInfo->pStUpdated);
|
||||
pInfo->pStUpdated = NULL;
|
||||
|
||||
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||
pInfo->pUpdated = NULL;
|
||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
#if 0
|
||||
|
@ -3957,7 +3983,6 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
int64_t maxTs = INT64_MIN;
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
|
@ -3975,10 +4000,14 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
SSHashObj* pSeUpdated = tSimpleHashInit(64, hashFn);
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
SArray* pUpdated = taosArrayInit(16, sizeof(SSessionKey));
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey));
|
||||
}
|
||||
if (!pInfo->pSeUpdated) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
|
||||
}
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
|
@ -3990,13 +4019,17 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
pBlock->info.type == STREAM_CLEAR) {
|
||||
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
||||
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins);
|
||||
removeSessionResults(pSeUpdated, pWins);
|
||||
removeSessionResults(pInfo->pSeUpdated, pWins);
|
||||
copyDeleteWindowInfo(pWins, pInfo->pSeDeleted);
|
||||
taosArrayDestroy(pWins);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pSeUpdated);
|
||||
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
return pBlock;
|
||||
} else {
|
||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||
}
|
||||
|
||||
if (pInfo->scalarSupp.pExprInfo != NULL) {
|
||||
|
@ -4005,19 +4038,20 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
||||
doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted);
|
||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||
doStreamStateAggImpl(pOperator, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||
}
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
// restore the value
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pSeUpdated);
|
||||
copyUpdateResult(pSeUpdated, pUpdated);
|
||||
removeSessionResults(pInfo->pSeDeleted, pUpdated);
|
||||
tSimpleHashCleanup(pSeUpdated);
|
||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated);
|
||||
copyUpdateResult(pInfo->pSeUpdated, pInfo->pUpdated);
|
||||
removeSessionResults(pInfo->pSeDeleted, pInfo->pUpdated);
|
||||
tSimpleHashCleanup(pInfo->pSeUpdated);
|
||||
pInfo->pSeUpdated = NULL;
|
||||
|
||||
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||
pInfo->pUpdated = NULL;
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
#if 0
|
||||
|
@ -4098,6 +4132,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
|
||||
pInfo->pChildren = NULL;
|
||||
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
|
||||
pInfo->pUpdated = NULL;
|
||||
pInfo->pSeUpdated = NULL;
|
||||
|
||||
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
|
@ -4711,8 +4747,6 @@ _error:
|
|||
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
int64_t maxTs = INT64_MIN;
|
||||
int64_t minTs = INT64_MAX;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -4740,9 +4774,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||
}
|
||||
if (!pInfo->pUpdatedMap) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
||||
}
|
||||
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
|
@ -4756,11 +4795,15 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap);
|
||||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
|
||||
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
return pBlock;
|
||||
} else {
|
||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||
}
|
||||
|
||||
if (pBlock->info.type == STREAM_NORMAL && pBlock->info.version != 0) {
|
||||
|
@ -4781,27 +4824,27 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type);
|
||||
}
|
||||
|
||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||
minTs = TMIN(minTs, pBlock->info.window.skey);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
|
||||
|
||||
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pUpdatedMap);
|
||||
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap);
|
||||
}
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
|
||||
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
|
||||
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
|
||||
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pInfo->pUpdatedMap,
|
||||
pInfo->pDelWins, pOperator);
|
||||
|
||||
void* pIte = NULL;
|
||||
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
|
||||
taosArrayPush(pUpdated, pIte);
|
||||
while ((pIte = taosHashIterate(pInfo->pUpdatedMap, pIte)) != NULL) {
|
||||
taosArrayPush(pInfo->pUpdated, pIte);
|
||||
}
|
||||
taosArraySort(pUpdated, resultrowComparAsc);
|
||||
taosArraySort(pInfo->pUpdated, resultrowComparAsc);
|
||||
|
||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||
pInfo->pUpdated = NULL;
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
taosHashCleanup(pUpdatedMap);
|
||||
taosHashCleanup(pInfo->pUpdatedMap);
|
||||
pInfo->pUpdatedMap = NULL;
|
||||
|
||||
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
|
@ -4902,6 +4945,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pInfo->delKey.ts = INT64_MAX;
|
||||
pInfo->delKey.groupId = 0;
|
||||
pInfo->numOfDatapack = 0;
|
||||
pInfo->pUpdated = NULL;
|
||||
pInfo->pUpdatedMap = NULL;
|
||||
|
||||
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
|
@ -4922,4 +4967,3 @@ _error:
|
|||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -860,13 +860,6 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void releaseSource(STuplePos* pPos) {
|
||||
if (pPos->pageId == -1) {
|
||||
return;
|
||||
}
|
||||
// Todo(liuyao) relase row
|
||||
}
|
||||
|
||||
// This function append the selectivity to subsidiaries function context directly, without fetching data
|
||||
// from intermediate disk based buf page
|
||||
void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos) {
|
||||
|
@ -899,7 +892,6 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos)
|
|||
}
|
||||
|
||||
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) {
|
||||
releaseSource(pDestPos);
|
||||
*pDestPos = *pSourcePos;
|
||||
}
|
||||
|
||||
|
|
|
@ -85,9 +85,7 @@ static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t w
|
|||
watermark = TMAX(originInt / adjInterval, 1) * adjInterval;
|
||||
} else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
|
||||
watermark = MAX_NUM_SCALABLE_BF * adjInterval;
|
||||
}/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) {
|
||||
watermark = MIN_NUM_SCALABLE_BF * adjInterval;
|
||||
}*/ // Todo(liuyao) save window info to tdb
|
||||
}
|
||||
return watermark;
|
||||
}
|
||||
|
||||
|
|
|
@ -247,6 +247,8 @@
|
|||
,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim
|
||||
,,y,script,./test.sh -f tsim/stream/tableAndTag0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/tableAndTag1.sim
|
||||
,,y,script,./test.sh -f tsim/trans/lossdata1.sim
|
||||
,,y,script,./test.sh -f tsim/trans/create_db.sim
|
||||
,,y,script,./test.sh -f tsim/tmq/basic1.sim
|
||||
|
|
|
@ -0,0 +1,273 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
|
||||
print ===== step1
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print ===== step2
|
||||
print ===== table name
|
||||
|
||||
sql create database result vgroups 1;
|
||||
|
||||
sql create database test vgroups 4;
|
||||
sql use test;
|
||||
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql_error create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s);
|
||||
sql create stream streams1 trigger at_once into result.streamt SUBTABLE(concat("aaa-", tbname)) as select _wstart, count(*) c1 from st partition by tbname interval(10s);
|
||||
sql insert into t1 values(1648791213000,1,2,3);
|
||||
sql insert into t2 values(1648791213000,1,2,3);
|
||||
|
||||
$loop_count = 0
|
||||
loop0:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select table_name from information_schema.ins_tables where db_name="result" order by 1;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data00 != aaa-t1 then
|
||||
print =====data00=$data00
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data10 != aaa-t2 then
|
||||
print =====data10=$data10
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop1:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from result.streamt;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
|
||||
print ===== step3
|
||||
print ===== tag name
|
||||
|
||||
sql create database result2 vgroups 1;
|
||||
|
||||
sql create database test2 vgroups 4;
|
||||
sql use test2;
|
||||
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream streams2 trigger at_once into result2.streamt2 TAGS(cc varchar(100)) as select _wstart, count(*) c1 from st partition by concat("tag-", tbname) as cc interval(10s);
|
||||
sql insert into t1 values(1648791213000,1,2,3);
|
||||
sql insert into t2 values(1648791213000,1,2,3);
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
loop2:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tag_name from information_schema.ins_tags where db_name="result2" and stable_name = "streamt2" order by 1;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data00 != cc then
|
||||
print data00 != cc
|
||||
print =====data00=$data00
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data10 != cc then
|
||||
print =====data10=$data10
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
sql select cc from result2.streamt2 order by 1;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data00 != tag-t1 then
|
||||
print data00 != tag-t1
|
||||
print =====data00=$data00
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data10 != tag-t2 then
|
||||
print =====data10=$data10
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop3:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from result2.streamt2;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
|
||||
print ===== step4
|
||||
print ===== tag name + table name
|
||||
|
||||
sql create database result3 vgroups 1;
|
||||
|
||||
sql create database test3 vgroups 4;
|
||||
sql use test3;
|
||||
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", tbname)) as select _wstart, count(*) c1 from st partition by concat("tag-", tbname) as dd, tbname interval(10s);
|
||||
sql insert into t1 values(1648791213000,1,2,3);
|
||||
sql insert into t2 values(1648791213000,1,2,3);
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
loop4:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tag_name from information_schema.ins_tags where db_name="result3" and stable_name = "streamt3" order by 1;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data00 != dd then
|
||||
print =====data00=$data00
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data10 != dd then
|
||||
print =====data10=$data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
sql select dd from result3.streamt3 order by 1;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data00 != tag-t1 then
|
||||
print =====data00=$data00
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data10 != tag-t2 then
|
||||
print =====data10=$data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop5:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from result3.streamt3;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop6:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select table_name from information_schema.ins_tables where db_name="result3" order by 1;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
if $data00 != tbn-t1 then
|
||||
print =====data00=$data00
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
if $data10 != tbn-t2 then
|
||||
print =====data10=$data10
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
|
||||
print ======over
|
||||
|
||||
system sh/stop_dnodes.sh
|
|
@ -0,0 +1,275 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
|
||||
print ===== step1
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print ===== step2
|
||||
print ===== table name
|
||||
|
||||
sql create database result vgroups 1;
|
||||
|
||||
sql create database test vgroups 4;
|
||||
sql use test;
|
||||
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql_error create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s);
|
||||
sql create stream streams1 trigger at_once into result.streamt SUBTABLE( concat("aaa-", cast(a as varchar(10) ) ) ) as select _wstart, count(*) c1 from st partition by a interval(10s);
|
||||
print ===== insert into 1
|
||||
sql insert into t1 values(1648791213000,1,2,3);
|
||||
sql insert into t2 values(1648791213000,2,2,3);
|
||||
|
||||
$loop_count = 0
|
||||
loop0:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select table_name from information_schema.ins_tables where db_name="result" order by 1;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
print $data20 $data30
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data00 != aaa-1 then
|
||||
print =====data00=$data00
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data10 != aaa-2 then
|
||||
print =====data10=$data10
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop1:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from result.streamt;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
|
||||
print ===== step3
|
||||
print ===== column name
|
||||
|
||||
sql create database result2 vgroups 1;
|
||||
|
||||
sql create database test2 vgroups 4;
|
||||
sql use test2;
|
||||
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream streams2 trigger at_once into result2.streamt2 TAGS(cc varchar(100)) as select _wstart, count(*) c1 from st partition by concat("col-", cast(a as varchar(10) ) ) as cc interval(10s);
|
||||
print ===== insert into 2
|
||||
sql insert into t1 values(1648791213000,1,2,3);
|
||||
sql insert into t2 values(1648791213000,2,2,3);
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
loop2:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tag_name from information_schema.ins_tags where db_name="result2" and stable_name = "streamt2" order by 1;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data00 != cc then
|
||||
print =====data00=$data00
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data10 != cc then
|
||||
print =====data10=$data10
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
sql select cc from result2.streamt2 order by 1;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data00 != col-1 then
|
||||
print =====data00=$data00
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data10 != col-2 then
|
||||
print =====data10=$data10
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop3:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from result2.streamt2;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
|
||||
print ===== step4
|
||||
print ===== column name + table name
|
||||
|
||||
sql create database result3 vgroups 1;
|
||||
|
||||
sql create database test3 vgroups 4;
|
||||
sql use test3;
|
||||
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", cast(a as varchar(10) ) ) ) as select _wstart, count(*) c1 from st partition by concat("col-", cast(a as varchar(10) ) ) as dd, a interval(10s);
|
||||
print ===== insert into 3
|
||||
sql insert into t1 values(1648791213000,1,2,3);
|
||||
sql insert into t2 values(1648791213000,2,2,3);
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
loop4:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tag_name from information_schema.ins_tags where db_name="result3" and stable_name = "streamt3" order by 1;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data00 != dd then
|
||||
print =====data00=$data00
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data10 != dd then
|
||||
print =====data10=$data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
sql select dd from result3.streamt3 order by 1;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data00 != col-1 then
|
||||
print =====data00=$data00
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data10 != col-2 then
|
||||
print =====data10=$data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop5:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from result3.streamt3;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop6:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select table_name from information_schema.ins_tables where db_name="result3" order by 1;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
if $data00 != tbn-1 then
|
||||
print =====data00=$data00
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
if $data10 != tbn-2 then
|
||||
print =====data10=$data10
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
|
||||
print ======over
|
||||
|
||||
system sh/stop_dnodes.sh
|
|
@ -68,7 +68,7 @@ sql select * from streamt2;
|
|||
|
||||
if $rows != 1 then
|
||||
print ======streamt2=$rows
|
||||
return -1
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select * from streamt3;
|
||||
|
@ -80,7 +80,7 @@ endi
|
|||
sql select * from streamt4;
|
||||
if $rows != 1 then
|
||||
print ======streamt4=$rows
|
||||
return -1
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select * from streamt5;
|
||||
|
@ -92,7 +92,7 @@ endi
|
|||
sql select * from streamt6;
|
||||
if $rows != 1 then
|
||||
print ======streamt6=$rows
|
||||
return -1
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select * from streamt7;
|
||||
|
@ -104,7 +104,7 @@ endi
|
|||
sql select * from streamt8;
|
||||
if $rows != 1 then
|
||||
print ======streamt8=$rows
|
||||
return -1
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select * from streamt9;
|
||||
|
@ -116,7 +116,7 @@ endi
|
|||
sql select * from streamt10;
|
||||
if $rows != 1 then
|
||||
print ======streamt10=$rows
|
||||
return -1
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select * from streamt11;
|
||||
|
@ -125,4 +125,6 @@ if $rows != 2 then
|
|||
goto loop1
|
||||
endi
|
||||
|
||||
print ======over
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue