refactor: do some internal refactor.
This commit is contained in:
parent
7a32b3a209
commit
53b2158c54
|
@ -184,11 +184,7 @@ void qDestroyTask(qTaskInfo_t tinfo);
|
|||
|
||||
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
|
||||
|
||||
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/);
|
||||
|
||||
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
|
||||
|
||||
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len);
|
||||
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList);
|
||||
|
||||
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
|
||||
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
|
||||
|
|
|
@ -871,32 +871,6 @@ int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
|
|||
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
|
||||
}
|
||||
|
||||
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
|
||||
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
|
||||
if (pTaskInfo->pRoot == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
int32_t nOptrWithVal = 0;
|
||||
// int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
|
||||
// if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
|
||||
// taosMemoryFreeClear(*pOutput);
|
||||
// *len = 0;
|
||||
// }
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
|
||||
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
|
||||
|
||||
if (pTaskInfo == NULL || pInput == NULL || len == 0) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
return 0;
|
||||
// return decodeOperator(pTaskInfo->pRoot, pInput, len);
|
||||
}
|
||||
|
||||
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||
|
|
|
@ -647,23 +647,25 @@ int32_t streamExecTask(SStreamTask* pTask) {
|
|||
int32_t streamTaskReleaseState(SStreamTask* pTask) {
|
||||
stDebug("s-task:%s release exec state", pTask->id.idStr);
|
||||
void* pExecutor = pTask->exec.pExecutor;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pExecutor != NULL) {
|
||||
int32_t code = qStreamOperatorReleaseState(pExecutor);
|
||||
return code;
|
||||
} else {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
code = qStreamOperatorReleaseState(pExecutor);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamTaskReloadState(SStreamTask* pTask) {
|
||||
stDebug("s-task:%s reload exec state", pTask->id.idStr);
|
||||
void* pExecutor = pTask->exec.pExecutor;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pExecutor != NULL) {
|
||||
int32_t code = qStreamOperatorReloadState(pExecutor);
|
||||
return code;
|
||||
} else {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
code = qStreamOperatorReloadState(pExecutor);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamAlignTransferState(SStreamTask* pTask) {
|
||||
|
|
Loading…
Reference in New Issue