refactor(tmq): disable parallel

This commit is contained in:
Liu Jicong 2022-07-21 20:15:52 +08:00
parent b794b91023
commit eaf11ca1ac
8 changed files with 59 additions and 67 deletions

View File

@ -68,7 +68,7 @@ typedef struct {
typedef struct { typedef struct {
char* qmsg; char* qmsg;
qTaskInfo_t task[5]; qTaskInfo_t task;
} STqExecCol; } STqExecCol;
typedef struct { typedef struct {
@ -82,7 +82,7 @@ typedef struct {
typedef struct { typedef struct {
int8_t subType; int8_t subType;
STqReader* pExecReader[5]; STqReader* pExecReader;
union { union {
STqExecCol execCol; STqExecCol execCol;
STqExecTb execTb; STqExecTb execTb;
@ -138,8 +138,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec // tqExec
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId); int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp);
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
// tqMeta // tqMeta

View File

@ -146,7 +146,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int32_t colId);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data);

View File

@ -262,7 +262,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; } static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; }
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SMqPollReq* pReq = pMsg->pCont; SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
int64_t timeout = pReq->timeout; int64_t timeout = pReq->timeout;
@ -271,9 +271,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqOffsetVal reqOffset = pReq->reqOffset; STqOffsetVal reqOffset = pReq->reqOffset;
STqOffsetVal fetchOffsetNew; STqOffsetVal fetchOffsetNew;
// todo
workerId = 0;
// 1.find handle // 1.find handle
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
/*ASSERT(pHandle);*/ /*ASSERT(pHandle);*/
@ -405,7 +402,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body; SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp, workerId) < 0) { if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp) < 0) {
/*ASSERT(0);*/ /*ASSERT(0);*/
} }
// TODO batch optimization: // TODO batch optimization:
@ -518,27 +515,23 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.execCol.qmsg = req.qmsg; pHandle->execHandle.execCol.qmsg = req.qmsg;
pHandle->snapshotVer = ver; pHandle->snapshotVer = ver;
req.qmsg = NULL; req.qmsg = NULL;
for (int32_t i = 0; i < 5; i++) { SReadHandle handle = {
SReadHandle handle = { .meta = pTq->pVnode->pMeta,
.meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode,
.vnode = pTq->pVnode, .initTableReader = true,
.initTableReader = true, .initTqReader = true,
.initTqReader = true, .version = ver,
.version = ver, };
}; pHandle->execHandle.execCol.task =
pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols);
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols); ASSERT(pHandle->execHandle.execCol.task);
ASSERT(pHandle->execHandle.execCol.task[i]); void* scanner = NULL;
void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner);
qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner); ASSERT(scanner);
ASSERT(scanner); pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner); ASSERT(pHandle->execHandle.pExecReader);
ASSERT(pHandle->execHandle.pExecReader[i]);
}
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
for (int32_t i = 0; i < 5; i++) { pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
}
pHandle->execHandle.execDb.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
@ -550,10 +543,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid); tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
} }
for (int32_t i = 0; i < 5; i++) { pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode); tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
tqReaderSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
}
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
} }
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));

View File

@ -37,8 +37,8 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp,
return 0; return 0;
} }
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerId, SMqDataRsp* pRsp) { static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper); SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper);
if (pSW == NULL) { if (pSW == NULL) {
return -1; return -1;
} }
@ -61,7 +61,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle; const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->execCol.task[0]; qTaskInfo_t task = pExec->execCol.task;
if (qStreamPrepareScan(task, pOffset) < 0) { if (qStreamPrepareScan(task, pOffset) < 0) {
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
@ -89,7 +89,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
if (pDataBlock != NULL) { if (pDataBlock != NULL) {
if (pRsp->withTbName) { if (pRsp->withTbName) {
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader[0]->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue; continue;
} }
@ -184,12 +184,12 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
} }
#endif #endif
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) { int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp) {
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
pRsp->withSchema = 1; pRsp->withSchema = 1;
STqReader* pReader = pExec->pExecReader[workerId]; STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0); tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) { while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
@ -197,18 +197,18 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue; continue;
} }
} }
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
tqAddBlockSchemaToRsp(pExec, workerId, pRsp); tqAddBlockSchemaToRsp(pExec, pRsp);
pRsp->blockNum++; pRsp->blockNum++;
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
pRsp->withSchema = 1; pRsp->withSchema = 1;
STqReader* pReader = pExec->pExecReader[workerId]; STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0); tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
@ -216,13 +216,13 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue; continue;
} }
} }
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
tqAddBlockSchemaToRsp(pExec, workerId, pRsp); tqAddBlockSchemaToRsp(pExec, pRsp);
pRsp->blockNum++; pRsp->blockNum++;
} }
} }

View File

@ -84,23 +84,22 @@ int32_t tqMetaOpen(STQ* pTq) {
/*handle.execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/ /*handle.execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
/*}*/ /*}*/
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) { SReadHandle reader = {
SReadHandle reader = { .meta = pTq->pVnode->pMeta,
.meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode,
.vnode = pTq->pVnode, .initTableReader = true,
.initTableReader = true, .initTqReader = true,
.initTqReader = true, .version = handle.snapshotVer,
.version = handle.snapshotVer, };
};
handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols); handle.execHandle.execCol.task =
ASSERT(handle.execHandle.execCol.task[i]); qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols);
void* scanner = NULL; ASSERT(handle.execHandle.execCol.task);
qExtractStreamScanner(handle.execHandle.execCol.task[i], &scanner); void* scanner = NULL;
ASSERT(scanner); qExtractStreamScanner(handle.execHandle.execCol.task, &scanner);
handle.execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner); ASSERT(scanner);
ASSERT(handle.execHandle.pExecReader[i]); handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
} ASSERT(handle.execHandle.pExecReader);
} else { } else {
handle.execHandle.execDb.pFilterOutTbUid = handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);

View File

@ -394,10 +394,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
if (pIter == NULL) break; if (pIter == NULL) break;
STqHandle* pExec = (STqHandle*)pIter; STqHandle* pExec = (STqHandle*)pIter;
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) { int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task, tbUidList, isAdd);
int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task[i], tbUidList, isAdd); ASSERT(code == 0);
ASSERT(code == 0);
}
} else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
if (!isAdd) { if (!isAdd) {
int32_t sz = taosArrayGetSize(tbUidList); int32_t sz = taosArrayGetSize(tbUidList);

View File

@ -127,6 +127,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
tqDebug("tq sink, convert block %d, rows: %d", i, rows);
int32_t dataLen = 0; int32_t dataLen = 0;
void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk)); void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
@ -178,11 +180,14 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pRes = (const SArray*)data; const SArray* pRes = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode; SVnode* pVnode = (SVnode*)vnode;
tqDebug("task write into table, vgId %d, block num: %d", pVnode->config.vgId, (int32_t)pRes->size); tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size);
ASSERT(pTask->tbSink.pTSchema); ASSERT(pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId); pTask->tbSink.stbFullName, pVnode->config.vgId);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
/*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/ /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
// build write msg // build write msg
SRpcMsg msg = { SRpcMsg msg = {

View File

@ -316,7 +316,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_VND_TABLE_CFG: case TDMT_VND_TABLE_CFG:
return vnodeGetTableCfg(pVnode, pMsg); return vnodeGetTableCfg(pVnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg); return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH: case TDMT_STREAM_TASK_DISPATCH: