From c3acab20705de933847574db3adef191b44a1c32 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Mar 2023 19:38:34 +0800 Subject: [PATCH 1/8] fix(tmq): do some internal refactor. --- source/client/src/clientTmq.c | 3 +- source/dnode/vnode/src/inc/tq.h | 27 ++-- source/dnode/vnode/src/tq/tq.c | 202 +++++++++++++++-------------- source/dnode/vnode/src/tq/tqPush.c | 6 +- 4 files changed, 120 insertions(+), 118 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 77bbd0be1a..82da067d8e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1811,7 +1811,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pRspWrapper == NULL) { taosReadAllQitems(tmq->mqueue, tmq->qall); taosGetQitem(tmq->qall, (void**)&pRspWrapper); - if (pRspWrapper == NULL) { return NULL; } @@ -1831,7 +1830,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp; if (pDataRsp->head.epoch == consumerEpoch) { - // todo fix it: race condition SMqClientVg* pVg = pollRspWrapper->vgHandle; // update the epset @@ -1843,6 +1841,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->epSet = *pollRspWrapper->pEpset; } + // update the local offset value only for the returned values. pVg->currentOffset = pDataRsp->rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index c2b38f5cd1..379ea25ee6 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -109,23 +109,18 @@ typedef struct { } STqPushEntry; struct STQ { - SVnode* pVnode; - char* path; - int64_t walLogLastVer; - - SRWLatch pushLock; - - SHashObj* pPushMgr; // consumerId -> STqPushEntry - SHashObj* pHandle; // subKey -> STqHandle - SHashObj* pCheckInfo; // topic -> SAlterCheckInfo - + SVnode* pVnode; + char* path; + int64_t walLogLastVer; + SRWLatch lock; + SHashObj* pPushMgr; // consumerId -> STqPushEntry + SHashObj* pHandle; // subKey -> STqHandle + SHashObj* pCheckInfo; // topic -> SAlterCheckInfo STqOffsetStore* pOffsetStore; - - TDB* pMetaDB; - TTB* pExecStore; - TTB* pCheckStore; - - SStreamMeta* pStreamMeta; + TDB* pMetaDB; + TTB* pExecStore; + TTB* pCheckStore; + SStreamMeta* pStreamMeta; }; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c4800e5051..1282b0a94d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -51,7 +51,7 @@ void tqCleanUp() { } } -static void destroySTqHandle(void* data) { +static void destroyTqHandle(void* data) { STqHandle* pData = (STqHandle*)data; qDestroyTask(pData->execHandle.task); if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { @@ -89,9 +89,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->walLogLastVer = pVnode->pWal->vers.lastVer; pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); - taosHashSetFreeFp(pTq->pHandle, destroySTqHandle); + taosHashSetFreeFp(pTq->pHandle, destroyTqHandle); - taosInitRWLatch(&pTq->pushLock); + taosInitRWLatch(&pTq->lock); pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree); @@ -236,38 +236,6 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { } #endif -// int32_t len = 0; -// int32_t code = 0; -// tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); -// if (code < 0) { -// return -1; -// } -// -// int32_t tlen = sizeof(SMqRspHead) + len; -// void* buf = rpcMallocCont(tlen); -// if (buf == NULL) { -// return -1; -// } -// -// memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead)); -// -// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); -// -// SEncoder encoder = {0}; -// tEncoderInit(&encoder, abuf, len); -// tEncodeSMqDataRsp(&encoder, pRsp); -// tEncoderClear(&encoder); -// -// SRpcMsg rsp = { -// .info = pPushEntry->pInfo, -// .pCont = buf, -// .contLen = tlen, -// .code = 0, -// }; -// -// tmsgSendRsp(&rsp); -// - SMqRspHead* pHeader = &pPushEntry->pDataRsp->head; doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType); @@ -444,7 +412,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand char formatBuf[80]; tFormatOffset(formatBuf, 80, pOffsetVal); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.", + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.", consumerId, pHandle->subKey, vgId, formatBuf); return 0; } else { @@ -502,7 +470,45 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } -static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { +#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) + +static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, + SRpcMsg* pMsg, STqOffsetVal* pOffset) { + int32_t code = 0; + uint64_t consumerId = pRequest->consumerId; + int32_t vgId = TD_VID(pTq->pVnode); + + SMqDataRsp dataRsp = {0}; + tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); + + // lock + taosWLockLatch(&pTq->lock); + + qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); + code = tqScanData(pTq, pHandle, &dataRsp, pOffset); + + // till now, all data has been transferred to consumer, new data needs to push client once arrived. + if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && + dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { + code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); + taosWUnLockLatch(&pTq->lock); + return code; + } + + taosWUnLockLatch(&pTq->lock); + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); + + // NOTE: this pHandle->consumerId may have been changed already. + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 + ", ts:%" PRId64 ", reqId:0x%" PRIx64, + consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, + dataRsp.rspOffset.ts, pRequest->reqId); + + tDeleteSMqDataRsp(&dataRsp); + return code; +} + +static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { int32_t code = -1; STqOffsetVal offset = {0}; SWalCkHead* pCkHead = NULL; @@ -512,9 +518,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* uint64_t consumerId = pRequest->consumerId; // 1. reset the offset if needed - if (reqOffset.type > 0) { - offset = reqOffset; - } else { // handle the reset offset cases, according to the consumer's choice. + if (IS_OFFSET_RESET_TYPE(reqOffset.type)) { + // handle the reset offset cases, according to the consumer's choice. bool blockReturned = false; code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned); if (code != 0) { @@ -525,38 +530,41 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* if (blockReturned) { return 0; } + } else { // use the consumer specified offset + offset = reqOffset; } - // this is a normal subscription requirement + // this is a normal subscribe requirement if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); - - // lock - taosWLockLatch(&pTq->pushLock); - - qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); - code = tqScanData(pTq, pHandle, &dataRsp, &offset); - - // till now, all data has been transferred to consumer, new data needs to push client once arrived. - if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && - dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); - taosWUnLockLatch(&pTq->pushLock); - return code; - } - - taosWUnLockLatch(&pTq->pushLock); - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); - - // NOTE: this pHandle->consumerId may have been changed already. - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 - ", ts:%" PRId64", reqId:0x%"PRIx64, - consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, - dataRsp.rspOffset.ts, pRequest->reqId); - - tDeleteSMqDataRsp(&dataRsp); - return code; + return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset); +// SMqDataRsp dataRsp = {0}; +// tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); +// +// // lock +// taosWLockLatch(&pTq->lock); +// +// qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); +// code = tqScanData(pTq, pHandle, &dataRsp, &offset); +// +// // till now, all data has been transferred to consumer, new data needs to push client once arrived. +// if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && +// dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { +// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); +// taosWUnLockLatch(&pTq->lock); +// return code; +// } +// +// taosWUnLockLatch(&pTq->lock); +// code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); +// +// // NOTE: this pHandle->consumerId may have been changed already. +// tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 +// ", ts:%" PRId64", reqId:0x%"PRIx64, +// consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, +// dataRsp.rspOffset.ts, pRequest->reqId); +// +// tDeleteSMqDataRsp(&dataRsp); +// return code; } // todo handle the case where re-balance occurs. @@ -700,31 +708,31 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } // 2. check re-balance status - taosRLockLatch(&pTq->pushLock); + taosRLockLatch(&pTq->lock); if (pHandle->consumerId != consumerId) { tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; - taosRUnLockLatch(&pTq->pushLock); + taosRUnLockLatch(&pTq->lock); return -1; } - taosRUnLockLatch(&pTq->pushLock); + taosRUnLockLatch(&pTq->lock); - taosWLockLatch(&pTq->pushLock); // 3. update the epoch value + taosWLockLatch(&pTq->lock); int32_t savedEpoch = pHandle->epoch; if (savedEpoch < reqEpoch) { tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); pHandle->epoch = reqEpoch; } - taosWUnLockLatch(&pTq->pushLock); + taosWUnLockLatch(&pTq->lock); char buf[80]; tFormatOffset(buf, 80, &reqOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); - return extractDataForMq(pTq, pHandle, &req, pMsg); + return doPollDataForMq(pTq, pHandle, &req, pMsg); } int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { @@ -732,12 +740,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); - taosWLockLatch(&pTq->pushLock); + taosWLockLatch(&pTq->lock); int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); } - taosWUnLockLatch(&pTq->pushLock); + taosWUnLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { @@ -801,18 +809,18 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SVnode* pVnode = pTq->pVnode; int32_t vgId = TD_VID(pVnode); - tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey, + tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { if (req.oldConsumerId != -1) { - tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId is %" PRId64 "", + tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64, req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId); } if (req.newConsumerId == -1) { - tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId); + tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId); taosMemoryFree(req.qmsg); return 0; } @@ -902,28 +910,28 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg atomic_add_fetch_32(&pHandle->epoch, 1); taosMemoryFree(req.qmsg); return tqMetaSaveHandle(pTq, req.subKey, pHandle); - } + } else { + tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, + req.newConsumerId); - tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, - req.newConsumerId); + taosWLockLatch(&pTq->lock); + atomic_store_32(&pHandle->epoch, -1); - taosWLockLatch(&pTq->pushLock); - atomic_store_32(&pHandle->epoch, -1); + // remove if it has been register in the push manager, and return one empty block to consumer + tqRemovePushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); - // remove if it has been register in the push manager, and return one empty block to consumer - tqRemovePushEntry(pTq, req.subKey, (int32_t) strlen(req.subKey), pHandle->consumerId, true); + atomic_store_64(&pHandle->consumerId, req.newConsumerId); + atomic_add_fetch_32(&pHandle->epoch, 1); - atomic_store_64(&pHandle->consumerId, req.newConsumerId); - atomic_add_fetch_32(&pHandle->epoch, 1); + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + qStreamCloseTsdbReader(pHandle->execHandle.task); + } - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - qStreamCloseTsdbReader(pHandle->execHandle.task); - } - - taosWUnLockLatch(&pTq->pushLock); - if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { - taosMemoryFree(req.qmsg); - return -1; + taosWUnLockLatch(&pTq->lock); + if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { + taosMemoryFree(req.qmsg); + return -1; + } } } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 01d8e7cf14..797aeb3f04 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -213,7 +213,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) if (msgType == TDMT_VND_SUBMIT) { // lock push mgr to avoid potential msg lost - taosWLockLatch(&pTq->pushLock); + taosWLockLatch(&pTq->lock); int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); if (numOfRegisteredPush > 0) { @@ -231,7 +231,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) taosArrayDestroy(cachedKeyLens); // unlock - taosWUnLockLatch(&pTq->pushLock); + taosWUnLockLatch(&pTq->lock); return -1; } @@ -320,7 +320,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) taosMemoryFree(data); } // unlock - taosWUnLockLatch(&pTq->pushLock); + taosWUnLockLatch(&pTq->lock); } if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) { From 69ca2b2f7b741bf30702b4b53ae86d7ed94d66fa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Mar 2023 19:40:36 +0800 Subject: [PATCH 2/8] fix(tmq): do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 214 ++++++++++++++------------------- 1 file changed, 92 insertions(+), 122 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1282b0a94d..8279ee7aeb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -531,159 +531,129 @@ static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* p return 0; } } else { // use the consumer specified offset + // the offset value can not be monotonious increase?? offset = reqOffset; } // this is a normal subscribe requirement if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset); -// SMqDataRsp dataRsp = {0}; -// tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); -// -// // lock -// taosWLockLatch(&pTq->lock); -// -// qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); -// code = tqScanData(pTq, pHandle, &dataRsp, &offset); -// -// // till now, all data has been transferred to consumer, new data needs to push client once arrived. -// if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && -// dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { -// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); -// taosWUnLockLatch(&pTq->lock); -// return code; -// } -// -// taosWUnLockLatch(&pTq->lock); -// code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); -// -// // NOTE: this pHandle->consumerId may have been changed already. -// tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 -// ", ts:%" PRId64", reqId:0x%"PRIx64, -// consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, -// dataRsp.rspOffset.ts, pRequest->reqId); -// -// tDeleteSMqDataRsp(&dataRsp); -// return code; - } + } else { // for taosX + // todo handle the case where re-balance occurs. + SMqMetaRsp metaRsp = {0}; + STaosxRsp taosxRsp = {0}; + tqInitTaosxRsp(&taosxRsp, pRequest); - // todo handle the case where re-balance occurs. - // for taosx - SMqMetaRsp metaRsp = {0}; - STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, pRequest); - - if (offset.type != TMQ_OFFSET__LOG) { - if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) { - return -1; - } - - if (metaRsp.metaRspLen > 0) { - code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); - tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 - ",ts:%" PRId64, - consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, - metaRsp.rspOffset.ts); - taosMemoryFree(metaRsp.metaRsp); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } - - if (taosxRsp.blockNum > 0) { - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } else { - offset = taosxRsp.rspOffset; - } - - tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 - ",version:%" PRId64, - consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, - taosxRsp.rspOffset.version); - } else { - -// if (offset.type == TMQ_OFFSET__LOG) { - int64_t fetchVer = offset.version + 1; - pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); - if (pCkHead == NULL) { - tDeleteSTaosxRsp(&taosxRsp); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - walSetReaderCapacity(pHandle->pWalReader, 2048); - - while (1) { - // todo refactor: this is not correct. - int32_t savedEpoch = atomic_load_32(&pHandle->epoch); - if (savedEpoch > pRequest->epoch) { - tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 - ", found new consumer epoch %d, discard req epoch %d", - consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); - break; + if (offset.type != TMQ_OFFSET__LOG) { + if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) { + return -1; } - if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + if (metaRsp.metaRspLen > 0) { + code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); + tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 + ",ts:%" PRId64, + consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts); + taosMemoryFree(metaRsp.metaRsp); tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); return code; } - SWalCont* pHead = &pCkHead->head; - tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, - pRequest->epoch, vgId, fetchVer, pHead->msgType); + if (taosxRsp.blockNum > 0) { + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + return code; + } else { + offset = taosxRsp.rspOffset; + } - if (pHead->msgType == TDMT_VND_SUBMIT) { - SPackedData submit = { - .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)), - .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg), - .ver = pHead->version, - }; + tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 + ",version:%" PRId64, + consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, + taosxRsp.rspOffset.version); + } else { + // if (offset.type == TMQ_OFFSET__LOG) { + int64_t fetchVer = offset.version + 1; + pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); + if (pCkHead == NULL) { + tDeleteSTaosxRsp(&taosxRsp); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } - if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { - tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId, - pRequest->subKey); - return -1; + walSetReaderCapacity(pHandle->pWalReader, 2048); + + while (1) { + // todo refactor: this is not correct. + int32_t savedEpoch = atomic_load_32(&pHandle->epoch); + if (savedEpoch > pRequest->epoch) { + tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 + ", found new consumer epoch %d, discard req epoch %d", + consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); + break; } - if (taosxRsp.blockNum > 0) { + if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); return code; - } else { - fetchVer++; } - } else { - /*A(pHandle->fetchMeta);*/ - /*A(IS_META_MSG(pHead->msgType));*/ - tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); - tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); - metaRsp.resMsgType = pHead->msgType; - metaRsp.metaRspLen = pHead->bodyLen; - metaRsp.metaRsp = pHead->body; - if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) { - code = -1; + SWalCont* pHead = &pCkHead->head; + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", + consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); + + if (pHead->msgType == TDMT_VND_SUBMIT) { + SPackedData submit = { + .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)), + .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg), + .ver = pHead->version, + }; + + if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { + tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId, + pRequest->subKey); + return -1; + } + + if (taosxRsp.blockNum > 0) { + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + taosMemoryFreeClear(pCkHead); + return code; + } else { + fetchVer++; + } + + } else { + /*A(pHandle->fetchMeta);*/ + /*A(IS_META_MSG(pHead->msgType));*/ + tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); + tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); + metaRsp.resMsgType = pHead->msgType; + metaRsp.metaRspLen = pHead->bodyLen; + metaRsp.metaRsp = pHead->body; + if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) { + code = -1; + taosMemoryFreeClear(pCkHead); + tDeleteSTaosxRsp(&taosxRsp); + return code; + } + code = 0; taosMemoryFreeClear(pCkHead); tDeleteSTaosxRsp(&taosxRsp); return code; } - code = 0; - taosMemoryFreeClear(pCkHead); - tDeleteSTaosxRsp(&taosxRsp); - return code; } } - } - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return 0; + tDeleteSTaosxRsp(&taosxRsp); + taosMemoryFreeClear(pCkHead); + return 0; + } } int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { From 40a78bde5038a385bcf32839106202ae6f00d830 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 23 Mar 2023 13:49:08 +0800 Subject: [PATCH 3/8] fix: unknow db error when query ins_tables with table_name="" --- source/libs/parser/src/parUtil.c | 2 +- tests/script/tsim/query/sys_tbname.sim | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 793d05721e..563bc5e780 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -444,7 +444,7 @@ static int32_t getInsTagsTableTargetNameFromOp(int32_t acctId, SOperatorNode* pO } else if (QUERY_NODE_VALUE == nodeType(pOper->pRight)) { pVal = (SValueNode*)pOper->pRight; } - if (NULL == pCol || NULL == pVal) { + if (NULL == pCol || NULL == pVal || NULL == pVal->literal || 0 == strcmp(pVal->literal, "")) { return TSDB_CODE_SUCCESS; } diff --git a/tests/script/tsim/query/sys_tbname.sim b/tests/script/tsim/query/sys_tbname.sim index 7b3953129a..c676f2b1e0 100644 --- a/tests/script/tsim/query/sys_tbname.sim +++ b/tests/script/tsim/query/sys_tbname.sim @@ -51,6 +51,11 @@ if $data00 != @ins_stables@ then return -1 endi +sql select * from information_schema.ins_tables where table_name=''; +if $rows != 0 then + return -1 +endi + sql select tbname from information_schema.ins_tables; print $rows $data00 if $rows != 33 then From a732ec7b3c96edf605a83e2f91f7c0fb0d943b31 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 24 Mar 2023 10:36:01 +0800 Subject: [PATCH 4/8] fix: change float display mode --- tools/shell/src/shellEngine.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 3080b15b8c..f9602aaa4d 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -541,7 +541,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t printf("%*" PRIu64, width, *((uint64_t *)val)); break; case TSDB_DATA_TYPE_FLOAT: - printf("%*.5f", width, GET_FLOAT_VAL(val)); + printf("%*ef", width, GET_FLOAT_VAL(val)); break; case TSDB_DATA_TYPE_DOUBLE: n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val)); From 4167e4c00cd031ba94f4663c53f9a606fa1b366b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 24 Mar 2023 15:20:27 +0800 Subject: [PATCH 5/8] fix: some columns sma are not set issue --- source/dnode/vnode/src/tsdb/tsdbRead.c | 17 ++- tests/parallel_test/cases.task | 1 + tests/script/tsim/query/nullColSma.sim | 139 +++++++++++++++++++++++++ 3 files changed, 154 insertions(+), 3 deletions(-) create mode 100644 tests/script/tsim/query/nullColSma.sim diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 5ad9276c6c..7432d400e6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4507,6 +4507,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_ int32_t i = 0, j = 0; int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg); taosArrayInsert(pSup->pColAgg, 0, pTsAgg); + size++; while (j < numOfCols && i < size) { SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); @@ -4519,10 +4520,20 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_ if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; taosArrayInsert(pSup->pColAgg, i, &nullColAgg); + i += 1; } j += 1; } } + + while (j < numOfCols) { + if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { + SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; + taosArrayInsert(pSup->pColAgg, i, &nullColAgg); + i += 1; + } + j++; + } } int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) { @@ -4602,8 +4613,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, } else if (pAgg->colId < pSup->colId[j]) { i += 1; } else if (pSup->colId[j] < pAgg->colId) { - // ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID); - pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg; + pResBlock->pBlockAgg[pSup->slotId[j]] = NULL; + *allHave = false; j += 1; } } @@ -4996,4 +5007,4 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proacti void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { taosMemoryFreeClear(pReader->idStr); pReader->idStr = taosStrdup(idstr); -} \ No newline at end of file +} diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 8c8d41d5b6..58fa0cbfab 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -871,6 +871,7 @@ ,,y,script,./test.sh -f tsim/query/emptyTsRange.sim ,,y,script,./test.sh -f tsim/query/partitionby.sim ,,y,script,./test.sh -f tsim/query/tableCount.sim +,,y,script,./test.sh -f tsim/query/nullColSma.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim ,,y,script,./test.sh -f tsim/mnode/basic1.sim diff --git a/tests/script/tsim/query/nullColSma.sim b/tests/script/tsim/query/nullColSma.sim new file mode 100644 index 0000000000..886274e7e5 --- /dev/null +++ b/tests/script/tsim/query/nullColSma.sim @@ -0,0 +1,139 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +$dbPrefix = m_in_db +$tbPrefix = m_in_tb +$mtPrefix = m_in_mt +$tbNum = 1 +$rowNum = 200 +$totalNum = 400 + +print =============== step1 +$i = 0 +$db = $dbPrefix . $i +$mt = $mtPrefix . $i + +sql drop database if exists $db +sql create database $db vgroups 1 maxrows 200 minrows 10; +sql use $db +sql create table $mt (ts timestamp, f1 int, f2 float) TAGS(tgcol int) + +print ====== start create child tables and insert data +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + sql create table $tb using $mt tags( $i ) + + $x = 0 + while $x < $rowNum + $cc = $x * 1 + $ms = 1601481600000 + $cc + + sql insert into $tb values ($ms , NULL , $x ) + $x = $x + 1 + endw + + $i = $i + 1 +endw + +$i = 1 +$tb = $tbPrefix . $i +sql create table $tb using $mt tags( $i ) + +$x = 0 +while $x < $rowNum + $cc = $x * 1 + $ms = 1601481600000 + $cc + + sql insert into $tb values ($ms , $x , NULL ) + $x = $x + 1 +endw + +sql flush database $db + +print =============== step2 +$i = 0 +$tb = $tbPrefix . $i +sql select max(f1) from $tb +if $rows != 1 then + return -1 +endi +if $data00 != NULL then + return -1 +endi + +$i = 1 +$tb = $tbPrefix . $i +sql select max(f2) from $tb +if $rows != 1 then + return -1 +endi +if $data00 != NULL then + return -1 +endi + +$rowNum = 10 + +print ====== insert more data +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + + $x = 0 + while $x < $rowNum + $cc = $x * 1 + $ms = 1601481700000 + $cc + + sql insert into $tb values ($ms , $x , $x ) + $x = $x + 1 + endw + + $i = $i + 1 +endw + +$i = 1 +$tb = $tbPrefix . $i +$x = 0 +while $x < $rowNum + $cc = $x * 1 + $ms = 1601481700000 + $cc + + sql insert into $tb values ($ms , $x , $x ) + $x = $x + 1 +endw + +sql flush database $db + +print =============== step3 +$i = 0 +$tb = $tbPrefix . $i +sql select max(f1) from $tb +if $rows != 1 then + return -1 +endi +if $data00 != 9 then + return -1 +endi + +$i = 1 +$tb = $tbPrefix . $i +sql select max(f2) from $tb +if $rows != 1 then + return -1 +endi +if $data00 != 9.00000 then + print $data00 + return -1 +endi + + +print =============== clear +#sql drop database $db +#sql select * from information_schema.ins_databases +#if $rows != 0 then +# return -1 +#endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT From 8f6ac6d7715db8a57daa0049b717c43a0764e1ab Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 24 Mar 2023 15:36:42 +0800 Subject: [PATCH 6/8] fix(tmq): wait for 2mins when subscribe topics. --- source/client/src/clientTmq.c | 3 ++- source/dnode/vnode/src/inc/tq.h | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 82da067d8e..111ca28cdc 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1116,6 +1116,7 @@ _failed: } int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { + const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most const SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container); void* buf = NULL; @@ -1209,7 +1210,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t retryCnt = 0; while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { - if (retryCnt++ > 40) { + if (retryCnt++ > MAX_RETRY_COUNT) { goto FAIL; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 379ea25ee6..1d7c213e1a 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -159,7 +159,7 @@ typedef struct { int32_t size; } STqOffsetHead; -STqOffsetStore* tqOffsetOpen(); +STqOffsetStore* tqOffsetOpen(STQ* pTq); void tqOffsetClose(STqOffsetStore*); STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset); From e0d1a4054aba7fd56f684b4b116f21bd9468841b Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 24 Mar 2023 17:58:14 +0800 Subject: [PATCH 7/8] fix: taosbenchmark support same min/max for main (#20621) --- cmake/taostools_CMakeLists.txt.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 87f0579f44..897ccdd158 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG d11f210 + GIT_TAG 04296a5 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE From 481091bae685c46b8a35ee0dfa0760dbfd99435f Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 24 Mar 2023 19:01:19 +0800 Subject: [PATCH 8/8] fix: sma load issue --- source/dnode/vnode/src/tsdb/tsdbRead.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 7432d400e6..96bce02b67 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4521,6 +4521,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_ SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; taosArrayInsert(pSup->pColAgg, i, &nullColAgg); i += 1; + size++; } j += 1; }