refactor: do some internal refactor.
This commit is contained in:
parent
0db357401d
commit
a248f3d088
|
@ -326,18 +326,21 @@ typedef struct SSinkRecorder {
|
||||||
int64_t numOfSubmit;
|
int64_t numOfSubmit;
|
||||||
int64_t numOfBlocks;
|
int64_t numOfBlocks;
|
||||||
int64_t numOfRows;
|
int64_t numOfRows;
|
||||||
int64_t bytes;
|
int64_t dataSize;
|
||||||
} SSinkRecorder;
|
} SSinkRecorder;
|
||||||
|
|
||||||
typedef struct STaskExecStatisInfo {
|
typedef struct STaskExecStatisInfo {
|
||||||
int64_t created;
|
int64_t created;
|
||||||
int64_t init;
|
int64_t init;
|
||||||
|
int64_t start;
|
||||||
int64_t step1Start;
|
int64_t step1Start;
|
||||||
int64_t step2Start;
|
int64_t step2Start;
|
||||||
int64_t start;
|
|
||||||
int32_t updateCount;
|
int32_t updateCount;
|
||||||
int32_t dispatch;
|
|
||||||
int64_t latestUpdateTs;
|
int64_t latestUpdateTs;
|
||||||
|
int32_t processDataBlocks;
|
||||||
|
int64_t processDataSize;
|
||||||
|
int32_t dispatch;
|
||||||
|
int64_t dispatchDataSize;
|
||||||
int32_t checkpoint;
|
int32_t checkpoint;
|
||||||
SSinkRecorder sink;
|
SSinkRecorder sink;
|
||||||
} STaskExecStatisInfo;
|
} STaskExecStatisInfo;
|
||||||
|
@ -415,7 +418,6 @@ typedef struct SStreamMeta {
|
||||||
FTaskExpand* expandFunc;
|
FTaskExpand* expandFunc;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int64_t stage;
|
int64_t stage;
|
||||||
// bool leader;
|
|
||||||
int32_t role;
|
int32_t role;
|
||||||
STaskStartInfo startInfo;
|
STaskStartInfo startInfo;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
|
|
|
@ -272,11 +272,12 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2*
|
||||||
SSinkRecorder* pRec = &pTask->execInfo.sink;
|
SSinkRecorder* pRec = &pTask->execInfo.sink;
|
||||||
|
|
||||||
pRec->numOfSubmit += 1;
|
pRec->numOfSubmit += 1;
|
||||||
if ((pRec->numOfSubmit % 5000) == 0) {
|
if ((pRec->numOfSubmit % 1000) == 0) {
|
||||||
double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0;
|
||||||
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
|
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
|
||||||
" submit into dst table, %.2fMiB duration:%.2f Sec.",
|
" submit into dst table, %.2fMiB duration:%.2f Sec.",
|
||||||
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->bytes), el);
|
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize),
|
||||||
|
el);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -556,7 +556,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
// here only handle the data block sink operation
|
// here only handle the data block sink operation
|
||||||
if (type == STREAM_INPUT__DATA_BLOCK) {
|
if (type == STREAM_INPUT__DATA_BLOCK) {
|
||||||
int32_t blockSize = streamQueueItemGetSize(pInput);
|
int32_t blockSize = streamQueueItemGetSize(pInput);
|
||||||
pTask->execInfo.sink.bytes += blockSize;
|
pTask->execInfo.sink.dataSize += blockSize;
|
||||||
|
|
||||||
stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize));
|
stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize));
|
||||||
doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
|
doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
|
||||||
|
|
|
@ -154,6 +154,12 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
||||||
int32_t taskLevel = pTask->info.taskLevel;
|
int32_t taskLevel = pTask->info.taskLevel;
|
||||||
*numOfBlocks = 0;
|
*numOfBlocks = 0;
|
||||||
|
|
||||||
|
// no available token in bucket for sink task, let's wait
|
||||||
|
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskHasAvailableToken(pTask->pTokenBucket))) {
|
||||||
|
stDebug("s-task:%s no available token in bucket for sink data, wait", id);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
|
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
|
||||||
stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
|
stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
|
||||||
|
|
Loading…
Reference in New Issue