From 4fd3064fd9bd97bc2cece0579d0ca9ae15d654ab Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 28 Mar 2023 14:43:37 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 15 +-- source/dnode/vnode/src/tq/tqExec.c | 19 ++- source/dnode/vnode/src/tq/tqPush.c | 180 +++++++++++++++----------- 4 files changed, 119 insertions(+), 97 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 0fe7f9a773..8b01ba237f 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -193,7 +193,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type); -int tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); +int tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ccd8b885a0..2d2c70c84a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -223,19 +223,6 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { SMqDataRsp* pRsp = pPushEntry->pDataRsp; - -#if 0 - A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); - A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); - - A(!pRsp->withSchema); - A(taosArrayGetSize(pRsp->blockSchema) == 0); - - if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { - A(pRsp->rspOffset.version > pRsp->reqOffset.version); - } -#endif - SMqRspHead* pHeader = &pPushEntry->pDataRsp->head; doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType); @@ -895,7 +882,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg atomic_store_32(&pHandle->epoch, -1); // remove if it has been register in the push manager, and return one empty block to consumer - tqRemovePushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); + tqUnregisterPushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index ce86a6757e..6845350346 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -62,6 +62,8 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in } int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { + const int32_t MAX_ROWS_TO_RETURN = 4096; + const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; @@ -82,7 +84,8 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs } } - int32_t rowCnt = 0; + int32_t totalRows = 0; + while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; @@ -94,9 +97,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs return -1; } - tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, vgId, pDataBlock); - - // current scan should be stopped asap, since the rebalance occurs. + // current scan should be stopped ASAP, since the re-balance occurs. if (pDataBlock == NULL) { break; } @@ -104,9 +105,12 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); pRsp->blockNum++; + tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d", vgId, pHandle->consumerId, + pDataBlock->info.rows, pRsp->blockNum); + if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - rowCnt += pDataBlock->info.rows; - if (rowCnt >= 4096) { + totalRows += pDataBlock->info.rows; + if (totalRows >= MAX_ROWS_TO_RETURN) { break; } } @@ -127,6 +131,9 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs return -1; } + tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d, rows:%d", vgId, pHandle->consumerId, + pRsp->blockNum, totalRows); + return 0; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 797aeb3f04..e07ab06458 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -206,6 +206,87 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ } #endif +typedef struct { + void* pKey; + int64_t keyLen; +} SItem; + +static void recordPushedEntry(SArray* cachedKey, void* pIter); + +static void freeItem(void* param) { + SItem* p = (SItem*) param; + taosMemoryFree(p->pKey); +} + +static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) { + int32_t vgId = TD_VID(pTq->pVnode); + size_t numOfKeys = taosArrayGetSize(pCachedKeys); + + for (int32_t i = 0; i < numOfKeys; i++) { + SItem* pItem = taosArrayGet(pCachedKeys, i); + if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) { + tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*) pItem->pKey); + } + } + + if (numOfKeys > 0) { + tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr)); + } +} + +static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData, + int32_t dataLen, SArray* pCachedKey) { + STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; + + SMqDataRsp* pRsp = pPushEntry->pDataRsp; + if (pRsp->reqOffset.version >= ver) { + tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId, + pRsp->reqOffset.version, ver); + return; + } + + qTaskInfo_t pTaskInfo = pExec->task; + + // prepare scan mem data + SPackedData submit = { + .msgStr = pData, + .msgLen = dataLen, + .ver = ver, + }; + + if(qStreamSetScanMemData(pTaskInfo, submit) != 0){ + return; + } + + // here start to scan submit block to extract the subscribed data + int32_t totalRows = 0; + + while (1) { + SSDataBlock* pDataBlock = NULL; + uint64_t ts = 0; + if (qExecTask(pTaskInfo, &pDataBlock, &ts) < 0) { + tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr()); + } + + if (pDataBlock == NULL) { + break; + } + + tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); + pRsp->blockNum++; + totalRows += pDataBlock->info.rows; + } + + tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d, rows:%d", vgId, pPushEntry->subKey, pRsp->blockNum, + totalRows); + + if (pRsp->blockNum > 0) { + tqOffsetResetToLog(&pRsp->rspOffset, ver); + tqPushDataRsp(pTq, pPushEntry); + recordPushedEntry(pCachedKey, pIter); + } +} + int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); int32_t len = msgLen - sizeof(SSubmitReq2Msg); @@ -220,24 +301,19 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); - SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); - SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); - - void* data = taosMemoryMalloc(len); + void* data = taosMemoryMalloc(len); if (data == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to copy data for stream since out of memory"); - taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); - taosArrayDestroy(cachedKeyLens); - - // unlock + tqError("failed to copy data for stream since out of memory, vgId:%d, consumer:0x%"PRIx64, vgId); taosWUnLockLatch(&pTq->lock); return -1; } memcpy(data, pReq, len); - void* pIter = NULL; + SArray* cachedKey = taosArrayInit(0, sizeof(SItem)); + void* pIter = NULL; + while (1) { pIter = taosHashIterate(pTq->pPushMgr, pIter); if (pIter == NULL) { @@ -248,83 +324,28 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); if (pHandle == NULL) { - tqDebug("vgId:%d, cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey); - continue; - } - - SMqDataRsp* pRsp = pPushEntry->pDataRsp; - if (pRsp->reqOffset.version >= ver) { - tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId, - pRsp->reqOffset.version, ver); + tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, pPushEntry->subKey); continue; } STqExecHandle* pExec = &pHandle->execHandle; - qTaskInfo_t task = pExec->task; - - // prepare scan mem data - SPackedData submit = { - .msgStr = data, - .msgLen = len, - .ver = ver, - }; - if(qStreamSetScanMemData(task, submit) != 0){ - continue; - } - - // here start to scan submit block to extract the subscribed data - while (1) { - SSDataBlock* pDataBlock = NULL; - uint64_t ts = 0; - if (qExecTask(task, &pDataBlock, &ts) < 0) { - tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr()); - } - - if (pDataBlock == NULL) { - break; - } - - tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); - pRsp->blockNum++; - } - - tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", vgId, pPushEntry->subKey, pRsp->blockNum); - if (pRsp->blockNum > 0) { - // set offset - tqOffsetResetToLog(&pRsp->rspOffset, ver); - - // remove from hash - size_t kLen; - void* key = taosHashGetKey(pIter, &kLen); - void* keyCopy = taosMemoryCalloc(1, kLen + 1); - memcpy(keyCopy, key, kLen); - - taosArrayPush(cachedKeys, &keyCopy); - taosArrayPush(cachedKeyLens, &kLen); - - tqPushDataRsp(pTq, pPushEntry); - } + doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey); } - // delete entry - for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) { - void* key = taosArrayGetP(cachedKeys, i); - size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i); - if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) { - tqError("vgId:%d, tq push hash remove key error, key: %s", pTq->pVnode->config.vgId, (char*)key); - } - } - - taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); - taosArrayDestroy(cachedKeyLens); + doRemovePushedEntry(cachedKey, pTq); + taosArrayDestroyEx(cachedKey, freeItem); taosMemoryFree(data); } + // unlock taosWUnLockLatch(&pTq->lock); } if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) { - if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; + if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) { + return 0; + } + if (msgType == TDMT_VND_SUBMIT) { void* data = taosMemoryMalloc(len); if (data == NULL) { @@ -351,6 +372,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) return 0; } +void recordPushedEntry(SArray* cachedKey, void* pIter) { + size_t kLen = 0; + void* key = taosHashGetKey(pIter, &kLen); + SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen}; + taosArrayPush(cachedKey, &item); +} + int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type) { uint64_t consumerId = pRequest->consumerId; @@ -388,8 +416,8 @@ int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, return 0; } -int32_t tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) { - int32_t vgId = TD_VID(pTq->pVnode); +int32_t tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) { + int32_t vgId = TD_VID(pTq->pVnode); STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen); if (pEntry != NULL) {