fix(stream): add some logs.

This commit is contained in:
Haojun Liao 2023-08-24 23:27:48 +08:00
parent 218e2cc5f3
commit f7ea875af1
2 changed files with 5 additions and 53 deletions

View File

@ -237,55 +237,6 @@ void tqNotifyClose(STQ* pTq) {
tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el);
}
// static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
// int64_t consumerId, int32_t type) {
// int32_t len = 0;
// int32_t code = 0;
//
// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
// tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
// tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
// }
//
// if (code < 0) {
// return -1;
// }
//
// int32_t tlen = sizeof(SMqRspHead) + len;
// void* buf = rpcMallocCont(tlen);
// if (buf == NULL) {
// return -1;
// }
//
// ((SMqRspHead*)buf)->mqMsgType = type;
// ((SMqRspHead*)buf)->epoch = epoch;
// ((SMqRspHead*)buf)->consumerId = consumerId;
//
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
//
// SEncoder encoder = {0};
// tEncoderInit(&encoder, abuf, len);
//
// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
// tEncodeMqDataRsp(&encoder, pRsp);
// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
// tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
// }
//
// tEncoderClear(&encoder);
//
// SRpcMsg rsp = {
// .info = *pRpcHandleInfo,
// .pCont = buf,
// .contLen = tlen,
// .code = 0,
// };
//
// tmsgSendRsp(&rsp);
// return 0;
// }
int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
SMqPollReq req = {0};
if (tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req) < 0) {

View File

@ -274,7 +274,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
p->chkInfo.checkpointId = p->checkpointingId;
p->status.taskStatus = TASK_STATUS__NORMAL;
streamSetStatusNormal(p);
// save the task
streamMetaSaveTask(pMeta, p);
@ -308,14 +308,15 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
ASSERT(remain >= 0);
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId,
pTask->checkpointingId);
} else {
qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, not ready:%d/%d", pMeta->vgId, remain,
(int32_t)taosArrayGetSize(pMeta->pTaskList));
qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d",
pMeta->vgId, pTask->id.idStr, remain, (int32_t)taosArrayGetSize(pMeta->pTaskList));
}
// send check point response to upstream task