enh(stream): add more info for sink task.

This commit is contained in:
Haojun Liao 2023-10-07 01:14:00 +08:00
parent 98659ad323
commit c20dd002fb
7 changed files with 44 additions and 34 deletions

View File

@ -602,8 +602,8 @@ typedef struct STaskStatusEntry {
int64_t offset; // only valid for source task int64_t offset; // only valid for source task
double inputQUsed; // in MiB double inputQUsed; // in MiB
double inputRate; double inputRate;
double outputQUsed; // in MiB double sinkQuota; // existed quota size for sink task
double outputRate; double sinkDataSize; // sink to dest data size
} STaskStatusEntry; } STaskStatusEntry;
typedef struct SStreamHbMsg { typedef struct SStreamHbMsg {

View File

@ -1603,9 +1603,15 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); // colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
// offset version info if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
const char* offsetStr = "%"PRId64"[%"PRId64",%"PRId64"]"; const char* sinkStr = "Quota:%2.fMiB, SinkData:%.2fMiB";
sprintf(buf, offsetStr, pe->offset, pe->verStart, pe->verEnd); sprintf(buf, sinkStr, pe->sinkQuota, pe->sinkDataSize);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// offset version info
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
sprintf(buf, offsetStr, pe->offset, pe->verStart, pe->verEnd);
}
STR_TO_VARSTR(vbuf, buf); STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -2459,11 +2465,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
pEntry->stage = p->stage; pEntry->stage = p->stage;
pEntry->inputQUsed = p->inputQUsed; pEntry->inputQUsed = p->inputQUsed;
pEntry->inputRate = p->inputRate; pEntry->inputRate = p->inputRate;
pEntry->outputQUsed = p->outputQUsed; // pEntry->outputQUsed = p->outputQUsed;
pEntry->outputRate = p->outputRate; // pEntry->outputRate = p->outputRate;
pEntry->offset = p->offset; pEntry->offset = p->offset;
pEntry->verStart = p->verStart; pEntry->verStart = p->verStart;
pEntry->verEnd = p->verEnd; pEntry->verEnd = p->verEnd;
pEntry->sinkQuota = p->sinkQuota;
pEntry->sinkDataSize = p->sinkDataSize;
} }
pEntry->status = p->status; pEntry->status = p->status;

View File

@ -24,6 +24,7 @@ typedef struct STableSinkInfo {
tstr name; tstr name;
} STableSinkInfo; } STableSinkInfo;
static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
static int32_t tsAscendingSortFn(const void* p1, const void* p2); static int32_t tsAscendingSortFn(const void* p1, const void* p2);
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
SSubmitTbData* pTableData); SSubmitTbData* pTableData);
@ -744,6 +745,17 @@ int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlo
return code; return code;
} }
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
for(int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* p = taosArrayGet(pBlocks, i);
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
return false;
}
}
return true;
}
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
const SArray* pBlocks = (const SArray*)data; const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode; SVnode* pVnode = (SVnode*)vnode;
@ -755,19 +767,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
if (pTask->execInfo.start == 0) { bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
pTask->execInfo.start = taosGetTimestampMs();
}
bool onlySubmitData = true;
for(int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* p = taosArrayGet(pBlocks, i);
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
onlySubmitData = false;
break;
}
}
if (!onlySubmitData) { if (!onlySubmitData) {
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id, tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
numOfBlocks); numOfBlocks);

View File

@ -372,10 +372,7 @@ static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32
SStreamQueueItem* pItem = NULL; SStreamQueueItem* pItem = NULL;
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id); int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
if ((code != TSDB_CODE_SUCCESS || pItem == NULL)/* && (numOfItems + numOfNewItems == 0)*/) { // failed, continue if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue
// handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader));
// streamMetaReleaseTask(pMeta, pTask);
// taosThreadMutexUnlock(&pTask->lock);
break; break;
} }
@ -459,7 +456,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
bool hasNewData = doPutDataIntoInputQFromWal(pTask, maxVer, &numOfItems); bool hasNewData = doPutDataIntoInputQFromWal(pTask, maxVer, &numOfItems);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
if (/*(code == TSDB_CODE_SUCCESS) || */(numOfItems > 0) || hasNewData) { if ((numOfItems > 0) || hasNewData) {
noDataInWal = false; noDataInWal = false;
code = streamSchedExec(pTask); code = streamSchedExec(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -40,8 +40,8 @@ extern "C" {
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) #define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
#define STREAM_TASK_QUEUE_CAPACITY 20480 #define STREAM_TASK_QUEUE_CAPACITY 20480
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30) #define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30)
// clang-format off // clang-format off
#define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)

View File

@ -776,8 +776,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->outputQUsed) < 0) return -1; if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->outputRate) < 0) return -1; if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1;
if (tEncodeI64(pEncoder, ps->offset) < 0) return -1; if (tEncodeI64(pEncoder, ps->offset) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1; if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1; if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1;
@ -803,8 +803,8 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.outputQUsed) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.outputRate) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.offset) < 0) return -1; if (tDecodeI64(pDecoder, &entry.offset) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1; if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1; if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1;
@ -887,11 +887,16 @@ void metaHbToMnode(void* param, void* tmrId) {
.nodeId = pMeta->vgId, .nodeId = pMeta->vgId,
.stage = pMeta->stage, .stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)), .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)),
.outputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->outputInfo.queue)), // .outputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->outputInfo.queue)),
}; };
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
entry.outputRate = entry.outputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
entry.sinkQuota = (*pTask)->pTokenBucket->bytesRate;
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
}
// entry.outputRate = entry.outputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
if ((*pTask)->exec.pWalReader != NULL) { if ((*pTask)->exec.pWalReader != NULL) {
entry.offset = walReaderGetCurrentVer((*pTask)->exec.pWalReader); entry.offset = walReaderGetCurrentVer((*pTask)->exec.pWalReader);

View File

@ -16,7 +16,7 @@
#include "streamInt.h" #include "streamInt.h"
#define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_STREAM_EXEC_BATCH_NUM 32
#define MAX_SMOOTH_BURST_RATIO 5 // 20 sec #define MAX_SMOOTH_BURST_RATIO 5 // 5 sec
#define WAIT_FOR_DURATION 40 #define WAIT_FOR_DURATION 40
// todo refactor: // todo refactor: