refactor: do some internal refactor.
This commit is contained in:
parent
281f25fdcc
commit
1a1319908d
|
@ -113,25 +113,23 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32
|
|||
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
|
||||
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
|
||||
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
||||
STaskId streamTaskExtractKey(const SStreamTask* pTask);
|
||||
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
||||
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
|
||||
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize);
|
||||
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
|
||||
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
|
||||
const char* streamQueueItemGetTypeStr(int32_t type);
|
||||
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
||||
|
||||
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
||||
|
||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
|
||||
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
|
||||
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
||||
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
||||
STaskId streamTaskExtractKey(const SStreamTask* pTask);
|
||||
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
||||
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
||||
|
||||
SStreamQueue* streamQueueOpen(int64_t cap);
|
||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
|
||||
void streamQueueProcessSuccess(SStreamQueue* queue);
|
||||
|
@ -152,8 +150,8 @@ int downloadCheckpoint(char* id, char* path);
|
|||
int deleteCheckpoint(char* id);
|
||||
int deleteCheckpointFile(char* id, char* name);
|
||||
|
||||
int32_t onNormalTaskReady(SStreamTask* pTask);
|
||||
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
|
||||
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
|
||||
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -158,7 +158,7 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm
|
|||
}
|
||||
|
||||
// todo handle memory error
|
||||
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
|
||||
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
|
||||
terrno = 0;
|
||||
|
||||
if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
|
||||
|
|
|
@ -221,7 +221,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
*pInput = qItem;
|
||||
} else {
|
||||
// merge current block failed, let's handle the already merged blocks.
|
||||
void* newRet = streamMergeQueueItem(*pInput, qItem);
|
||||
void* newRet = streamQueueMergeQueueItem(*pInput, qItem);
|
||||
if (newRet == NULL) {
|
||||
if (terrno != 0) {
|
||||
stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
|
||||
|
|
|
@ -322,7 +322,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
|
|||
}
|
||||
}
|
||||
|
||||
int32_t onNormalTaskReady(SStreamTask* pTask) {
|
||||
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
streamTaskSetReady(pTask);
|
||||
|
@ -347,7 +347,7 @@ int32_t onNormalTaskReady(SStreamTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t onScanhistoryTaskReady(SStreamTask* pTask) {
|
||||
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
// set the state to be ready
|
||||
|
|
|
@ -457,11 +457,11 @@ void doInitStateTransferTable(void) {
|
|||
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
|
||||
|
||||
// initialization event handle
|
||||
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false);
|
||||
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, false, false);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
// scan-history related event
|
||||
|
|
Loading…
Reference in New Issue