From c20dd002fb7adfdb8ec2b7f434becd3bc049f4f6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 7 Oct 2023 01:14:00 +0800 Subject: [PATCH] enh(stream): add more info for sink task. --- include/libs/stream/tstream.h | 4 ++-- source/dnode/mnode/impl/src/mndStream.c | 18 +++++++++++----- source/dnode/vnode/src/tq/tqSink.c | 26 ++++++++++++------------ source/dnode/vnode/src/tq/tqStreamTask.c | 7 ++----- source/libs/stream/inc/streamInt.h | 4 ++-- source/libs/stream/src/streamMeta.c | 17 ++++++++++------ source/libs/stream/src/streamQueue.c | 2 +- 7 files changed, 44 insertions(+), 34 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bdc2c0a5ec..a67199a7d6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -602,8 +602,8 @@ typedef struct STaskStatusEntry { int64_t offset; // only valid for source task double inputQUsed; // in MiB double inputRate; - double outputQUsed; // in MiB - double outputRate; + double sinkQuota; // existed quota size for sink task + double sinkDataSize; // sink to dest data size } STaskStatusEntry; typedef struct SStreamHbMsg { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index de4f6e85a3..7738bb5c41 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1603,9 +1603,15 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); // colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); - // offset version info - const char* offsetStr = "%"PRId64"[%"PRId64",%"PRId64"]"; - sprintf(buf, offsetStr, pe->offset, pe->verStart, pe->verEnd); + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + const char* sinkStr = "Quota:%2.fMiB, SinkData:%.2fMiB"; + sprintf(buf, sinkStr, pe->sinkQuota, pe->sinkDataSize); + } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + // offset version info + const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; + sprintf(buf, offsetStr, pe->offset, pe->verStart, pe->verEnd); + } + STR_TO_VARSTR(vbuf, buf); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -2459,11 +2465,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { pEntry->stage = p->stage; pEntry->inputQUsed = p->inputQUsed; pEntry->inputRate = p->inputRate; - pEntry->outputQUsed = p->outputQUsed; - pEntry->outputRate = p->outputRate; +// pEntry->outputQUsed = p->outputQUsed; +// pEntry->outputRate = p->outputRate; pEntry->offset = p->offset; pEntry->verStart = p->verStart; pEntry->verEnd = p->verEnd; + pEntry->sinkQuota = p->sinkQuota; + pEntry->sinkDataSize = p->sinkDataSize; } pEntry->status = p->status; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 8009eccb1b..23b5aff7fa 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -24,6 +24,7 @@ typedef struct STableSinkInfo { tstr name; } STableSinkInfo; +static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks); static int32_t tsAscendingSortFn(const void* p1, const void* p2); static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, SSubmitTbData* pTableData); @@ -744,6 +745,17 @@ int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlo return code; } +bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) { + for(int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* p = taosArrayGet(pBlocks, i); + if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) { + return false; + } + } + + return true; +} + void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; @@ -755,19 +767,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { int32_t code = TSDB_CODE_SUCCESS; const char* id = pTask->id.idStr; - if (pTask->execInfo.start == 0) { - pTask->execInfo.start = taosGetTimestampMs(); - } - - bool onlySubmitData = true; - for(int32_t i = 0; i < numOfBlocks; ++i) { - SSDataBlock* p = taosArrayGet(pBlocks, i); - if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) { - onlySubmitData = false; - break; - } - } - + bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks); if (!onlySubmitData) { tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id, numOfBlocks); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 3685435e34..8bee7d80a2 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -372,10 +372,7 @@ static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32 SStreamQueueItem* pItem = NULL; int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id); - if ((code != TSDB_CODE_SUCCESS || pItem == NULL)/* && (numOfItems + numOfNewItems == 0)*/) { // failed, continue -// handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); -// streamMetaReleaseTask(pMeta, pTask); -// taosThreadMutexUnlock(&pTask->lock); + if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue break; } @@ -459,7 +456,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { bool hasNewData = doPutDataIntoInputQFromWal(pTask, maxVer, &numOfItems); taosThreadMutexUnlock(&pTask->lock); - if (/*(code == TSDB_CODE_SUCCESS) || */(numOfItems > 0) || hasNewData) { + if ((numOfItems > 0) || hasNewData) { noDataInWal = false; code = streamSchedExec(pTask); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 2912c2954d..3ee88aaa1b 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -40,8 +40,8 @@ extern "C" { #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec #define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) -#define STREAM_TASK_QUEUE_CAPACITY 20480 -#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30) +#define STREAM_TASK_QUEUE_CAPACITY 20480 +#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30) // clang-format off #define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 19cdf9d6bd..35007bda1d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -776,8 +776,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->outputQUsed) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->outputRate) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1; if (tEncodeI64(pEncoder, ps->offset) < 0) return -1; if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1; if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1; @@ -803,8 +803,8 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.outputQUsed) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.outputRate) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1; if (tDecodeI64(pDecoder, &entry.offset) < 0) return -1; if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1; if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1; @@ -887,11 +887,16 @@ void metaHbToMnode(void* param, void* tmrId) { .nodeId = pMeta->vgId, .stage = pMeta->stage, .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)), - .outputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->outputInfo.queue)), +// .outputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->outputInfo.queue)), }; entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; - entry.outputRate = entry.outputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; + if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { + entry.sinkQuota = (*pTask)->pTokenBucket->bytesRate; + entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); + } + +// entry.outputRate = entry.outputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; if ((*pTask)->exec.pWalReader != NULL) { entry.offset = walReaderGetCurrentVer((*pTask)->exec.pWalReader); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 7f12d471bc..2975d1f0f3 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -16,7 +16,7 @@ #include "streamInt.h" #define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MAX_SMOOTH_BURST_RATIO 5 // 20 sec +#define MAX_SMOOTH_BURST_RATIO 5 // 5 sec #define WAIT_FOR_DURATION 40 // todo refactor: