diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 370103c222..27084700b0 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -239,6 +239,7 @@ int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t v bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); +int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 092128fe6e..5e35e34b87 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -671,7 +671,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) { - /*ASSERT(0);*/ } if (taosxRsp.blockNum > 0 /* threshold */) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 48c14bc758..3887f72740 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -44,7 +44,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRs return 0; } -static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { +static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) { SMetaReader mr = {0}; metaReaderInit(&mr, pTq->pVnode->pMeta, 0); // TODO add reference to gurantee success @@ -52,8 +52,10 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { metaReaderClear(&mr); return -1; } - char* tbName = strdup(mr.me.name); - taosArrayPush(pRsp->blockTbName, &tbName); + for (int32_t i = 0; i < n; i++) { + char* tbName = strdup(mr.me.name); + taosArrayPush(pRsp->blockTbName, &tbName); + } metaReaderClear(&mr); return 0; } @@ -111,7 +113,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs if (pRsp->withTbName) { if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) { int64_t uid = pExec->pExecReader->msgIter.uid; - tqAddTbNameToRsp(pTq, uid, pRsp); + tqAddTbNameToRsp(pTq, uid, pRsp, 1); } else { pRsp->withTbName = false; } @@ -155,7 +157,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta int64_t uid = 0; if (pOffset->type == TMQ_OFFSET__LOG) { uid = pExec->pExecReader->msgIter.uid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { + if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, 1) < 0) { continue; } } else { @@ -225,18 +227,30 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp STqExecHandle* pExec = &pHandle->execHandle; ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); + SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); + SArray* pSchemas = taosArrayInit(0, sizeof(void*)); + if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { STqReader* pReader = pExec->pExecReader; tqReaderSetDataMsg(pReader, pReq, 0); while (tqNextDataBlock(pReader)) { - SSDataBlock block = {0}; - if (tqRetrieveDataBlock(&block, pReader) < 0) { + /*SSDataBlock block = {0};*/ + /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/ + /*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/ + /*}*/ + + taosArrayClear(pBlocks); + taosArrayClear(pSchemas); + if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { int64_t uid = pExec->pExecReader->msgIter.uid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { - blockDataFreeRes(&block); + if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) { + taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); + taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper); + pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); + pSchemas = taosArrayInit(0, sizeof(void*)); continue; } } @@ -255,25 +269,37 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp pRsp->createTableNum++; } } - tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock), - pTq->pVnode->config.tsdbCfg.precision); - blockDataFreeRes(&block); - tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); - pRsp->blockNum++; + for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { + SSDataBlock* pBlock = taosArrayGet(pBlocks, i); + tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), + pTq->pVnode->config.tsdbCfg.precision); + blockDataFreeRes(pBlock); + SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); + taosArrayPush(pRsp->blockSchema, &pSW); + pRsp->blockNum++; + } } } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { STqReader* pReader = pExec->pExecReader; tqReaderSetDataMsg(pReader, pReq, 0); while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { - SSDataBlock block = {0}; - if (tqRetrieveDataBlock(&block, pReader) < 0) { + /*SSDataBlock block = {0};*/ + /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/ + /*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/ + /*}*/ + taosArrayClear(pBlocks); + taosArrayClear(pSchemas); + if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { int64_t uid = pExec->pExecReader->msgIter.uid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { - blockDataFreeRes(&block); - continue; + if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) { + taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); + taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper); + pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); + pSchemas = taosArrayInit(0, sizeof(void*)); + return -1; } } if (pHandle->fetchMeta) { @@ -291,14 +317,26 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp pRsp->createTableNum++; } } - tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock), - pTq->pVnode->config.tsdbCfg.precision); - blockDataFreeRes(&block); - tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); - pRsp->blockNum++; + /*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/ + /*pTq->pVnode->config.tsdbCfg.precision);*/ + /*blockDataFreeRes(&block);*/ + /*tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);*/ + /*pRsp->blockNum++;*/ + for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { + SSDataBlock* pBlock = taosArrayGet(pBlocks, i); + tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), + pTq->pVnode->config.tsdbCfg.precision); + blockDataFreeRes(pBlock); + SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); + taosArrayPush(pRsp->blockSchema, &pSW); + pRsp->blockNum++; + } } } + taosArrayDestroy(pBlocks); + taosArrayDestroy(pSchemas); + if (pRsp->blockNum == 0) { return -1; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 5c5ba1205e..afb7ac39de 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -556,7 +556,7 @@ FAIL: return -1; } -int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* schemas) { +int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas) { int32_t sversion = htonl(pReader->pBlock->sversion); if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || @@ -592,9 +592,10 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch int32_t colAtMost = pSchemaWrapper->nCols; int32_t curRow = 0; + int32_t lastRow = 0; char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); - if (assigned) return -1; + if (assigned == NULL) return -1; tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter); STSRowIter iter = {0}; @@ -605,11 +606,13 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch bool buildNew = false; tdSTSRowIterReset(&iter, row); + tqDebug("vgId:%d, row of block %d", pReader->pWalReader->pWal->cfg.vgId, curRow); for (int32_t i = 0; i < colAtMost; i++) { SCellVal sVal = {0}; if (!tdSTSRowIterFetch(&iter, pSchemaWrapper->pSchema[i].colId, pSchemaWrapper->pSchema[i].type, &sVal)) { break; } + tqDebug("vgId:%d, %d col, type %d", pReader->pWalReader->pWal->cfg.vgId, i, sVal.valType); if (curRow == 0) { assigned[i] = sVal.valType != TD_VTYPE_NONE; buildNew = true; @@ -623,27 +626,42 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch } if (buildNew) { - SSDataBlock block; - SSchemaWrapper sw; - if (tqMaskBlock(&sw, &block, pSchemaWrapper, assigned) < 0) { + if (taosArrayGetSize(blocks) > 0) { + SSDataBlock* pLastBlock = taosArrayGetLast(blocks); + pLastBlock->info.rows = curRow - lastRow; + lastRow = curRow; + } + SSDataBlock* pBlock = createDataBlock(); + SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (tqMaskBlock(pSW, pBlock, pSchemaWrapper, assigned) < 0) { + blockDataDestroy(pBlock); goto FAIL; } + SSDataBlock block = {0}; + assignOneDataBlock(&block, pBlock); + blockDataDestroy(pBlock); + + tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, + (int32_t)taosArrayGetSize(block.pDataBlock)); taosArrayPush(blocks, &block); - taosArrayPush(schemas, &sw); + taosArrayPush(schemas, &pSW); } SSDataBlock* pBlock = taosArrayGetLast(blocks); pBlock->info.uid = pReader->msgIter.uid; - pBlock->info.rows = pReader->msgIter.numOfRows; + pBlock->info.rows = 0; pBlock->info.version = pReader->pMsg->version; + tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId, + (int32_t)taosArrayGetSize(blocks)); + if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows - curRow) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; } - tdSTSRowIterInit(&iter, pTschema); + tdSTSRowIterReset(&iter, row); for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); SCellVal sVal = {0}; @@ -654,12 +672,16 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch ASSERT(sVal.valType != TD_VTYPE_NONE); - if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) { + if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) { goto FAIL; } + tqDebug("vgId:%d, row %d col %d append %d", pReader->pWalReader->pWal->cfg.vgId, curRow, i, + sVal.valType == TD_VTYPE_NULL); } curRow++; } + SSDataBlock* pLastBlock = taosArrayGetLast(blocks); + pLastBlock->info.rows = curRow - lastRow; taosMemoryFree(assigned); return 0; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index b2624d1bc1..27bfea0534 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -349,7 +349,6 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d .contLen = len + sizeof(SMsgHead), }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { - rpcFreeCont(serializedDeleteReq); tqDebug("failed to put delete req into write-queue since %s", terrstr()); } } else { @@ -476,12 +475,12 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; - SSubmitReq* ret = rpcMallocCont(cap); - ret->header.vgId = pVnode->config.vgId; - ret->length = sizeof(SSubmitReq); - ret->numOfBlocks = htonl(1); + SSubmitReq* pSubmit = rpcMallocCont(cap); + pSubmit->header.vgId = pVnode->config.vgId; + pSubmit->length = sizeof(SSubmitReq); + pSubmit->numOfBlocks = htonl(1); - SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq)); + SSubmitBlk* blkHead = POINTER_SHIFT(pSubmit, sizeof(SSubmitReq)); blkHead->numOfRows = htonl(pDataBlock->info.rows); blkHead->sversion = htonl(pTSchema->version); @@ -531,17 +530,16 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d } blkHead->dataLen = htonl(dataLen); - ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen; - ret->length = htonl(ret->length); + pSubmit->length += sizeof(SSubmitBlk) + schemaLen + dataLen; + pSubmit->length = htonl(pSubmit->length); SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, - .pCont = ret, - .contLen = ntohl(ret->length), + .pCont = pSubmit, + .contLen = ntohl(pSubmit->length), }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { - rpcFreeCont(ret); tqDebug("failed to put into write-queue since %s", terrstr()); } }