fix:conflicts from 3.0
This commit is contained in:
parent
a8e45440c0
commit
ee96127020
|
@ -222,13 +222,12 @@ static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgrou
|
|||
int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
||||
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
||||
|
||||
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory);
|
||||
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory);
|
||||
if (pTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
epsetAssign(&(pTask)->info.mnodeEpset, pEpset);
|
||||
mDebug("doAddSinkTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
|
||||
|
||||
pTask->info.nodeId = pVgroup->vgId;
|
||||
|
@ -279,25 +278,60 @@ static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet*
|
|||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, int64_t firstWindowSkey,
|
||||
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
||||
SVgroupVer* pVer = taosArrayGet(pList, i);
|
||||
if (pVer->vgId == vgId) {
|
||||
return pVer->ver;
|
||||
}
|
||||
}
|
||||
|
||||
mError("failed to find the vgId:%d for extract last version", vgId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) {
|
||||
int64_t latestVer = getVgroupLastVer(pVerList, vgId);
|
||||
if (latestVer < 0) {
|
||||
latestVer = 0;
|
||||
}
|
||||
|
||||
// set the correct ts, which is the last key of queried table.
|
||||
SDataRange* pRange = &pTask->dataRange;
|
||||
STimeWindow* pWindow = &pRange->window;
|
||||
|
||||
if (pTask->info.fillHistory) {
|
||||
pWindow->skey = INT64_MIN;
|
||||
pWindow->ekey = skey - 1;
|
||||
|
||||
pRange->range.minVer = 0;
|
||||
pRange->range.maxVer = latestVer;
|
||||
mDebug("add fill-history source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
|
||||
pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
|
||||
} else {
|
||||
pWindow->skey = skey;
|
||||
pWindow->ekey = INT64_MAX;
|
||||
|
||||
pRange->range.minVer = latestVer + 1;
|
||||
pRange->range.maxVer = INT64_MAX;
|
||||
|
||||
mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
|
||||
pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
|
||||
}
|
||||
}
|
||||
|
||||
static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset,
|
||||
bool isFillhistory, bool useTriggerParam) {
|
||||
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
||||
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
||||
|
||||
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE,
|
||||
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset,
|
||||
isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
|
||||
*pTaskList, pStream->conf.fillHistory);
|
||||
if (pTask == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
epsetAssign(&pTask->info.mnodeEpset, pEpset);
|
||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||
|
||||
pWindow->skey = INT64_MIN;
|
||||
pWindow->ekey = firstWindowSkey - 1;
|
||||
mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pWindow->skey, pWindow->ekey);
|
||||
|
||||
return pTask;
|
||||
}
|
||||
|
||||
|
@ -331,15 +365,17 @@ static void setHTasksId(SStreamObj* pStream) {
|
|||
}
|
||||
|
||||
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
|
||||
int64_t nextWindowSkey, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam ){
|
||||
int64_t skey, SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam ){
|
||||
// new stream task
|
||||
SStreamTask* pTask = buildSourceTask(pStream, pEpset, nextWindowSkey, isFillhistory, useTriggerParam);
|
||||
SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam);
|
||||
if(pTask == NULL){
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
|
||||
|
||||
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
|
||||
|
||||
int32_t code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
|
||||
if(code != 0){
|
||||
terrno = code;
|
||||
|
@ -380,7 +416,7 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){
|
|||
}
|
||||
|
||||
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream,
|
||||
SEpSet* pEpset, int64_t nextWindowSkey, bool useTriggerParam) {
|
||||
SEpSet* pEpset, int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
|
||||
addNewTaskList(pStream);
|
||||
|
||||
void* pIter = NULL;
|
||||
|
@ -397,14 +433,14 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream
|
|||
continue;
|
||||
}
|
||||
|
||||
int code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVgroup, false, useTriggerParam);
|
||||
int code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
|
||||
if(code != 0){
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pStream->conf.fillHistory) {
|
||||
code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVgroup, true, useTriggerParam);
|
||||
code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, true, useTriggerParam);
|
||||
if(code != 0){
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return code;
|
||||
|
@ -425,7 +461,7 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil
|
|||
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
||||
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
||||
|
||||
SStreamTask* pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, isFillhistory,
|
||||
SStreamTask* pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory,
|
||||
useTriggerParam ? pStream->conf.triggerParam : 0,
|
||||
*pTaskList, pStream->conf.fillHistory);
|
||||
if (pAggTask == NULL) {
|
||||
|
@ -433,7 +469,6 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil
|
|||
return NULL;
|
||||
}
|
||||
|
||||
epsetAssign(&pAggTask->info.mnodeEpset, pEpset);
|
||||
return pAggTask;
|
||||
}
|
||||
|
||||
|
@ -566,7 +601,8 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
|
|||
mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr);
|
||||
}
|
||||
|
||||
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) {
|
||||
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, int64_t skey,
|
||||
SArray* pVerList) {
|
||||
SSdb* pSdb = pMnode->pSdb;
|
||||
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
|
||||
bool hasExtraSink = false;
|
||||
|
@ -600,7 +636,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
|||
if (plan == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
int32_t code = addSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, numOfPlanLevel == 1);
|
||||
int32_t code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, numOfPlanLevel == 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue