Merge pull request #18308 from taosdata/feature/stream

enh(taosx): split block when none column not match
This commit is contained in:
Shengliang Guan 2022-11-21 22:33:06 +08:00 committed by GitHub
commit e6f3ea1e3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 103 additions and 45 deletions

View File

@ -239,6 +239,7 @@ int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t v
bool tqNextDataBlock(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas);
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);

View File

@ -671,7 +671,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) {
/*ASSERT(0);*/
}
if (taosxRsp.blockNum > 0 /* threshold */) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);

View File

@ -44,7 +44,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRs
return 0;
}
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
// TODO add reference to gurantee success
@ -52,8 +52,10 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
metaReaderClear(&mr);
return -1;
}
char* tbName = strdup(mr.me.name);
taosArrayPush(pRsp->blockTbName, &tbName);
for (int32_t i = 0; i < n; i++) {
char* tbName = strdup(mr.me.name);
taosArrayPush(pRsp->blockTbName, &tbName);
}
metaReaderClear(&mr);
return 0;
}
@ -111,7 +113,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
if (pRsp->withTbName) {
if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
tqAddTbNameToRsp(pTq, uid, pRsp, 1);
} else {
pRsp->withTbName = false;
}
@ -155,7 +157,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
int64_t uid = 0;
if (pOffset->type == TMQ_OFFSET__LOG) {
uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) {
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, 1) < 0) {
continue;
}
} else {
@ -225,18 +227,30 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
STqExecHandle* pExec = &pHandle->execHandle;
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
SArray* pSchemas = taosArrayInit(0, sizeof(void*));
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block, pReader) < 0) {
/*SSDataBlock block = {0};*/
/*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
}
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) {
blockDataFreeRes(&block);
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
continue;
}
}
@ -255,25 +269,37 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
pRsp->createTableNum++;
}
}
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
blockDataFreeRes(&block);
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
pRsp->blockNum++;
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++;
}
}
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block, pReader) < 0) {
/*SSDataBlock block = {0};*/
/*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
}
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) {
blockDataFreeRes(&block);
continue;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
return -1;
}
}
if (pHandle->fetchMeta) {
@ -291,14 +317,26 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
pRsp->createTableNum++;
}
}
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
blockDataFreeRes(&block);
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
pRsp->blockNum++;
/*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/
/*pTq->pVnode->config.tsdbCfg.precision);*/
/*blockDataFreeRes(&block);*/
/*tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);*/
/*pRsp->blockNum++;*/
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++;
}
}
}
taosArrayDestroy(pBlocks);
taosArrayDestroy(pSchemas);
if (pRsp->blockNum == 0) {
return -1;
}

View File

@ -556,7 +556,7 @@ FAIL:
return -1;
}
int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* schemas) {
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas) {
int32_t sversion = htonl(pReader->pBlock->sversion);
if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
@ -592,9 +592,10 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch
int32_t colAtMost = pSchemaWrapper->nCols;
int32_t curRow = 0;
int32_t lastRow = 0;
char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
if (assigned) return -1;
if (assigned == NULL) return -1;
tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter);
STSRowIter iter = {0};
@ -605,11 +606,13 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch
bool buildNew = false;
tdSTSRowIterReset(&iter, row);
tqDebug("vgId:%d, row of block %d", pReader->pWalReader->pWal->cfg.vgId, curRow);
for (int32_t i = 0; i < colAtMost; i++) {
SCellVal sVal = {0};
if (!tdSTSRowIterFetch(&iter, pSchemaWrapper->pSchema[i].colId, pSchemaWrapper->pSchema[i].type, &sVal)) {
break;
}
tqDebug("vgId:%d, %d col, type %d", pReader->pWalReader->pWal->cfg.vgId, i, sVal.valType);
if (curRow == 0) {
assigned[i] = sVal.valType != TD_VTYPE_NONE;
buildNew = true;
@ -623,27 +626,42 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch
}
if (buildNew) {
SSDataBlock block;
SSchemaWrapper sw;
if (tqMaskBlock(&sw, &block, pSchemaWrapper, assigned) < 0) {
if (taosArrayGetSize(blocks) > 0) {
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
lastRow = curRow;
}
SSDataBlock* pBlock = createDataBlock();
SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (tqMaskBlock(pSW, pBlock, pSchemaWrapper, assigned) < 0) {
blockDataDestroy(pBlock);
goto FAIL;
}
SSDataBlock block = {0};
assignOneDataBlock(&block, pBlock);
blockDataDestroy(pBlock);
tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(block.pDataBlock));
taosArrayPush(blocks, &block);
taosArrayPush(schemas, &sw);
taosArrayPush(schemas, &pSW);
}
SSDataBlock* pBlock = taosArrayGetLast(blocks);
pBlock->info.uid = pReader->msgIter.uid;
pBlock->info.rows = pReader->msgIter.numOfRows;
pBlock->info.rows = 0;
pBlock->info.version = pReader->pMsg->version;
tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(blocks));
if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows - curRow) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
tdSTSRowIterInit(&iter, pTschema);
tdSTSRowIterReset(&iter, row);
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
SCellVal sVal = {0};
@ -654,12 +672,16 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch
ASSERT(sVal.valType != TD_VTYPE_NONE);
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) {
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
goto FAIL;
}
tqDebug("vgId:%d, row %d col %d append %d", pReader->pWalReader->pWal->cfg.vgId, curRow, i,
sVal.valType == TD_VTYPE_NULL);
}
curRow++;
}
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
taosMemoryFree(assigned);
return 0;

View File

@ -349,7 +349,6 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
.contLen = len + sizeof(SMsgHead),
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(serializedDeleteReq);
tqDebug("failed to put delete req into write-queue since %s", terrstr());
}
} else {
@ -476,12 +475,12 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
SSubmitReq* ret = rpcMallocCont(cap);
ret->header.vgId = pVnode->config.vgId;
ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(1);
SSubmitReq* pSubmit = rpcMallocCont(cap);
pSubmit->header.vgId = pVnode->config.vgId;
pSubmit->length = sizeof(SSubmitReq);
pSubmit->numOfBlocks = htonl(1);
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
SSubmitBlk* blkHead = POINTER_SHIFT(pSubmit, sizeof(SSubmitReq));
blkHead->numOfRows = htonl(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version);
@ -531,17 +530,16 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
}
blkHead->dataLen = htonl(dataLen);
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
ret->length = htonl(ret->length);
pSubmit->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
pSubmit->length = htonl(pSubmit->length);
SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT,
.pCont = ret,
.contLen = ntohl(ret->length),
.pCont = pSubmit,
.contLen = ntohl(pSubmit->length),
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(ret);
tqDebug("failed to put into write-queue since %s", terrstr());
}
}