diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 6dd1e5c1c3..b54f19e4ce 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -113,25 +113,23 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32 int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); +STaskId streamTaskExtractKey(const SStreamTask* pTask); +void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); +void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); +int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen); +int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize); int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); const char* streamQueueItemGetTypeStr(int32_t type); +SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); -SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); - -int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen); int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); -int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask); -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); -STaskId streamTaskExtractKey(const SStreamTask* pTask); -void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); -void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); - SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); void streamQueueProcessSuccess(SStreamQueue* queue); @@ -152,8 +150,8 @@ int downloadCheckpoint(char* id, char* path); int deleteCheckpoint(char* id); int deleteCheckpointFile(char* id, char* name); -int32_t onNormalTaskReady(SStreamTask* pTask); -int32_t onScanhistoryTaskReady(SStreamTask* pTask); +int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); +int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index f6ec6e9fdb..bcda85e7a7 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -158,7 +158,7 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm } // todo handle memory error -SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) { +SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) { terrno = 0; if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) { diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index d19dfc13bf..d1610362f9 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -221,7 +221,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu *pInput = qItem; } else { // merge current block failed, let's handle the already merged blocks. - void* newRet = streamMergeQueueItem(*pInput, qItem); + void* newRet = streamQueueMergeQueueItem(*pInput, qItem); if (newRet == NULL) { if (terrno != 0) { stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 9b27281915..745d030e15 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -322,7 +322,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ } } -int32_t onNormalTaskReady(SStreamTask* pTask) { +int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) { const char* id = pTask->id.idStr; streamTaskSetReady(pTask); @@ -347,7 +347,7 @@ int32_t onNormalTaskReady(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t onScanhistoryTaskReady(SStreamTask* pTask) { +int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { const char* id = pTask->id.idStr; // set the state to be ready diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 1c951e1452..cac3766893 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -457,11 +457,11 @@ void doInitStateTransferTable(void) { streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans)); // initialization event handle - STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false); + STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); // scan-history related event