diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f6862621f9..208b5d3fa0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -506,7 +506,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { .initTqReader = true, .version = ver, }; - pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols); + pHandle->execHandle.execCol.task[i] = + qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols); ASSERT(pHandle->execHandle.execCol.task[i]); void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner); @@ -679,9 +680,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // SStreamTaskRunReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (pTask) { - streamProcessRunReq(pTask); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + streamProcessRunReq(*ppTask); return 0; } else { return -1; @@ -696,14 +697,14 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { SDecoder decoder; tDecoderInit(&decoder, msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); - int32_t taskId = req.taskId; - SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (pTask) { + int32_t taskId = req.taskId; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (ppTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0, }; - streamProcessDispatchReq(pTask, &req, &rsp); + streamProcessDispatchReq(*ppTask, &req, &rsp); return 0; } else { return -1; @@ -713,9 +714,9 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (pTask) { - streamProcessRecoverReq(pTask, pReq, pMsg); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + streamProcessRecoverReq(*ppTask, pReq, pMsg); return 0; } else { return -1; @@ -725,9 +726,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = pRsp->taskId; - SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (pTask) { - streamProcessDispatchRsp(pTask, pRsp); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + streamProcessDispatchRsp(*ppTask, pRsp); return 0; } else { return -1; @@ -737,9 +738,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverRsp* pRsp = pMsg->pCont; int32_t taskId = pRsp->taskId; - SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (pTask) { - streamProcessRecoverRsp(pTask, pRsp); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + streamProcessRecoverRsp(*ppTask, pRsp); return 0; } else { return -1; @@ -749,10 +750,10 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; - SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); - if (pTask) { + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); + if (ppTask) { taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); - atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); + atomic_store_8(&(*ppTask)->taskStatus, TASK_STATUS__DROPPING); } // todo // clear queue @@ -780,16 +781,17 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { SDecoder decoder; tDecoderInit(&decoder, msgBody, msgLen); tDecodeStreamRetrieveReq(&decoder, &req); - int32_t taskId = req.dstTaskId; - SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { - return 0; + int32_t taskId = req.dstTaskId; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + SRpcMsg rsp = { + .info = pMsg->info, + .code = 0, + }; + streamProcessRetrieveReq(*ppTask, &req, &rsp); + } else { + return -1; } - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; - streamProcessRetrieveReq(pTask, &req, &rsp); return 0; } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 84e5a58c1a..4bb6f07a47 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -94,7 +94,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { haystack = candidate + 1; } if (found || offset == 0) break; - offset = TMIN(0, offset - readSize + 8); + offset = TMIN(0, offset - readSize + sizeof(uint64_t)); int64_t offset2 = taosLSeekFile(pFile, offset, SEEK_SET); ASSERT(offset == offset2); if (readSize != taosReadFile(pFile, buf, readSize)) {