fix(stream): keep the status entry in hash table, instead entry index.

This commit is contained in:
Haojun Liao 2023-09-19 10:33:58 +08:00
parent 5c0b8ea804
commit ff3ea366e2
1 changed files with 29 additions and 17 deletions

View File

@ -125,7 +125,7 @@ int32_t mndInitStream(SMnode *pMnode) {
taosThreadMutexInit(&execNodeList.lock, NULL); taosThreadMutexInit(&execNodeList.lock, NULL);
execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskStatusEntry)); execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskId));
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
@ -1183,10 +1183,15 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
taosThreadMutexLock(&execNodeList.lock); taosThreadMutexLock(&execNodeList.lock);
for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) {
STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i); STaskId *p = taosArrayGet(execNodeList.pTaskList, i);
if (p->status != TASK_STATUS__NORMAL) { STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
continue;
}
if (pEntry->status != TASK_STATUS__NORMAL) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued", mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued",
p->id.streamId, (int32_t)p->id.taskId, 0, streamGetTaskStatusStr(p->status)); pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status));
ready = false; ready = false;
break; break;
} }
@ -1557,13 +1562,12 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
char status[20 + VARSTR_HEADER_SIZE] = {0}; char status[20 + VARSTR_HEADER_SIZE] = {0};
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
int32_t *index = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id)); STaskStatusEntry* pe = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id));
if (index == NULL) { if (pe == NULL) {
continue; continue;
} }
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index); const char* pStatus = streamGetTaskStatusStr(pe->status);
const char* pStatus = streamGetTaskStatusStr(pStatusEntry->status);
STR_TO_VARSTR(status, pStatus); STR_TO_VARSTR(status, pStatus);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -2254,10 +2258,8 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNod
if (p == NULL) { if (p == NULL) {
STaskStatusEntry entry = { STaskStatusEntry entry = {
.id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; .id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP};
taosArrayPush(pExecNode->pTaskList, &entry); taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
taosArrayPush(pExecNode->pTaskList, &id);
int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1;
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &ordinal, sizeof(ordinal));
} }
} }
} }
@ -2275,11 +2277,21 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecN
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p != NULL) { if (p != NULL) {
taosArrayRemove(pExecNode->pTaskList, *(int32_t*)p);
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id)); taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId* pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
break;
}
}
} }
} }
} }
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
} }
// todo: this process should be executed by the write queue worker of the mnode // todo: this process should be executed by the write queue worker of the mnode
@ -2308,13 +2320,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
for (int32_t i = 0; i < req.numOfTasks; ++i) { for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
int32_t *index = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id)); STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id));
if (index == NULL) { if (pEntry == NULL) {
mError("s-task:0x%"PRIx64" not found in mnode task list", p->id.taskId);
continue; continue;
} }
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index); pEntry->status = p->status;
pStatusEntry->status = p->status;
if (p->status != TASK_STATUS__NORMAL) { if (p->status != TASK_STATUS__NORMAL) {
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status)); mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status));
} }