enh(stream): add some logs and record the submit package performance.

This commit is contained in:
Haojun Liao 2023-09-06 14:34:36 +08:00
parent e607a0a5cb
commit 8783ebcb02
3 changed files with 76 additions and 43 deletions

View File

@ -312,10 +312,17 @@ typedef struct STaskSchedInfo {
void* pTimer;
} STaskSchedInfo;
typedef struct SSinkTaskRecorder {
int64_t numOfPackages;
int64_t numOfRows;
} SSinkTaskRecorder;
typedef struct {
int64_t created;
int64_t init;
int64_t step1Start;
int64_t step2Start;
int64_t sinkStart;
} STaskTimestamp;
struct SStreamTask {
@ -345,6 +352,7 @@ struct SStreamTask {
STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
};
SSinkTaskRecorder sinkRecorder;
void* launchTaskTimer;
SMsgCb* pMsgCb; // msg handle

View File

@ -133,7 +133,6 @@ static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
return TSDB_CODE_SUCCESS;
}
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
@ -144,6 +143,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
int32_t code = TSDB_CODE_SUCCESS;
if (pTask->tsInfo.sinkStart == 0) {
pTask->tsInfo.sinkStart = taosGetTimestampMs();
}
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, pTask->id.idStr, numOfBlocks);
SArray* tagArray = NULL;
@ -259,10 +262,17 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
continue;
} else {
pTask->sinkRecorder.numOfPackages += 1;
code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask);
}
}
if ((pTask->sinkRecorder.numOfPackages % 5000) == 0) {
SSinkTaskRecorder* pRec = &pTask->sinkRecorder;
tqInfo("s-task:%s vgId:d write %d blocks (%" PRId64 " rows) into dst table, duration:%.2fSec.", pTask->id.idStr,
vgId, pRec->numOfPackages, pRec->numOfRows, (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0);
}
tqDebug("vgId:%d, s-task:%s write results completed", vgId, pTask->id.idStr);
_end:
@ -399,6 +409,55 @@ static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSi
return code;
}
static int32_t tqBuildSubmitReq(SSubmitTbData* pTableData, int32_t vgId, void** pMsg, int32_t* msgLen) {
int32_t code = 0;
void* pBuf = NULL;
*msgLen = 0;
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
if (submitReq.aSubmitTbData == NULL) {
tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
taosArrayDestroy(pTableData->aRowP);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(submitReq.aSubmitTbData, pTableData);
// encode
int32_t len = 0;
tEncodeSize(tEncodeSubmitReq, &submitReq, len, code);
SEncoder encoder;
len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len);
if (NULL == pBuf) {
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
return TSDB_CODE_OUT_OF_MEMORY;
}
((SSubmitReq2Msg*)pBuf)->header.vgId = vgId;
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSubmitReq(&encoder, &submitReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req, code:%s, ignore and continue", terrstr());
tEncoderClear(&encoder);
rpcFreeCont(pBuf);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
return code;
}
tEncoderClear(&encoder);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
*msgLen = len;
*pMsg = pBuf;
return TSDB_CODE_SUCCESS;
}
int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock,
SStreamTask* pTask) {
int32_t numOfRows = pDataBlock->info.rows;
@ -406,7 +465,6 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
uint64_t groupId = pDataBlock->info.id.groupId;
STSchema* pTSchema = pTask->tbSink.pTSchema;
int32_t code = TSDB_CODE_SUCCESS;
void* pBuf = NULL;
SArray* pVals = NULL;
const char* id = pTask->id.idStr;
@ -573,7 +631,7 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
void* colData = colDataGetData(pColData, j);
if (IS_STR_DATA_TYPE(pCol->type)) {
// address copy, no value
SValue sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)};
SValue sv = (SValue){.nData = varDataLen(colData), .pData = (uint8_t*) varDataVal(colData)};
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
} else {
@ -601,53 +659,20 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
taosArrayPush(tbData.aRowP, &pRow);
}
SSubmitReq2 submitReq = {0};
if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
taosArrayDestroy(tbData.aRowP);
taosArrayDestroy(pVals);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(submitReq.aSubmitTbData, &tbData);
// encode
int32_t len = 0;
tEncodeSize(tEncodeSubmitReq, &submitReq, len, code);
SEncoder encoder;
len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len);
if (NULL == pBuf) {
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
taosArrayDestroy(tbData.aRowP);
void* pBuf = NULL;
code = tqBuildSubmitReq(&tbData, vgId, &pBuf, &len);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pVals);
}
((SSubmitReq2Msg*)pBuf)->header.vgId = vgId;
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSubmitReq(&encoder, &submitReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req, code:%s, ignore and continue", terrstr());
tEncoderClear(&encoder);
rpcFreeCont(pBuf);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
return code;
}
tEncoderClear(&encoder);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
pTask->sinkRecorder.numOfRows += numOfRows;
SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len };
SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len};
code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
if(code == TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_SUCCESS) {
tqDebug("s-task:%s send submit msg to dstTable:%s, numOfRows:%d", id, pTableSinkInfo->name.data, numOfRows);
} else {
tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());

View File

@ -375,7 +375,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
return -1;
}
pTask->tsInfo.init = taosGetTimestampMs();
pTask->tsInfo.created = taosGetTimestampMs();
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMeta = pMeta;