refactor: update logs.
This commit is contained in:
parent
39cac9d308
commit
777ed1761f
|
@ -821,12 +821,11 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// do recovery step 1
|
// do recovery step 1
|
||||||
tqDebug("s-task:%s start recover step 1 scan", pTask->id.idStr);
|
tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr);
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
streamSourceRecoverScanStep1(pTask);
|
streamSourceRecoverScanStep1(pTask);
|
||||||
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
|
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
|
||||||
tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
|
tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
|
||||||
|
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
|
@ -834,7 +833,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
tqDebug("s-task:%s recover step 1 ended, elapsed time:%.2fs", pTask->id.idStr, el);
|
tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
|
||||||
|
|
||||||
// build msg to launch next step
|
// build msg to launch next step
|
||||||
SStreamRecoverStep2Req req;
|
SStreamRecoverStep2Req req;
|
||||||
|
@ -861,7 +860,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
memcpy(serializedReq, &req, len);
|
memcpy(serializedReq, &req, len);
|
||||||
|
|
||||||
// dispatch msg
|
// dispatch msg
|
||||||
tqDebug("s-task:%s start recover block stage(step 2)", pTask->id.idStr);
|
tqDebug("s-task:%s step 1 finished, send msg to start blocking recover stage(step 2)", pTask->id.idStr);
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
|
.code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
|
||||||
|
@ -902,6 +901,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
// set status normal
|
// set status normal
|
||||||
|
tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
|
||||||
code = streamSetStatusNormal(pTask);
|
code = streamSetStatusNormal(pTask);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
|
@ -909,7 +909,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st)/ 1000.0;
|
double el = (taosGetTimestampMs() - st)/ 1000.0;
|
||||||
tqDebug("s-task:%s step2 recover finished, el:%.2f s", pTask->id.idStr, el);
|
tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
|
||||||
|
|
||||||
// dispatch recover finish req to all related downstream task
|
// dispatch recover finish req to all related downstream task
|
||||||
code = streamDispatchRecoverFinishReq(pTask);
|
code = streamDispatchRecoverFinishReq(pTask);
|
||||||
|
@ -1367,12 +1367,12 @@ int32_t tqStartStreamTasks(STQ* pTq) {
|
||||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||||
if (pRunReq == NULL) {
|
if (pRunReq == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tqError("vgId:%d failed restore stream tasks, code:%s", vgId, terrstr());
|
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
|
||||||
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks);
|
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
|
||||||
pRunReq->head.vgId = vgId;
|
pRunReq->head.vgId = vgId;
|
||||||
pRunReq->streamId = 0;
|
pRunReq->streamId = 0;
|
||||||
pRunReq->taskId = WAL_READ_TASKS_ID;
|
pRunReq->taskId = WAL_READ_TASKS_ID;
|
||||||
|
|
|
@ -100,7 +100,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
|
|
||||||
int32_t status = pTask->status.taskStatus;
|
int32_t status = pTask->status.taskStatus;
|
||||||
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
|
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
|
||||||
tqDebug("s-task:%s not source task, no need to start", pTask->id.idStr);
|
tqDebug("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel);
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -113,7 +113,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tInputQueueIsFull(pTask)) {
|
if (tInputQueueIsFull(pTask)) {
|
||||||
tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr);
|
tqDebug("s-task:%s input queue is full, do nothing", pTask->id.idStr);
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -216,8 +216,8 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
|
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
|
||||||
qDebug("vgId:%d s-task:%s receive dispatch req from taskId:%d", pReq->upstreamNodeId, pTask->id.idStr,
|
qDebug("s-task:%s receive dispatch msg from taskId:%d (vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId,
|
||||||
pReq->upstreamTaskId);
|
pReq->upstreamNodeId);
|
||||||
|
|
||||||
streamTaskEnqueueBlocks(pTask, pReq, pRsp);
|
streamTaskEnqueueBlocks(pTask, pReq, pRsp);
|
||||||
tDeleteStreamDispatchReq(pReq);
|
tDeleteStreamDispatchReq(pReq);
|
||||||
|
|
Loading…
Reference in New Issue