From 68f310dd486421e6c577307c383054e8daf92bcc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 28 Mar 2023 15:05:10 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/streamState.h | 23 +++++---- include/libs/stream/tstream.h | 5 +- source/dnode/vnode/CMakeLists.txt | 2 +- source/dnode/vnode/src/tq/tq.c | 4 ++ source/dnode/vnode/src/tq/tqPush.c | 20 +++----- .../dnode/vnode/src/tq/{tqExec.c => tqScan.c} | 47 +++++++++++-------- 6 files changed, 53 insertions(+), 48 deletions(-) rename source/dnode/vnode/src/tq/{tqExec.c => tqScan.c} (93%) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 6b09bf4899..fd5cec2931 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -23,20 +23,19 @@ extern "C" { #ifndef _STREAM_STATE_H_ #define _STREAM_STATE_H_ -typedef struct SStreamTask SStreamTask; - typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2); typedef struct STdbState { - SStreamTask* pOwner; - TDB* db; - TTB* pStateDb; - TTB* pFuncStateDb; - TTB* pFillStateDb; // todo refactor - TTB* pSessionStateDb; - TTB* pParNameDb; - TTB* pParTagDb; - TXN* txn; + struct SStreamTask* pOwner; + + TDB* db; + TTB* pStateDb; + TTB* pFuncStateDb; + TTB* pFillStateDb; // todo refactor + TTB* pSessionStateDb; + TTB* pParNameDb; + TTB* pParTagDb; + TXN* txn; } STdbState; // incremental state storage @@ -45,7 +44,7 @@ typedef struct { int32_t number; } SStreamState; -SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); +SStreamState* streamStateOpen(char* path, struct SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); void streamStateClose(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1d301623b1..d04e965c6f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -295,7 +295,7 @@ typedef struct { SEpSet epSet; } SStreamChildEpInfo; -typedef struct SStreamTask { +struct SStreamTask { int64_t streamId; int32_t taskId; int32_t totalLevel; @@ -362,8 +362,7 @@ typedef struct SStreamTask { int64_t checkpointingId; int32_t checkpointAlignCnt; - -} SStreamTask; +}; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 8dc3f46ae3..9911752f8e 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -57,7 +57,7 @@ target_sources( # tq "src/tq/tq.c" - "src/tq/tqExec.c" + "src/tq/tqScan.c" "src/tq/tqMeta.c" "src/tq/tqRead.c" "src/tq/tqOffset.c" diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2d2c70c84a..e1149b48af 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -473,6 +473,10 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, pOffset); + if (code != TSDB_CODE_SUCCESS) { + taosWUnLockLatch(&pTq->lock); + return code; + } // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index e07ab06458..73d722dba5 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -220,7 +220,7 @@ static void freeItem(void* param) { static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); - size_t numOfKeys = taosArrayGetSize(pCachedKeys); + int32_t numOfKeys = (int32_t) taosArrayGetSize(pCachedKeys); for (int32_t i = 0; i < numOfKeys; i++) { SItem* pItem = taosArrayGet(pCachedKeys, i); @@ -248,13 +248,9 @@ static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int6 qTaskInfo_t pTaskInfo = pExec->task; // prepare scan mem data - SPackedData submit = { - .msgStr = pData, - .msgLen = dataLen, - .ver = ver, - }; + SPackedData submit = {.msgStr = pData, .msgLen = dataLen, .ver = ver}; - if(qStreamSetScanMemData(pTaskInfo, submit) != 0){ + if (qStreamSetScanMemData(pTaskInfo, submit) != 0) { return; } @@ -287,7 +283,7 @@ static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int6 } } -int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { +int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); int32_t len = msgLen - sizeof(SSubmitReq2Msg); int32_t vgId = TD_VID(pTq->pVnode); @@ -341,6 +337,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) taosWUnLockLatch(&pTq->lock); } + // push data for stream processing if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) { if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) { return 0; @@ -353,12 +350,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tqError("failed to copy data for stream since out of memory"); return -1; } + memcpy(data, pReq, len); - SPackedData submit = { - .msgStr = data, - .msgLen = len, - .ver = ver, - }; + SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver}; tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq); tqProcessSubmitReq(pTq, submit); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqScan.c similarity index 93% rename from source/dnode/vnode/src/tq/tqExec.c rename to source/dnode/vnode/src/tq/tqScan.c index 6845350346..36d0631195 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -18,7 +18,9 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); - if (buf == NULL) return -1; + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; pRetrieve->useconds = 0; @@ -31,7 +33,8 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t actualLen += sizeof(SRetrieveTableRsp); taosArrayPush(pRsp->blockDataLen, &actualLen); taosArrayPush(pRsp->blockData, &buf); - return 0; + + return TSDB_CODE_SUCCESS; } static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) { @@ -63,38 +66,39 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { const int32_t MAX_ROWS_TO_RETURN = 4096; + int32_t vgId = TD_VID(pTq->pVnode); + int32_t code = 0; + int32_t totalRows = 0; const STqExecHandle* pExec = &pHandle->execHandle; - - qTaskInfo_t task = pExec->task; - int32_t vgId = TD_VID(pTq->pVnode); + qTaskInfo_t task = pExec->task; if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); if (pOffset->type == TMQ_OFFSET__LOG) { pRsp->rspOffset = *pOffset; - return 0; + return code; } else { tqOffsetResetToLog(pOffset, pHandle->snapshotVer); if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); pRsp->rspOffset = *pOffset; - return 0; + return code; } } } - int32_t totalRows = 0; - while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; - tqDebug("vgId:%d, tmq task start to execute, consumer:0x%"PRIx64, vgId, pHandle->consumerId); - if (qExecTask(task, &pDataBlock, &ts) < 0) { + tqDebug("vgId:%d, tmq task start to execute, consumer:0x%" PRIx64, vgId, pHandle->consumerId); + + code = qExecTask(task, &pDataBlock, &ts); + if (code != TSDB_CODE_SUCCESS) { tqError("vgId:%d, task exec error since %s, consumer:0x%" PRIx64, vgId, terrstr(), pHandle->consumerId); - return -1; + return code; } // current scan should be stopped ASAP, since the re-balance occurs. @@ -102,7 +106,12 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs break; } - tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); + code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d, failed to add block to rsp msg", vgId); + return code; + } + pRsp->blockNum++; tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d", vgId, pHandle->consumerId, @@ -116,25 +125,25 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs } } - if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) { - return -1; - } + qStreamExtractOffset(task, &pRsp->rspOffset); if (pRsp->rspOffset.type == 0) { + code = TSDB_CODE_INVALID_PARA; tqError("vgId:%d, expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, vgId, pRsp->rspOffset.type, pRsp->rspOffset.ts, pRsp->rspOffset.uid, pRsp->rspOffset.version); - return -1; + return code; } if (pRsp->withTbName || pRsp->withSchema) { + code = TSDB_CODE_INVALID_PARA; tqError("vgId:%d, get column should not with meta:%d,%d", vgId, pRsp->withTbName, pRsp->withSchema); - return -1; + return code; } tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d, rows:%d", vgId, pHandle->consumerId, pRsp->blockNum, totalRows); - return 0; + return code; } int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {