fix(stream): fix some typo.

This commit is contained in:
Haojun Liao 2024-06-17 14:39:46 +08:00
parent b44d87475d
commit 848b1e19b5
2 changed files with 10 additions and 7 deletions

View File

@ -605,12 +605,13 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
numOfTasks = taosArrayGetSize(pMeta->pTaskList); numOfTasks = taosArrayGetSize(pMeta->pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i); SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
if (pId->streamId != streamId) { if (pId->streamId != streamId) {
continue; continue;
} }
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL) { if (ppTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%" PRIx64 " in retrieving progress", pMeta->vgId, pId->taskId); tqError("vgId:%d failed to acquire task:0x%" PRIx64 " in retrieving progress", pMeta->vgId, pId->taskId);
continue; continue;

View File

@ -1007,9 +1007,10 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i); SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (pTask == NULL) { if (pTask == NULL) {
continue; continue;
} }
@ -1020,7 +1021,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
} }
STaskStatusEntry entry = { STaskStatusEntry entry = {
.id = *pId, .id = id,
.status = streamTaskGetStatus(*pTask)->state, .status = streamTaskGetStatus(*pTask)->state,
.nodeId = hbMsg.vgId, .nodeId = hbMsg.vgId,
.stage = pMeta->stage, .stage = pMeta->stage,
@ -1508,8 +1509,9 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
int32_t num = taosArrayGetSize(pMeta->pTaskList); int32_t num = taosArrayGetSize(pMeta->pTaskList);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pTaskId, sizeof(*pTaskId)); STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL) { if (ppTask == NULL) {
continue; continue;
} }