From 4846194bbbd21e35633d732443a95b24d2653f5a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 16 Sep 2023 21:37:30 +0800 Subject: [PATCH 1/4] refactor: do some internal refactor, remove the related fill-history task by sending mesg to replay it in follower node. --- include/libs/stream/tstream.h | 27 +++---- source/common/src/tglobal.c | 2 +- source/dnode/snode/src/snode.c | 18 +++-- source/dnode/vnode/src/tq/tq.c | 16 ++-- source/dnode/vnode/src/tq/tqStreamTask.c | 4 +- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 9 +-- source/libs/stream/inc/streamInt.h | 9 +++ source/libs/stream/src/streamCheckpoint.c | 7 +- source/libs/stream/src/streamExec.c | 4 +- source/libs/stream/src/streamMeta.c | 79 ++++++++++---------- source/libs/stream/src/streamRecover.c | 23 +++--- source/libs/stream/src/streamTask.c | 28 ++++--- 12 files changed, 112 insertions(+), 114 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c41834bd82..4c359975ce 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -106,6 +106,7 @@ typedef struct { } SStreamQueueItem; typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data); +typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver); typedef struct { @@ -154,8 +155,6 @@ typedef struct { int64_t size; } SStreamQueueRes; -void streamFreeQitem(SStreamQueueItem* data); - #if 0 bool streamQueueResEmpty(const SStreamQueueRes* pRes); int64_t streamQueueResSize(const SStreamQueueRes* pRes); @@ -185,12 +184,6 @@ typedef struct { int32_t streamInit(); void streamCleanUp(); -SStreamQueue* streamQueueOpen(int64_t cap); -void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); -void streamQueueProcessSuccess(SStreamQueue* queue); -void streamQueueProcessFail(SStreamQueue* queue); -void* streamQueueNextItem(SStreamQueue* pQueue); - SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); @@ -222,8 +215,6 @@ typedef struct { SSHashObj* pTblInfo; } STaskSinkTb; -typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); - typedef struct { int64_t smaId; // following are not applicable to encoder and decoder @@ -244,10 +235,10 @@ typedef struct SStreamChildEpInfo { int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer } SStreamChildEpInfo; -typedef struct SStreamTaskKey { +typedef struct STaskId { int64_t streamId; int32_t taskId; -} SStreamTaskKey; +} STaskId; typedef struct SStreamTaskId { int64_t streamId; @@ -341,8 +332,8 @@ struct SStreamTask { SCheckpointInfo chkInfo; STaskExec exec; SDataRange dataRange; - SStreamTaskId historyTaskId; - SStreamTaskId streamTaskId; + STaskId historyTaskId; + STaskId streamTaskId; STaskExecStatisInfo taskExecInfo; SArray* pReadyMsgList; // SArray TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ @@ -386,7 +377,7 @@ typedef struct SStreamMeta { TTB* pTaskDb; TTB* pCheckpointDb; SHashObj* pTasksMap; - SArray* pTaskList; // SArray + SArray* pTaskList; // SArray void* ahandle; TXN* txn; FTaskExpand* expandFunc; @@ -425,7 +416,7 @@ void tFreeStreamTask(SStreamTask* pTask); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); -int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId); +int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId); int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock); @@ -516,7 +507,7 @@ typedef struct { int32_t downstreamTaskId; int32_t upstreamNodeId; int32_t childId; -} SStreamScanHistoryFinishReq, SStreamTransferReq; +} SStreamScanHistoryFinishReq; int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq); int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq); @@ -704,7 +695,7 @@ void streamMetaCleanup(); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage); void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store -int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey); +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 6e1b30d3c8..4bf4e64e52 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -240,7 +240,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointTickInterval = 30; +int32_t tsStreamCheckpointTickInterval = 300; int32_t tsStreamNodeCheckInterval = 10; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 2b1885fb0e..ef9c1ebe2e 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -189,15 +189,17 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); - - SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId); - if (pTask == NULL) { - qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId); - return 0; - } - streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId); - streamMetaReleaseTask(pSnode->pMeta, pTask); + + // commit the update + taosWLockLatch(&pSnode->pMeta->lock); + int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); + qDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks); + + if (streamMetaCommit(pSnode->pMeta) < 0) { + // persist to disk + } + taosWUnLockLatch(&pSnode->pMeta->lock); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e665e4c408..66d183440f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -726,7 +726,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { SStreamTask* pStateTask = pTask; SStreamTask task = {0}; if (pTask->info.fillHistory) { - task.id = pTask->streamTaskId; + task.id.streamId = pTask->streamTaskId.streamId; + task.id.taskId = pTask->streamTaskId.taskId; task.pMeta = pTask->pMeta; pStateTask = &task; } @@ -760,7 +761,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { SStreamTask* pSateTask = pTask; SStreamTask task = {0}; if (pTask->info.fillHistory) { - task.id = pTask->streamTaskId; + task.id.streamId = pTask->streamTaskId.streamId; + task.id.taskId = pTask->streamTaskId.taskId; task.pMeta = pTask->pMeta; pSateTask = &task; } @@ -1675,9 +1677,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { taosWLockLatch(&pMeta->lock); // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. - int64_t keys[2] = {req.streamId, req.taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); - + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL || *ppTask == NULL) { tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, req.taskId); @@ -1695,10 +1696,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask** ppHTask = NULL; if (pTask->historyTaskId.taskId != 0) { - keys[0] = pTask->historyTaskId.streamId; - keys[1] = pTask->historyTaskId.taskId; - - ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->historyTaskId, sizeof(pTask->historyTaskId)); if (ppHTask == NULL || *ppHTask == NULL) { tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", pMeta->vgId, req.taskId); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 1e66988aab..d82410e6ea 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -240,8 +240,8 @@ int32_t tqStartStreamTasks(STQ* pTq) { for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); - int64_t key[2] = {pTaskId->streamId, pTaskId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, key, sizeof(key)); + STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); int8_t status = (*pTask)->status.taskStatus; if (status == TASK_STATUS__STOP && (*pTask)->info.fillHistory != 1) { diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index f0da479700..458e2c74ff 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -227,23 +227,20 @@ _err: int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; STQ* pTq = pWriter->pTq; - STqHandle handle; SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; if (pHdr->type == SNAP_DATA_STREAM_TASK) { - SStreamTaskId task = {0}; + STaskId taskId = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - - code = tDecodeStreamTaskId(&decoder, &task); + code = tDecodeStreamTaskId(&decoder, &taskId); if (code < 0) { tDecoderClear(&decoder); goto _err; } tDecoderClear(&decoder); - // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) - int64_t key[2] = {task.streamId, task.taskId}; + int64_t key[2] = {taskId.streamId, taskId.taskId}; taosWLockLatch(&pTq->pStreamMeta->lock); if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index fb11ec4ea4..64df8e2f44 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -86,6 +86,15 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate); +SStreamQueue* streamQueueOpen(int64_t cap); +void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); +void streamQueueProcessSuccess(SStreamQueue* queue); +void streamQueueProcessFail(SStreamQueue* queue); +void* streamQueueNextItem(SStreamQueue* pQueue); +void streamFreeQitem(SStreamQueueItem* data); + +STaskId extractStreamTaskKey(const SStreamTask* pTask); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index cfbfdb5da4..fce3526bee 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -266,11 +266,8 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { int64_t keys[2]; for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { - SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - keys[0] = pId->streamId; - keys[1] = pId->taskId; - - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); if (ppTask == NULL) { continue; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3b3dca7f5f..91c46c8ad9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -300,10 +300,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { qError( "s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related " "fill-history task", - pTask->id.idStr, pTask->streamTaskId.taskId); + pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId); // 1. free it and remove fill-history task from disk meta-store -// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id); // 2. save to disk @@ -371,6 +370,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 5. clear the link between fill-history task and stream task info pStreamTask->historyTaskId.taskId = 0; + pStreamTask->historyTaskId.streamId = 0; // 6. save to disk taosWLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 78eee339f1..afb8349234 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -36,7 +36,6 @@ static void metaHbToMnode(void* param, void* tmrId); static void streamMetaClear(SStreamMeta* pMeta); static int32_t streamMetaBegin(SStreamMeta* pMeta); static void streamMetaCloseImpl(void* arg); -static void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask); typedef struct { TdThreadMutex mutex; @@ -361,10 +360,8 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); - int64_t key[2] = {0}; - extractStreamTaskKey(key, pTask); - - if (tdbTbUpsert(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn) < 0) { + int64_t id[2] = {pTask->id.streamId, pTask->id.taskId}; + if (tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn) < 0) { qError("s-task:%s save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno)); return -1; } @@ -373,18 +370,14 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { return 0; } -void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask) { - pKey[0] = pTask->id.streamId; - pKey[1] = pTask->id.taskId; -} - -int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey) { - int32_t code = tdbTbDelete(pMeta->pTaskDb, pKey, STREAM_TASK_KEY_LEN, pMeta->txn); +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) { + int64_t key[2] = {pTaskId->streamId, pTaskId->taskId}; + int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn); if (code != 0) { - qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pKey[1], + qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t) pTaskId->taskId, tstrerror(terrno)); } else { - qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pKey[1]); + qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t) pTaskId->taskId); } return code; @@ -394,8 +387,8 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey) { int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) { *pAdded = false; - int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; - void* p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { tFreeStreamTask(pTask); @@ -417,7 +410,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa return 0; } - taosHashPut(pMeta->pTasksMap, keys, sizeof(keys), &pTask, POINTER_BYTES); + taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); *pAdded = true; return 0; } @@ -432,10 +425,8 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) { int32_t num = 0; size_t size = taosArrayGetSize(pMeta->pTaskList); for (int32_t i = 0; i < size; ++i) { - SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - int64_t keys[2] = {pId->streamId, pId->taskId}; - - SStreamTask** p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + SStreamTask** p = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); if (p == NULL) { continue; } @@ -451,8 +442,8 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) { SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { taosRLockLatch(&pMeta->lock); - int64_t keys[2] = {streamId, taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + STaskId id = {.streamId = streamId, .taskId = taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask != NULL) { if (!streamTaskShouldStop(&(*ppTask)->status)) { int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); @@ -495,8 +486,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t // pre-delete operation taosWLockLatch(&pMeta->lock); - int64_t keys[2] = {streamId, taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + STaskId id = {.streamId = streamId, .taskId = taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; if (streamTaskShouldPause(&pTask->status)) { @@ -516,7 +507,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t while (1) { taosRLockLatch(&pMeta->lock); - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { if ((*ppTask)->status.timerActive == 0) { @@ -535,9 +526,9 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t // let's do delete of stream task taosWLockLatch(&pMeta->lock); - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { - taosHashRemove(pMeta->pTasksMap, keys, sizeof(keys)); + taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); ASSERT(pTask->status.timerActive == 0); @@ -550,7 +541,17 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t streamMetaReleaseTask(pMeta, pTask); } - streamMetaRemoveTask(pMeta, keys); + // it is an fill-history task, remove the related stream task's id that points to it + if ((*ppTask)->info.fillHistory == 1) { + STaskId id1 = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId}; + SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id1, sizeof(id1)); + if (ppStreamTask != NULL) { + (*ppStreamTask)->historyTaskId.taskId = 0; + (*ppStreamTask)->historyTaskId.streamId = 0; + } + } + + streamMetaRemoveTask(pMeta, &id); streamMetaReleaseTask(pMeta, pTask); } else { qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); @@ -651,7 +652,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { void* pVal = NULL; int32_t vLen = 0; SDecoder decoder; - SArray* pRecycleList = taosArrayInit(4, STREAM_TASK_KEY_LEN); + SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId)); tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { @@ -678,18 +679,17 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int32_t taskId = pTask->id.taskId; tFreeStreamTask(pTask); - int64_t key[2] = {0}; - extractStreamTaskKey(key, pTask); + STaskId id = extractStreamTaskKey(pTask); - taosArrayPush(pRecycleList, key); + taosArrayPush(pRecycleList, &id); int32_t total = taosArrayGetSize(pRecycleList); qDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total); continue; } // do duplicate task check. - int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; - void* p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { // pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader // In this case, we try not to start fill-history task anymore. @@ -707,7 +707,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { continue; } - if (taosHashPut(pMeta->pTasksMap, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) { + if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) < 0) { doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); return -1; @@ -731,7 +731,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (taosArrayGetSize(pRecycleList) > 0) { for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { - int64_t* pId = taosArrayGet(pRecycleList, i); + STaskId* pId = taosArrayGet(pRecycleList, i); streamMetaRemoveTask(pMeta, pId); } } @@ -831,10 +831,9 @@ void metaHbToMnode(void* param, void* tmrId) { hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); + STaskId* pId = taosArrayGet(pMeta->pTaskList, i); - int64_t keys[2] = {pId->streamId, pId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); if ((*pTask)->info.fillHistory == 1) { continue; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index db2e418171..7a318e2310 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -21,8 +21,7 @@ typedef struct SStreamTaskRetryInfo { SStreamMeta* pMeta; - int32_t taskId; - int64_t streamId; + STaskId id; } SStreamTaskRetryInfo; static int32_t streamSetParamForScanHistory(SStreamTask* pTask); @@ -520,12 +519,10 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { SStreamTaskRetryInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; - qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId); + qDebug("s-task:0x%x in timer to launch related history task", (int32_t) pInfo->id.taskId); taosWLockLatch(&pMeta->lock); - int64_t keys[2] = {pInfo->streamId, pInfo->taskId}; - - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id)); if (ppTask) { ASSERT((*ppTask)->status.timerActive >= 1); @@ -541,7 +538,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { } taosWUnLockLatch(&pMeta->lock); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->streamId, pInfo->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId); if (pTask != NULL) { ASSERT(pTask->status.timerActive >= 1); @@ -552,7 +549,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { qWarn( "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been " "destroyed, or should stop", - pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId); + pTask->id.idStr, pMeta->vgId, pStatus, (int32_t) pTask->historyTaskId.taskId); taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer); streamMetaReleaseTask(pMeta, pTask); @@ -568,7 +565,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { atomic_sub_fetch_8(&pTask->status.timerActive, 1); streamMetaReleaseTask(pMeta, pTask); } else { - qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId); + qError("s-task:0x%x failed to load task, it may have been destroyed", (int32_t) pInfo->id.taskId); } taosMemoryFree(pInfo); @@ -587,17 +584,15 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, pTask->historyTaskId.streamId, hTaskId); - int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId}; - // Set the execute conditions, including the query time window and the version range - SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->historyTaskId, sizeof(pTask->historyTaskId)); if (pHTask == NULL) { qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, pMeta->vgId, hTaskId); SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo)); - pInfo->taskId = pTask->id.taskId; - pInfo->streamId = pTask->id.streamId; + pInfo->id.taskId = pTask->id.taskId; + pInfo->id.streamId = pTask->id.streamId; pInfo->pMeta = pTask->pMeta; if (pTask->launchTaskTimer == NULL) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 23ace63d18..117c795a8d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -251,7 +251,8 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) tEndDecode(pDecoder); return 0; } -int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId) { + +int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) { int64_t ver; if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &ver) < 0) return -1; @@ -478,8 +479,8 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); if (pInfo->nodeId == nodeId) { epsetAssign(&pInfo->epSet, pEpSet); - qDebug("s-task:0x%x update the upstreamInfo, nodeId:%d taskId:0x%x newEpset:%s", pTask->id.taskId, nodeId, - pInfo->taskId, buf); + qDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, + pInfo->taskId, nodeId, buf); break; } } @@ -509,7 +510,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE if (pVgInfo->vgId == nodeId) { epsetAssign(&pVgInfo->epSet, pEpSet); - qDebug("s-task:0x%x update the dispatch info, nodeId:%d newEpset:%s", pTask->id.taskId, nodeId, buf); + qDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, + pVgInfo->taskId, nodeId, buf); break; } } @@ -517,7 +519,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher; if (pDispatcher->nodeId == nodeId) { epsetAssign(&pDispatcher->epSet, pEpSet); - qDebug("s-task:0x%x update the dispatch info, nodeId:%d newEpSet:%s", pTask->id.taskId, nodeId, buf); + qDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId, + pDispatcher->taskId, nodeId, buf); } } else { // do nothing @@ -567,17 +570,19 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { STaskExecStatisInfo* p = &pTask->taskExecInfo; - qDebug("s-task:%s update task nodeEp epset, update count:%d, prevTs:%"PRId64, pTask->id.idStr, - p->updateCount + 1, p->latestUpdateTs); - p->updateCount += 1; + int32_t numOfNodes = taosArrayGetSize(pNodeList); + int64_t prevTs = p->latestUpdateTs; + p->latestUpdateTs = taosGetTimestampMs(); + p->updateCount += 1; + qDebug("s-task:%s update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.idStr, + numOfNodes, p->updateCount, prevTs); for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i); doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp); } - return 0; } @@ -649,3 +654,8 @@ int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamT qDebug("vgId:%d build and send drop table:0x%x msg", vgId, pTaskId->taskId); return code; } + +STaskId extractStreamTaskKey(const SStreamTask* pTask) { + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + return id; +} \ No newline at end of file From 01144c58a785c6bfd8f97afaa5aabb536cfd6e7f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 17 Sep 2023 01:19:59 +0800 Subject: [PATCH 2/4] fix(stream): fix stream task id error. --- include/libs/stream/tstream.h | 9 ++--- source/dnode/mnode/impl/src/mndStream.c | 47 ++++++++--------------- source/dnode/vnode/src/inc/vnodeInt.h | 1 - source/dnode/vnode/src/tq/tq.c | 16 ++++---- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- source/libs/stream/src/streamCheckpoint.c | 7 +--- source/libs/stream/src/streamMeta.c | 36 ++++++++++------- source/libs/stream/src/streamRecover.c | 10 +++-- source/libs/stream/src/streamTask.c | 14 +++++-- 9 files changed, 71 insertions(+), 71 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4c359975ce..2d70bb1e1c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -237,7 +237,7 @@ typedef struct SStreamChildEpInfo { typedef struct STaskId { int64_t streamId; - int32_t taskId; + int64_t taskId; } STaskId; typedef struct SStreamTaskId { @@ -393,7 +393,8 @@ typedef struct SStreamMeta { TdThreadMutex backendMutex; SMetaHbInfo* pHbInfo; SHashObj* pUpdateTaskSet; - int32_t totalTasks; // this value should be increased when a new task is added into the meta + int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta + int32_t numOfPausedTasks; int32_t chkptNotReadyTasks; int64_t rid; @@ -402,7 +403,6 @@ typedef struct SStreamMeta { SArray* chkpInUse; int32_t chkpCap; SRWLatch chkpDirLock; - int32_t pauseTaskNum; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -553,8 +553,7 @@ int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpo int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp); typedef struct STaskStatusEntry { - int64_t streamId; - int32_t taskId; + STaskId id; int32_t status; } STaskStatusEntry; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 6ff78cd103..32f1fafdfc 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1194,7 +1194,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i); if (p->status != TASK_STATUS__NORMAL) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued", - p->streamId, p->taskId, 0, streamGetTaskStatusStr(p->status)); + p->id.streamId, (int32_t)p->id.taskId, 0, streamGetTaskStatusStr(p->status)); ready = false; break; } @@ -1564,29 +1564,17 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // status char status[20 + VARSTR_HEADER_SIZE] = {0}; - int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus); - if (taskStatus == TASK_STATUS__NORMAL) { - memcpy(varDataVal(status), "normal", 6); - varDataSetLen(status, 6); - } else if (taskStatus == TASK_STATUS__DROPPING) { - memcpy(varDataVal(status), "dropping", 8); - varDataSetLen(status, 8); - } else if (taskStatus == TASK_STATUS__UNINIT) { - memcpy(varDataVal(status), "uninit", 6); - varDataSetLen(status, 4); - } else if (taskStatus == TASK_STATUS__STOP) { - memcpy(varDataVal(status), "stop", 4); - varDataSetLen(status, 4); - } else if (taskStatus == TASK_STATUS__SCAN_HISTORY) { - memcpy(varDataVal(status), "history", 7); - varDataSetLen(status, 7); - } else if (taskStatus == TASK_STATUS__HALT) { - memcpy(varDataVal(status), "halt", 4); - varDataSetLen(status, 4); - } else if (taskStatus == TASK_STATUS__PAUSE) { - memcpy(varDataVal(status), "pause", 5); - varDataSetLen(status, 5); + + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + int32_t *index = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id)); + if (index == NULL) { + continue; } + + STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index); + const char* pStatus = streamGetTaskStatusStr(pStatusEntry->status); + STR_TO_VARSTR(status, pStatus); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&status, false); @@ -2269,16 +2257,16 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p int32_t numOfTasks = taosArrayGetSize(pLevel); for (int32_t j = 0; j < numOfTasks; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; - void *p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys)); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); if (p == NULL) { STaskStatusEntry entry = { - .streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; + .id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; taosArrayPush(pExecNode->pTaskList, &entry); int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1; - taosHashPut(pExecNode->pTaskMap, keys, sizeof(keys), &ordinal, sizeof(ordinal)); + taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &ordinal, sizeof(ordinal)); } } } @@ -2311,8 +2299,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - int64_t k[2] = {p->streamId, p->taskId}; - int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); + int32_t *index = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id)); if (index == NULL) { continue; } @@ -2320,7 +2307,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index); pStatusEntry->status = p->status; if (p->status != TASK_STATUS__NORMAL) { - mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status)); + mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status)); } } taosThreadMutexUnlock(&execNodeList.lock); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 536273c044..39f3d465f2 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -259,7 +259,6 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg); -int32_t tqCheckLogInWal(STQ* pTq, int64_t version); // sma int32_t smaInit(); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 66d183440f..e639e272fa 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -847,14 +847,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, pTask->streamTaskId.taskId, pTask->info.triggerParam); + pTask->info.fillHistory, (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam); } else { tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, pTask->historyTaskId.taskId, pTask->info.triggerParam); + pTask->info.fillHistory, (int32_t)pTask->historyTaskId.taskId, pTask->info.triggerParam); } return 0; @@ -1081,7 +1081,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { // todo delete this task, if the related stream task is dropped - qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", + qError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop fill-history task:%s", pTask->streamTaskId.taskId, pTask->id.idStr); tqDebug("s-task:%s fill-history task set status to be dropping", id); @@ -1367,7 +1367,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (pTask->historyTaskId.taskId != 0) { pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); if (pHistoryTask == NULL) { - tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already", + tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64 + ", it may have been dropped already", pMeta->vgId, pTask->historyTaskId.taskId); streamMetaReleaseTask(pMeta, pTask); @@ -1547,8 +1548,6 @@ FAIL: return -1; } -int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; } - // todo error code cannot be return, since this is invoked by an mnode-launched transaction. int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); @@ -1598,11 +1597,10 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { // set the initial value for generating check point // set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed if (pMeta->chkptNotReadyTasks == 0) { - pMeta->chkptNotReadyTasks = streamMetaGetNumOfStreamTasks(pMeta); - pMeta->totalTasks = pMeta->chkptNotReadyTasks; + pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks; } - total = taosArrayGetSize(pMeta->pTaskList); + total = pMeta->numOfStreamTasks; taosWUnLockLatch(&pMeta->lock); qDebug("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg, chkpt:%" PRId64 ", total checkpoint req:%d", diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index d82410e6ea..3cba4567fe 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -166,7 +166,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return 0; } - int32_t numOfPauseTasks = pTq->pStreamMeta->pauseTaskNum; + int32_t numOfPauseTasks = pTq->pStreamMeta->numOfPausedTasks; if (ckPause && numOfTasks == numOfPauseTasks) { tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index fce3526bee..a48f74ce86 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -182,8 +182,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc taosWLockLatch(&pMeta->lock); if (pMeta->chkptNotReadyTasks == 0) { - pMeta->chkptNotReadyTasks = streamMetaGetNumOfStreamTasks(pMeta); - pMeta->totalTasks = pMeta->chkptNotReadyTasks; + pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks; } taosWUnLockLatch(&pMeta->lock); @@ -315,15 +314,13 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if (remain == 0) { // all tasks are ready qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); - pMeta->totalTasks = 0; - streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId, pTask->checkpointingId); } else { qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId, - pTask->id.idStr, remain, pMeta->totalTasks); + pTask->id.idStr, remain, pMeta->numOfStreamTasks); } // send check point response to upstream task diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index afb8349234..970af07aa1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -204,8 +204,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF taosInitRWLatch(&pMeta->lock); taosThreadMutexInit(&pMeta->backendMutex, NULL); - pMeta->pauseTaskNum = 0; - + pMeta->numOfPausedTasks = 0; + pMeta->numOfStreamTasks = 0; qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage); return pMeta; @@ -411,6 +411,10 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); + if (pTask->info.fillHistory == 0) { + atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); + } + *pAdded = true; return 0; } @@ -491,7 +495,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t if (ppTask) { pTask = *ppTask; if (streamTaskShouldPause(&pTask->status)) { - int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); + int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); qInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); } atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); @@ -640,8 +644,8 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; - qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId); + if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno)); return -1; @@ -713,15 +717,17 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { return -1; } + if (pTask->info.fillHistory == 0) { + atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); + } + if (streamTaskShouldPause(&pTask->status)) { - atomic_add_fetch_32(&pMeta->pauseTaskNum, 1); + atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); } ASSERT(pTask->status.downstreamReady == 0); } - qInfo("vgId:%d pause task num:%d", pMeta->vgId, pMeta->pauseTaskNum); - tdbFree(pKey); tdbFree(pVal); if (tdbTbcClose(pCur) < 0) { @@ -737,7 +743,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { } int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - qDebug("vgId:%d load %d tasks into meta from disk completed", pMeta->vgId, numOfTasks); + qDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks, + pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); taosArrayDestroy(pRecycleList); return 0; } @@ -749,8 +756,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { for (int32_t i = 0; i < pReq->numOfTasks; ++i) { STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i); - if (tEncodeI64(pEncoder, ps->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, ps->taskId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1; + if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, ps->status) < 0) return -1; } tEndEncode(pEncoder); @@ -765,8 +772,11 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry)); for (int32_t i = 0; i < pReq->numOfTasks; ++i) { STaskStatusEntry hb = {0}; - if (tDecodeI64(pDecoder, &hb.streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &hb.taskId) < 0) return -1; + if (tDecodeI64(pDecoder, &hb.id.streamId) < 0) return -1; + int32_t taskId = 0; + if (tDecodeI32(pDecoder, &taskId) < 0) return -1; + + hb.id.taskId = taskId; if (tDecodeI32(pDecoder, &hb.status) < 0) return -1; taosArrayPush(pReq->pTaskStatus, &hb); @@ -838,7 +848,7 @@ void metaHbToMnode(void* param, void* tmrId) { continue; } - STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus}; + STaskStatusEntry entry = {.id = *pId, .status = (*pTask)->status.taskStatus}; taosArrayPush(hbMsg.pTaskStatus, &entry); if (!hasValEpset) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 7a318e2310..d28ec85dd5 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -73,6 +73,7 @@ const char* streamGetTaskStatusStr(int32_t status) { case TASK_STATUS__CK: return "check-point"; case TASK_STATUS__DROPPING: return "dropping"; case TASK_STATUS__STOP: return "stop"; + case TASK_STATUS__UNINIT: return "uninitialized"; default:return ""; } } @@ -244,6 +245,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { ASSERT(pTask->historyTaskId.taskId == 0); } else { qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); + streamTaskEnablePause(pTask); } } @@ -818,7 +820,7 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { } if(pTask->info.taskLevel == TASK_LEVEL__SINK) { - int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1); + int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); qInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); return; } @@ -852,7 +854,7 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); - int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1); + int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); qInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); taosWUnLockLatch(&pMeta->lock); @@ -872,10 +874,10 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) { if (status == TASK_STATUS__PAUSE) { pTask->status.taskStatus = pTask->status.keepTaskStatus; pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; - int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); + int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); qInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); + int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); qInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); } else { qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 117c795a8d..4f320c3de0 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -165,9 +165,14 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1; - if (tDecodeI32(pDecoder, &pTask->historyTaskId.taskId)) return -1; + + int32_t taskId = pTask->historyTaskId.taskId; + if (tDecodeI32(pDecoder, &taskId)) return -1; + if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1; - if (tDecodeI32(pDecoder, &pTask->streamTaskId.taskId)) return -1; + + taskId = pTask->streamTaskId.taskId; + if (tDecodeI32(pDecoder, &taskId)) return -1; if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1; if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1; @@ -259,8 +264,11 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) { if (ver != SSTREAM_TASK_VER) return -1; if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTaskId->taskId) < 0) return -1; + int32_t taskId = 0; + if (tDecodeI32(pDecoder, &taskId) < 0) return -1; + + pTaskId->taskId = taskId; tEndDecode(pDecoder); return 0; } From 301258784a3a6e4d8e277667ec479a315bb6b138 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 17 Sep 2023 13:59:06 +0800 Subject: [PATCH 3/4] fix(stream): fix invalid read. --- source/libs/stream/src/streamMeta.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 970af07aa1..31bf5a482b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -532,6 +532,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t taosWLockLatch(&pMeta->lock); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { + // it is an fill-history task, remove the related stream task's id that points to it + if ((*ppTask)->info.fillHistory == 1) { + STaskId streamTaskId = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId}; + SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &streamTaskId, sizeof(streamTaskId)); + if (ppStreamTask != NULL) { + (*ppStreamTask)->historyTaskId.taskId = 0; + (*ppStreamTask)->historyTaskId.streamId = 0; + } + } + taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); @@ -545,16 +555,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t streamMetaReleaseTask(pMeta, pTask); } - // it is an fill-history task, remove the related stream task's id that points to it - if ((*ppTask)->info.fillHistory == 1) { - STaskId id1 = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId}; - SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id1, sizeof(id1)); - if (ppStreamTask != NULL) { - (*ppStreamTask)->historyTaskId.taskId = 0; - (*ppStreamTask)->historyTaskId.streamId = 0; - } - } - streamMetaRemoveTask(pMeta, &id); streamMetaReleaseTask(pMeta, pTask); } else { From 3dfdda328398ca4b2d3220741b641f6ec43dc9b4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 17 Sep 2023 18:07:26 +0800 Subject: [PATCH 4/4] fix(stream): fix error in decode stream task. --- source/dnode/mnode/impl/src/mndStream.c | 11 ----------- source/libs/stream/src/streamTask.c | 15 +++++++++------ 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 32f1fafdfc..f4110562a6 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -799,17 +799,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } } - // pDb = mndAcquireDb(pMnode, streamObj.sourceDb); - // if (pDb->cfg.replications != 1) { - // mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications); - // terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB; - // mndReleaseDb(pMnode, pDb); - // pDb = NULL; - // goto _OVER; - // } - - // mndReleaseDb(pMnode, pDb); - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream"); if (pTrans == NULL) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4f320c3de0..ba8578f98e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -97,9 +97,12 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1; if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1; - if (tEncodeI32(pEncoder, pTask->historyTaskId.taskId)) return -1; + int32_t taskId = pTask->historyTaskId.taskId; + if (tEncodeI32(pEncoder, taskId)) return -1; + if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1; - if (tEncodeI32(pEncoder, pTask->streamTaskId.taskId)) return -1; + taskId = pTask->streamTaskId.taskId; + if (tEncodeI32(pEncoder, taskId)) return -1; if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1; if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1; @@ -141,6 +144,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { } int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { + int32_t taskId = 0; + if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; if (pTask->ver != SSTREAM_TASK_VER) return -1; @@ -165,14 +170,12 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1; - - int32_t taskId = pTask->historyTaskId.taskId; if (tDecodeI32(pDecoder, &taskId)) return -1; + pTask->historyTaskId.taskId = taskId; if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1; - - taskId = pTask->streamTaskId.taskId; if (tDecodeI32(pDecoder, &taskId)) return -1; + pTask->streamTaskId.taskId = taskId; if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1; if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;