diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 370103c222..27084700b0 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -239,6 +239,7 @@ int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t v bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); +int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 48c14bc758..97d3acba93 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -44,7 +44,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRs return 0; } -static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { +static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) { SMetaReader mr = {0}; metaReaderInit(&mr, pTq->pVnode->pMeta, 0); // TODO add reference to gurantee success @@ -52,8 +52,10 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { metaReaderClear(&mr); return -1; } - char* tbName = strdup(mr.me.name); - taosArrayPush(pRsp->blockTbName, &tbName); + for (int32_t i = 0; i < n; i++) { + char* tbName = strdup(mr.me.name); + taosArrayPush(pRsp->blockTbName, &tbName); + } metaReaderClear(&mr); return 0; } @@ -111,7 +113,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs if (pRsp->withTbName) { if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) { int64_t uid = pExec->pExecReader->msgIter.uid; - tqAddTbNameToRsp(pTq, uid, pRsp); + tqAddTbNameToRsp(pTq, uid, pRsp, 1); } else { pRsp->withTbName = false; } @@ -155,7 +157,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta int64_t uid = 0; if (pOffset->type == TMQ_OFFSET__LOG) { uid = pExec->pExecReader->msgIter.uid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { + if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, 1) < 0) { continue; } } else { @@ -225,18 +227,27 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp STqExecHandle* pExec = &pHandle->execHandle; ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); + SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); + SArray* pSchemas = taosArrayInit(0, sizeof(SSchemaWrapper)); + if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { STqReader* pReader = pExec->pExecReader; tqReaderSetDataMsg(pReader, pReq, 0); while (tqNextDataBlock(pReader)) { - SSDataBlock block = {0}; - if (tqRetrieveDataBlock(&block, pReader) < 0) { + /*SSDataBlock block = {0};*/ + /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/ + /*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/ + /*}*/ + + taosArrayClear(pBlocks); + taosArrayClear(pSchemas); + if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { int64_t uid = pExec->pExecReader->msgIter.uid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { - blockDataFreeRes(&block); + if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) { + taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); continue; } } @@ -255,24 +266,34 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp pRsp->createTableNum++; } } - tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock), - pTq->pVnode->config.tsdbCfg.precision); - blockDataFreeRes(&block); - tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); - pRsp->blockNum++; + for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { + SSDataBlock* pBlock = taosArrayGet(pBlocks, i); + tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), + pTq->pVnode->config.tsdbCfg.precision); + blockDataFreeRes(pBlock); + SSchemaWrapper* pSW = taosArrayGet(pSchemas, i); + taosArrayPush(pRsp->blockSchema, &pSW); + pRsp->blockNum++; + } } } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { STqReader* pReader = pExec->pExecReader; tqReaderSetDataMsg(pReader, pReq, 0); while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { - SSDataBlock block = {0}; - if (tqRetrieveDataBlock(&block, pReader) < 0) { + /*SSDataBlock block = {0};*/ + /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/ + /*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/ + /*}*/ + taosArrayClear(pBlocks); + taosArrayClear(pSchemas); + if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { int64_t uid = pExec->pExecReader->msgIter.uid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { - blockDataFreeRes(&block); + if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) { + taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); + /*blockDataFreeRes(&block);*/ continue; } } @@ -291,14 +312,26 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp pRsp->createTableNum++; } } - tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock), - pTq->pVnode->config.tsdbCfg.precision); - blockDataFreeRes(&block); - tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); - pRsp->blockNum++; + /*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/ + /*pTq->pVnode->config.tsdbCfg.precision);*/ + /*blockDataFreeRes(&block);*/ + /*tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);*/ + /*pRsp->blockNum++;*/ + for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { + SSDataBlock* pBlock = taosArrayGet(pBlocks, i); + tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), + pTq->pVnode->config.tsdbCfg.precision); + blockDataFreeRes(pBlock); + SSchemaWrapper* pSW = taosArrayGet(pSchemas, i); + taosArrayPush(pRsp->blockSchema, &pSW); + pRsp->blockNum++; + } } } + taosArrayDestroy(pBlocks); + taosArrayDestroy(pSchemas); + if (pRsp->blockNum == 0) { return -1; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 5c5ba1205e..30897cbe48 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -556,7 +556,7 @@ FAIL: return -1; } -int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* schemas) { +int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas) { int32_t sversion = htonl(pReader->pBlock->sversion); if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || @@ -594,7 +594,7 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch int32_t curRow = 0; char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); - if (assigned) return -1; + if (assigned == NULL) return -1; tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter); STSRowIter iter = {0}; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index b2624d1bc1..27bfea0534 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -349,7 +349,6 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d .contLen = len + sizeof(SMsgHead), }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { - rpcFreeCont(serializedDeleteReq); tqDebug("failed to put delete req into write-queue since %s", terrstr()); } } else { @@ -476,12 +475,12 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; - SSubmitReq* ret = rpcMallocCont(cap); - ret->header.vgId = pVnode->config.vgId; - ret->length = sizeof(SSubmitReq); - ret->numOfBlocks = htonl(1); + SSubmitReq* pSubmit = rpcMallocCont(cap); + pSubmit->header.vgId = pVnode->config.vgId; + pSubmit->length = sizeof(SSubmitReq); + pSubmit->numOfBlocks = htonl(1); - SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq)); + SSubmitBlk* blkHead = POINTER_SHIFT(pSubmit, sizeof(SSubmitReq)); blkHead->numOfRows = htonl(pDataBlock->info.rows); blkHead->sversion = htonl(pTSchema->version); @@ -531,17 +530,16 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d } blkHead->dataLen = htonl(dataLen); - ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen; - ret->length = htonl(ret->length); + pSubmit->length += sizeof(SSubmitBlk) + schemaLen + dataLen; + pSubmit->length = htonl(pSubmit->length); SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, - .pCont = ret, - .contLen = ntohl(ret->length), + .pCont = pSubmit, + .contLen = ntohl(pSubmit->length), }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { - rpcFreeCont(ret); tqDebug("failed to put into write-queue since %s", terrstr()); } }