refactor(stream): update logs.

This commit is contained in:
Haojun Liao 2025-02-20 16:55:10 +08:00
parent 99d6086c5a
commit 0a672f1b96
2 changed files with 10 additions and 6 deletions

View File

@ -388,7 +388,7 @@ void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) {
} }
double el = (now - pTrans->startTime) / 1000.0; double el = (now - pTrans->startTime) / 1000.0;
mInfo("stream:%s id:%" PRIx64 " ongoing checkpoint trans, id:%d, elapsed time:%.2fs killed", pTrans->name, mInfo("stream:0x%" PRIx64 " start to kill ongoing long checkpoint transId:%d, elapsed time:%.2fs. killed",
pTrans->streamId, pTrans->transId, el); pTrans->streamId, pTrans->transId, el);
SStreamObj *p = NULL; SStreamObj *p = NULL;
@ -396,9 +396,10 @@ void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) {
if (code == 0 && p != NULL) { if (code == 0 && p != NULL) {
mndKillTransImpl(pMnode, pTrans->transId, p->sourceDb); mndKillTransImpl(pMnode, pTrans->transId, p->sourceDb);
mDebug("create reset task trans for stream:%s 0x%" PRIx64, pTrans->name, pTrans->streamId); mDebug("stream:%s 0x%" PRIx64 " transId:%d checkpointId:%" PRId64 " create reset task trans", p->name,
mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId); pTrans->streamId, pTrans->transId, p->checkpointId);
mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId);
sdbRelease(pMnode->pSdb, p); sdbRelease(pMnode->pSdb, p);
} }
} }

View File

@ -777,7 +777,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
int32_t code = 0; int32_t code = 0;
// merge multiple input data if possible in the input queue. // merge multiple input data if possible in the input queue.
stDebug("s-task:%s start to extract data block from inputQ", id); int64_t st = taosGetTimestampMs();
stDebug("s-task:%s start to extract data block from inputQ, ts:%" PRId64, id, st);
while (1) { while (1) {
int32_t blockSize = 0; int32_t blockSize = 0;
@ -807,8 +808,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
return 0; return 0;
} }
int64_t st = taosGetTimestampMs();
EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
if (ret == EXEC_AFTER_IDLE) { if (ret == EXEC_AFTER_IDLE) {
streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
@ -825,6 +824,10 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
// dispatch checkpoint msg to all downstream tasks // dispatch checkpoint msg to all downstream tasks
int32_t type = pInput->type; int32_t type = pInput->type;
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
// Injection error: for automatic kill long trans test
taosMsleep(50*1000);
code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput); code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput);
if (code != 0) { if (code != 0) {
stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code)); stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code));