feat(stream): support delete data

This commit is contained in:
Liu Jicong 2022-09-02 15:34:16 +08:00
parent 739e7ae76f
commit d653da7971
10 changed files with 665 additions and 526 deletions

View File

@ -184,7 +184,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
int32_t getJsonValueLen(const char* data);
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows);
int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
uint32_t numOfRows);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, int32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
@ -232,6 +233,7 @@ SSDataBlock* createDataBlock();
void* blockDataDestroy(SSDataBlock* pBlock);
void blockDataFreeRes(SSDataBlock* pBlock);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
SSDataBlock* createSpecialDataBlock(EStreamType type);
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
@ -249,7 +251,6 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {

View File

@ -140,7 +140,8 @@ int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
return TSDB_CODE_SUCCESS;
}
static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData, int32_t itemLen, int32_t numOfRows) {
static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData,
int32_t itemLen, int32_t numOfRows) {
ASSERT(pColumnInfoData->info.bytes >= itemLen);
size_t start = 1;
@ -151,14 +152,16 @@ static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t curren
int32_t count = log(numOfRows) / log(2);
while (t < count) {
int32_t xlen = 1 << t;
memcpy(pColumnInfoData->pData + start * itemLen + pColumnInfoData->varmeta.length, pColumnInfoData->pData, xlen * itemLen);
memcpy(pColumnInfoData->pData + start * itemLen + pColumnInfoData->varmeta.length, pColumnInfoData->pData,
xlen * itemLen);
t += 1;
start += xlen;
}
// the tail part
if (numOfRows > start) {
memcpy(pColumnInfoData->pData + start * itemLen + currentRow * itemLen, pColumnInfoData->pData, (numOfRows - start) * itemLen);
memcpy(pColumnInfoData->pData + start * itemLen + currentRow * itemLen, pColumnInfoData->pData,
(numOfRows - start) * itemLen);
}
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
@ -170,7 +173,8 @@ static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t curren
}
}
int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows) {
int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
uint32_t numOfRows) {
ASSERT(pData != NULL && pColumnInfoData != NULL);
int32_t len = pColumnInfoData->info.bytes;
@ -676,7 +680,8 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
* @return
*/
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
// length |
return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(uint64_t) +
numOfCols * (sizeof(int8_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
}
@ -1302,6 +1307,40 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
return TSDB_CODE_SUCCESS;
}
SSDataBlock* createSpecialDataBlock(EStreamType type) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.hasVarCol = false;
pBlock->info.groupId = 0;
pBlock->info.rows = 0;
pBlock->info.type = type;
pBlock->info.rowSize =
sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY);
pBlock->info.watermark = INT64_MIN;
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
infoData.info.bytes = sizeof(TSKEY);
// window start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
// window end ts
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
infoData.info.bytes = sizeof(uint64_t);
// uid
taosArrayPush(pBlock->pDataBlock, &infoData);
// group id
taosArrayPush(pBlock->pDataBlock, &infoData);
// calculate start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
// calculate end ts
taosArrayPush(pBlock->pDataBlock, &infoData);
return pBlock;
}
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
if (pDataBlock == NULL) {
return NULL;

View File

@ -173,7 +173,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
// tq-stream
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);

View File

@ -816,7 +816,85 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
}
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
bool failed = false;
SDecoder* pCoder = &(SDecoder){0};
SDeleteRes* pRes = &(SDeleteRes){0};
pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
if (pRes->uidList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
failed = true;
}
tDecoderInit(pCoder, pReq, len);
tDecodeDeleteRes(pCoder, pRes);
/*ASSERT(pRes->skey != 0);*/
/*ASSERT(pRes->ekey != 0);*/
tDecoderClear(pCoder);
int32_t sz = taosArrayGetSize(pRes->uidList);
if (sz == 0) {
taosArrayDestroy(pRes->uidList);
return 0;
}
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
blockDataEnsureCapacity(pDelBlock, sz);
pDelBlock->info.rows = sz;
pDelBlock->info.version = ver;
for (int32_t i = 0; i < sz; i++) {
// start key column
SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
colDataAppend(pStartCol, i, (const char*)&pRes->skey, false); // end key column
SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
colDataAppend(pEndCol, i, (const char*)&pRes->ekey, false);
// uid column
SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
int64_t* pUid = taosArrayGet(pRes->uidList, i);
colDataAppend(pUidCol, i, (const char*)pUid, false);
colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
}
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
SSDataBlock block = {0};
assignOneDataBlock(&block, pDelBlock);
block.info.type = STREAM_DELETE_DATA;
taosArrayPush(pStreamBlock->blocks, &block);
if (!failed) {
if (streamTaskInput(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
qError("stream task input del failed, task id %d", pTask->taskId);
continue;
}
if (streamSchedExec(pTask) < 0) {
qError("stream task launch failed, task id %d", pTask->taskId);
continue;
}
} else {
streamTaskInputFail(pTask);
}
}
return 0;
}
int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
void* pIter = NULL;
bool failed = false;
SStreamDataSubmit* pSubmit = NULL;

View File

@ -213,7 +213,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
if (vnodeIsRoleLeader(pTq->pVnode) && msgType == TDMT_VND_SUBMIT) {
if (vnodeIsRoleLeader(pTq->pVnode)) {
if (msgType == TDMT_VND_SUBMIT) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
void* data = taosMemoryMalloc(msgLen);
@ -226,7 +227,11 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
SSubmitReq* pReq = (SSubmitReq*)data;
pReq->version = ver;
tqProcessStreamTrigger(pTq, data, ver);
tqProcessSubmitReq(pTq, data, ver);
}
if (msgType == TDMT_VND_DELETE) {
tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);
}
}
return 0;

View File

@ -1107,6 +1107,8 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
tDecoderInit(pCoder, pReq, len);
tDecodeDeleteRes(pCoder, pRes);
ASSERT(pRes->skey != 0);
ASSERT(pRes->ekey != 0);
for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),

View File

@ -38,11 +38,11 @@ extern "C" {
#include "tlockfree.h"
#include "tmsg.h"
#include "tpagedbuf.h"
#include "tstreamUpdate.h"
#include "tstream.h"
#include "tstreamUpdate.h"
#include "vnode.h"
#include "executorInt.h"
#include "vnode.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
@ -310,7 +310,8 @@ typedef struct SAggSupporter {
} SAggSupporter;
typedef struct {
// if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
// if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
// current data block needs to be loaded.
SInterval interval;
SAggSupporter* pAggSup;
SExprSupp* pExprSup; // expr supporter of aggregate operator
@ -899,7 +900,8 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
@ -907,7 +909,8 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLi
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, char** pNextStart);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList,
char** pNextStart);
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
SOperatorInfo* pOperator);
@ -933,29 +936,37 @@ SSDataBlock* loadNextDataBlock(void* param);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
bool isIntervalQuery, SAggSupporter* pSup);
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
const char* pUser, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams,
SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream);
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, bool isStream);
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
@ -975,27 +986,33 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode,
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo);
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
int32_t scanFlag, bool createDummyCol);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables);
@ -1040,10 +1057,10 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
int64_t gap, int32_t* pIndex);
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
int64_t gap, int32_t* pIndex);
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
@ -1054,11 +1071,13 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
const int32_t* rowCellOffset, SSDataBlock* pBlock,
SExecTaskInfo* pTaskInfo);
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* idstr);
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idstr);
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo);
@ -1069,7 +1088,6 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol
bool groupbyTbname(SNodeList* pGroupList);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
SSDataBlock* createSpecialDataBlock(EStreamType type);
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
#ifdef __cplusplus

View File

@ -955,9 +955,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
pTableScanInfo->currentGroupId = -1;
}
static void freeArray(void* array) {
taosArrayDestroy(array);
}
static void freeArray(void* array) { taosArrayDestroy(array); }
static void resetTableScanOperator(SOperatorInfo* pTableScanOp) {
STableScanInfo* pTableScanInfo = pTableScanOp->info;
@ -971,7 +969,8 @@ static void resetTableScanOperator(SOperatorInfo* pTableScanOp) {
resetTableScanInfo(pTableScanOp->info, &win);
}
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs, int64_t maxVersion) {
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
int64_t maxVersion) {
SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
taosArrayClear(gpTbls);
STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
@ -1068,8 +1067,8 @@ static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlo
if (hasGroup) {
(*pRowIndex) += 1;
} else {
(*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey,
binarySearchForKey, NULL, TSDB_ORDER_ASC);
(*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
}
do {
preWin = endWin;
@ -1277,10 +1276,10 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
isDeletedWindow(&win, pBlock->info.groupId, pInfo->windowSup.pIntervalAggSup);
if ((update || closedWin) && out) {
uint64_t gpId = closedWin&&pInfo->partitionSup.needCalc ?
calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId) : 0;
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid,
&gpId);
uint64_t gpId = closedWin && pInfo->partitionSup.needCalc
? calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId)
: 0;
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId);
}
}
if (out && pInfo->pUpdateDataRes->info.rows > 0) {
@ -1515,6 +1514,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
generateScanRange(pInfo, pBlock, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
return pInfo->pDeleteDataRes;
} break;
@ -1947,8 +1947,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
pInfo->pCondition = pScanPhyNode->node.pConditions;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->windowSup =
(SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->groupId = 0;
pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
pInfo->pStreamScanOp = pOperator;

View File

@ -42,7 +42,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator);
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId);
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
uint64_t groupId);
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
///*
@ -1424,7 +1425,8 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId)
return true;
}
void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* pDelWins, SInterval* pInterval, SHashObj* pUpdatedMap) {
void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* pDelWins, SInterval* pInterval,
SHashObj* pUpdatedMap) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* tsStarts = (TSKEY*)pStartCol->pData;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
@ -1469,7 +1471,6 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
}
static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0;
@ -1826,7 +1827,8 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
return needed;
}
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval, int64_t waterMark) {
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval,
int64_t waterMark) {
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, waterMark);
return;
@ -1921,7 +1923,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) {
initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup, &pInfo->interval, pInfo->twAggSup.waterMark);
initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup, &pInfo->interval,
pInfo->twAggSup.waterMark);
}
code = appendDownstream(pOperator, &downstream, 1);
@ -2248,7 +2251,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
if (hasInterp) {
pResBlock->info.rows += 1;
}
}
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
@ -2748,7 +2750,9 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
initBasicInfo(&pInfo->binfo, pResBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};;
pInfo->twAggSup =
(STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
;
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->tsSlotId = tsSlotId;
@ -2893,7 +2897,8 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
}
static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExprSupp* pSup, SArray* pWinArray,
int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pUpdatedMap) {
int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo,
SHashObj* pUpdatedMap) {
int32_t size = taosArrayGetSize(pWinArray);
if (!pInfo->pChildren) {
return;
@ -3274,8 +3279,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info;
SExprSupp* pChildSup = &pChildOp->exprSupp;
doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, pBlock, NULL, &pChildInfo->interval, NULL);
rebuildIntervalWindow(pInfo, pSup, delWins, pInfo->binfo.pRes->info.groupId,
pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, pUpdatedMap);
rebuildIntervalWindow(pInfo, pSup, delWins, pInfo->binfo.pRes->info.groupId, pOperator->exprSupp.numOfExprs,
pOperator->pTaskInfo, pUpdatedMap);
addRetriveWindow(delWins, pInfo);
taosArrayAddAll(pInfo->pDelWins, delWins);
taosArrayDestroy(delWins);
@ -3381,40 +3386,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return NULL;
}
SSDataBlock* createSpecialDataBlock(EStreamType type) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.hasVarCol = false;
pBlock->info.groupId = 0;
pBlock->info.rows = 0;
pBlock->info.type = type;
pBlock->info.rowSize =
sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY);
pBlock->info.watermark = INT64_MIN;
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
infoData.info.bytes = sizeof(TSKEY);
// window start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
// window end ts
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
infoData.info.bytes = sizeof(uint64_t);
// uid
taosArrayPush(pBlock->pDataBlock, &infoData);
// group id
taosArrayPush(pBlock->pDataBlock, &infoData);
// calculate start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
// calculate end ts
taosArrayPush(pBlock->pDataBlock, &infoData);
return pBlock;
}
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
@ -3709,7 +3680,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
if (downstream) {
initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, pInfo->twAggSup.waterMark, pOperator->operatorType, pInfo->primaryTsIndex);
initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, pInfo->twAggSup.waterMark, pOperator->operatorType,
pInfo->primaryTsIndex);
code = appendDownstream(pOperator, &downstream, 1);
}
return pOperator;
@ -3740,12 +3712,14 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) {
bool isInWindow(SResultWindowInfo* pWinInfo, TSKEY ts, int64_t gap) { return isInTimeWindow(&pWinInfo->win, ts, gap); }
static SResultWindowInfo* insertNewSessionWindow(SArray* pWinInfos, TSKEY startTs, TSKEY endTs, int32_t index) {
SResultWindowInfo win = {.pos.offset = -1, .pos.pageId = -1, .win.skey = startTs, .win.ekey = endTs, .isOutput = false};
SResultWindowInfo win = {
.pos.offset = -1, .pos.pageId = -1, .win.skey = startTs, .win.ekey = endTs, .isOutput = false};
return taosArrayInsert(pWinInfos, index, &win);
}
static SResultWindowInfo* addNewSessionWindow(SArray* pWinInfos, TSKEY startTs, TSKEY endTs) {
SResultWindowInfo win = {.pos.offset = -1, .pos.pageId = -1, .win.skey = startTs, .win.ekey = endTs, .isOutput = false};
SResultWindowInfo win = {
.pos.offset = -1, .pos.pageId = -1, .win.skey = startTs, .win.ekey = endTs, .isOutput = false};
return taosArrayPush(pWinInfos, &win);
}
@ -4147,8 +4121,8 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
}
}
static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWinArray,
int32_t numOfOutput, SOperatorInfo* pOperator, SHashObj* pStUpdated, bool needCreate) {
static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWinArray, int32_t numOfOutput,
SOperatorInfo* pOperator, SHashObj* pStUpdated, bool needCreate) {
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -4161,7 +4135,8 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
uint64_t groupId = pParentWin->groupId;
int32_t winIndex = 0;
if (needCreate) {
pParentWin = getSessionTimeWindow(&pInfo->streamAggSup, pParentWin->win.skey, pParentWin->win.ekey, groupId, 0, &winIndex);
pParentWin =
getSessionTimeWindow(&pInfo->streamAggSup, pParentWin->win.skey, pParentWin->win.ekey, groupId, 0, &winIndex);
}
setWindowOutputBuf(pParentWin, &pCurResult, pSup->pCtx, groupId, numOfOutput, pSup->rowEntryInfoOffset,
&pInfo->streamAggSup, pTaskInfo);
@ -4362,8 +4337,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, pChildOp->exprSupp.numOfExprs,
0, NULL);
doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX,
pChildOp->exprSupp.numOfExprs, 0, NULL);
rebuildTimeWindow(pInfo, pWins, pOperator->exprSupp.numOfExprs, pOperator, NULL, false);
}
taosArrayDestroy(pWins);
@ -4856,8 +4831,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
i, &allEqual, pStDeleted);
if (!allEqual) {
uint64_t uid = 0;
appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey,
&uid, &groupId);
appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, &uid, &groupId);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
continue;
@ -5035,7 +5009,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL,
destroyStreamStateOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
initDownStream(downstream, &pInfo->streamAggSup, 0, pInfo->twAggSup.waterMark, pOperator->operatorType, pInfo->primaryTsIndex);
initDownStream(downstream, &pInfo->streamAggSup, 0, pInfo->twAggSup.waterMark, pOperator->operatorType,
pInfo->primaryTsIndex);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -5064,8 +5039,8 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui
SExprSupp* pSup = &pOperatorInfo->exprSupp;
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(
iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs,
@ -5359,8 +5334,8 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table
SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(
iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo,
pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
@ -5605,7 +5580,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
initBasicInfo(&pIntervalInfo->binfo, pResBlock);
initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo);
if (pIntervalInfo->timeWindowInterpo) {
pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));

View File

@ -243,6 +243,36 @@ FAIL:
return 0;
}
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
int64_t groupId) {
char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId);
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
// TODO: get hash function by hashMethod
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
taosMemoryFree(ctbName);
bool found = false;
// TODO: optimize search
int32_t j;
for (j = 0; j < vgSz; j++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
ASSERT(pVgInfo->vgId > 0);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
return -1;
}
if (pReqs[j].blockNum == 0) {
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
}
pReqs[j].blockNum++;
found = true;
break;
}
}
ASSERT(found);
return 0;
}
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = -1;
int32_t blockNum = taosArrayGetSize(pData->blocks);
@ -317,20 +347,10 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
for (int32_t i = 0; i < blockNum; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pDataBlock->info.groupId);
// TODO: get hash function by hashMethod
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
taosMemoryFree(ctbName);
bool found = false;
// TODO: optimize search
int32_t j;
for (j = 0; j < vgSz; j++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
ASSERT(pVgInfo->vgId > 0);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
// TODO: do not use broadcast
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
for (int32_t j = 0; j < vgSz; j++) {
if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
goto FAIL_SHUFFLE_DISPATCH;
}
@ -338,11 +358,13 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
}
pReqs[j].blockNum++;
found = true;
break;
}
continue;
}
if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.groupId) < 0) {
goto FAIL_SHUFFLE_DISPATCH;
}
ASSERT(found);
}
for (int32_t i = 0; i < vgSz; i++) {