refactor: do some internal refactor.
This commit is contained in:
parent
a8262a93b8
commit
f54fe2d6ef
|
@ -113,25 +113,23 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32
|
||||||
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
|
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
|
||||||
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
|
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
|
||||||
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
|
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
|
||||||
|
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
||||||
|
STaskId streamTaskExtractKey(const SStreamTask* pTask);
|
||||||
|
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
||||||
|
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
||||||
|
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
|
||||||
|
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize);
|
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize);
|
||||||
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
|
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
|
||||||
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
|
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
|
||||||
const char* streamQueueItemGetTypeStr(int32_t type);
|
const char* streamQueueItemGetTypeStr(int32_t type);
|
||||||
|
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
||||||
|
|
||||||
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
|
||||||
|
|
||||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
|
|
||||||
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
|
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
|
||||||
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
||||||
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
|
||||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
|
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
|
||||||
STaskId streamTaskExtractKey(const SStreamTask* pTask);
|
|
||||||
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
|
||||||
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
|
||||||
|
|
||||||
SStreamQueue* streamQueueOpen(int64_t cap);
|
SStreamQueue* streamQueueOpen(int64_t cap);
|
||||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
|
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
|
||||||
void streamQueueProcessSuccess(SStreamQueue* queue);
|
void streamQueueProcessSuccess(SStreamQueue* queue);
|
||||||
|
@ -152,8 +150,8 @@ int downloadCheckpoint(char* id, char* path);
|
||||||
int deleteCheckpoint(char* id);
|
int deleteCheckpoint(char* id);
|
||||||
int deleteCheckpointFile(char* id, char* name);
|
int deleteCheckpointFile(char* id, char* name);
|
||||||
|
|
||||||
int32_t onNormalTaskReady(SStreamTask* pTask);
|
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
|
||||||
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
|
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -158,7 +158,7 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo handle memory error
|
// todo handle memory error
|
||||||
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
|
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
|
if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
|
||||||
|
|
|
@ -221,7 +221,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
||||||
*pInput = qItem;
|
*pInput = qItem;
|
||||||
} else {
|
} else {
|
||||||
// merge current block failed, let's handle the already merged blocks.
|
// merge current block failed, let's handle the already merged blocks.
|
||||||
void* newRet = streamMergeQueueItem(*pInput, qItem);
|
void* newRet = streamQueueMergeQueueItem(*pInput, qItem);
|
||||||
if (newRet == NULL) {
|
if (newRet == NULL) {
|
||||||
if (terrno != 0) {
|
if (terrno != 0) {
|
||||||
stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
|
stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
|
||||||
|
|
|
@ -323,7 +323,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t onNormalTaskReady(SStreamTask* pTask) {
|
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
streamTaskSetReady(pTask);
|
streamTaskSetReady(pTask);
|
||||||
|
@ -348,7 +348,7 @@ int32_t onNormalTaskReady(SStreamTask* pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t onScanhistoryTaskReady(SStreamTask* pTask) {
|
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
// set the state to be ready
|
// set the state to be ready
|
||||||
|
|
|
@ -457,11 +457,11 @@ void doInitStateTransferTable(void) {
|
||||||
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
|
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
|
||||||
|
|
||||||
// initialization event handle
|
// initialization event handle
|
||||||
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false);
|
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, false, false);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
// scan-history related event
|
// scan-history related event
|
||||||
|
|
Loading…
Reference in New Issue