From 30671eaadf73624e3aaa50c8850c8f9aa834771e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sun, 22 May 2022 20:35:57 +0800 Subject: [PATCH 1/2] fix(tmq): skip dropped table for db subscription --- include/util/taoserror.h | 1 + source/dnode/vnode/src/tq/tq.c | 1 + source/dnode/vnode/src/tq/tqRead.c | 4 +++- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 1d7287ed0e..28ed637977 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -419,6 +419,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TQ_META_KEY_NOT_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A09) #define TSDB_CODE_TQ_META_KEY_DUP_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A0A) #define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B) +#define TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0A0B) // wal #define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0671044bad..25fa716d4e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -600,6 +600,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SSDataBlock block = {0}; if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows, &block.info.numOfCols) < 0) { + if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; ASSERT(0); } int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 918660a9ec..2559631e05 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -91,7 +91,9 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) { pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); if (pHandle->pSchema == NULL) { - tqError("cannot found schema for table: %ld, version %d", pHandle->msgIter.suid, pHandle->sver); + tqWarn("cannot found schema for table: %ld, version %d, possibly dropped table", pHandle->msgIter.suid, + pHandle->sver); + terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; } From 452b154eb931ed1e9ac78984a2f634bd318ae4f8 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sun, 22 May 2022 21:20:36 +0800 Subject: [PATCH 2/2] enh(tmq): add error log --- source/dnode/vnode/src/tq/tqRead.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 2559631e05..be8d786de2 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -91,14 +91,22 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) { pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); if (pHandle->pSchema == NULL) { - tqWarn("cannot found schema for table: %ld, version %d, possibly dropped table", pHandle->msgIter.suid, - pHandle->sver); + tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table", + pHandle->msgIter.uid, pHandle->msgIter.suid, pHandle->sver); + /*ASSERT(0);*/ terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; } // this interface use suid instead of uid pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true); + if (pHandle->pSchemaWrapper == NULL) { + tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table", + pHandle->msgIter.suid, pHandle->sver); + /*ASSERT(0);*/ + terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; + return -1; + } pHandle->sver = sversion; pHandle->cachedSchemaUid = pHandle->msgIter.suid; }