diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c41834bd82..4c359975ce 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -106,6 +106,7 @@ typedef struct { } SStreamQueueItem; typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data); +typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver); typedef struct { @@ -154,8 +155,6 @@ typedef struct { int64_t size; } SStreamQueueRes; -void streamFreeQitem(SStreamQueueItem* data); - #if 0 bool streamQueueResEmpty(const SStreamQueueRes* pRes); int64_t streamQueueResSize(const SStreamQueueRes* pRes); @@ -185,12 +184,6 @@ typedef struct { int32_t streamInit(); void streamCleanUp(); -SStreamQueue* streamQueueOpen(int64_t cap); -void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); -void streamQueueProcessSuccess(SStreamQueue* queue); -void streamQueueProcessFail(SStreamQueue* queue); -void* streamQueueNextItem(SStreamQueue* pQueue); - SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); @@ -222,8 +215,6 @@ typedef struct { SSHashObj* pTblInfo; } STaskSinkTb; -typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); - typedef struct { int64_t smaId; // following are not applicable to encoder and decoder @@ -244,10 +235,10 @@ typedef struct SStreamChildEpInfo { int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer } SStreamChildEpInfo; -typedef struct SStreamTaskKey { +typedef struct STaskId { int64_t streamId; int32_t taskId; -} SStreamTaskKey; +} STaskId; typedef struct SStreamTaskId { int64_t streamId; @@ -341,8 +332,8 @@ struct SStreamTask { SCheckpointInfo chkInfo; STaskExec exec; SDataRange dataRange; - SStreamTaskId historyTaskId; - SStreamTaskId streamTaskId; + STaskId historyTaskId; + STaskId streamTaskId; STaskExecStatisInfo taskExecInfo; SArray* pReadyMsgList; // SArray TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ @@ -386,7 +377,7 @@ typedef struct SStreamMeta { TTB* pTaskDb; TTB* pCheckpointDb; SHashObj* pTasksMap; - SArray* pTaskList; // SArray + SArray* pTaskList; // SArray void* ahandle; TXN* txn; FTaskExpand* expandFunc; @@ -425,7 +416,7 @@ void tFreeStreamTask(SStreamTask* pTask); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); -int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId); +int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId); int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock); @@ -516,7 +507,7 @@ typedef struct { int32_t downstreamTaskId; int32_t upstreamNodeId; int32_t childId; -} SStreamScanHistoryFinishReq, SStreamTransferReq; +} SStreamScanHistoryFinishReq; int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq); int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq); @@ -704,7 +695,7 @@ void streamMetaCleanup(); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage); void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store -int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey); +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 6e1b30d3c8..4bf4e64e52 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -240,7 +240,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointTickInterval = 30; +int32_t tsStreamCheckpointTickInterval = 300; int32_t tsStreamNodeCheckInterval = 10; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 2b1885fb0e..ef9c1ebe2e 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -189,15 +189,17 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); - - SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId); - if (pTask == NULL) { - qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId); - return 0; - } - streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId); - streamMetaReleaseTask(pSnode->pMeta, pTask); + + // commit the update + taosWLockLatch(&pSnode->pMeta->lock); + int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); + qDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks); + + if (streamMetaCommit(pSnode->pMeta) < 0) { + // persist to disk + } + taosWUnLockLatch(&pSnode->pMeta->lock); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e665e4c408..66d183440f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -726,7 +726,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { SStreamTask* pStateTask = pTask; SStreamTask task = {0}; if (pTask->info.fillHistory) { - task.id = pTask->streamTaskId; + task.id.streamId = pTask->streamTaskId.streamId; + task.id.taskId = pTask->streamTaskId.taskId; task.pMeta = pTask->pMeta; pStateTask = &task; } @@ -760,7 +761,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { SStreamTask* pSateTask = pTask; SStreamTask task = {0}; if (pTask->info.fillHistory) { - task.id = pTask->streamTaskId; + task.id.streamId = pTask->streamTaskId.streamId; + task.id.taskId = pTask->streamTaskId.taskId; task.pMeta = pTask->pMeta; pSateTask = &task; } @@ -1675,9 +1677,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { taosWLockLatch(&pMeta->lock); // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. - int64_t keys[2] = {req.streamId, req.taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); - + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL || *ppTask == NULL) { tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, req.taskId); @@ -1695,10 +1696,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask** ppHTask = NULL; if (pTask->historyTaskId.taskId != 0) { - keys[0] = pTask->historyTaskId.streamId; - keys[1] = pTask->historyTaskId.taskId; - - ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->historyTaskId, sizeof(pTask->historyTaskId)); if (ppHTask == NULL || *ppHTask == NULL) { tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", pMeta->vgId, req.taskId); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 1e66988aab..d82410e6ea 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -240,8 +240,8 @@ int32_t tqStartStreamTasks(STQ* pTq) { for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); - int64_t key[2] = {pTaskId->streamId, pTaskId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, key, sizeof(key)); + STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); int8_t status = (*pTask)->status.taskStatus; if (status == TASK_STATUS__STOP && (*pTask)->info.fillHistory != 1) { diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index f0da479700..458e2c74ff 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -227,23 +227,20 @@ _err: int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; STQ* pTq = pWriter->pTq; - STqHandle handle; SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; if (pHdr->type == SNAP_DATA_STREAM_TASK) { - SStreamTaskId task = {0}; + STaskId taskId = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - - code = tDecodeStreamTaskId(&decoder, &task); + code = tDecodeStreamTaskId(&decoder, &taskId); if (code < 0) { tDecoderClear(&decoder); goto _err; } tDecoderClear(&decoder); - // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) - int64_t key[2] = {task.streamId, task.taskId}; + int64_t key[2] = {taskId.streamId, taskId.taskId}; taosWLockLatch(&pTq->pStreamMeta->lock); if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index fb11ec4ea4..64df8e2f44 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -86,6 +86,15 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate); +SStreamQueue* streamQueueOpen(int64_t cap); +void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); +void streamQueueProcessSuccess(SStreamQueue* queue); +void streamQueueProcessFail(SStreamQueue* queue); +void* streamQueueNextItem(SStreamQueue* pQueue); +void streamFreeQitem(SStreamQueueItem* data); + +STaskId extractStreamTaskKey(const SStreamTask* pTask); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index cfbfdb5da4..fce3526bee 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -266,11 +266,8 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { int64_t keys[2]; for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { - SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - keys[0] = pId->streamId; - keys[1] = pId->taskId; - - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); if (ppTask == NULL) { continue; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3b3dca7f5f..91c46c8ad9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -300,10 +300,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { qError( "s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related " "fill-history task", - pTask->id.idStr, pTask->streamTaskId.taskId); + pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId); // 1. free it and remove fill-history task from disk meta-store -// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id); // 2. save to disk @@ -371,6 +370,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 5. clear the link between fill-history task and stream task info pStreamTask->historyTaskId.taskId = 0; + pStreamTask->historyTaskId.streamId = 0; // 6. save to disk taosWLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 78eee339f1..afb8349234 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -36,7 +36,6 @@ static void metaHbToMnode(void* param, void* tmrId); static void streamMetaClear(SStreamMeta* pMeta); static int32_t streamMetaBegin(SStreamMeta* pMeta); static void streamMetaCloseImpl(void* arg); -static void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask); typedef struct { TdThreadMutex mutex; @@ -361,10 +360,8 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); - int64_t key[2] = {0}; - extractStreamTaskKey(key, pTask); - - if (tdbTbUpsert(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn) < 0) { + int64_t id[2] = {pTask->id.streamId, pTask->id.taskId}; + if (tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn) < 0) { qError("s-task:%s save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno)); return -1; } @@ -373,18 +370,14 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { return 0; } -void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask) { - pKey[0] = pTask->id.streamId; - pKey[1] = pTask->id.taskId; -} - -int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey) { - int32_t code = tdbTbDelete(pMeta->pTaskDb, pKey, STREAM_TASK_KEY_LEN, pMeta->txn); +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) { + int64_t key[2] = {pTaskId->streamId, pTaskId->taskId}; + int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn); if (code != 0) { - qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pKey[1], + qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t) pTaskId->taskId, tstrerror(terrno)); } else { - qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pKey[1]); + qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t) pTaskId->taskId); } return code; @@ -394,8 +387,8 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey) { int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) { *pAdded = false; - int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; - void* p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { tFreeStreamTask(pTask); @@ -417,7 +410,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa return 0; } - taosHashPut(pMeta->pTasksMap, keys, sizeof(keys), &pTask, POINTER_BYTES); + taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); *pAdded = true; return 0; } @@ -432,10 +425,8 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) { int32_t num = 0; size_t size = taosArrayGetSize(pMeta->pTaskList); for (int32_t i = 0; i < size; ++i) { - SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - int64_t keys[2] = {pId->streamId, pId->taskId}; - - SStreamTask** p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + SStreamTask** p = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); if (p == NULL) { continue; } @@ -451,8 +442,8 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) { SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { taosRLockLatch(&pMeta->lock); - int64_t keys[2] = {streamId, taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + STaskId id = {.streamId = streamId, .taskId = taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask != NULL) { if (!streamTaskShouldStop(&(*ppTask)->status)) { int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); @@ -495,8 +486,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t // pre-delete operation taosWLockLatch(&pMeta->lock); - int64_t keys[2] = {streamId, taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + STaskId id = {.streamId = streamId, .taskId = taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; if (streamTaskShouldPause(&pTask->status)) { @@ -516,7 +507,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t while (1) { taosRLockLatch(&pMeta->lock); - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { if ((*ppTask)->status.timerActive == 0) { @@ -535,9 +526,9 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t // let's do delete of stream task taosWLockLatch(&pMeta->lock); - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { - taosHashRemove(pMeta->pTasksMap, keys, sizeof(keys)); + taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); ASSERT(pTask->status.timerActive == 0); @@ -550,7 +541,17 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t streamMetaReleaseTask(pMeta, pTask); } - streamMetaRemoveTask(pMeta, keys); + // it is an fill-history task, remove the related stream task's id that points to it + if ((*ppTask)->info.fillHistory == 1) { + STaskId id1 = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId}; + SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id1, sizeof(id1)); + if (ppStreamTask != NULL) { + (*ppStreamTask)->historyTaskId.taskId = 0; + (*ppStreamTask)->historyTaskId.streamId = 0; + } + } + + streamMetaRemoveTask(pMeta, &id); streamMetaReleaseTask(pMeta, pTask); } else { qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); @@ -651,7 +652,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { void* pVal = NULL; int32_t vLen = 0; SDecoder decoder; - SArray* pRecycleList = taosArrayInit(4, STREAM_TASK_KEY_LEN); + SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId)); tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { @@ -678,18 +679,17 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int32_t taskId = pTask->id.taskId; tFreeStreamTask(pTask); - int64_t key[2] = {0}; - extractStreamTaskKey(key, pTask); + STaskId id = extractStreamTaskKey(pTask); - taosArrayPush(pRecycleList, key); + taosArrayPush(pRecycleList, &id); int32_t total = taosArrayGetSize(pRecycleList); qDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total); continue; } // do duplicate task check. - int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; - void* p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { // pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader // In this case, we try not to start fill-history task anymore. @@ -707,7 +707,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { continue; } - if (taosHashPut(pMeta->pTasksMap, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) { + if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) < 0) { doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); return -1; @@ -731,7 +731,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (taosArrayGetSize(pRecycleList) > 0) { for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { - int64_t* pId = taosArrayGet(pRecycleList, i); + STaskId* pId = taosArrayGet(pRecycleList, i); streamMetaRemoveTask(pMeta, pId); } } @@ -831,10 +831,9 @@ void metaHbToMnode(void* param, void* tmrId) { hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); + STaskId* pId = taosArrayGet(pMeta->pTaskList, i); - int64_t keys[2] = {pId->streamId, pId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); if ((*pTask)->info.fillHistory == 1) { continue; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index db2e418171..7a318e2310 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -21,8 +21,7 @@ typedef struct SStreamTaskRetryInfo { SStreamMeta* pMeta; - int32_t taskId; - int64_t streamId; + STaskId id; } SStreamTaskRetryInfo; static int32_t streamSetParamForScanHistory(SStreamTask* pTask); @@ -520,12 +519,10 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { SStreamTaskRetryInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; - qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId); + qDebug("s-task:0x%x in timer to launch related history task", (int32_t) pInfo->id.taskId); taosWLockLatch(&pMeta->lock); - int64_t keys[2] = {pInfo->streamId, pInfo->taskId}; - - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id)); if (ppTask) { ASSERT((*ppTask)->status.timerActive >= 1); @@ -541,7 +538,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { } taosWUnLockLatch(&pMeta->lock); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->streamId, pInfo->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId); if (pTask != NULL) { ASSERT(pTask->status.timerActive >= 1); @@ -552,7 +549,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { qWarn( "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been " "destroyed, or should stop", - pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId); + pTask->id.idStr, pMeta->vgId, pStatus, (int32_t) pTask->historyTaskId.taskId); taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer); streamMetaReleaseTask(pMeta, pTask); @@ -568,7 +565,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { atomic_sub_fetch_8(&pTask->status.timerActive, 1); streamMetaReleaseTask(pMeta, pTask); } else { - qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId); + qError("s-task:0x%x failed to load task, it may have been destroyed", (int32_t) pInfo->id.taskId); } taosMemoryFree(pInfo); @@ -587,17 +584,15 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, pTask->historyTaskId.streamId, hTaskId); - int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId}; - // Set the execute conditions, including the query time window and the version range - SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->historyTaskId, sizeof(pTask->historyTaskId)); if (pHTask == NULL) { qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, pMeta->vgId, hTaskId); SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo)); - pInfo->taskId = pTask->id.taskId; - pInfo->streamId = pTask->id.streamId; + pInfo->id.taskId = pTask->id.taskId; + pInfo->id.streamId = pTask->id.streamId; pInfo->pMeta = pTask->pMeta; if (pTask->launchTaskTimer == NULL) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 23ace63d18..117c795a8d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -251,7 +251,8 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) tEndDecode(pDecoder); return 0; } -int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId) { + +int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) { int64_t ver; if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &ver) < 0) return -1; @@ -478,8 +479,8 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); if (pInfo->nodeId == nodeId) { epsetAssign(&pInfo->epSet, pEpSet); - qDebug("s-task:0x%x update the upstreamInfo, nodeId:%d taskId:0x%x newEpset:%s", pTask->id.taskId, nodeId, - pInfo->taskId, buf); + qDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, + pInfo->taskId, nodeId, buf); break; } } @@ -509,7 +510,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE if (pVgInfo->vgId == nodeId) { epsetAssign(&pVgInfo->epSet, pEpSet); - qDebug("s-task:0x%x update the dispatch info, nodeId:%d newEpset:%s", pTask->id.taskId, nodeId, buf); + qDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, + pVgInfo->taskId, nodeId, buf); break; } } @@ -517,7 +519,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher; if (pDispatcher->nodeId == nodeId) { epsetAssign(&pDispatcher->epSet, pEpSet); - qDebug("s-task:0x%x update the dispatch info, nodeId:%d newEpSet:%s", pTask->id.taskId, nodeId, buf); + qDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId, + pDispatcher->taskId, nodeId, buf); } } else { // do nothing @@ -567,17 +570,19 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { STaskExecStatisInfo* p = &pTask->taskExecInfo; - qDebug("s-task:%s update task nodeEp epset, update count:%d, prevTs:%"PRId64, pTask->id.idStr, - p->updateCount + 1, p->latestUpdateTs); - p->updateCount += 1; + int32_t numOfNodes = taosArrayGetSize(pNodeList); + int64_t prevTs = p->latestUpdateTs; + p->latestUpdateTs = taosGetTimestampMs(); + p->updateCount += 1; + qDebug("s-task:%s update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.idStr, + numOfNodes, p->updateCount, prevTs); for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i); doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp); } - return 0; } @@ -649,3 +654,8 @@ int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamT qDebug("vgId:%d build and send drop table:0x%x msg", vgId, pTaskId->taskId); return code; } + +STaskId extractStreamTaskKey(const SStreamTask* pTask) { + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + return id; +} \ No newline at end of file