diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4c359975ce..2d70bb1e1c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -237,7 +237,7 @@ typedef struct SStreamChildEpInfo { typedef struct STaskId { int64_t streamId; - int32_t taskId; + int64_t taskId; } STaskId; typedef struct SStreamTaskId { @@ -393,7 +393,8 @@ typedef struct SStreamMeta { TdThreadMutex backendMutex; SMetaHbInfo* pHbInfo; SHashObj* pUpdateTaskSet; - int32_t totalTasks; // this value should be increased when a new task is added into the meta + int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta + int32_t numOfPausedTasks; int32_t chkptNotReadyTasks; int64_t rid; @@ -402,7 +403,6 @@ typedef struct SStreamMeta { SArray* chkpInUse; int32_t chkpCap; SRWLatch chkpDirLock; - int32_t pauseTaskNum; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -553,8 +553,7 @@ int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpo int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp); typedef struct STaskStatusEntry { - int64_t streamId; - int32_t taskId; + STaskId id; int32_t status; } STaskStatusEntry; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index cbc8e1e099..28d4716f0e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1194,7 +1194,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i); if (p->status != TASK_STATUS__NORMAL) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued", - p->streamId, p->taskId, 0, streamGetTaskStatusStr(p->status)); + p->id.streamId, (int32_t)p->id.taskId, 0, streamGetTaskStatusStr(p->status)); ready = false; break; } @@ -1564,29 +1564,17 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // status char status[20 + VARSTR_HEADER_SIZE] = {0}; - int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus); - if (taskStatus == TASK_STATUS__NORMAL) { - memcpy(varDataVal(status), "normal", 6); - varDataSetLen(status, 6); - } else if (taskStatus == TASK_STATUS__DROPPING) { - memcpy(varDataVal(status), "dropping", 8); - varDataSetLen(status, 8); - } else if (taskStatus == TASK_STATUS__UNINIT) { - memcpy(varDataVal(status), "uninit", 6); - varDataSetLen(status, 4); - } else if (taskStatus == TASK_STATUS__STOP) { - memcpy(varDataVal(status), "stop", 4); - varDataSetLen(status, 4); - } else if (taskStatus == TASK_STATUS__SCAN_HISTORY) { - memcpy(varDataVal(status), "history", 7); - varDataSetLen(status, 7); - } else if (taskStatus == TASK_STATUS__HALT) { - memcpy(varDataVal(status), "halt", 4); - varDataSetLen(status, 4); - } else if (taskStatus == TASK_STATUS__PAUSE) { - memcpy(varDataVal(status), "pause", 5); - varDataSetLen(status, 5); + + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + int32_t *index = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id)); + if (index == NULL) { + continue; } + + STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index); + const char* pStatus = streamGetTaskStatusStr(pStatusEntry->status); + STR_TO_VARSTR(status, pStatus); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&status, false); @@ -2269,16 +2257,16 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p int32_t numOfTasks = taosArrayGetSize(pLevel); for (int32_t j = 0; j < numOfTasks; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; - void *p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys)); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); if (p == NULL) { STaskStatusEntry entry = { - .streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; + .id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; taosArrayPush(pExecNode->pTaskList, &entry); int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1; - taosHashPut(pExecNode->pTaskMap, keys, sizeof(keys), &ordinal, sizeof(ordinal)); + taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &ordinal, sizeof(ordinal)); } } } @@ -2311,8 +2299,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - int64_t k[2] = {p->streamId, p->taskId}; - int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); + int32_t *index = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id)); if (index == NULL) { continue; } @@ -2320,7 +2307,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index); pStatusEntry->status = p->status; if (p->status != TASK_STATUS__NORMAL) { - mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status)); + mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status)); } } taosThreadMutexUnlock(&execNodeList.lock); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 536273c044..39f3d465f2 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -259,7 +259,6 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg); -int32_t tqCheckLogInWal(STQ* pTq, int64_t version); // sma int32_t smaInit(); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 867508bc5f..fe1ef1637f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -860,14 +860,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, pTask->streamTaskId.taskId, pTask->info.triggerParam); + pTask->info.fillHistory, (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam); } else { tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, pTask->historyTaskId.taskId, pTask->info.triggerParam); + pTask->info.fillHistory, (int32_t)pTask->historyTaskId.taskId, pTask->info.triggerParam); } return 0; @@ -1094,7 +1094,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { // todo delete this task, if the related stream task is dropped - qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", + qError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop fill-history task:%s", pTask->streamTaskId.taskId, pTask->id.idStr); tqDebug("s-task:%s fill-history task set status to be dropping", id); @@ -1380,7 +1380,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (pTask->historyTaskId.taskId != 0) { pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); if (pHistoryTask == NULL) { - tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already", + tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64 + ", it may have been dropped already", pMeta->vgId, pTask->historyTaskId.taskId); streamMetaReleaseTask(pMeta, pTask); @@ -1560,8 +1561,6 @@ FAIL: return -1; } -int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; } - // todo error code cannot be return, since this is invoked by an mnode-launched transaction. int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); @@ -1611,11 +1610,10 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { // set the initial value for generating check point // set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed if (pMeta->chkptNotReadyTasks == 0) { - pMeta->chkptNotReadyTasks = streamMetaGetNumOfStreamTasks(pMeta); - pMeta->totalTasks = pMeta->chkptNotReadyTasks; + pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks; } - total = taosArrayGetSize(pMeta->pTaskList); + total = pMeta->numOfStreamTasks; taosWUnLockLatch(&pMeta->lock); qDebug("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg, chkpt:%" PRId64 ", total checkpoint req:%d", diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index d82410e6ea..3cba4567fe 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -166,7 +166,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return 0; } - int32_t numOfPauseTasks = pTq->pStreamMeta->pauseTaskNum; + int32_t numOfPauseTasks = pTq->pStreamMeta->numOfPausedTasks; if (ckPause && numOfTasks == numOfPauseTasks) { tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index fce3526bee..a48f74ce86 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -182,8 +182,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc taosWLockLatch(&pMeta->lock); if (pMeta->chkptNotReadyTasks == 0) { - pMeta->chkptNotReadyTasks = streamMetaGetNumOfStreamTasks(pMeta); - pMeta->totalTasks = pMeta->chkptNotReadyTasks; + pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks; } taosWUnLockLatch(&pMeta->lock); @@ -315,15 +314,13 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if (remain == 0) { // all tasks are ready qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); - pMeta->totalTasks = 0; - streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId, pTask->checkpointingId); } else { qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId, - pTask->id.idStr, remain, pMeta->totalTasks); + pTask->id.idStr, remain, pMeta->numOfStreamTasks); } // send check point response to upstream task diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d3e57433a4..70c46bf2ed 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -205,8 +205,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF taosInitRWLatch(&pMeta->lock); taosThreadMutexInit(&pMeta->backendMutex, NULL); - pMeta->pauseTaskNum = 0; - + pMeta->numOfPausedTasks = 0; + pMeta->numOfStreamTasks = 0; qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage); return pMeta; @@ -412,6 +412,10 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); + if (pTask->info.fillHistory == 0) { + atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); + } + *pAdded = true; return 0; } @@ -492,7 +496,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t if (ppTask) { pTask = *ppTask; if (streamTaskShouldPause(&pTask->status)) { - int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); + int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); qInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); } atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); @@ -641,8 +645,8 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; - qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId); + if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno)); return -1; @@ -714,15 +718,17 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { return -1; } + if (pTask->info.fillHistory == 0) { + atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); + } + if (streamTaskShouldPause(&pTask->status)) { - atomic_add_fetch_32(&pMeta->pauseTaskNum, 1); + atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); } ASSERT(pTask->status.downstreamReady == 0); } - qInfo("vgId:%d pause task num:%d", pMeta->vgId, pMeta->pauseTaskNum); - tdbFree(pKey); tdbFree(pVal); if (tdbTbcClose(pCur) < 0) { @@ -738,7 +744,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { } int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - qDebug("vgId:%d load %d tasks into meta from disk completed", pMeta->vgId, numOfTasks); + qDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks, + pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); taosArrayDestroy(pRecycleList); return 0; } @@ -750,8 +757,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { for (int32_t i = 0; i < pReq->numOfTasks; ++i) { STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i); - if (tEncodeI64(pEncoder, ps->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, ps->taskId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1; + if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, ps->status) < 0) return -1; } tEndEncode(pEncoder); @@ -766,8 +773,11 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry)); for (int32_t i = 0; i < pReq->numOfTasks; ++i) { STaskStatusEntry hb = {0}; - if (tDecodeI64(pDecoder, &hb.streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &hb.taskId) < 0) return -1; + if (tDecodeI64(pDecoder, &hb.id.streamId) < 0) return -1; + int32_t taskId = 0; + if (tDecodeI32(pDecoder, &taskId) < 0) return -1; + + hb.id.taskId = taskId; if (tDecodeI32(pDecoder, &hb.status) < 0) return -1; taosArrayPush(pReq->pTaskStatus, &hb); @@ -839,7 +849,7 @@ void metaHbToMnode(void* param, void* tmrId) { continue; } - STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus}; + STaskStatusEntry entry = {.id = *pId, .status = (*pTask)->status.taskStatus}; taosArrayPush(hbMsg.pTaskStatus, &entry); if (!hasValEpset) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 7a318e2310..d28ec85dd5 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -73,6 +73,7 @@ const char* streamGetTaskStatusStr(int32_t status) { case TASK_STATUS__CK: return "check-point"; case TASK_STATUS__DROPPING: return "dropping"; case TASK_STATUS__STOP: return "stop"; + case TASK_STATUS__UNINIT: return "uninitialized"; default:return ""; } } @@ -244,6 +245,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { ASSERT(pTask->historyTaskId.taskId == 0); } else { qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); + streamTaskEnablePause(pTask); } } @@ -818,7 +820,7 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { } if(pTask->info.taskLevel == TASK_LEVEL__SINK) { - int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1); + int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); qInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); return; } @@ -852,7 +854,7 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); - int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1); + int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); qInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); taosWUnLockLatch(&pMeta->lock); @@ -872,10 +874,10 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) { if (status == TASK_STATUS__PAUSE) { pTask->status.taskStatus = pTask->status.keepTaskStatus; pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; - int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); + int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); qInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); + int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); qInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); } else { qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 117c795a8d..4f320c3de0 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -165,9 +165,14 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1; - if (tDecodeI32(pDecoder, &pTask->historyTaskId.taskId)) return -1; + + int32_t taskId = pTask->historyTaskId.taskId; + if (tDecodeI32(pDecoder, &taskId)) return -1; + if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1; - if (tDecodeI32(pDecoder, &pTask->streamTaskId.taskId)) return -1; + + taskId = pTask->streamTaskId.taskId; + if (tDecodeI32(pDecoder, &taskId)) return -1; if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1; if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1; @@ -259,8 +264,11 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) { if (ver != SSTREAM_TASK_VER) return -1; if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTaskId->taskId) < 0) return -1; + int32_t taskId = 0; + if (tDecodeI32(pDecoder, &taskId) < 0) return -1; + + pTaskId->taskId = taskId; tEndDecode(pDecoder); return 0; }