fix(stream): fix stream task id error.

This commit is contained in:
Haojun Liao 2023-09-17 01:19:59 +08:00
parent 4846194bbb
commit 01144c58a7
9 changed files with 71 additions and 71 deletions

View File

@ -237,7 +237,7 @@ typedef struct SStreamChildEpInfo {
typedef struct STaskId {
int64_t streamId;
int32_t taskId;
int64_t taskId;
} STaskId;
typedef struct SStreamTaskId {
@ -393,7 +393,8 @@ typedef struct SStreamMeta {
TdThreadMutex backendMutex;
SMetaHbInfo* pHbInfo;
SHashObj* pUpdateTaskSet;
int32_t totalTasks; // this value should be increased when a new task is added into the meta
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
int32_t numOfPausedTasks;
int32_t chkptNotReadyTasks;
int64_t rid;
@ -402,7 +403,6 @@ typedef struct SStreamMeta {
SArray* chkpInUse;
int32_t chkpCap;
SRWLatch chkpDirLock;
int32_t pauseTaskNum;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@ -553,8 +553,7 @@ int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpo
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
typedef struct STaskStatusEntry {
int64_t streamId;
int32_t taskId;
STaskId id;
int32_t status;
} STaskStatusEntry;

View File

@ -1194,7 +1194,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i);
if (p->status != TASK_STATUS__NORMAL) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued",
p->streamId, p->taskId, 0, streamGetTaskStatusStr(p->status));
p->id.streamId, (int32_t)p->id.taskId, 0, streamGetTaskStatusStr(p->status));
ready = false;
break;
}
@ -1564,29 +1564,17 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus);
if (taskStatus == TASK_STATUS__NORMAL) {
memcpy(varDataVal(status), "normal", 6);
varDataSetLen(status, 6);
} else if (taskStatus == TASK_STATUS__DROPPING) {
memcpy(varDataVal(status), "dropping", 8);
varDataSetLen(status, 8);
} else if (taskStatus == TASK_STATUS__UNINIT) {
memcpy(varDataVal(status), "uninit", 6);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__STOP) {
memcpy(varDataVal(status), "stop", 4);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__SCAN_HISTORY) {
memcpy(varDataVal(status), "history", 7);
varDataSetLen(status, 7);
} else if (taskStatus == TASK_STATUS__HALT) {
memcpy(varDataVal(status), "halt", 4);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__PAUSE) {
memcpy(varDataVal(status), "pause", 5);
varDataSetLen(status, 5);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
int32_t *index = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id));
if (index == NULL) {
continue;
}
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index);
const char* pStatus = streamGetTaskStatusStr(pStatusEntry->status);
STR_TO_VARSTR(status, pStatus);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
@ -2269,16 +2257,16 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys));
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p == NULL) {
STaskStatusEntry entry = {
.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .status = TASK_STATUS__STOP};
.id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP};
taosArrayPush(pExecNode->pTaskList, &entry);
int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1;
taosHashPut(pExecNode->pTaskMap, keys, sizeof(keys), &ordinal, sizeof(ordinal));
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &ordinal, sizeof(ordinal));
}
}
}
@ -2311,8 +2299,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
int64_t k[2] = {p->streamId, p->taskId};
int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
int32_t *index = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id));
if (index == NULL) {
continue;
}
@ -2320,7 +2307,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index);
pStatusEntry->status = p->status;
if (p->status != TASK_STATUS__NORMAL) {
mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status));
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status));
}
}
taosThreadMutexUnlock(&execNodeList.lock);

View File

@ -259,7 +259,6 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
// sma
int32_t smaInit();

View File

@ -847,14 +847,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->streamTaskId.taskId, pTask->info.triggerParam);
pTask->info.fillHistory, (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam);
} else {
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->historyTaskId.taskId, pTask->info.triggerParam);
pTask->info.fillHistory, (int32_t)pTask->historyTaskId.taskId, pTask->info.triggerParam);
}
return 0;
@ -1081,7 +1081,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) {
// todo delete this task, if the related stream task is dropped
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
qError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr);
tqDebug("s-task:%s fill-history task set status to be dropping", id);
@ -1367,7 +1367,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already",
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
", it may have been dropped already",
pMeta->vgId, pTask->historyTaskId.taskId);
streamMetaReleaseTask(pMeta, pTask);
@ -1547,8 +1548,6 @@ FAIL:
return -1;
}
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
// todo error code cannot be return, since this is invoked by an mnode-launched transaction.
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
@ -1598,11 +1597,10 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
// set the initial value for generating check point
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = streamMetaGetNumOfStreamTasks(pMeta);
pMeta->totalTasks = pMeta->chkptNotReadyTasks;
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
}
total = taosArrayGetSize(pMeta->pTaskList);
total = pMeta->numOfStreamTasks;
taosWUnLockLatch(&pMeta->lock);
qDebug("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg, chkpt:%" PRId64 ", total checkpoint req:%d",

View File

@ -166,7 +166,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
return 0;
}
int32_t numOfPauseTasks = pTq->pStreamMeta->pauseTaskNum;
int32_t numOfPauseTasks = pTq->pStreamMeta->numOfPausedTasks;
if (ckPause && numOfTasks == numOfPauseTasks) {
tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);

View File

@ -182,8 +182,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
taosWLockLatch(&pMeta->lock);
if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = streamMetaGetNumOfStreamTasks(pMeta);
pMeta->totalTasks = pMeta->chkptNotReadyTasks;
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
}
taosWUnLockLatch(&pMeta->lock);
@ -315,15 +314,13 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
if (remain == 0) { // all tasks are ready
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
pMeta->totalTasks = 0;
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId,
pTask->checkpointingId);
} else {
qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId,
pTask->id.idStr, remain, pMeta->totalTasks);
pTask->id.idStr, remain, pMeta->numOfStreamTasks);
}
// send check point response to upstream task

View File

@ -204,8 +204,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL);
pMeta->pauseTaskNum = 0;
pMeta->numOfPausedTasks = 0;
pMeta->numOfStreamTasks = 0;
qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
stage);
return pMeta;
@ -411,6 +411,10 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
}
taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
if (pTask->info.fillHistory == 0) {
atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
}
*pAdded = true;
return 0;
}
@ -491,7 +495,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
if (ppTask) {
pTask = *ppTask;
if (streamTaskShouldPause(&pTask->status)) {
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
qInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
}
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
@ -640,8 +644,8 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;
qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId);
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno));
return -1;
@ -713,15 +717,17 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
return -1;
}
if (pTask->info.fillHistory == 0) {
atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
}
if (streamTaskShouldPause(&pTask->status)) {
atomic_add_fetch_32(&pMeta->pauseTaskNum, 1);
atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
}
ASSERT(pTask->status.downstreamReady == 0);
}
qInfo("vgId:%d pause task num:%d", pMeta->vgId, pMeta->pauseTaskNum);
tdbFree(pKey);
tdbFree(pVal);
if (tdbTbcClose(pCur) < 0) {
@ -737,7 +743,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
}
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
qDebug("vgId:%d load %d tasks into meta from disk completed", pMeta->vgId, numOfTasks);
qDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
taosArrayDestroy(pRecycleList);
return 0;
}
@ -749,8 +756,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
if (tEncodeI64(pEncoder, ps->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->taskId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
}
tEndEncode(pEncoder);
@ -765,8 +772,11 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry));
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
STaskStatusEntry hb = {0};
if (tDecodeI64(pDecoder, &hb.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &hb.taskId) < 0) return -1;
if (tDecodeI64(pDecoder, &hb.id.streamId) < 0) return -1;
int32_t taskId = 0;
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
hb.id.taskId = taskId;
if (tDecodeI32(pDecoder, &hb.status) < 0) return -1;
taosArrayPush(pReq->pTaskStatus, &hb);
@ -838,7 +848,7 @@ void metaHbToMnode(void* param, void* tmrId) {
continue;
}
STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus};
STaskStatusEntry entry = {.id = *pId, .status = (*pTask)->status.taskStatus};
taosArrayPush(hbMsg.pTaskStatus, &entry);
if (!hasValEpset) {

View File

@ -73,6 +73,7 @@ const char* streamGetTaskStatusStr(int32_t status) {
case TASK_STATUS__CK: return "check-point";
case TASK_STATUS__DROPPING: return "dropping";
case TASK_STATUS__STOP: return "stop";
case TASK_STATUS__UNINIT: return "uninitialized";
default:return "";
}
}
@ -244,6 +245,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
ASSERT(pTask->historyTaskId.taskId == 0);
} else {
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
streamTaskEnablePause(pTask);
}
}
@ -818,7 +820,7 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
}
if(pTask->info.taskLevel == TASK_LEVEL__SINK) {
int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1);
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
qInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
return;
}
@ -852,7 +854,7 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1);
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
qInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
taosWUnLockLatch(&pMeta->lock);
@ -872,10 +874,10 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
if (status == TASK_STATUS__PAUSE) {
pTask->status.taskStatus = pTask->status.keepTaskStatus;
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
qInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
qInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
} else {
qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));

View File

@ -165,9 +165,14 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;
if (tDecodeI32(pDecoder, &pTask->historyTaskId.taskId)) return -1;
int32_t taskId = pTask->historyTaskId.taskId;
if (tDecodeI32(pDecoder, &taskId)) return -1;
if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1;
if (tDecodeI32(pDecoder, &pTask->streamTaskId.taskId)) return -1;
taskId = pTask->streamTaskId.taskId;
if (tDecodeI32(pDecoder, &taskId)) return -1;
if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1;
if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;
@ -259,8 +264,11 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
if (ver != SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTaskId->taskId) < 0) return -1;
int32_t taskId = 0;
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
pTaskId->taskId = taskId;
tEndDecode(pDecoder);
return 0;
}