fix(stream): fix stream task id error.

This commit is contained in:
Haojun Liao 2023-09-17 01:19:59 +08:00
parent 7ccc273206
commit 0459a4628c
9 changed files with 71 additions and 71 deletions

View File

@ -237,7 +237,7 @@ typedef struct SStreamChildEpInfo {
typedef struct STaskId { typedef struct STaskId {
int64_t streamId; int64_t streamId;
int32_t taskId; int64_t taskId;
} STaskId; } STaskId;
typedef struct SStreamTaskId { typedef struct SStreamTaskId {
@ -393,7 +393,8 @@ typedef struct SStreamMeta {
TdThreadMutex backendMutex; TdThreadMutex backendMutex;
SMetaHbInfo* pHbInfo; SMetaHbInfo* pHbInfo;
SHashObj* pUpdateTaskSet; SHashObj* pUpdateTaskSet;
int32_t totalTasks; // this value should be increased when a new task is added into the meta int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
int32_t numOfPausedTasks;
int32_t chkptNotReadyTasks; int32_t chkptNotReadyTasks;
int64_t rid; int64_t rid;
@ -402,7 +403,6 @@ typedef struct SStreamMeta {
SArray* chkpInUse; SArray* chkpInUse;
int32_t chkpCap; int32_t chkpCap;
SRWLatch chkpDirLock; SRWLatch chkpDirLock;
int32_t pauseTaskNum;
} SStreamMeta; } SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@ -553,8 +553,7 @@ int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpo
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp); int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
typedef struct STaskStatusEntry { typedef struct STaskStatusEntry {
int64_t streamId; STaskId id;
int32_t taskId;
int32_t status; int32_t status;
} STaskStatusEntry; } STaskStatusEntry;

View File

@ -1194,7 +1194,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i); STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i);
if (p->status != TASK_STATUS__NORMAL) { if (p->status != TASK_STATUS__NORMAL) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued", mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued",
p->streamId, p->taskId, 0, streamGetTaskStatusStr(p->status)); p->id.streamId, (int32_t)p->id.taskId, 0, streamGetTaskStatusStr(p->status));
ready = false; ready = false;
break; break;
} }
@ -1564,29 +1564,17 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// status // status
char status[20 + VARSTR_HEADER_SIZE] = {0}; char status[20 + VARSTR_HEADER_SIZE] = {0};
int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus);
if (taskStatus == TASK_STATUS__NORMAL) { STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
memcpy(varDataVal(status), "normal", 6); int32_t *index = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id));
varDataSetLen(status, 6); if (index == NULL) {
} else if (taskStatus == TASK_STATUS__DROPPING) { continue;
memcpy(varDataVal(status), "dropping", 8);
varDataSetLen(status, 8);
} else if (taskStatus == TASK_STATUS__UNINIT) {
memcpy(varDataVal(status), "uninit", 6);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__STOP) {
memcpy(varDataVal(status), "stop", 4);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__SCAN_HISTORY) {
memcpy(varDataVal(status), "history", 7);
varDataSetLen(status, 7);
} else if (taskStatus == TASK_STATUS__HALT) {
memcpy(varDataVal(status), "halt", 4);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__PAUSE) {
memcpy(varDataVal(status), "pause", 5);
varDataSetLen(status, 5);
} }
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index);
const char* pStatus = streamGetTaskStatusStr(pStatusEntry->status);
STR_TO_VARSTR(status, pStatus);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&status, false); colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
@ -2269,16 +2257,16 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p
int32_t numOfTasks = taosArrayGetSize(pLevel); int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) { for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j); SStreamTask *pTask = taosArrayGetP(pLevel, j);
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys)); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p == NULL) { if (p == NULL) {
STaskStatusEntry entry = { STaskStatusEntry entry = {
.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; .id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP};
taosArrayPush(pExecNode->pTaskList, &entry); taosArrayPush(pExecNode->pTaskList, &entry);
int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1; int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1;
taosHashPut(pExecNode->pTaskMap, keys, sizeof(keys), &ordinal, sizeof(ordinal)); taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &ordinal, sizeof(ordinal));
} }
} }
} }
@ -2311,8 +2299,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
for (int32_t i = 0; i < req.numOfTasks; ++i) { for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
int64_t k[2] = {p->streamId, p->taskId}; int32_t *index = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id));
int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
if (index == NULL) { if (index == NULL) {
continue; continue;
} }
@ -2320,7 +2307,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index); STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index);
pStatusEntry->status = p->status; pStatusEntry->status = p->status;
if (p->status != TASK_STATUS__NORMAL) { if (p->status != TASK_STATUS__NORMAL) {
mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status)); mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status));
} }
} }
taosThreadMutexUnlock(&execNodeList.lock); taosThreadMutexUnlock(&execNodeList.lock);

View File

@ -259,7 +259,6 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
// sma // sma
int32_t smaInit(); int32_t smaInit();

View File

@ -860,14 +860,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms", " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->streamTaskId.taskId, pTask->info.triggerParam); pTask->info.fillHistory, (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam);
} else { } else {
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64 " nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->historyTaskId.taskId, pTask->info.triggerParam); pTask->info.fillHistory, (int32_t)pTask->historyTaskId.taskId, pTask->info.triggerParam);
} }
return 0; return 0;
@ -1094,7 +1094,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
// todo delete this task, if the related stream task is dropped // todo delete this task, if the related stream task is dropped
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", qError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr); pTask->streamTaskId.taskId, pTask->id.idStr);
tqDebug("s-task:%s fill-history task set status to be dropping", id); tqDebug("s-task:%s fill-history task set status to be dropping", id);
@ -1380,7 +1380,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (pTask->historyTaskId.taskId != 0) { if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHistoryTask == NULL) { if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already", tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
", it may have been dropped already",
pMeta->vgId, pTask->historyTaskId.taskId); pMeta->vgId, pTask->historyTaskId.taskId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
@ -1560,8 +1561,6 @@ FAIL:
return -1; return -1;
} }
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
// todo error code cannot be return, since this is invoked by an mnode-launched transaction. // todo error code cannot be return, since this is invoked by an mnode-launched transaction.
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
@ -1611,11 +1610,10 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
// set the initial value for generating check point // set the initial value for generating check point
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed // set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
if (pMeta->chkptNotReadyTasks == 0) { if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = streamMetaGetNumOfStreamTasks(pMeta); pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
pMeta->totalTasks = pMeta->chkptNotReadyTasks;
} }
total = taosArrayGetSize(pMeta->pTaskList); total = pMeta->numOfStreamTasks;
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
qDebug("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg, chkpt:%" PRId64 ", total checkpoint req:%d", qDebug("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg, chkpt:%" PRId64 ", total checkpoint req:%d",

View File

@ -166,7 +166,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
return 0; return 0;
} }
int32_t numOfPauseTasks = pTq->pStreamMeta->pauseTaskNum; int32_t numOfPauseTasks = pTq->pStreamMeta->numOfPausedTasks;
if (ckPause && numOfTasks == numOfPauseTasks) { if (ckPause && numOfTasks == numOfPauseTasks) {
tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId); tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);

View File

@ -182,8 +182,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
if (pMeta->chkptNotReadyTasks == 0) { if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = streamMetaGetNumOfStreamTasks(pMeta); pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
pMeta->totalTasks = pMeta->chkptNotReadyTasks;
} }
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
@ -315,15 +314,13 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
if (remain == 0) { // all tasks are ready if (remain == 0) { // all tasks are ready
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
pMeta->totalTasks = 0;
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId, qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId,
pTask->checkpointingId); pTask->checkpointingId);
} else { } else {
qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId, qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId,
pTask->id.idStr, remain, pMeta->totalTasks); pTask->id.idStr, remain, pMeta->numOfStreamTasks);
} }
// send check point response to upstream task // send check point response to upstream task

View File

@ -205,8 +205,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
taosInitRWLatch(&pMeta->lock); taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL); taosThreadMutexInit(&pMeta->backendMutex, NULL);
pMeta->pauseTaskNum = 0; pMeta->numOfPausedTasks = 0;
pMeta->numOfStreamTasks = 0;
qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
stage); stage);
return pMeta; return pMeta;
@ -412,6 +412,10 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
} }
taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
if (pTask->info.fillHistory == 0) {
atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
}
*pAdded = true; *pAdded = true;
return 0; return 0;
} }
@ -492,7 +496,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
if (ppTask) { if (ppTask) {
pTask = *ppTask; pTask = *ppTask;
if (streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldPause(&pTask->status)) {
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
qInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); qInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
} }
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
@ -641,8 +645,8 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL; TBC* pCur = NULL;
qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId); qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId);
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno)); qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno));
return -1; return -1;
@ -714,15 +718,17 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
return -1; return -1;
} }
if (pTask->info.fillHistory == 0) {
atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
}
if (streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldPause(&pTask->status)) {
atomic_add_fetch_32(&pMeta->pauseTaskNum, 1); atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
} }
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
} }
qInfo("vgId:%d pause task num:%d", pMeta->vgId, pMeta->pauseTaskNum);
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
if (tdbTbcClose(pCur) < 0) { if (tdbTbcClose(pCur) < 0) {
@ -738,7 +744,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
} }
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
qDebug("vgId:%d load %d tasks into meta from disk completed", pMeta->vgId, numOfTasks); qDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
taosArrayDestroy(pRecycleList); taosArrayDestroy(pRecycleList);
return 0; return 0;
} }
@ -750,8 +757,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
for (int32_t i = 0; i < pReq->numOfTasks; ++i) { for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i); STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
if (tEncodeI64(pEncoder, ps->streamId) < 0) return -1; if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->taskId) < 0) return -1; if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->status) < 0) return -1; if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
} }
tEndEncode(pEncoder); tEndEncode(pEncoder);
@ -766,8 +773,11 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry)); pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry));
for (int32_t i = 0; i < pReq->numOfTasks; ++i) { for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
STaskStatusEntry hb = {0}; STaskStatusEntry hb = {0};
if (tDecodeI64(pDecoder, &hb.streamId) < 0) return -1; if (tDecodeI64(pDecoder, &hb.id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &hb.taskId) < 0) return -1; int32_t taskId = 0;
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
hb.id.taskId = taskId;
if (tDecodeI32(pDecoder, &hb.status) < 0) return -1; if (tDecodeI32(pDecoder, &hb.status) < 0) return -1;
taosArrayPush(pReq->pTaskStatus, &hb); taosArrayPush(pReq->pTaskStatus, &hb);
@ -839,7 +849,7 @@ void metaHbToMnode(void* param, void* tmrId) {
continue; continue;
} }
STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus}; STaskStatusEntry entry = {.id = *pId, .status = (*pTask)->status.taskStatus};
taosArrayPush(hbMsg.pTaskStatus, &entry); taosArrayPush(hbMsg.pTaskStatus, &entry);
if (!hasValEpset) { if (!hasValEpset) {

View File

@ -73,6 +73,7 @@ const char* streamGetTaskStatusStr(int32_t status) {
case TASK_STATUS__CK: return "check-point"; case TASK_STATUS__CK: return "check-point";
case TASK_STATUS__DROPPING: return "dropping"; case TASK_STATUS__DROPPING: return "dropping";
case TASK_STATUS__STOP: return "stop"; case TASK_STATUS__STOP: return "stop";
case TASK_STATUS__UNINIT: return "uninitialized";
default:return ""; default:return "";
} }
} }
@ -244,6 +245,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
ASSERT(pTask->historyTaskId.taskId == 0); ASSERT(pTask->historyTaskId.taskId == 0);
} else { } else {
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
streamTaskEnablePause(pTask);
} }
} }
@ -818,7 +820,7 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
} }
if(pTask->info.taskLevel == TASK_LEVEL__SINK) { if(pTask->info.taskLevel == TASK_LEVEL__SINK) {
int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1); int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
qInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); qInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
return; return;
} }
@ -852,7 +854,7 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1); int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
qInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); qInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
@ -872,10 +874,10 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
if (status == TASK_STATUS__PAUSE) { if (status == TASK_STATUS__PAUSE) {
pTask->status.taskStatus = pTask->status.keepTaskStatus; pTask->status.taskStatus = pTask->status.keepTaskStatus;
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
qInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); qInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
qInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); qInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
} else { } else {
qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));

View File

@ -165,9 +165,14 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1; if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;
if (tDecodeI32(pDecoder, &pTask->historyTaskId.taskId)) return -1;
int32_t taskId = pTask->historyTaskId.taskId;
if (tDecodeI32(pDecoder, &taskId)) return -1;
if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1; if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1;
if (tDecodeI32(pDecoder, &pTask->streamTaskId.taskId)) return -1;
taskId = pTask->streamTaskId.taskId;
if (tDecodeI32(pDecoder, &taskId)) return -1;
if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1; if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1;
if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1; if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;
@ -259,8 +264,11 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
if (ver != SSTREAM_TASK_VER) return -1; if (ver != SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTaskId->taskId) < 0) return -1;
int32_t taskId = 0;
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
pTaskId->taskId = taskId;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
} }