From 8783ebcb02b2840371d9ead18ff149f565b0fe40 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Sep 2023 14:34:36 +0800 Subject: [PATCH] enh(stream): add some logs and record the submit package performance. --- include/libs/stream/tstream.h | 8 ++ source/dnode/vnode/src/tq/tqSink.c | 109 +++++++++++++++++----------- source/libs/stream/src/streamTask.c | 2 +- 3 files changed, 76 insertions(+), 43 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ea3524d12d..86e79835ac 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -312,10 +312,17 @@ typedef struct STaskSchedInfo { void* pTimer; } STaskSchedInfo; +typedef struct SSinkTaskRecorder { + int64_t numOfPackages; + int64_t numOfRows; +} SSinkTaskRecorder; + typedef struct { + int64_t created; int64_t init; int64_t step1Start; int64_t step2Start; + int64_t sinkStart; } STaskTimestamp; struct SStreamTask { @@ -345,6 +352,7 @@ struct SStreamTask { STaskSinkSma smaSink; STaskSinkFetch fetchSink; }; + SSinkTaskRecorder sinkRecorder; void* launchTaskTimer; SMsgCb* pMsgCb; // msg handle diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index a738befec9..5596c958e9 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -133,7 +133,6 @@ static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { return TSDB_CODE_SUCCESS; } - void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; @@ -144,6 +143,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { int32_t numOfBlocks = taosArrayGetSize(pBlocks); int32_t code = TSDB_CODE_SUCCESS; + if (pTask->tsInfo.sinkStart == 0) { + pTask->tsInfo.sinkStart = taosGetTimestampMs(); + } + tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, pTask->id.idStr, numOfBlocks); SArray* tagArray = NULL; @@ -259,10 +262,17 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { continue; } else { + pTask->sinkRecorder.numOfPackages += 1; code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask); } } + if ((pTask->sinkRecorder.numOfPackages % 5000) == 0) { + SSinkTaskRecorder* pRec = &pTask->sinkRecorder; + tqInfo("s-task:%s vgId:d write %d blocks (%" PRId64 " rows) into dst table, duration:%.2fSec.", pTask->id.idStr, + vgId, pRec->numOfPackages, pRec->numOfRows, (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0); + } + tqDebug("vgId:%d, s-task:%s write results completed", vgId, pTask->id.idStr); _end: @@ -399,6 +409,55 @@ static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSi return code; } +static int32_t tqBuildSubmitReq(SSubmitTbData* pTableData, int32_t vgId, void** pMsg, int32_t* msgLen) { + int32_t code = 0; + void* pBuf = NULL; + *msgLen = 0; + + SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; + if (submitReq.aSubmitTbData == NULL) { + tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE); + taosArrayDestroy(pTableData->aRowP); + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(submitReq.aSubmitTbData, pTableData); + + // encode + int32_t len = 0; + tEncodeSize(tEncodeSubmitReq, &submitReq, len, code); + + SEncoder encoder; + len += sizeof(SSubmitReq2Msg); + + pBuf = rpcMallocCont(len); + if (NULL == pBuf) { + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SSubmitReq2Msg*)pBuf)->header.vgId = vgId; + ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); + ((SSubmitReq2Msg*)pBuf)->version = htobe64(1); + + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); + if (tEncodeSubmitReq(&encoder, &submitReq) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("failed to encode submit req, code:%s, ignore and continue", terrstr()); + tEncoderClear(&encoder); + rpcFreeCont(pBuf); + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + return code; + } + + tEncoderClear(&encoder); + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + + *msgLen = len; + *pMsg = pBuf; + return TSDB_CODE_SUCCESS; +} + int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock, SStreamTask* pTask) { int32_t numOfRows = pDataBlock->info.rows; @@ -406,7 +465,6 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, uint64_t groupId = pDataBlock->info.id.groupId; STSchema* pTSchema = pTask->tbSink.pTSchema; int32_t code = TSDB_CODE_SUCCESS; - void* pBuf = NULL; SArray* pVals = NULL; const char* id = pTask->id.idStr; @@ -573,7 +631,7 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, void* colData = colDataGetData(pColData, j); if (IS_STR_DATA_TYPE(pCol->type)) { // address copy, no value - SValue sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; + SValue sv = (SValue){.nData = varDataLen(colData), .pData = (uint8_t*) varDataVal(colData)}; SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); taosArrayPush(pVals, &cv); } else { @@ -601,53 +659,20 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, taosArrayPush(tbData.aRowP, &pRow); } - SSubmitReq2 submitReq = {0}; - if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { - tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); - - taosArrayDestroy(tbData.aRowP); - taosArrayDestroy(pVals); - return TSDB_CODE_OUT_OF_MEMORY; - } - - taosArrayPush(submitReq.aSubmitTbData, &tbData); - - // encode int32_t len = 0; - tEncodeSize(tEncodeSubmitReq, &submitReq, len, code); - - SEncoder encoder; - len += sizeof(SSubmitReq2Msg); - - pBuf = rpcMallocCont(len); - if (NULL == pBuf) { - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - taosArrayDestroy(tbData.aRowP); + void* pBuf = NULL; + code = tqBuildSubmitReq(&tbData, vgId, &pBuf, &len); + if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pVals); - } - - ((SSubmitReq2Msg*)pBuf)->header.vgId = vgId; - ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); - ((SSubmitReq2Msg*)pBuf)->version = htobe64(1); - - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); - if (tEncodeSubmitReq(&encoder, &submitReq) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to encode submit req, code:%s, ignore and continue", terrstr()); - tEncoderClear(&encoder); - rpcFreeCont(pBuf); - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - return code; } - tEncoderClear(&encoder); - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + pTask->sinkRecorder.numOfRows += numOfRows; - SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len }; + SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len}; code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg); - if(code == TSDB_CODE_SUCCESS) { + if (code == TSDB_CODE_SUCCESS) { tqDebug("s-task:%s send submit msg to dstTable:%s, numOfRows:%d", id, pTableSinkInfo->name.data, numOfRows); } else { tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 711dbf65e7..7ce5040257 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -375,7 +375,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i return -1; } - pTask->tsInfo.init = taosGetTimestampMs(); + pTask->tsInfo.created = taosGetTimestampMs(); pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta;