diff --git a/include/common/tmsg.h b/include/common/tmsg.h index cd63f7d278..8689e10203 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2684,7 +2684,7 @@ typedef struct { char subKey[TSDB_SUBSCRIBE_KEY_LEN]; int8_t subType; int8_t withMeta; - char* qmsg; + char* qmsg; //SubPlanToString int64_t suid; } SMqRebVgReq; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index ac0c68f652..345418801b 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -542,7 +542,7 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer typedef struct { int32_t vgId; - char* qmsg; + char* qmsg; //SubPlanToString SEpSet epSet; } SMqVgEp; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 104e0945ba..792fed2309 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -67,7 +67,7 @@ typedef struct { // tqExec typedef struct { - char* qmsg; + char* qmsg; // SubPlanToString } STqExecCol; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c465617975..d07933c9f1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -575,7 +575,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { return -1; } -#if 1 // till now, all data has been rsp to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version) { @@ -597,7 +596,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } } -#endif taosWUnLockLatch(&pTq->pushLock); if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) { @@ -613,10 +611,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } // for taosx - /*A(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN);*/ - SMqMetaRsp metaRsp = {0}; - STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, &req); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 5352ebe6d4..ee00cdc205 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -307,10 +307,10 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { while (1) { if (!fromProcessedMsg) { - SWalReader* pWalReader = pReader->pWalReader; - - if (walNextValidMsg(pWalReader) < 0) { - pReader->ver = pWalReader->curVersion - (pWalReader->curInvalid | pWalReader->curStopped); + if (walNextValidMsg(pReader->pWalReader) < 0) { + pReader->ver = + pReader->pWalReader->curVersion - pReader->pWalReader->curStopped; +// pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped); ret->offset.type = TMQ_OFFSET__LOG; ret->offset.version = pReader->ver; ret->fetchType = FETCH_TYPE__NONE; @@ -318,25 +318,11 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { return -1; } - void* body = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); - int64_t ver = pWalReader->pHead->head.version; + void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); + int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); + int64_t ver = pReader->pWalReader->pHead->head.version; - tqDebug("tmq poll: extract submit msg from wal, version:%"PRId64" len:%d", ver, bodyLen); - -#if 0 - if (pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { - // TODO do filter - ret->fetchType = FETCH_TYPE__META; - ret->meta = pWalReader->pHead->head.body; - return 0; - } else { -#endif tqReaderSetSubmitReq2(pReader, body, bodyLen, ver); - /*tqReaderSetDataMsg(pReader, body, pWalReader->pHead->head.version);*/ -#if 0 - } -#endif } while (tqNextDataBlock2(pReader)) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e1594b13a8..7961d5518a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1651,8 +1651,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } else if (ret.fetchType == FETCH_TYPE__NONE || (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) { pTaskInfo->streamInfo.lastStatus = ret.offset; - ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version); - ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion); char formatBuf[80]; tFormatOffset(formatBuf, 80, &ret.offset); qDebug("queue scan log return null, offset %s", formatBuf); @@ -1660,16 +1658,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { return NULL; } } -#if 0 - } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { - SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); - if (pResult && pResult->info.rows > 0) { - qDebug("stream scan tsdb return %d rows", pResult->info.rows); - return pResult; - } - qDebug("stream scan tsdb return null"); - return NULL; -#endif } else { qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type); return NULL; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 7bfd3df9e0..3e1e36ccc1 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -200,6 +200,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { pReader->curVersion, pReader->curInvalid, ver); pReader->curVersion = ver; + pReader->curInvalid = 0; return 0; } @@ -210,8 +211,8 @@ int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { return 0; } - pReader->curInvalid = 1; - pReader->curVersion = ver; +// pReader->curInvalid = 1; +// pReader->curVersion = ver; if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId, @@ -219,8 +220,8 @@ int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } - if (ver < pWal->vers.snapshotVer) { - } +// if (ver < pWal->vers.snapshotVer) { +// } if (walReadSeekVerImpl(pReader, ver) < 0) { return -1; @@ -239,8 +240,8 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { if (pRead->curInvalid || pRead->curVersion != fetchVer) { if (walReadSeekVer(pRead, fetchVer) < 0) { - pRead->curVersion = fetchVer; - pRead->curInvalid = 1; +// pRead->curVersion = fetchVer; +// pRead->curInvalid = 1; return -1; } seeked = true; @@ -259,11 +260,11 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { } else { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } - pRead->curInvalid = 1; +// pRead->curInvalid = 1; return -1; } } - pRead->curInvalid = 0; +// pRead->curInvalid = 0; return 0; } @@ -295,13 +296,14 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } - pReader->curInvalid = 1; +// pRead->curInvalid = 1; return -1; } if (walValidBodyCksum(pReader->pHead) != 0) { wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver); - pReader->curInvalid = 1; +// pRead->curInvalid = 1; + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -317,7 +319,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) { code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR); if (code < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - pRead->curInvalid = 1; +// pRead->curInvalid = 1; return -1; } @@ -345,8 +347,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { if (pRead->curInvalid || pRead->curVersion != ver) { code = walReadSeekVer(pRead, ver); if (code < 0) { - pRead->curVersion = ver; - pRead->curInvalid = 1; +// pRead->curVersion = ver; +// pRead->curInvalid = 1; return -1; } seeked = true; @@ -366,7 +368,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { } else { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } - pRead->curInvalid = 1; +// pRead->curInvalid = 1; return -1; } } @@ -379,7 +381,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { return -1; } - pRead->curInvalid = 0; +// pRead->curInvalid = 0; return 0; } @@ -394,7 +396,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR); if (code < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - pRead->curInvalid = 1; +// pRead->curInvalid = 1; return -1; } @@ -433,14 +435,14 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { pRead->pWal->cfg.vgId, pReadHead->version, ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } - pRead->curInvalid = 1; +// pRead->curInvalid = 1; return -1; } if (pReadHead->version != ver) { wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, pReadHead->version, ver); - pRead->curInvalid = 1; +// pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -448,7 +450,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { if (walValidBodyCksum(*ppHead) != 0) { wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); - pRead->curInvalid = 1; +// pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -544,7 +546,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { if (pReader->pHead->head.version != ver) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver); - pReader->curInvalid = 1; +// pReader->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; taosThreadMutexUnlock(&pReader->mutex); return -1; @@ -557,7 +559,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, pReader->pHead->head.bodyLen); uint32_t logCkSum = pReader->pHead->cksumBody; wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum); - pReader->curInvalid = 1; +// pReader->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; taosThreadMutexUnlock(&pReader->mutex); return -1; @@ -575,5 +577,6 @@ void walReadReset(SWalReader *pReader) { taosCloseFile(&pReader->pLogFile); pReader->curInvalid = 1; pReader->curFileFirstVer = -1; + pReader->curVersion = -1; taosThreadMutexUnlock(&pReader->mutex); } diff --git a/tests/system-test/7-tmq/tmqDelete-1ctb.py b/tests/system-test/7-tmq/tmqDelete-1ctb.py index 4b45b1a834..4c62bb757b 100644 --- a/tests/system-test/7-tmq/tmqDelete-1ctb.py +++ b/tests/system-test/7-tmq/tmqDelete-1ctb.py @@ -80,16 +80,16 @@ class TDTestCase: tdLog.debug("del data ............ [OK]") return - def threadFunctionForDeletaData(self, **paraDict): + def threadFunctionForDeletaData(self, paraDict): # create new connector for new tdSql instance in my thread newTdSql = tdCom.newTdSql() self.delData(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["startTs"],paraDict["endTs"],paraDict["ctbStartIdx"]) return - def asyncDeleteData(self, paraDict): - pThread = threading.Thread(target=self.threadFunctionForDeletaData, kwargs=paraDict) - pThread.start() - return pThread + # def asyncDeleteData(self, paraDict): + # pThread = threading.Thread(target=self.threadFunctionForDeletaData, kwargs=paraDict) + # pThread.start() + # return pThread def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") @@ -340,7 +340,8 @@ class TDTestCase: # del some data rowsOfDelete = int(self.rowsPerTbl / 4 ) paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1 - pDeleteThread = self.asyncDeleteData(paraDict) + # pDeleteThread = self.asyncDeleteData(paraDict) + self.threadFunctionForDeletaData(paraDict) tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])