diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 163ed7cffd..2df27a432c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3273,6 +3273,12 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag); #define TSDB_MSG_FLG_ENCODE 0x1 #define TSDB_MSG_FLG_DECODE 0x2 +typedef struct { + void* msgStr; + int32_t msgLen; + int64_t ver; +} SPackedSubmit; + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index c7db114b73..063c5bd4ed 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -190,7 +190,9 @@ int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); -int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t ver); +// int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t ver); +// +int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedSubmit submit); int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 16cf960724..8721f1ba41 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -103,6 +103,7 @@ typedef struct { int8_t type; } SStreamQueueItem; +#if 0 typedef struct { int8_t type; int64_t ver; @@ -116,6 +117,21 @@ typedef struct { SArray* dataRefs; // SArray SArray* reqs; // SArray } SStreamMergedSubmit; +#endif + +typedef struct { + int8_t type; + int64_t ver; + int32_t* dataRef; + SPackedSubmit submit; +} SStreamDataSubmit2; + +typedef struct { + int8_t type; + int64_t ver; + SArray* dataRefs; // SArray + SArray* submits; // SArray +} SStreamMergedSubmit2; typedef struct { int8_t type; @@ -219,11 +235,11 @@ static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) { } } -SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq); +SStreamDataSubmit2* streamDataSubmitNew(SPackedSubmit submit); -void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit); +void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit); -SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit); +SStreamDataSubmit2* streamSubmitRefClone(SStreamDataSubmit2* pSubmit); typedef struct { char* qmsg; @@ -355,14 +371,15 @@ void tFreeSStreamTask(SStreamTask* pTask); static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem); + SStreamDataSubmit2* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit2*)pItem); if (pSubmitClone == NULL) { qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask); terrno = TSDB_CODE_OUT_OF_MEMORY; atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); return -1; } - qDebug("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data); + qDebug("task %d %p submit enqueue %p %p %p %d %" PRId64, pTask->taskId, pTask, pItem, pSubmitClone, + pSubmitClone->submit.msgStr, pSubmitClone->submit.msgLen, pSubmitClone->submit.ver); taosWriteQitem(pTask->inputQueue->queue, pSubmitClone); // qStreamInput(pTask->exec.executor, pSubmitClone); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE || diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 971ed7c51c..fe48def982 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -220,12 +220,6 @@ typedef struct SSnapContext { bool queryMetaOrData; // true-get meta, false-get data } SSnapContext; -typedef struct { - void *msgStr; - int32_t msgLen; - int64_t ver; -} SPackedSubmit; - typedef struct STqReader { const SSubmitReq *pMsg; // SSubmitBlk *pBlock; @@ -234,8 +228,10 @@ typedef struct STqReader { int64_t ver; SPackedSubmit msg2; - SSubmitReq2 *pSubmit; - int32_t nextBlk; + + int8_t setMsg; + SSubmitReq2 submit; + int32_t nextBlk; int64_t lastBlkUid; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 2b927ac3e2..817c6dc596 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -182,7 +182,8 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore); // tqSink // void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data); -void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data); +// void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data); +void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data); // tqOffset char* tqOffsetBuildFName(const char* path, int32_t fVer); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 79b4f227f5..e1e3366c03 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -180,7 +180,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); -int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* data, int64_t ver); +int32_t tqProcessSubmitReq(STQ* pTq, SPackedSubmit submit); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b61f3084f0..cade8b6a7a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -953,7 +953,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->smaSink.smaSink = smaHandleRes; } else if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.vnode = pTq->pVnode; - pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline; + pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2; ASSERT(pTask->tbSink.pSchemaWrapper); ASSERT(pTask->tbSink.pSchemaWrapper->pSchema); @@ -1334,12 +1334,12 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { return 0; } -int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { - void* pIter = NULL; - bool failed = false; - SStreamDataSubmit* pSubmit = NULL; +int32_t tqProcessSubmitReq(STQ* pTq, SPackedSubmit submit) { + void* pIter = NULL; + bool failed = false; + SStreamDataSubmit2* pSubmit = NULL; - pSubmit = streamDataSubmitNew(pReq); + pSubmit = streamDataSubmitNew(submit); if (pSubmit == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("failed to create data submit for stream since out of memory"); @@ -1356,7 +1356,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { continue; } - tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver); + tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, submit.ver); if (!failed) { if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 02da5d1a4b..3d4419a930 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -213,7 +213,11 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { - tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType)); + void* pReq = POINTER_SHIFT(msg, sizeof(SMsgHead)); + int32_t len = msgLen - sizeof(SMsgHead); + + tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver, + TMSG_INFO(msgType), msg, pReq, len); if (msgType == TDMT_VND_SUBMIT) { // lock push mgr to avoid potential msg lost @@ -222,7 +226,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) if (taosHashGetSize(pTq->pPushMgr) != 0) { SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); - void* data = taosMemoryMalloc(msgLen); + void* data = taosMemoryMalloc(len); if (data == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("failed to copy data for stream since out of memory"); @@ -230,9 +234,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) taosArrayDestroy(cachedKeyLens); return -1; } - memcpy(data, msg, msgLen); - SSubmitReq* pReq = (SSubmitReq*)data; - pReq->version = ver; + memcpy(data, pReq, len); void* pIter = NULL; while (1) { @@ -256,7 +258,12 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) SMqDataRsp* pRsp = &pPushEntry->dataRsp; // prepare scan mem data - qStreamScanMemData(task, pReq, ver); + SPackedSubmit submit = { + .msgStr = data, + .msgLen = len, + .ver = ver, + }; + qStreamSetScanMemData(task, submit); // exec while (1) { @@ -310,17 +317,22 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) if (vnodeIsRoleLeader(pTq->pVnode)) { if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; if (msgType == TDMT_VND_SUBMIT) { - void* data = taosMemoryMalloc(msgLen); + void* data = taosMemoryMalloc(len); if (data == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("failed to copy data for stream since out of memory"); return -1; } - memcpy(data, msg, msgLen); - SSubmitReq* pReq = (SSubmitReq*)data; - pReq->version = ver; + memcpy(data, pReq, len); + SPackedSubmit submit = { + .msgStr = data, + .msgLen = len, + .ver = ver, + }; - tqProcessSubmitReq(pTq, data, ver); + tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq); + + tqProcessSubmitReq(pTq, submit); } if (msgType == TDMT_VND_DELETE) { tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 5a23267827..c05c24db74 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -306,7 +306,7 @@ int32_t tqSeekVer(STqReader* pReader, int64_t ver) { } int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { - bool fromProcessedMsg = pReader->pMsg != NULL; + bool fromProcessedMsg = pReader->msg2.msgStr != NULL; while (1) { if (!fromProcessedMsg) { @@ -381,15 +381,22 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { ASSERT(pReader->msg2.msgStr == NULL); + ASSERT(msgStr); + ASSERT(msgLen); pReader->msg2.msgStr = msgStr; pReader->msg2.msgLen = msgLen; pReader->msg2.ver = ver; - if (pReader->pSubmit == NULL) { + tqDebug("tq reader set msg %p", msgStr); + + if (pReader->setMsg == 0) { SDecoder decoder; tDecoderInit(&decoder, pReader->msg2.msgStr, pReader->msg2.msgLen); - tDecodeSSubmitReq2(&decoder, pReader->pSubmit); + if (tDecodeSSubmitReq2(&decoder, &pReader->submit) < 0) { + ASSERT(0); + } tDecoderClear(&decoder); + pReader->setMsg = 1; } return 0; } @@ -422,11 +429,14 @@ bool tqNextDataBlock(STqReader* pReader) { bool tqNextDataBlock2(STqReader* pReader) { if (pReader->msg2.msgStr == NULL) return false; - ASSERT(pReader->pSubmit != NULL); + ASSERT(pReader->setMsg == 1); - int32_t blockSz = taosArrayGetSize(pReader->pSubmit->aSubmitTbData); + tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen, + pReader->msg2.ver, pReader->nextBlk); + + int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < blockSz) { - SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->pSubmit->aSubmitTbData, pReader->nextBlk); + SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pReader->tbIdHash == NULL) return true; void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); @@ -435,8 +445,8 @@ bool tqNextDataBlock2(STqReader* pReader) { } } - tDestroySSubmitReq2(pReader->pSubmit, TSDB_MSG_FLG_DECODE); - pReader->pSubmit = NULL; + tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); + pReader->setMsg = 0; pReader->nextBlk = 0; pReader->msg2.msgStr = NULL; @@ -445,11 +455,11 @@ bool tqNextDataBlock2(STqReader* pReader) { bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) { if (pReader->msg2.msgStr == NULL) return false; - ASSERT(pReader->pSubmit != NULL); + ASSERT(pReader->setMsg == 1); - int32_t blockSz = taosArrayGetSize(pReader->pSubmit->aSubmitTbData); + int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < blockSz) { - SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->pSubmit->aSubmitTbData, pReader->nextBlk); + SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pReader->tbIdHash == NULL) return true; void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); @@ -458,8 +468,8 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) { } } - tDestroySSubmitReq2(pReader->pSubmit, TSDB_MSG_FLG_DECODE); - pReader->pSubmit = NULL; + tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); + pReader->setMsg = 0; pReader->nextBlk = 0; pReader->msg2.msgStr = NULL; @@ -548,10 +558,12 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader) #endif int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { - int32_t blockSz = taosArrayGetSize(pReader->pSubmit->aSubmitTbData); + int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); ASSERT(pReader->nextBlk < blockSz); - SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->pSubmit->aSubmitTbData, pReader->nextBlk); + tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk); + + SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); pReader->nextBlk++; int32_t sversion = pSubmitTbData->sver; @@ -672,7 +684,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { } else { val = &colVal.value.val; } - if (colDataAppend(pColData, i, val, colVal.type != TD_VTYPE_NORM) < 0) { + if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { goto FAIL; } } @@ -686,14 +698,14 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { SArray* pRows = pSubmitTbData->aRowP; for (int32_t i = 0; i < numOfRows; i++) { - SRow* pRow = taosArrayGet(pRows, i); + SRow* pRow = taosArrayGetP(pRows, i); int32_t targetIdx = 0; int32_t sourceIdx = 0; for (int32_t j = 0; j < colActual; j++) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j); while (1) { - ASSERT(sourceIdx < numOfRows); + ASSERT(sourceIdx < pTschema->numOfCols); SColVal colVal; tRowGet(pRow, pTschema, sourceIdx, &colVal); @@ -707,7 +719,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { } else { val = &colVal.value.val; } - if (colDataAppend(pColData, i, val, colVal.type != TD_VTYPE_NORM) < 0) { + if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { goto FAIL; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index d34c35f6e7..64fbcf65a6 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -957,7 +957,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* if (NULL == pBuf) { goto _end; } - ((SMsgHead*)pBuf)->vgId = htonl(TD_VID(pVnode)); + ((SMsgHead*)pBuf)->vgId = TD_VID(pVnode); ((SMsgHead*)pBuf)->contLen = htonl(len); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); if (tEncodeSSubmitReq2(&encoder, pReq) < 0) { @@ -969,7 +969,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, - .contLen = ntohl(len), + .contLen = len, }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0ad86e43a5..b67f21210c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -871,6 +871,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq pRsp->code = TSDB_CODE_SUCCESS; + vDebug("vvvvvvvvvvvv %p, %d", pReq, len); + // decode SDecoder dc = {0}; tDecoderInit(&dc, pReq, len); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 03a31764ba..7417e4e657 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -202,7 +202,7 @@ typedef struct SOperatorFpSet { __optr_fn_t getNextFn; __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP __optr_close_fn_t closeFn; - __optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator + __optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator __optr_encode_fn_t encodeResultRow; __optr_decode_fn_t decodeResultRow; __optr_explain_fn_t getExplainFn; @@ -503,8 +503,8 @@ typedef struct STableCountScanOperatorInfo { STableCountScanSupp supp; - int32_t currGrpIdx; - SArray* stbUidList; // when group by db_name and/or stable_name + int32_t currGrpIdx; + SArray* stbUidList; // when group by db_name and/or stable_name } STableCountScanOperatorInfo; typedef struct SOptrBasicInfo { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 0d6b2977a8..fe7c388ffc 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -115,7 +115,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu if (type == STREAM_INPUT__MERGED_SUBMIT) { // ASSERT(numOfBlocks > 1); for (int32_t i = 0; i < numOfBlocks; i++) { - SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*)); + SPackedSubmit* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(SPackedSubmit)); taosArrayPush(pInfo->pBlockLists, &pReq); } pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; @@ -125,8 +125,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { - SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - taosArrayPush(pInfo->pBlockLists, &pDataBlock); + SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; + SPackedSubmit tmp = { + .msgStr = pDataBlock, + }; + taosArrayPush(pInfo->pBlockLists, &tmp); } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else { @@ -1002,6 +1005,7 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s return TSDB_CODE_SUCCESS; } +#if 0 int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t scanVer) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); @@ -1010,6 +1014,7 @@ int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t sc pTaskInfo->streamInfo.scanVer = scanVer; return 0; } +#endif int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedSubmit submit) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9f623ec8fe..eb8d7d2e78 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -768,8 +768,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); ASSERT(pInfo->base.dataReader == NULL); - int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, - pInfo->pResBlock, (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo)); + int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, + (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -986,8 +986,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU SSDataBlock* pBlock = pTableScanInfo->pResBlock; STsdbReader* pReader = NULL; - int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, (STsdbReader**)&pReader, - GET_TASKID(pTaskInfo)); + int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, + (STsdbReader**)&pReader, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { terrno = code; T_LONG_JMP(pTaskInfo->env, code); @@ -995,7 +995,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU } if (tsdbNextDataBlock(pReader)) { - /*SSDataBlock* p = */tsdbRetrieveDataBlock(pReader, NULL); + /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); } @@ -1224,7 +1224,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); - uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; + uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; @@ -1526,17 +1526,18 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { qDebug("queue scan called"); if (pTaskInfo->streamInfo.pReq != NULL) { - if (pInfo->tqReader->pMsg == NULL) { - pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq; + if (pInfo->tqReader->msg2.msgStr == NULL) { + /*pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;*/ pInfo->tqReader->ver = pTaskInfo->streamInfo.scanVer; - const SSubmitReq* pSubmit = pInfo->tqReader->pMsg; + /*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/ /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ /*void* msgStr = pTaskInfo->streamInfo.*/ SPackedSubmit submit = pTaskInfo->streamInfo.submit; if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) { - qError("submit msg messed up when initing stream submit block %p", pSubmit); - pInfo->tqReader->pMsg = NULL; + qError("submit msg messed up when initing stream submit block %p", submit.msgStr); + pInfo->tqReader->msg2 = (SPackedSubmit){0}; + pInfo->tqReader->setMsg = 0; pTaskInfo->streamInfo.pReq = NULL; ASSERT(0); } @@ -1561,7 +1562,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } } - pInfo->tqReader->pMsg = NULL; pTaskInfo->streamInfo.pReq = NULL; return NULL; } @@ -1920,7 +1920,7 @@ FETCH_NEXT_BLOCK: NEXT_SUBMIT_BLK: while (1) { - if (pInfo->tqReader->pMsg == NULL) { + if (pInfo->tqReader->msg2.msgStr == NULL) { if (pInfo->validBlockIndex >= totBlockNum) { updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); doClearBufferedBlocks(pInfo); @@ -1928,13 +1928,12 @@ FETCH_NEXT_BLOCK: return NULL; } - int32_t current = pInfo->validBlockIndex++; - SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current); + int32_t current = pInfo->validBlockIndex++; + SPackedSubmit* pSubmit = taosArrayGetP(pInfo->pBlockLists, current); /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ - if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit, 0, 0) < 0) { + if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current, totBlockNum); - pInfo->tqReader->pMsg = NULL; continue; } } @@ -1985,7 +1984,6 @@ FETCH_NEXT_BLOCK: if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { break; } else { - pInfo->tqReader->pMsg = NULL; continue; } /*blockDataCleanup(pInfo->pRes);*/ @@ -2266,7 +2264,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } } - pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES); + pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedSubmit)); if (pInfo->pBlockLists == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _error; @@ -2286,7 +2284,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->base.dataReader = NULL; - code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock, &pTSInfo->base.dataReader, NULL); + code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock, + &pTSInfo->base.dataReader, NULL); if (code != 0) { terrno = code; destroyTableScanOperatorInfo(pTableScanOp); @@ -2356,7 +2355,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = + createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL); return pOperator; @@ -2493,7 +2493,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = + createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); return pOperator; @@ -2514,11 +2515,12 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx); - int64_t st = taosGetTimestampUs(); - void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); + int64_t st = taosGetTimestampUs(); + void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->base.readHandle; - int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); + int32_t code = + tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); if (code != 0) { T_LONG_JMP(pTaskInfo->env, code); } @@ -2916,8 +2918,8 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* SSDataBlock* pRes, char* dbName); static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName); -static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, - STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName); +static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, + STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName); static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes); static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, @@ -3042,8 +3044,8 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, optrDefaultBufFn, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, + optrDefaultBufFn, NULL); return pOperator; _error: diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 2fea5b9eca..68e8473c6d 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -66,12 +66,12 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock return 0; } -SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) { - SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); +SStreamDataSubmit2* streamDataSubmitNew(SPackedSubmit submit) { + SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM); if (pDataSubmit == NULL) return NULL; pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t)); if (pDataSubmit->dataRef == NULL) goto FAIL; - pDataSubmit->data = pReq; + pDataSubmit->submit = submit; *pDataSubmit->dataRef = 1; pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT; return pDataSubmit; @@ -80,47 +80,47 @@ FAIL: return NULL; } -SStreamMergedSubmit* streamMergedSubmitNew() { - SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM); +SStreamMergedSubmit2* streamMergedSubmitNew() { + SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM); if (pMerged == NULL) return NULL; - pMerged->reqs = taosArrayInit(0, sizeof(void*)); + pMerged->submits = taosArrayInit(0, sizeof(SPackedSubmit)); pMerged->dataRefs = taosArrayInit(0, sizeof(void*)); - if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL; + if (pMerged->dataRefs == NULL || pMerged->submits == NULL) goto FAIL; pMerged->type = STREAM_INPUT__MERGED_SUBMIT; return pMerged; FAIL: - if (pMerged->reqs) taosArrayDestroy(pMerged->reqs); + if (pMerged->submits) taosArrayDestroy(pMerged->submits); if (pMerged->dataRefs) taosArrayDestroy(pMerged->dataRefs); taosFreeQitem(pMerged); return NULL; } -int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { +int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) { taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef); - taosArrayPush(pMerged->reqs, &pSubmit->data); + taosArrayPush(pMerged->submits, &pSubmit->submit); pMerged->ver = pSubmit->ver; return 0; } -static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) { +static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) { atomic_add_fetch_32(pDataSubmit->dataRef, 1); } -SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) { - SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); +SStreamDataSubmit2* streamSubmitRefClone(SStreamDataSubmit2* pSubmit) { + SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM); if (pSubmitClone == NULL) { return NULL; } streamDataSubmitRefInc(pSubmit); - memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit)); + memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2)); return pSubmitClone; } -void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) { +void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit) { int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); ASSERT(ref >= 0); if (ref == 0) { - taosMemoryFree(pDataSubmit->data); + taosMemoryFree(pDataSubmit->submit.msgStr); taosMemoryFree(pDataSubmit->dataRef); } } @@ -135,16 +135,16 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* taosFreeQitem(elem); return dst; } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; - SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)elem; + SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)dst; + SStreamDataSubmit2* pBlockSrc = (SStreamDataSubmit2*)elem; streamMergeSubmit(pMerged, pBlockSrc); taosFreeQitem(elem); return dst; } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); + SStreamMergedSubmit2* pMerged = streamMergedSubmitNew(); ASSERT(pMerged); - streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst); - streamMergeSubmit(pMerged, (SStreamDataSubmit*)elem); + streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst); + streamMergeSubmit(pMerged, (SStreamDataSubmit2*)elem); taosFreeQitem(dst); taosFreeQitem(elem); return (SStreamQueueItem*)pMerged; @@ -162,22 +162,22 @@ void streamFreeQitem(SStreamQueueItem* data) { taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(data); } else if (type == STREAM_INPUT__DATA_SUBMIT) { - streamDataSubmitRefDec((SStreamDataSubmit*)data); + streamDataSubmitRefDec((SStreamDataSubmit2*)data); taosFreeQitem(data); } else if (type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; - int32_t sz = taosArrayGetSize(pMerge->reqs); + SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data; + int32_t sz = taosArrayGetSize(pMerge->submits); for (int32_t i = 0; i < sz; i++) { int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i); int32_t ref = atomic_sub_fetch_32(pRef, 1); ASSERT(ref >= 0); if (ref == 0) { - void* dataStr = taosArrayGetP(pMerge->reqs, i); - taosMemoryFree(dataStr); + SPackedSubmit* pSubmit = (SPackedSubmit*)taosArrayGet(pMerge->submits, i); + taosMemoryFree(pSubmit->msgStr); taosMemoryFree(pRef); } } - taosArrayDestroy(pMerge->reqs); + taosArrayDestroy(pMerge->submits); taosArrayDestroy(pMerge->dataRefs); taosFreeQitem(pMerge); } else if (type == STREAM_INPUT__REF_DATA_BLOCK) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 6a83a9a4da..3d6495b1a9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -26,17 +26,18 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); - const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data; - qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef); - qSetMultiStreamInput(exec, pSubmit->data, 1, STREAM_INPUT__DATA_SUBMIT); + const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; + qDebug("task %d %p set submit input %p %p %d %" PRId64, pTask->taskId, pTask, pSubmit, pSubmit->submit.msgStr, + pSubmit->submit.msgLen, pSubmit->submit.ver); + qSetMultiStreamInput(exec, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data; SArray* blocks = pBlock->blocks; qDebug("task %d %p set ssdata input", pTask->taskId, pTask); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { - const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data; - SArray* blocks = pMerged->reqs; + const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data; + SArray* blocks = pMerged->submits; qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT); } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { @@ -244,11 +245,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { qRes->blocks = pRes; if (((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)input; + SStreamDataSubmit2* pSubmit = (SStreamDataSubmit2*)input; qRes->childId = pTask->selfChildId; qRes->sourceVer = pSubmit->ver; } else if (((SStreamQueueItem*)input)->type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)input; + SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)input; qRes->childId = pTask->selfChildId; qRes->sourceVer = pMerged->ver; }