diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 6b09bf4899..fd5cec2931 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -23,20 +23,19 @@ extern "C" { #ifndef _STREAM_STATE_H_ #define _STREAM_STATE_H_ -typedef struct SStreamTask SStreamTask; - typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2); typedef struct STdbState { - SStreamTask* pOwner; - TDB* db; - TTB* pStateDb; - TTB* pFuncStateDb; - TTB* pFillStateDb; // todo refactor - TTB* pSessionStateDb; - TTB* pParNameDb; - TTB* pParTagDb; - TXN* txn; + struct SStreamTask* pOwner; + + TDB* db; + TTB* pStateDb; + TTB* pFuncStateDb; + TTB* pFillStateDb; // todo refactor + TTB* pSessionStateDb; + TTB* pParNameDb; + TTB* pParTagDb; + TXN* txn; } STdbState; // incremental state storage @@ -45,7 +44,7 @@ typedef struct { int32_t number; } SStreamState; -SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); +SStreamState* streamStateOpen(char* path, struct SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); void streamStateClose(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1d301623b1..d04e965c6f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -295,7 +295,7 @@ typedef struct { SEpSet epSet; } SStreamChildEpInfo; -typedef struct SStreamTask { +struct SStreamTask { int64_t streamId; int32_t taskId; int32_t totalLevel; @@ -362,8 +362,7 @@ typedef struct SStreamTask { int64_t checkpointingId; int32_t checkpointAlignCnt; - -} SStreamTask; +}; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 8dc3f46ae3..9911752f8e 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -57,7 +57,7 @@ target_sources( # tq "src/tq/tq.c" - "src/tq/tqExec.c" + "src/tq/tqScan.c" "src/tq/tqMeta.c" "src/tq/tqRead.c" "src/tq/tqOffset.c" diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 0fe7f9a773..8b01ba237f 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -193,7 +193,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type); -int tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); +int tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7997a9eb1a..4db53c1627 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -223,19 +223,6 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { SMqDataRsp* pRsp = pPushEntry->pDataRsp; - -#if 0 - A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); - A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); - - A(!pRsp->withSchema); - A(taosArrayGetSize(pRsp->blockSchema) == 0); - - if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { - A(pRsp->rspOffset.version > pRsp->reqOffset.version); - } -#endif - SMqRspHead* pHeader = &pPushEntry->pDataRsp->head; doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType); @@ -486,6 +473,10 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, pOffset); + if (code != TSDB_CODE_SUCCESS) { + taosWUnLockLatch(&pTq->lock); + return code; + } // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && @@ -905,7 +896,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg atomic_store_32(&pHandle->epoch, -1); // remove if it has been register in the push manager, and return one empty block to consumer - tqRemovePushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); + tqUnregisterPushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 797aeb3f04..1619829115 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -206,7 +206,84 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ } #endif -int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { +typedef struct { + void* pKey; + int64_t keyLen; +} SItem; + +static void recordPushedEntry(SArray* cachedKey, void* pIter); + +static void freeItem(void* param) { + SItem* p = (SItem*) param; + taosMemoryFree(p->pKey); +} + +static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) { + int32_t vgId = TD_VID(pTq->pVnode); + int32_t numOfKeys = (int32_t) taosArrayGetSize(pCachedKeys); + + for (int32_t i = 0; i < numOfKeys; i++) { + SItem* pItem = taosArrayGet(pCachedKeys, i); + if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) { + tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*) pItem->pKey); + } + } + + if (numOfKeys > 0) { + tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr)); + } +} + +static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData, + int32_t dataLen, SArray* pCachedKey) { + STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; + + SMqDataRsp* pRsp = pPushEntry->pDataRsp; + if (pRsp->reqOffset.version >= ver) { + tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId, + pRsp->reqOffset.version, ver); + return; + } + + qTaskInfo_t pTaskInfo = pExec->task; + + // prepare scan mem data + SPackedData submit = {.msgStr = pData, .msgLen = dataLen, .ver = ver}; + + if (qStreamSetScanMemData(pTaskInfo, submit) != 0) { + return; + } + + // here start to scan submit block to extract the subscribed data + int32_t totalRows = 0; + + while (1) { + SSDataBlock* pDataBlock = NULL; + uint64_t ts = 0; + if (qExecTask(pTaskInfo, &pDataBlock, &ts) < 0) { + tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr()); + } + + if (pDataBlock == NULL) { + break; + } + + tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); + pRsp->blockNum++; + totalRows += pDataBlock->info.rows; + } + + tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d, rows:%d", vgId, pPushEntry->subKey, pRsp->blockNum, + totalRows); + + if (pRsp->blockNum > 0) { + tqOffsetResetToLog(&pRsp->rspOffset, ver); + tqPushDataRsp(pTq, pPushEntry); + recordPushedEntry(pCachedKey, pIter); + } +} + +int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); int32_t len = msgLen - sizeof(SSubmitReq2Msg); int32_t vgId = TD_VID(pTq->pVnode); @@ -220,24 +297,19 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); - SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); - SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); - - void* data = taosMemoryMalloc(len); + void* data = taosMemoryMalloc(len); if (data == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to copy data for stream since out of memory"); - taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); - taosArrayDestroy(cachedKeyLens); - - // unlock + tqError("failed to copy data for stream since out of memory, vgId:%d", vgId); taosWUnLockLatch(&pTq->lock); return -1; } memcpy(data, pReq, len); - void* pIter = NULL; + SArray* cachedKey = taosArrayInit(0, sizeof(SItem)); + void* pIter = NULL; + while (1) { pIter = taosHashIterate(pTq->pPushMgr, pIter); if (pIter == NULL) { @@ -248,83 +320,29 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); if (pHandle == NULL) { - tqDebug("vgId:%d, cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey); - continue; - } - - SMqDataRsp* pRsp = pPushEntry->pDataRsp; - if (pRsp->reqOffset.version >= ver) { - tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId, - pRsp->reqOffset.version, ver); + tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, pPushEntry->subKey); continue; } STqExecHandle* pExec = &pHandle->execHandle; - qTaskInfo_t task = pExec->task; - - // prepare scan mem data - SPackedData submit = { - .msgStr = data, - .msgLen = len, - .ver = ver, - }; - if(qStreamSetScanMemData(task, submit) != 0){ - continue; - } - - // here start to scan submit block to extract the subscribed data - while (1) { - SSDataBlock* pDataBlock = NULL; - uint64_t ts = 0; - if (qExecTask(task, &pDataBlock, &ts) < 0) { - tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr()); - } - - if (pDataBlock == NULL) { - break; - } - - tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); - pRsp->blockNum++; - } - - tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", vgId, pPushEntry->subKey, pRsp->blockNum); - if (pRsp->blockNum > 0) { - // set offset - tqOffsetResetToLog(&pRsp->rspOffset, ver); - - // remove from hash - size_t kLen; - void* key = taosHashGetKey(pIter, &kLen); - void* keyCopy = taosMemoryCalloc(1, kLen + 1); - memcpy(keyCopy, key, kLen); - - taosArrayPush(cachedKeys, &keyCopy); - taosArrayPush(cachedKeyLens, &kLen); - - tqPushDataRsp(pTq, pPushEntry); - } + doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey); } - // delete entry - for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) { - void* key = taosArrayGetP(cachedKeys, i); - size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i); - if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) { - tqError("vgId:%d, tq push hash remove key error, key: %s", pTq->pVnode->config.vgId, (char*)key); - } - } - - taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); - taosArrayDestroy(cachedKeyLens); + doRemovePushedEntry(cachedKey, pTq); + taosArrayDestroyEx(cachedKey, freeItem); taosMemoryFree(data); } + // unlock taosWUnLockLatch(&pTq->lock); } + // push data for stream processing if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) { - if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; + if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) { + return 0; + } + if (msgType == TDMT_VND_SUBMIT) { void* data = taosMemoryMalloc(len); if (data == NULL) { @@ -332,12 +350,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tqError("failed to copy data for stream since out of memory"); return -1; } + memcpy(data, pReq, len); - SPackedData submit = { - .msgStr = data, - .msgLen = len, - .ver = ver, - }; + SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver}; tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq); tqProcessSubmitReq(pTq, submit); @@ -351,6 +366,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) return 0; } +void recordPushedEntry(SArray* cachedKey, void* pIter) { + size_t kLen = 0; + void* key = taosHashGetKey(pIter, &kLen); + SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen}; + taosArrayPush(cachedKey, &item); +} + int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type) { uint64_t consumerId = pRequest->consumerId; @@ -388,8 +410,8 @@ int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, return 0; } -int32_t tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) { - int32_t vgId = TD_VID(pTq->pVnode); +int32_t tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) { + int32_t vgId = TD_VID(pTq->pVnode); STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen); if (pEntry != NULL) { diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqScan.c similarity index 89% rename from source/dnode/vnode/src/tq/tqExec.c rename to source/dnode/vnode/src/tq/tqScan.c index f23b5f8526..1a166d326f 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -18,7 +18,9 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); - if (buf == NULL) return -1; + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; pRetrieve->useconds = 0; @@ -31,7 +33,8 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t actualLen += sizeof(SRetrieveTableRsp); taosArrayPush(pRsp->blockDataLen, &actualLen); taosArrayPush(pRsp->blockData, &buf); - return 0; + + return TSDB_CODE_SUCCESS; } static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) { @@ -62,72 +65,85 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in } int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { - const STqExecHandle* pExec = &pHandle->execHandle; + const int32_t MAX_ROWS_TO_RETURN = 4096; + int32_t vgId = TD_VID(pTq->pVnode); + int32_t code = 0; + int32_t totalRows = 0; - qTaskInfo_t task = pExec->task; - int32_t vgId = TD_VID(pTq->pVnode); + const STqExecHandle* pExec = &pHandle->execHandle; + qTaskInfo_t task = pExec->task; if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); if (pOffset->type == TMQ_OFFSET__LOG) { pRsp->rspOffset = *pOffset; - return 0; + return code; } else { tqOffsetResetToLog(pOffset, pHandle->snapshotVer); if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); pRsp->rspOffset = *pOffset; - return 0; + return code; } } } - int32_t rowCnt = 0; while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; - tqDebug("vgId:%d, tmq task start to execute, consumer:0x%"PRIx64, vgId, pHandle->consumerId); - if (qExecTask(task, &pDataBlock, &ts) < 0) { + tqDebug("vgId:%d, tmq task start to execute, consumer:0x%" PRIx64, vgId, pHandle->consumerId); + + code = qExecTask(task, &pDataBlock, &ts); + if (code != TSDB_CODE_SUCCESS) { tqError("vgId:%d, task exec error since %s, consumer:0x%" PRIx64, vgId, terrstr(), pHandle->consumerId); - return -1; + return code; } - tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, vgId, pDataBlock); - - // current scan should be stopped asap, since the rebalance occurs. + // current scan should be stopped ASAP, since the re-balance occurs. if (pDataBlock == NULL) { break; } - tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); + code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d, failed to add block to rsp msg", vgId); + return code; + } + pRsp->blockNum++; + tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d", vgId, pHandle->consumerId, + pDataBlock->info.rows, pRsp->blockNum); + if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - rowCnt += pDataBlock->info.rows; - if (rowCnt >= 4096) { + totalRows += pDataBlock->info.rows; + if (totalRows >= MAX_ROWS_TO_RETURN) { break; } } } - if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) { - return -1; - } + qStreamExtractOffset(task, &pRsp->rspOffset); if (pRsp->rspOffset.type == 0) { + code = TSDB_CODE_INVALID_PARA; tqError("vgId:%d, expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, vgId, pRsp->rspOffset.type, pRsp->rspOffset.ts, pRsp->rspOffset.uid, pRsp->rspOffset.version); - return -1; + return code; } if (pRsp->withTbName || pRsp->withSchema) { + code = TSDB_CODE_INVALID_PARA; tqError("vgId:%d, get column should not with meta:%d,%d", vgId, pRsp->withTbName, pRsp->withSchema); - return -1; + return code; } - return 0; + tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, total blocks:%d, rows:%d", vgId, pHandle->consumerId, + pRsp->blockNum, totalRows); + + return code; } int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {