feat(stream): auto create ctb
This commit is contained in:
parent
db3cbbf22d
commit
d17b478e96
|
@ -232,7 +232,7 @@ void blockDebugShowData(const SArray* dataBlocks);
|
|||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
||||
tb_uid_t uid, tb_uid_t suid);
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid);
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, int32_t vgId);
|
||||
|
||||
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
|
||||
|
|
|
@ -25,6 +25,8 @@ extern "C" {
|
|||
#ifndef _TSTREAM_H_
|
||||
#define _TSTREAM_H_
|
||||
|
||||
typedef struct SStreamTask SStreamTask;
|
||||
|
||||
enum {
|
||||
STREAM_TASK_STATUS__RUNNING = 1,
|
||||
STREAM_TASK_STATUS__STOP,
|
||||
|
@ -69,7 +71,7 @@ typedef struct {
|
|||
SUseDbRsp dbInfo;
|
||||
} STaskDispatcherShuffle;
|
||||
|
||||
typedef void FTbSink(void* vnode, int64_t ver, const SArray* data);
|
||||
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||
|
||||
typedef struct {
|
||||
int64_t stbUid;
|
||||
|
@ -119,7 +121,7 @@ enum {
|
|||
TASK_SINK__FETCH,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
struct SStreamTask {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int8_t status;
|
||||
|
@ -154,8 +156,7 @@ typedef struct {
|
|||
|
||||
// application storage
|
||||
void* ahandle;
|
||||
|
||||
} SStreamTask;
|
||||
};
|
||||
|
||||
SStreamTask* tNewSStreamTask(int64_t streamId);
|
||||
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
|
||||
|
|
|
@ -1596,7 +1596,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid) {
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
|
||||
int32_t vgId) {
|
||||
SSubmitReq* ret = NULL;
|
||||
|
||||
// cal size
|
||||
|
@ -1634,9 +1635,11 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
}
|
||||
|
||||
// assign data
|
||||
ret = taosMemoryCalloc(1, cap);
|
||||
ret = taosMemoryCalloc(1, cap + 46);
|
||||
ret = POINTER_SHIFT(ret, 46);
|
||||
ret->header.vgId = vgId;
|
||||
ret->version = htonl(1);
|
||||
ret->length = htonl(cap - sizeof(SSubmitReq));
|
||||
ret->length = sizeof(SSubmitReq);
|
||||
ret->numOfBlocks = htonl(sz);
|
||||
|
||||
void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq));
|
||||
|
@ -1703,11 +1706,14 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
rowData = POINTER_SHIFT(rowData, rowLen);
|
||||
blkHead->dataLen += rowLen;
|
||||
}
|
||||
int32_t len = blkHead->dataLen;
|
||||
blkHead->dataLen = htonl(len);
|
||||
blkHead = POINTER_SHIFT(blkHead, len);
|
||||
int32_t dataLen = blkHead->dataLen;
|
||||
blkHead->dataLen = htonl(dataLen);
|
||||
|
||||
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
|
||||
blkHead = POINTER_SHIFT(blkHead, schemaLen + dataLen);
|
||||
/*submitBlk = blkHead;*/
|
||||
}
|
||||
|
||||
ret->length = htonl(ret->length);
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -106,6 +106,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
|
|||
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
|
||||
|
||||
int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); // sync integration
|
||||
int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc);
|
||||
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
|
|
|
@ -177,6 +177,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
|
||||
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
|
||||
msgCb.pWrapper = pMgmt->pWrapper;
|
||||
msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue;
|
||||
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
||||
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
||||
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
||||
|
|
|
@ -135,6 +135,7 @@ static void *vmOpenVnodeFunc(void *param) {
|
|||
|
||||
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
|
||||
msgCb.pWrapper = pMgmt->pWrapper;
|
||||
msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue;
|
||||
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
||||
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
||||
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
||||
|
@ -147,7 +148,7 @@ static void *vmOpenVnodeFunc(void *param) {
|
|||
pThread->failed++;
|
||||
} else {
|
||||
vmOpenVnode(pMgmt, pCfg, pImpl);
|
||||
//vnodeStart(pImpl);
|
||||
// vnodeStart(pImpl);
|
||||
dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||
pThread->opened++;
|
||||
}
|
||||
|
|
|
@ -357,6 +357,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
|
|||
pMsg->rpcMsg = *pRpc;
|
||||
// if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0);
|
||||
switch (qtype) {
|
||||
case WRITE_QUEUE:
|
||||
dTrace("msg:%p, will be put into vnode-write queue", pMsg);
|
||||
taosWriteQitem(pVnode->pWriteQ, pMsg);
|
||||
break;
|
||||
case QUERY_QUEUE:
|
||||
dTrace("msg:%p, will be put into vnode-query queue", pMsg);
|
||||
taosWriteQitem(pVnode->pQueryQ, pMsg);
|
||||
|
@ -387,6 +391,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||
return vmPutRpcMsgToQueue(pWrapper, pRpc, WRITE_QUEUE);
|
||||
}
|
||||
|
||||
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||
return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE);
|
||||
}
|
||||
|
|
|
@ -884,11 +884,21 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
}
|
||||
}
|
||||
|
||||
void tqTableSink(void* vnode, int64_t ver, const SArray* data) {
|
||||
//
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||
const SArray* pRes = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, pVnode->config.vgId);
|
||||
tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);
|
||||
// build write msg
|
||||
//
|
||||
SRpcMsg msg = {
|
||||
.msgType = TDMT_VND_SUBMIT,
|
||||
.pCont = pReq,
|
||||
.contLen = ntohl(pReq->length),
|
||||
};
|
||||
|
||||
ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0);
|
||||
}
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
#include "ttime.h"
|
||||
#include "tdatablock.h"
|
||||
#include "executorimpl.h"
|
||||
#include "functionMgt.h"
|
||||
#include "tdatablock.h"
|
||||
#include "ttime.h"
|
||||
|
||||
typedef enum SResultTsInterpType {
|
||||
RESULT_ROW_START_INTERP = 1,
|
||||
|
@ -545,7 +545,6 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlock, SqlFunctionCtx* pCtx,
|
||||
SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep,
|
||||
int32_t order, bool timeWindowInterpo) {
|
||||
|
@ -759,10 +758,10 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SIntervalAggOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
while (1) {
|
||||
|
@ -808,7 +807,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex);
|
||||
int64_t gid = pBlock->info.groupId;
|
||||
|
@ -932,7 +931,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
|
||||
SIntervalAggOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -981,10 +980,10 @@ static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArr
|
|||
}
|
||||
}
|
||||
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
|
||||
for ( int i = 0; i < num; i++) {
|
||||
for (int i = 0; i < num; i++) {
|
||||
if (type == STREAM_INVERT) {
|
||||
fmSetInvertFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
|
||||
} else if (type == STREAM_NORMAL){
|
||||
} else if (type == STREAM_NORMAL) {
|
||||
fmSetNormalFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
|
||||
}
|
||||
}
|
||||
|
@ -992,7 +991,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
|
|||
|
||||
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||
SIntervalAggOperatorInfo* pInfo = pOperator->info;
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -1038,7 +1037,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
|
||||
ASSERT(pInfo->binfo.pRes->info.rows > 0);
|
||||
// TODO: remove for stream
|
||||
/*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
|
@ -1070,17 +1070,17 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
pInfo->interval = *pInterval;
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
pInfo->interval = *pInterval;
|
||||
// pInfo->execModel = OPTR_EXEC_MODEL_STREAM;
|
||||
pInfo->execModel = pTaskInfo->execModel;
|
||||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
pInfo->execModel = pTaskInfo->execModel;
|
||||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
pInfo->primaryTsIndex = primaryTsSlotId;
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
|
@ -1099,14 +1099,14 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
|
||||
|
||||
pOperator->name = "TimeIntervalAggOperator";
|
||||
pOperator->name = "TimeIntervalAggOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL,
|
||||
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
|
@ -1118,7 +1118,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
destroyIntervalOperatorInfo(pInfo, numOfCols);
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
|
@ -1131,7 +1131,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
|
|||
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -1177,7 +1177,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
|
|||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
destroyIntervalOperatorInfo(pInfo, numOfCols);
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
|
@ -1321,7 +1321,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
|
|||
return pSliceInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
while (1) {
|
||||
|
@ -1379,7 +1379,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
taosMemoryFree(pInfo);
|
||||
taosMemoryFree(pOperator);
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1402,18 +1402,18 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
|||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||
|
||||
pInfo->twAggSup = *pTwAggSup;
|
||||
pInfo->twAggSup = *pTwAggSup;
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
pInfo->tsSlotId = tsSlotId;
|
||||
pOperator->name = "StateWindowOperator";
|
||||
pInfo->tsSlotId = tsSlotId;
|
||||
pOperator->name = "StateWindowOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL,
|
||||
destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
|
@ -1421,7 +1421,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
|||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
pTaskInfo->code = TSDB_CODE_SUCCESS;
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1432,8 +1432,8 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
}
|
||||
|
||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
|
||||
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -1453,18 +1453,18 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
|||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
pInfo->tsSlotId = tsSlotId;
|
||||
pInfo->gap = gap;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->winSup.prevTs = INT64_MIN;
|
||||
pInfo->reptScan = false;
|
||||
pOperator->name = "SessionWindowAggOperator";
|
||||
pInfo->tsSlotId = tsSlotId;
|
||||
pInfo->gap = gap;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->winSup.prevTs = INT64_MIN;
|
||||
pInfo->reptScan = false;
|
||||
pOperator->name = "SessionWindowAggOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
|
||||
destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
|
@ -1473,7 +1473,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
|||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
if (pInfo != NULL) {
|
||||
destroySWindowOperatorInfo(pInfo, numOfCols);
|
||||
}
|
||||
|
@ -1482,4 +1482,4 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
|||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -150,12 +150,12 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
|||
pRes = (SArray*)input;
|
||||
}
|
||||
|
||||
if (pRes == NULL || taosArrayGetSize(pRes) == 0) return 0;
|
||||
|
||||
// sink
|
||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
/*blockDebugShowData(pRes);*/
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, false, pTask->tbSink.stbUid);
|
||||
tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);
|
||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
|
||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
|
||||
//
|
||||
|
|
Loading…
Reference in New Issue