From 2d6436c57d714af026b86061d11dd33f5a227172 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 3 Apr 2023 11:24:22 +0800 Subject: [PATCH] fix:modify log & sleep 500ms if data is inserting while consume --- source/client/src/clientSml.c | 3 +-- source/client/src/clientTmq.c | 2 ++ source/dnode/vnode/src/tq/tq.c | 20 +++++++++++--------- source/dnode/vnode/src/tq/tqScan.c | 5 +++-- source/libs/executor/src/scanoperator.c | 2 +- 5 files changed, 18 insertions(+), 14 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index a4ecc3c3df..40a685faf5 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -533,8 +533,7 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL; if (index) { if (colField[*index].type != kv->type) { - uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, - kv->key, colField[*index].type, kv->type); + uError("SML:0x%" PRIx64 " point type and db type mismatch. point type: %d, db type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key); return TSDB_CODE_TSC_INVALID_VALUE; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 59a407656d..e08be619ef 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1269,6 +1269,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP; taosWriteQitem(tmq->mqueue, pRspWrapper); + }else if(code == TSDB_CODE_WAL_LOG_NOT_EXIST){ //poll data while insert + taosMsleep(500); } goto CREATE_MSG_FAIL; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 363aab8e45..f1935ad731 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -448,9 +448,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); if(code != 0) { - tDeleteSMqDataRsp(&dataRsp); - taosWUnLockLatch(&pTq->lock); - return TSDB_CODE_TMQ_CONSUMER_ERROR; + goto end; } // till now, all data has been transferred to consumer, new data needs to push client once arrived. @@ -461,16 +459,20 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, return code; } - taosWUnLockLatch(&pTq->lock); + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); // NOTE: this pHandle->consumerId may have been changed already. - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 - ", ts:%" PRId64 ", reqId:0x%" PRIx64, - consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, - dataRsp.rspOffset.ts, pRequest->reqId); - tDeleteSMqDataRsp(&dataRsp); +end: + { + char buf[80] = {0}; + tFormatOffset(buf, 80, &dataRsp.rspOffset); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d", + consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); + taosWUnLockLatch(&pTq->lock); + tDeleteSMqDataRsp(&dataRsp); + } return code; } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index e4acf49635..dff162d527 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -82,13 +82,13 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; - tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task start execute", pHandle->consumerId, vgId); + tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq one task start execute", pHandle->consumerId, vgId); if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) { tqError("consumer:0x%"PRIx64" vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr()); return -1; } - tqDebug("consumer:0x%"PRIx64" vgId:%d tmq task executed, total blocks:%d, pDataBlock:%p", pHandle->consumerId, vgId, pRsp->blockNum, pDataBlock); + tqDebug("consumer:0x%"PRIx64" vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, pDataBlock); // current scan should be stopped asap, since the rebalance occurs. if (pDataBlock == NULL) { break; @@ -107,6 +107,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs } } + tqDebug("consumer:0x%"PRIx64" vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d", pHandle->consumerId, vgId, pRsp->blockNum, totalRows); qStreamExtractOffset(task, &pRsp->rspOffset); return 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 89ca7510e9..f723127711 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1672,7 +1672,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1); //curVersion move to next, so currentOffset = curVersion - 1 if (ret.fetchType == FETCH_TYPE__DATA) { - qDebug("doQueueScan get data from log %"PRId64" rows, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version); + qDebug("doQueueScan get data from log %"PRId64" rows, version:%" PRId64, ret.data.info.rows, pTaskInfo->streamInfo.currentOffset.version); blockDataCleanup(pInfo->pRes); setBlockIntoRes(pInfo, &ret.data, true); if (pInfo->pRes->info.rows > 0) {