fix(stream): add some logs.
This commit is contained in:
parent
5ba6a68e7a
commit
8edf86b3bb
|
@ -313,7 +313,8 @@ typedef struct STaskSchedInfo {
|
|||
} STaskSchedInfo;
|
||||
|
||||
typedef struct SSinkTaskRecorder {
|
||||
int64_t numOfPackages;
|
||||
int64_t numOfSubmit;
|
||||
int64_t numOfBlocks;
|
||||
int64_t numOfRows;
|
||||
} SSinkTaskRecorder;
|
||||
|
||||
|
|
|
@ -272,7 +272,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
|
|||
continue;
|
||||
} else {
|
||||
hasSubmit = true;
|
||||
pTask->sinkRecorder.numOfPackages += 1;
|
||||
pTask->sinkRecorder.numOfBlocks += 1;
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
|
||||
code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData);
|
||||
|
@ -300,10 +300,13 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
|
|||
tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
|
||||
}
|
||||
|
||||
if ((pTask->sinkRecorder.numOfPackages % 5000) == 0) {
|
||||
pTask->sinkRecorder.numOfSubmit += 1;
|
||||
|
||||
if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) {
|
||||
SSinkTaskRecorder* pRec = &pTask->sinkRecorder;
|
||||
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) into dst table, duration:%.2fSec.",
|
||||
pTask->id.idStr, vgId, pRec->numOfPackages, pRec->numOfRows,
|
||||
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
|
||||
" submit into dst table, duration:%.2f Sec.",
|
||||
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit,
|
||||
(taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0);
|
||||
}
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue