refactor: support submitreq2

This commit is contained in:
Liu Jicong 2022-12-07 15:19:34 +08:00
parent a039353854
commit b133a56219
16 changed files with 180 additions and 124 deletions

View File

@ -3273,6 +3273,12 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag);
#define TSDB_MSG_FLG_ENCODE 0x1 #define TSDB_MSG_FLG_ENCODE 0x1
#define TSDB_MSG_FLG_DECODE 0x2 #define TSDB_MSG_FLG_DECODE 0x2
typedef struct {
void* msgStr;
int32_t msgLen;
int64_t ver;
} SPackedSubmit;
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -190,7 +190,9 @@ int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t ver); // int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t ver);
//
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedSubmit submit);
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);

View File

@ -103,6 +103,7 @@ typedef struct {
int8_t type; int8_t type;
} SStreamQueueItem; } SStreamQueueItem;
#if 0
typedef struct { typedef struct {
int8_t type; int8_t type;
int64_t ver; int64_t ver;
@ -116,6 +117,21 @@ typedef struct {
SArray* dataRefs; // SArray<int32_t*> SArray* dataRefs; // SArray<int32_t*>
SArray* reqs; // SArray<SSubmitReq*> SArray* reqs; // SArray<SSubmitReq*>
} SStreamMergedSubmit; } SStreamMergedSubmit;
#endif
typedef struct {
int8_t type;
int64_t ver;
int32_t* dataRef;
SPackedSubmit submit;
} SStreamDataSubmit2;
typedef struct {
int8_t type;
int64_t ver;
SArray* dataRefs; // SArray<int32_t*>
SArray* submits; // SArray<SPackedSubmit>
} SStreamMergedSubmit2;
typedef struct { typedef struct {
int8_t type; int8_t type;
@ -219,11 +235,11 @@ static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
} }
} }
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq); SStreamDataSubmit2* streamDataSubmitNew(SPackedSubmit submit);
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit); void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit);
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit); SStreamDataSubmit2* streamSubmitRefClone(SStreamDataSubmit2* pSubmit);
typedef struct { typedef struct {
char* qmsg; char* qmsg;
@ -355,14 +371,15 @@ void tFreeSStreamTask(SStreamTask* pTask);
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem); SStreamDataSubmit2* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit2*)pItem);
if (pSubmitClone == NULL) { if (pSubmitClone == NULL) {
qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask); qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1; return -1;
} }
qDebug("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data); qDebug("task %d %p submit enqueue %p %p %p %d %" PRId64, pTask->taskId, pTask, pItem, pSubmitClone,
pSubmitClone->submit.msgStr, pSubmitClone->submit.msgLen, pSubmitClone->submit.ver);
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone); taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
// qStreamInput(pTask->exec.executor, pSubmitClone); // qStreamInput(pTask->exec.executor, pSubmitClone);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE || } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE ||

View File

@ -220,12 +220,6 @@ typedef struct SSnapContext {
bool queryMetaOrData; // true-get meta, false-get data bool queryMetaOrData; // true-get meta, false-get data
} SSnapContext; } SSnapContext;
typedef struct {
void *msgStr;
int32_t msgLen;
int64_t ver;
} SPackedSubmit;
typedef struct STqReader { typedef struct STqReader {
const SSubmitReq *pMsg; const SSubmitReq *pMsg;
// SSubmitBlk *pBlock; // SSubmitBlk *pBlock;
@ -234,7 +228,9 @@ typedef struct STqReader {
int64_t ver; int64_t ver;
SPackedSubmit msg2; SPackedSubmit msg2;
SSubmitReq2 *pSubmit;
int8_t setMsg;
SSubmitReq2 submit;
int32_t nextBlk; int32_t nextBlk;
int64_t lastBlkUid; int64_t lastBlkUid;

View File

@ -182,7 +182,8 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
// tqSink // tqSink
// void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data); // void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data); // void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
// tqOffset // tqOffset
char* tqOffsetBuildFName(const char* path, int32_t fVer); char* tqOffsetBuildFName(const char* path, int32_t fVer);

View File

@ -180,7 +180,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* data, int64_t ver); int32_t tqProcessSubmitReq(STQ* pTq, SPackedSubmit submit);
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);

View File

@ -953,7 +953,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->smaSink.smaSink = smaHandleRes; pTask->smaSink.smaSink = smaHandleRes;
} else if (pTask->outputType == TASK_OUTPUT__TABLE) { } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
pTask->tbSink.vnode = pTq->pVnode; pTask->tbSink.vnode = pTq->pVnode;
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline; pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
ASSERT(pTask->tbSink.pSchemaWrapper); ASSERT(pTask->tbSink.pSchemaWrapper);
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema); ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
@ -1334,12 +1334,12 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
return 0; return 0;
} }
int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { int32_t tqProcessSubmitReq(STQ* pTq, SPackedSubmit submit) {
void* pIter = NULL; void* pIter = NULL;
bool failed = false; bool failed = false;
SStreamDataSubmit* pSubmit = NULL; SStreamDataSubmit2* pSubmit = NULL;
pSubmit = streamDataSubmitNew(pReq); pSubmit = streamDataSubmitNew(submit);
if (pSubmit == NULL) { if (pSubmit == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to create data submit for stream since out of memory"); tqError("failed to create data submit for stream since out of memory");
@ -1356,7 +1356,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
continue; continue;
} }
tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver); tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, submit.ver);
if (!failed) { if (!failed) {
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) { if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {

View File

@ -213,7 +213,11 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif #endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType)); void* pReq = POINTER_SHIFT(msg, sizeof(SMsgHead));
int32_t len = msgLen - sizeof(SMsgHead);
tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver,
TMSG_INFO(msgType), msg, pReq, len);
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost // lock push mgr to avoid potential msg lost
@ -222,7 +226,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
if (taosHashGetSize(pTq->pPushMgr) != 0) { if (taosHashGetSize(pTq->pPushMgr) != 0) {
SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
void* data = taosMemoryMalloc(msgLen); void* data = taosMemoryMalloc(len);
if (data == NULL) { if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory"); tqError("failed to copy data for stream since out of memory");
@ -230,9 +234,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
taosArrayDestroy(cachedKeyLens); taosArrayDestroy(cachedKeyLens);
return -1; return -1;
} }
memcpy(data, msg, msgLen); memcpy(data, pReq, len);
SSubmitReq* pReq = (SSubmitReq*)data;
pReq->version = ver;
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
@ -256,7 +258,12 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
SMqDataRsp* pRsp = &pPushEntry->dataRsp; SMqDataRsp* pRsp = &pPushEntry->dataRsp;
// prepare scan mem data // prepare scan mem data
qStreamScanMemData(task, pReq, ver); SPackedSubmit submit = {
.msgStr = data,
.msgLen = len,
.ver = ver,
};
qStreamSetScanMemData(task, submit);
// exec // exec
while (1) { while (1) {
@ -310,17 +317,22 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
if (vnodeIsRoleLeader(pTq->pVnode)) { if (vnodeIsRoleLeader(pTq->pVnode)) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {
void* data = taosMemoryMalloc(msgLen); void* data = taosMemoryMalloc(len);
if (data == NULL) { if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory"); tqError("failed to copy data for stream since out of memory");
return -1; return -1;
} }
memcpy(data, msg, msgLen); memcpy(data, pReq, len);
SSubmitReq* pReq = (SSubmitReq*)data; SPackedSubmit submit = {
pReq->version = ver; .msgStr = data,
.msgLen = len,
.ver = ver,
};
tqProcessSubmitReq(pTq, data, ver); tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq);
tqProcessSubmitReq(pTq, submit);
} }
if (msgType == TDMT_VND_DELETE) { if (msgType == TDMT_VND_DELETE) {
tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver); tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);

View File

@ -306,7 +306,7 @@ int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
} }
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
bool fromProcessedMsg = pReader->pMsg != NULL; bool fromProcessedMsg = pReader->msg2.msgStr != NULL;
while (1) { while (1) {
if (!fromProcessedMsg) { if (!fromProcessedMsg) {
@ -381,15 +381,22 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v
int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
ASSERT(pReader->msg2.msgStr == NULL); ASSERT(pReader->msg2.msgStr == NULL);
ASSERT(msgStr);
ASSERT(msgLen);
pReader->msg2.msgStr = msgStr; pReader->msg2.msgStr = msgStr;
pReader->msg2.msgLen = msgLen; pReader->msg2.msgLen = msgLen;
pReader->msg2.ver = ver; pReader->msg2.ver = ver;
if (pReader->pSubmit == NULL) { tqDebug("tq reader set msg %p", msgStr);
if (pReader->setMsg == 0) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, pReader->msg2.msgStr, pReader->msg2.msgLen); tDecoderInit(&decoder, pReader->msg2.msgStr, pReader->msg2.msgLen);
tDecodeSSubmitReq2(&decoder, pReader->pSubmit); if (tDecodeSSubmitReq2(&decoder, &pReader->submit) < 0) {
ASSERT(0);
}
tDecoderClear(&decoder); tDecoderClear(&decoder);
pReader->setMsg = 1;
} }
return 0; return 0;
} }
@ -422,11 +429,14 @@ bool tqNextDataBlock(STqReader* pReader) {
bool tqNextDataBlock2(STqReader* pReader) { bool tqNextDataBlock2(STqReader* pReader) {
if (pReader->msg2.msgStr == NULL) return false; if (pReader->msg2.msgStr == NULL) return false;
ASSERT(pReader->pSubmit != NULL); ASSERT(pReader->setMsg == 1);
int32_t blockSz = taosArrayGetSize(pReader->pSubmit->aSubmitTbData); tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen,
pReader->msg2.ver, pReader->nextBlk);
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < blockSz) { while (pReader->nextBlk < blockSz) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->pSubmit->aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) return true; if (pReader->tbIdHash == NULL) return true;
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
@ -435,8 +445,8 @@ bool tqNextDataBlock2(STqReader* pReader) {
} }
} }
tDestroySSubmitReq2(pReader->pSubmit, TSDB_MSG_FLG_DECODE); tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->pSubmit = NULL; pReader->setMsg = 0;
pReader->nextBlk = 0; pReader->nextBlk = 0;
pReader->msg2.msgStr = NULL; pReader->msg2.msgStr = NULL;
@ -445,11 +455,11 @@ bool tqNextDataBlock2(STqReader* pReader) {
bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) { bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) {
if (pReader->msg2.msgStr == NULL) return false; if (pReader->msg2.msgStr == NULL) return false;
ASSERT(pReader->pSubmit != NULL); ASSERT(pReader->setMsg == 1);
int32_t blockSz = taosArrayGetSize(pReader->pSubmit->aSubmitTbData); int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < blockSz) { while (pReader->nextBlk < blockSz) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->pSubmit->aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) return true; if (pReader->tbIdHash == NULL) return true;
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
@ -458,8 +468,8 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) {
} }
} }
tDestroySSubmitReq2(pReader->pSubmit, TSDB_MSG_FLG_DECODE); tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->pSubmit = NULL; pReader->setMsg = 0;
pReader->nextBlk = 0; pReader->nextBlk = 0;
pReader->msg2.msgStr = NULL; pReader->msg2.msgStr = NULL;
@ -548,10 +558,12 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader)
#endif #endif
int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
int32_t blockSz = taosArrayGetSize(pReader->pSubmit->aSubmitTbData); int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
ASSERT(pReader->nextBlk < blockSz); ASSERT(pReader->nextBlk < blockSz);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->pSubmit->aSubmitTbData, pReader->nextBlk); tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++; pReader->nextBlk++;
int32_t sversion = pSubmitTbData->sver; int32_t sversion = pSubmitTbData->sver;
@ -672,7 +684,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
} else { } else {
val = &colVal.value.val; val = &colVal.value.val;
} }
if (colDataAppend(pColData, i, val, colVal.type != TD_VTYPE_NORM) < 0) { if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL; goto FAIL;
} }
} }
@ -686,14 +698,14 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
SArray* pRows = pSubmitTbData->aRowP; SArray* pRows = pSubmitTbData->aRowP;
for (int32_t i = 0; i < numOfRows; i++) { for (int32_t i = 0; i < numOfRows; i++) {
SRow* pRow = taosArrayGet(pRows, i); SRow* pRow = taosArrayGetP(pRows, i);
int32_t targetIdx = 0; int32_t targetIdx = 0;
int32_t sourceIdx = 0; int32_t sourceIdx = 0;
for (int32_t j = 0; j < colActual; j++) { for (int32_t j = 0; j < colActual; j++) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
while (1) { while (1) {
ASSERT(sourceIdx < numOfRows); ASSERT(sourceIdx < pTschema->numOfCols);
SColVal colVal; SColVal colVal;
tRowGet(pRow, pTschema, sourceIdx, &colVal); tRowGet(pRow, pTschema, sourceIdx, &colVal);
@ -707,7 +719,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
} else { } else {
val = &colVal.value.val; val = &colVal.value.val;
} }
if (colDataAppend(pColData, i, val, colVal.type != TD_VTYPE_NORM) < 0) { if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL; goto FAIL;
} }

View File

@ -957,7 +957,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
if (NULL == pBuf) { if (NULL == pBuf) {
goto _end; goto _end;
} }
((SMsgHead*)pBuf)->vgId = htonl(TD_VID(pVnode)); ((SMsgHead*)pBuf)->vgId = TD_VID(pVnode);
((SMsgHead*)pBuf)->contLen = htonl(len); ((SMsgHead*)pBuf)->contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) { if (tEncodeSSubmitReq2(&encoder, pReq) < 0) {
@ -969,7 +969,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SRpcMsg msg = { SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT, .msgType = TDMT_VND_SUBMIT,
.pCont = pBuf, .pCont = pBuf,
.contLen = ntohl(len), .contLen = len,
}; };
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {

View File

@ -871,6 +871,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
pRsp->code = TSDB_CODE_SUCCESS; pRsp->code = TSDB_CODE_SUCCESS;
vDebug("vvvvvvvvvvvv %p, %d", pReq, len);
// decode // decode
SDecoder dc = {0}; SDecoder dc = {0};
tDecoderInit(&dc, pReq, len); tDecoderInit(&dc, pReq, len);

View File

@ -115,7 +115,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
if (type == STREAM_INPUT__MERGED_SUBMIT) { if (type == STREAM_INPUT__MERGED_SUBMIT) {
// ASSERT(numOfBlocks > 1); // ASSERT(numOfBlocks > 1);
for (int32_t i = 0; i < numOfBlocks; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*)); SPackedSubmit* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(SPackedSubmit));
taosArrayPush(pInfo->pBlockLists, &pReq); taosArrayPush(pInfo->pBlockLists, &pReq);
} }
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
@ -126,7 +126,10 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} else if (type == STREAM_INPUT__DATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
taosArrayPush(pInfo->pBlockLists, &pDataBlock); SPackedSubmit tmp = {
.msgStr = pDataBlock,
};
taosArrayPush(pInfo->pBlockLists, &tmp);
} }
pInfo->blockType = STREAM_INPUT__DATA_BLOCK; pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else { } else {
@ -1002,6 +1005,7 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#if 0
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t scanVer) { int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t scanVer) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
@ -1010,6 +1014,7 @@ int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t sc
pTaskInfo->streamInfo.scanVer = scanVer; pTaskInfo->streamInfo.scanVer = scanVer;
return 0; return 0;
} }
#endif
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedSubmit submit) { int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedSubmit submit) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

View File

@ -768,8 +768,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
ASSERT(pInfo->base.dataReader == NULL); ASSERT(pInfo->base.dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
pInfo->pResBlock, (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo)); (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
@ -986,8 +986,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SSDataBlock* pBlock = pTableScanInfo->pResBlock; SSDataBlock* pBlock = pTableScanInfo->pResBlock;
STsdbReader* pReader = NULL; STsdbReader* pReader = NULL;
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, (STsdbReader**)&pReader, int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
GET_TASKID(pTaskInfo)); (STsdbReader**)&pReader, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
@ -995,7 +995,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
} }
if (tsdbNextDataBlock(pReader)) { if (tsdbNextDataBlock(pReader)) {
/*SSDataBlock* p = */tsdbRetrieveDataBlock(pReader, NULL); /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
} }
@ -1526,17 +1526,18 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug("queue scan called"); qDebug("queue scan called");
if (pTaskInfo->streamInfo.pReq != NULL) { if (pTaskInfo->streamInfo.pReq != NULL) {
if (pInfo->tqReader->pMsg == NULL) { if (pInfo->tqReader->msg2.msgStr == NULL) {
pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq; /*pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;*/
pInfo->tqReader->ver = pTaskInfo->streamInfo.scanVer; pInfo->tqReader->ver = pTaskInfo->streamInfo.scanVer;
const SSubmitReq* pSubmit = pInfo->tqReader->pMsg; /*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/
/*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
/*void* msgStr = pTaskInfo->streamInfo.*/ /*void* msgStr = pTaskInfo->streamInfo.*/
SPackedSubmit submit = pTaskInfo->streamInfo.submit; SPackedSubmit submit = pTaskInfo->streamInfo.submit;
if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) { if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
qError("submit msg messed up when initing stream submit block %p", pSubmit); qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
pInfo->tqReader->pMsg = NULL; pInfo->tqReader->msg2 = (SPackedSubmit){0};
pInfo->tqReader->setMsg = 0;
pTaskInfo->streamInfo.pReq = NULL; pTaskInfo->streamInfo.pReq = NULL;
ASSERT(0); ASSERT(0);
} }
@ -1561,7 +1562,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
} }
} }
pInfo->tqReader->pMsg = NULL;
pTaskInfo->streamInfo.pReq = NULL; pTaskInfo->streamInfo.pReq = NULL;
return NULL; return NULL;
} }
@ -1920,7 +1920,7 @@ FETCH_NEXT_BLOCK:
NEXT_SUBMIT_BLK: NEXT_SUBMIT_BLK:
while (1) { while (1) {
if (pInfo->tqReader->pMsg == NULL) { if (pInfo->tqReader->msg2.msgStr == NULL) {
if (pInfo->validBlockIndex >= totBlockNum) { if (pInfo->validBlockIndex >= totBlockNum) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
doClearBufferedBlocks(pInfo); doClearBufferedBlocks(pInfo);
@ -1929,12 +1929,11 @@ FETCH_NEXT_BLOCK:
} }
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current); SPackedSubmit* pSubmit = taosArrayGetP(pInfo->pBlockLists, current);
/*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit, 0, 0) < 0) { if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current, qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
totBlockNum); totBlockNum);
pInfo->tqReader->pMsg = NULL;
continue; continue;
} }
} }
@ -1985,7 +1984,6 @@ FETCH_NEXT_BLOCK:
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
break; break;
} else { } else {
pInfo->tqReader->pMsg = NULL;
continue; continue;
} }
/*blockDataCleanup(pInfo->pRes);*/ /*blockDataCleanup(pInfo->pRes);*/
@ -2266,7 +2264,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
} }
} }
pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES); pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedSubmit));
if (pInfo->pBlockLists == NULL) { if (pInfo->pBlockLists == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
@ -2286,7 +2284,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pHandle->initTableReader) { if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
pTSInfo->base.dataReader = NULL; pTSInfo->base.dataReader = NULL;
code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock, &pTSInfo->base.dataReader, NULL); code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock,
&pTSInfo->base.dataReader, NULL);
if (code != 0) { if (code != 0) {
terrno = code; terrno = code;
destroyTableScanOperatorInfo(pTableScanOp); destroyTableScanOperatorInfo(pTableScanOp);
@ -2356,7 +2355,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL); pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
return pOperator; return pOperator;
@ -2493,7 +2493,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
return pOperator; return pOperator;
@ -2518,7 +2519,8 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle; SReadHandle* pHandle = &pInfo->base.readHandle;
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); int32_t code =
tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
if (code != 0) { if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
@ -3042,8 +3044,8 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED, setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, optrDefaultBufFn, NULL); optrDefaultBufFn, NULL);
return pOperator; return pOperator;
_error: _error:

View File

@ -66,12 +66,12 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
return 0; return 0;
} }
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) { SStreamDataSubmit2* streamDataSubmitNew(SPackedSubmit submit) {
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM);
if (pDataSubmit == NULL) return NULL; if (pDataSubmit == NULL) return NULL;
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t)); pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
if (pDataSubmit->dataRef == NULL) goto FAIL; if (pDataSubmit->dataRef == NULL) goto FAIL;
pDataSubmit->data = pReq; pDataSubmit->submit = submit;
*pDataSubmit->dataRef = 1; *pDataSubmit->dataRef = 1;
pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT; pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT;
return pDataSubmit; return pDataSubmit;
@ -80,47 +80,47 @@ FAIL:
return NULL; return NULL;
} }
SStreamMergedSubmit* streamMergedSubmitNew() { SStreamMergedSubmit2* streamMergedSubmitNew() {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM); SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM);
if (pMerged == NULL) return NULL; if (pMerged == NULL) return NULL;
pMerged->reqs = taosArrayInit(0, sizeof(void*)); pMerged->submits = taosArrayInit(0, sizeof(SPackedSubmit));
pMerged->dataRefs = taosArrayInit(0, sizeof(void*)); pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL; if (pMerged->dataRefs == NULL || pMerged->submits == NULL) goto FAIL;
pMerged->type = STREAM_INPUT__MERGED_SUBMIT; pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
return pMerged; return pMerged;
FAIL: FAIL:
if (pMerged->reqs) taosArrayDestroy(pMerged->reqs); if (pMerged->submits) taosArrayDestroy(pMerged->submits);
if (pMerged->dataRefs) taosArrayDestroy(pMerged->dataRefs); if (pMerged->dataRefs) taosArrayDestroy(pMerged->dataRefs);
taosFreeQitem(pMerged); taosFreeQitem(pMerged);
return NULL; return NULL;
} }
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) {
taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef); taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
taosArrayPush(pMerged->reqs, &pSubmit->data); taosArrayPush(pMerged->submits, &pSubmit->submit);
pMerged->ver = pSubmit->ver; pMerged->ver = pSubmit->ver;
return 0; return 0;
} }
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) { static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) {
atomic_add_fetch_32(pDataSubmit->dataRef, 1); atomic_add_fetch_32(pDataSubmit->dataRef, 1);
} }
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) { SStreamDataSubmit2* streamSubmitRefClone(SStreamDataSubmit2* pSubmit) {
SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM);
if (pSubmitClone == NULL) { if (pSubmitClone == NULL) {
return NULL; return NULL;
} }
streamDataSubmitRefInc(pSubmit); streamDataSubmitRefInc(pSubmit);
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit)); memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2));
return pSubmitClone; return pSubmitClone;
} }
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) { void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit) {
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0); ASSERT(ref >= 0);
if (ref == 0) { if (ref == 0) {
taosMemoryFree(pDataSubmit->data); taosMemoryFree(pDataSubmit->submit.msgStr);
taosMemoryFree(pDataSubmit->dataRef); taosMemoryFree(pDataSubmit->dataRef);
} }
} }
@ -135,16 +135,16 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
taosFreeQitem(elem); taosFreeQitem(elem);
return dst; return dst;
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)dst;
SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)elem; SStreamDataSubmit2* pBlockSrc = (SStreamDataSubmit2*)elem;
streamMergeSubmit(pMerged, pBlockSrc); streamMergeSubmit(pMerged, pBlockSrc);
taosFreeQitem(elem); taosFreeQitem(elem);
return dst; return dst;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); SStreamMergedSubmit2* pMerged = streamMergedSubmitNew();
ASSERT(pMerged); ASSERT(pMerged);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst); streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)elem); streamMergeSubmit(pMerged, (SStreamDataSubmit2*)elem);
taosFreeQitem(dst); taosFreeQitem(dst);
taosFreeQitem(elem); taosFreeQitem(elem);
return (SStreamQueueItem*)pMerged; return (SStreamQueueItem*)pMerged;
@ -162,22 +162,22 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) { } else if (type == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitRefDec((SStreamDataSubmit*)data); streamDataSubmitRefDec((SStreamDataSubmit2*)data);
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__MERGED_SUBMIT) { } else if (type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data;
int32_t sz = taosArrayGetSize(pMerge->reqs); int32_t sz = taosArrayGetSize(pMerge->submits);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i); int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
int32_t ref = atomic_sub_fetch_32(pRef, 1); int32_t ref = atomic_sub_fetch_32(pRef, 1);
ASSERT(ref >= 0); ASSERT(ref >= 0);
if (ref == 0) { if (ref == 0) {
void* dataStr = taosArrayGetP(pMerge->reqs, i); SPackedSubmit* pSubmit = (SPackedSubmit*)taosArrayGet(pMerge->submits, i);
taosMemoryFree(dataStr); taosMemoryFree(pSubmit->msgStr);
taosMemoryFree(pRef); taosMemoryFree(pRef);
} }
} }
taosArrayDestroy(pMerge->reqs); taosArrayDestroy(pMerge->submits);
taosArrayDestroy(pMerge->dataRefs); taosArrayDestroy(pMerge->dataRefs);
taosFreeQitem(pMerge); taosFreeQitem(pMerge);
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) { } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {

View File

@ -26,17 +26,18 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data; const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data;
qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef); qDebug("task %d %p set submit input %p %p %d %" PRId64, pTask->taskId, pTask, pSubmit, pSubmit->submit.msgStr,
qSetMultiStreamInput(exec, pSubmit->data, 1, STREAM_INPUT__DATA_SUBMIT); pSubmit->submit.msgLen, pSubmit->submit.ver);
qSetMultiStreamInput(exec, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data; const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
SArray* blocks = pBlock->blocks; SArray* blocks = pBlock->blocks;
qDebug("task %d %p set ssdata input", pTask->taskId, pTask); qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data; const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data;
SArray* blocks = pMerged->reqs; SArray* blocks = pMerged->submits;
qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size); qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT);
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
@ -244,11 +245,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qRes->blocks = pRes; qRes->blocks = pRes;
if (((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_SUBMIT) { if (((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)input; SStreamDataSubmit2* pSubmit = (SStreamDataSubmit2*)input;
qRes->childId = pTask->selfChildId; qRes->childId = pTask->selfChildId;
qRes->sourceVer = pSubmit->ver; qRes->sourceVer = pSubmit->ver;
} else if (((SStreamQueueItem*)input)->type == STREAM_INPUT__MERGED_SUBMIT) { } else if (((SStreamQueueItem*)input)->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)input; SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)input;
qRes->childId = pTask->selfChildId; qRes->childId = pTask->selfChildId;
qRes->sourceVer = pMerged->ver; qRes->sourceVer = pMerged->ver;
} }