From 94e7c79ca0fd10c962a6e182256f7e0809771902 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 1 Nov 2022 18:18:26 +0800 Subject: [PATCH] fix(tmq): handle adding subtable for subscribing stb --- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqRead.c | 36 +++++++++++++++- source/libs/executor/src/scanoperator.c | 17 ++++---- source/libs/wal/src/walRead.c | 57 +++++++++++++++++++++---- 4 files changed, 93 insertions(+), 19 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 47a40ea495..094db9ebd0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1268,7 +1268,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { if (pIter == NULL) break; SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; - if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__RECOVER1) { + if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE) { tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus); continue; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 2f6ec0c39f..392c724888 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -199,6 +199,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea goto END; } + tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset, + TMSG_INFO((*ppCkHead)->head.msgType)); + if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { code = walFetchBody(pHandle->pWalReader, ppCkHead); @@ -586,8 +589,39 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0); } } + } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { + if (isAdd) { + SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); + SMetaReader mr = {0}; + metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + for (int32_t i = 0; i < taosArrayGetSize(tbUidList); ++i) { + uint64_t* id = (uint64_t*)taosArrayGet(tbUidList, i); + + int32_t code = metaGetTableEntryByUid(&mr, *id); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); + continue; + } + + tDecoderClear(&mr.coder); + + if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pExec->execHandle.execTb.suid) { + tqDebug("table uid %" PRId64 " does not add to tq handle", *id); + continue; + } + tqDebug("table uid %" PRId64 " add to tq handle", *id); + taosArrayPush(qa, id); + } + metaReaderClear(&mr); + if (taosArrayGetSize(qa) > 0) { + tqReaderAddTbUidList(pExec->execHandle.pExecReader, qa); + } + taosArrayDestroy(qa); + } else { + // TODO handle delete table from stb + } } else { - // tq update id + ASSERT(0); } } while (1) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5229b46815..e54b889a7a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -344,7 +344,8 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, return true; } -static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, int32_t rows) { +static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, + int32_t rows) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) { SExprSupp* pSup = &pTableScanInfo->pseudoSup; @@ -1878,7 +1879,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } #endif -#if 1 if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 || pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; @@ -1914,7 +1914,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { return NULL; } -#endif size_t total = taosArrayGetSize(pInfo->pBlockLists); // TODO: refactor @@ -2296,7 +2295,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT pInfo->vnode = pHandle->vnode; pInfo->sContext = pHandle->sContext; - pOperator->name = "RawStreamScanOperator"; + pOperator->name = "RawScanOperator"; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -4384,8 +4383,9 @@ static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeS // currently only the tbname pseudo column if (pTableScanInfo->pseudoSup.numOfExprs > 0) { - int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, - pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); + int32_t code = + addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, + pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -4501,8 +4501,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc // currently only the tbname pseudo column if (pTableScanInfo->pseudoSup.numOfExprs > 0) { - int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, - pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); + int32_t code = + addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, + pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 8f493ddd85..0cc9dad6b6 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -347,22 +347,46 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) { int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { int64_t code; + int64_t contLen; + bool seeked = false; + + wDebug("vgId:%d try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64, + pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, + pRead->pWal->vers.lastVer); // TODO: valid ver - if (ver > pRead->pWal->vers.commitVer) { + if (ver > pRead->pWal->vers.appliedVer) { return -1; } if (pRead->curInvalid || pRead->curVersion != ver) { code = walReadSeekVer(pRead, ver); - if (code < 0) return -1; + if (code < 0) { + pRead->curVersion = ver; + pRead->curInvalid = 1; + return -1; + } + seeked = true; } - ASSERT(taosValidFile(pRead->pLogFile) == true); - - code = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead)); - if (code != sizeof(SWalCkHead)) { - return -1; + while (1) { + contLen = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead)); + if (contLen == sizeof(SWalCkHead)) { + break; + } else if (contLen == 0 && !seeked) { + walReadSeekVerImpl(pRead, ver); + seeked = true; + continue; + } else { + if (contLen < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + } else { + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + } + ASSERT(0); + pRead->curInvalid = 1; + return -1; + } } code = walValidHeadCksum(pHead); @@ -373,13 +397,15 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { return -1; } + pRead->curInvalid = 0; return 0; } int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { int64_t code; - // ASSERT(pRead->curVersion == pHead->head.version); + ASSERT(pRead->curVersion == pHead->head.version); + ASSERT(pRead->curInvalid == 0); code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR); if (code < 0) { @@ -409,19 +435,32 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { } if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) { + if (pReadHead->bodyLen < 0) { + ASSERT(0); + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s", + pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno)); + } else { + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted", + pRead->pWal->cfg.vgId, pReadHead->version, ver); + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + } + pRead->curInvalid = 1; ASSERT(0); return -1; } if (pReadHead->version != ver) { + ASSERT(0); wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, - pRead->pHead->head.version, ver); + pReadHead->version, ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } if (walValidBodyCksum(*ppHead) != 0) { + ASSERT(0); wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); pRead->curInvalid = 1;