enh(taosx): split block when none column not match

This commit is contained in:
Liu Jicong 2022-11-21 15:32:27 +08:00
parent 9a603e5663
commit 5d471a0ceb
4 changed files with 68 additions and 36 deletions

View File

@ -239,6 +239,7 @@ int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t v
bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlock(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas);
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);

View File

@ -44,7 +44,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRs
return 0; return 0;
} }
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0); metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
// TODO add reference to gurantee success // TODO add reference to gurantee success
@ -52,8 +52,10 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
metaReaderClear(&mr); metaReaderClear(&mr);
return -1; return -1;
} }
char* tbName = strdup(mr.me.name); for (int32_t i = 0; i < n; i++) {
taosArrayPush(pRsp->blockTbName, &tbName); char* tbName = strdup(mr.me.name);
taosArrayPush(pRsp->blockTbName, &tbName);
}
metaReaderClear(&mr); metaReaderClear(&mr);
return 0; return 0;
} }
@ -111,7 +113,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
if (pRsp->withTbName) { if (pRsp->withTbName) {
if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) { if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp); tqAddTbNameToRsp(pTq, uid, pRsp, 1);
} else { } else {
pRsp->withTbName = false; pRsp->withTbName = false;
} }
@ -155,7 +157,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
int64_t uid = 0; int64_t uid = 0;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
uid = pExec->pExecReader->msgIter.uid; uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, 1) < 0) {
continue; continue;
} }
} else { } else {
@ -225,18 +227,27 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
STqExecHandle* pExec = &pHandle->execHandle; STqExecHandle* pExec = &pHandle->execHandle;
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
SArray* pSchemas = taosArrayInit(0, sizeof(SSchemaWrapper));
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
STqReader* pReader = pExec->pExecReader; STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0); tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) { while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0}; /*SSDataBlock block = {0};*/
if (tqRetrieveDataBlock(&block, pReader) < 0) { /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) {
blockDataFreeRes(&block); taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
continue; continue;
} }
} }
@ -255,24 +266,34 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
pRsp->createTableNum++; pRsp->createTableNum++;
} }
} }
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock), for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
pTq->pVnode->config.tsdbCfg.precision); SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
blockDataFreeRes(&block); tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); pTq->pVnode->config.tsdbCfg.precision);
pRsp->blockNum++; blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGet(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++;
}
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReader* pReader = pExec->pExecReader; STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0); tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
SSDataBlock block = {0}; /*SSDataBlock block = {0};*/
if (tqRetrieveDataBlock(&block, pReader) < 0) { /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) {
blockDataFreeRes(&block); taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
/*blockDataFreeRes(&block);*/
continue; continue;
} }
} }
@ -291,14 +312,26 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
pRsp->createTableNum++; pRsp->createTableNum++;
} }
} }
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock), /*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/
pTq->pVnode->config.tsdbCfg.precision); /*pTq->pVnode->config.tsdbCfg.precision);*/
blockDataFreeRes(&block); /*blockDataFreeRes(&block);*/
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); /*tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);*/
pRsp->blockNum++; /*pRsp->blockNum++;*/
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGet(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++;
}
} }
} }
taosArrayDestroy(pBlocks);
taosArrayDestroy(pSchemas);
if (pRsp->blockNum == 0) { if (pRsp->blockNum == 0) {
return -1; return -1;
} }

View File

@ -556,7 +556,7 @@ FAIL:
return -1; return -1;
} }
int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* schemas) { int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas) {
int32_t sversion = htonl(pReader->pBlock->sversion); int32_t sversion = htonl(pReader->pBlock->sversion);
if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
@ -594,7 +594,7 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch
int32_t curRow = 0; int32_t curRow = 0;
char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
if (assigned) return -1; if (assigned == NULL) return -1;
tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter); tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter);
STSRowIter iter = {0}; STSRowIter iter = {0};

View File

@ -349,7 +349,6 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
.contLen = len + sizeof(SMsgHead), .contLen = len + sizeof(SMsgHead),
}; };
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(serializedDeleteReq);
tqDebug("failed to put delete req into write-queue since %s", terrstr()); tqDebug("failed to put delete req into write-queue since %s", terrstr());
} }
} else { } else {
@ -476,12 +475,12 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
SSubmitReq* ret = rpcMallocCont(cap); SSubmitReq* pSubmit = rpcMallocCont(cap);
ret->header.vgId = pVnode->config.vgId; pSubmit->header.vgId = pVnode->config.vgId;
ret->length = sizeof(SSubmitReq); pSubmit->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(1); pSubmit->numOfBlocks = htonl(1);
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq)); SSubmitBlk* blkHead = POINTER_SHIFT(pSubmit, sizeof(SSubmitReq));
blkHead->numOfRows = htonl(pDataBlock->info.rows); blkHead->numOfRows = htonl(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version); blkHead->sversion = htonl(pTSchema->version);
@ -531,17 +530,16 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
} }
blkHead->dataLen = htonl(dataLen); blkHead->dataLen = htonl(dataLen);
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen; pSubmit->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
ret->length = htonl(ret->length); pSubmit->length = htonl(pSubmit->length);
SRpcMsg msg = { SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT, .msgType = TDMT_VND_SUBMIT,
.pCont = ret, .pCont = pSubmit,
.contLen = ntohl(ret->length), .contLen = ntohl(pSubmit->length),
}; };
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(ret);
tqDebug("failed to put into write-queue since %s", terrstr()); tqDebug("failed to put into write-queue since %s", terrstr());
} }
} }