refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-04-17 09:46:58 +08:00
parent 10b3fd9426
commit ac137b4b33
2 changed files with 16 additions and 15 deletions

View File

@ -335,17 +335,17 @@ struct SStreamTask {
// meta // meta
typedef struct SStreamMeta { typedef struct SStreamMeta {
char* path; char* path;
TDB* db; TDB* db;
TTB* pTaskDb; TTB* pTaskDb;
TTB* pCheckpointDb; TTB* pCheckpointDb;
SHashObj* pTasks; SHashObj* pTasks;
void* ahandle; void* ahandle;
TXN* txn; TXN* txn;
FTaskExpand* expandFunc; FTaskExpand* expandFunc;
int32_t vgId; int32_t vgId;
SRWLatch lock; SRWLatch lock;
int8_t walScan; int8_t walScan;
} SStreamMeta; } SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@ -358,10 +358,6 @@ void tFreeStreamTask(SStreamTask* pTask);
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem); int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem);
bool tInputQueueIsFull(const SStreamTask* pTask); bool tInputQueueIsFull(const SStreamTask* pTask);
static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
}
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int64_t streamId; int64_t streamId;
@ -537,6 +533,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); // int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
void streamTaskInputFail(SStreamTask* pTask);
int32_t streamTryExec(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask);
int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock);

View File

@ -352,4 +352,8 @@ void* streamQueueNextItem(SStreamQueue* queue) {
} }
return streamQueueCurItem(queue); return streamQueueCurItem(queue);
} }
}
void streamTaskInputFail(SStreamTask* pTask) {
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
} }